You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2021/07/16 21:30:53 UTC

[hadoop] 03/03: HDFS-16125. [FGL] Fix the iterator for PartitionedGSet. Contributed by Xing Lin. (#3197)

This is an automated email from the ASF dual-hosted git repository.

shv pushed a commit to branch fgl
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit b1e2c07379e7d46686f570220621a5e94d05e795
Author: Xing Lin <xi...@linkedin.com>
AuthorDate: Fri Jul 16 13:04:59 2021 -0700

    HDFS-16125. [FGL] Fix the iterator for PartitionedGSet. Contributed by Xing Lin. (#3197)
---
 .../java/org/apache/hadoop/util/LatchLock.java     |   4 +-
 .../org/apache/hadoop/util/PartitionedGSet.java    |  35 ++-
 .../apache/hadoop/util/TestPartitionedGSet.java    | 270 +++++++++++++++++++++
 .../hadoop/hdfs/server/namenode/INodeMap.java      |   4 +-
 4 files changed, 300 insertions(+), 13 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java
index 41e33da..fd98391 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java
@@ -30,7 +30,7 @@ public abstract class LatchLock<C> {
   protected abstract boolean isReadTopLocked();
   /** @return true topLock is locked for write by any thread */
   protected abstract boolean isWriteTopLocked();
-  protected abstract void readTopdUnlock();
+  protected abstract void readTopUnlock();
   protected abstract void writeTopUnlock();
 
   protected abstract boolean hasReadChildLock();
@@ -46,7 +46,7 @@ public abstract class LatchLock<C> {
   // Public APIs to use with the class
   public void readLock() {
     readChildLock();
-    readTopdUnlock();
+    readTopUnlock();
   }
 
   public void readUnlock() {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
index 7ebb1b3..f3569cc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
@@ -24,7 +24,7 @@ import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeMap;
-
+import java.util.NoSuchElementException;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
@@ -79,8 +79,7 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
 
   public PartitionedGSet(final int capacity,
       final Comparator<? super K> comparator,
-      final LatchLock<?> latchLock,
-      final E rootKey) {
+      final LatchLock<?> latchLock) {
     this.partitions = new TreeMap<K, PartitionEntry>(comparator);
     this.latchLock = latchLock;
     // addNewPartition(rootKey).put(rootKey);
@@ -275,17 +274,36 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
    * modifying other partitions, while iterating through the current one.
    */
   private class EntryIterator implements Iterator<E> {
-    private final Iterator<K> keyIterator;
+    private Iterator<K> keyIterator;
     private Iterator<E> partitionIterator;
 
     public EntryIterator() {
       keyIterator = partitions.keySet().iterator();
-      K curKey = partitions.firstKey();
-      partitionIterator = getPartition(curKey).iterator();
+ 
+      if (!keyIterator.hasNext()) {
+        partitionIterator = null;
+        return;
+      }
+
+      K firstKey = keyIterator.next();
+      partitionIterator = partitions.get(firstKey).iterator();
     }
 
     @Override
     public boolean hasNext() {
+
+      // Special case: an iterator was created for an empty PartitionedGSet.
+      // Check whether new partitions have been added since then.
+      if (partitionIterator == null) {
+        if (partitions.size() == 0) {
+          return false;
+        } else {
+          keyIterator = partitions.keySet().iterator();
+          K nextKey = keyIterator.next();
+          partitionIterator = partitions.get(nextKey).iterator();
+        }
+      }
+
       while(!partitionIterator.hasNext()) {
         if(!keyIterator.hasNext()) {
           return false;
@@ -298,9 +316,8 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
 
     @Override
     public E next() {
-      while(!partitionIterator.hasNext()) {
-        K curKey = keyIterator.next();
-        partitionIterator = getPartition(curKey).iterator();
+      if (!hasNext()) {
+        throw new NoSuchElementException("No more elements in this set.");
       }
       return partitionIterator.next();
     }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestPartitionedGSet.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestPartitionedGSet.java
new file mode 100644
index 0000000..9ae772c
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestPartitionedGSet.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Testing {@link PartitionedGSet} */
+public class TestPartitionedGSet {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestPartitionedGSet.class);
+  private static final int ELEMENT_NUM = 100;
+
+  /**
+   * Generate positive random numbers for testing. We want to use only positive
+   * numbers because the smallest partition used in testing is 0.
+   *
+   * @param length
+   *    number of random numbers to be generated.
+   *
+   * @param randomSeed
+   *    seed to be used for random number generator.
+   *
+   * @return
+   *    An array of Integers
+   */
+  private static ArrayList<Integer> getRandomList(int length, int randomSeed) {
+    Random random = new Random(randomSeed);
+    ArrayList<Integer> list = new ArrayList<Integer>(length);
+    for (int i = 0; i < length; i++) {
+      list.add(random.nextInt(Integer.MAX_VALUE));
+    }
+    return list;
+  }
+
+  private static class TestElement implements LinkedElement {
+    private final int val;
+    private LinkedElement next;
+
+    TestElement(int val) {
+      this.val = val;
+      this.next = null;
+    }
+
+    public int getVal() {
+      return val;
+    }
+
+    @Override
+    public void setNext(LinkedElement next) {
+      this.next = next;
+    }
+
+    @Override
+    public LinkedElement getNext() {
+      return next;
+    }
+  }
+
+  private static class TestElementComparator implements Comparator<TestElement>
+  {
+    @Override
+    public int compare(TestElement e1, TestElement e2) {
+      if (e1 == null || e2 == null) {
+        throw new NullPointerException("Cannot compare null elements");
+      }
+
+      return e1.getVal() - e2.getVal();
+    }
+  }
+
+  protected ReentrantReadWriteLock topLock =
+      new ReentrantReadWriteLock(false);
+  /**
+   * We are NOT testing any concurrent access to a PartitionedGSet here.
+   */
+  private class NoOpLock extends LatchLock<ReentrantReadWriteLock> {
+    private ReentrantReadWriteLock childLock;
+
+    public NoOpLock() {
+      childLock = new ReentrantReadWriteLock(false);
+    }
+
+    @Override
+    protected boolean isReadTopLocked() {
+      return topLock.getReadLockCount() > 0 || isWriteTopLocked();
+    }
+
+    @Override
+    protected boolean isWriteTopLocked() {
+      return topLock.isWriteLocked();
+    }
+
+    @Override
+    protected void readTopUnlock() {
+      topLock.readLock().unlock();
+    }
+
+    @Override
+    protected void writeTopUnlock() {
+      topLock.writeLock().unlock();
+    }
+
+    @Override
+    protected boolean hasReadChildLock() {
+      return childLock.getReadLockCount() > 0 || hasWriteChildLock();
+    }
+
+    @Override
+    protected void readChildLock() {
+      childLock.readLock().lock();
+    }
+
+    @Override
+    protected void readChildUnlock() {
+      childLock.readLock().unlock();
+    }
+
+    @Override
+    protected boolean hasWriteChildLock() {
+      return childLock.isWriteLockedByCurrentThread();
+    }
+
+    @Override
+    protected void writeChildLock() {
+      childLock.writeLock().lock();
+    }
+
+    @Override
+    protected void writeChildUnlock() {
+      childLock.writeLock().unlock();
+    }
+
+    @Override
+    protected LatchLock<ReentrantReadWriteLock> clone() {
+      return new NoOpLock();
+    }
+  }
+
+  /**
+   * Test iterator for a PartitionedGSet with no partitions.
+   */
+  @Test(timeout=60000)
+  public void testIteratorForNoPartition() {
+    PartitionedGSet<TestElement, TestElement> set =
+        new PartitionedGSet<TestElement, TestElement>(
+            16, new TestElementComparator(), new NoOpLock());
+
+    topLock.readLock().lock();
+    int count = 0;
+    Iterator<TestElement> iter = set.iterator();
+    while( iter.hasNext() ) {
+      iter.next();
+      count ++;
+    }
+    topLock.readLock().unlock();
+    Assert.assertEquals(0, count);
+  }
+
+  /**
+   * Test iterator for a PartitionedGSet with empty partitions.
+   */
+  @Test(timeout=60000)
+  public void testIteratorForEmptyPartitions() {
+    PartitionedGSet<TestElement, TestElement> set =
+        new PartitionedGSet<TestElement, TestElement>(
+            16, new TestElementComparator(), new NoOpLock());
+
+    set.addNewPartition(new TestElement(0));
+    set.addNewPartition(new TestElement(1000));
+    set.addNewPartition(new TestElement(2000));
+
+    topLock.readLock().lock();
+    int count = 0;
+    Iterator<TestElement> iter = set.iterator();
+    while( iter.hasNext() ) {
+      iter.next();
+      count ++;
+    }
+    topLock.readLock().unlock();
+    Assert.assertEquals(0, count);
+  }
+
+  /**
+   * Test whether the iterator can return the same number of elements as stored
+   * into the PartitionedGSet.
+   */
+  @Test(timeout=60000)
+  public void testIteratorCountElements() {
+    ArrayList<Integer> list = getRandomList(ELEMENT_NUM, 123);
+    PartitionedGSet<TestElement, TestElement> set =
+        new PartitionedGSet<TestElement, TestElement>(
+            16, new TestElementComparator(), new NoOpLock());
+
+    set.addNewPartition(new TestElement(0));
+    set.addNewPartition(new TestElement(1000));
+    set.addNewPartition(new TestElement(2000));
+
+    topLock.writeLock().lock();
+    for (Integer i : list) {
+      set.put(new TestElement(i));
+    }
+    topLock.writeLock().unlock();
+
+    topLock.readLock().lock();
+    int count = 0;
+    Iterator<TestElement> iter = set.iterator();
+    while( iter.hasNext() ) {
+      iter.next();
+      count ++;
+    }
+    topLock.readLock().unlock();
+    Assert.assertEquals(ELEMENT_NUM, count);
+  }
+
+  /**
+   * Test iterator when it is created before partitions/elements are
+   * added to the PartitionedGSet.
+   */
+  @Test(timeout=60000)
+  public void testIteratorAddElementsAfterIteratorCreation() {
+    PartitionedGSet<TestElement, TestElement> set =
+        new PartitionedGSet<TestElement, TestElement>(
+            16, new TestElementComparator(), new NoOpLock());
+
+    // Create the iterator before partitions are added.
+    Iterator<TestElement> iter = set.iterator();
+
+    set.addNewPartition(new TestElement(0));
+    set.addNewPartition(new TestElement(1000));
+    set.addNewPartition(new TestElement(2000));
+
+    // Added one element
+    topLock.writeLock().lock();
+    set.put(new TestElement(2500));
+    topLock.writeLock().unlock();
+
+    topLock.readLock().lock();
+    int count = 0;
+    while( iter.hasNext() ) {
+      iter.next();
+      count ++;
+    }
+    topLock.readLock().unlock();
+    Assert.assertEquals(1, count);
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
index 3b07dce..a0253b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
@@ -121,7 +121,7 @@ public class INodeMap {
     }
 
     @Override
-    protected void readTopdUnlock() {
+    protected void readTopUnlock() {
       namesystem.getFSLock().readUnlock("INodeMap", null, false);
     }
 
@@ -194,7 +194,7 @@ public class INodeMap {
     // Compute the map capacity by allocating 1% of total memory
     int capacity = LightWeightGSet.computeCapacity(1, "INodeMap");
     this.map = new PartitionedGSet<>(capacity, new INodeKeyComparator(),
-            new INodeMapLock(), rootDir);
+            new INodeMapLock());
 
     // Pre-populate initial empty partitions
     PartitionedGSet<INode, INodeWithAdditionalFields> pgs =

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org