You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2021/07/16 21:30:53 UTC
[hadoop] 03/03: HDFS-16125. [FGL] Fix the iterator for
PartitionedGSet. Contributed by Xing Lin. (#3197)
This is an automated email from the ASF dual-hosted git repository.
shv pushed a commit to branch fgl
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit b1e2c07379e7d46686f570220621a5e94d05e795
Author: Xing Lin <xi...@linkedin.com>
AuthorDate: Fri Jul 16 13:04:59 2021 -0700
HDFS-16125. [FGL] Fix the iterator for PartitionedGSet. Contributed by Xing Lin. (#3197)
---
.../java/org/apache/hadoop/util/LatchLock.java | 4 +-
.../org/apache/hadoop/util/PartitionedGSet.java | 35 ++-
.../apache/hadoop/util/TestPartitionedGSet.java | 270 +++++++++++++++++++++
.../hadoop/hdfs/server/namenode/INodeMap.java | 4 +-
4 files changed, 300 insertions(+), 13 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java
index 41e33da..fd98391 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java
@@ -30,7 +30,7 @@ public abstract class LatchLock<C> {
protected abstract boolean isReadTopLocked();
/** @return true topLock is locked for write by any thread */
protected abstract boolean isWriteTopLocked();
- protected abstract void readTopdUnlock();
+ protected abstract void readTopUnlock();
protected abstract void writeTopUnlock();
protected abstract boolean hasReadChildLock();
@@ -46,7 +46,7 @@ public abstract class LatchLock<C> {
// Public APIs to use with the class
public void readLock() {
readChildLock();
- readTopdUnlock();
+ readTopUnlock();
}
public void readUnlock() {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
index 7ebb1b3..f3569cc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
@@ -24,7 +24,7 @@ import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
-
+import java.util.NoSuchElementException;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
@@ -79,8 +79,7 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
public PartitionedGSet(final int capacity,
final Comparator<? super K> comparator,
- final LatchLock<?> latchLock,
- final E rootKey) {
+ final LatchLock<?> latchLock) {
this.partitions = new TreeMap<K, PartitionEntry>(comparator);
this.latchLock = latchLock;
// addNewPartition(rootKey).put(rootKey);
@@ -275,17 +274,36 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
* modifying other partitions, while iterating through the current one.
*/
private class EntryIterator implements Iterator<E> {
- private final Iterator<K> keyIterator;
+ private Iterator<K> keyIterator;
private Iterator<E> partitionIterator;
public EntryIterator() {
keyIterator = partitions.keySet().iterator();
- K curKey = partitions.firstKey();
- partitionIterator = getPartition(curKey).iterator();
+
+ if (!keyIterator.hasNext()) {
+ partitionIterator = null;
+ return;
+ }
+
+ K firstKey = keyIterator.next();
+ partitionIterator = partitions.get(firstKey).iterator();
}
@Override
public boolean hasNext() {
+
+ // Special case: an iterator was created for an empty PartitionedGSet.
+ // Check whether new partitions have been added since then.
+ if (partitionIterator == null) {
+ if (partitions.size() == 0) {
+ return false;
+ } else {
+ keyIterator = partitions.keySet().iterator();
+ K nextKey = keyIterator.next();
+ partitionIterator = partitions.get(nextKey).iterator();
+ }
+ }
+
while(!partitionIterator.hasNext()) {
if(!keyIterator.hasNext()) {
return false;
@@ -298,9 +316,8 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
@Override
public E next() {
- while(!partitionIterator.hasNext()) {
- K curKey = keyIterator.next();
- partitionIterator = getPartition(curKey).iterator();
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more elements in this set.");
}
return partitionIterator.next();
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestPartitionedGSet.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestPartitionedGSet.java
new file mode 100644
index 0000000..9ae772c
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestPartitionedGSet.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Testing {@link PartitionedGSet} */
+public class TestPartitionedGSet {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestPartitionedGSet.class);
+ private static final int ELEMENT_NUM = 100;
+
+ /**
+ * Generate positive random numbers for testing. We want to use only positive
+ * numbers because the smallest partition used in testing is 0.
+ *
+ * @param length
+ * number of random numbers to be generated.
+ *
+ * @param randomSeed
+ * seed to be used for random number generator.
+ *
+ * @return
+ * An array of Integers
+ */
+ private static ArrayList<Integer> getRandomList(int length, int randomSeed) {
+ Random random = new Random(randomSeed);
+ ArrayList<Integer> list = new ArrayList<Integer>(length);
+ for (int i = 0; i < length; i++) {
+ list.add(random.nextInt(Integer.MAX_VALUE));
+ }
+ return list;
+ }
+
+ private static class TestElement implements LinkedElement {
+ private final int val;
+ private LinkedElement next;
+
+ TestElement(int val) {
+ this.val = val;
+ this.next = null;
+ }
+
+ public int getVal() {
+ return val;
+ }
+
+ @Override
+ public void setNext(LinkedElement next) {
+ this.next = next;
+ }
+
+ @Override
+ public LinkedElement getNext() {
+ return next;
+ }
+ }
+
+ private static class TestElementComparator implements Comparator<TestElement>
+ {
+ @Override
+ public int compare(TestElement e1, TestElement e2) {
+ if (e1 == null || e2 == null) {
+ throw new NullPointerException("Cannot compare null elements");
+ }
+
+ return e1.getVal() - e2.getVal();
+ }
+ }
+
+ protected ReentrantReadWriteLock topLock =
+ new ReentrantReadWriteLock(false);
+ /**
+ * We are NOT testing any concurrent access to a PartitionedGSet here.
+ */
+ private class NoOpLock extends LatchLock<ReentrantReadWriteLock> {
+ private ReentrantReadWriteLock childLock;
+
+ public NoOpLock() {
+ childLock = new ReentrantReadWriteLock(false);
+ }
+
+ @Override
+ protected boolean isReadTopLocked() {
+ return topLock.getReadLockCount() > 0 || isWriteTopLocked();
+ }
+
+ @Override
+ protected boolean isWriteTopLocked() {
+ return topLock.isWriteLocked();
+ }
+
+ @Override
+ protected void readTopUnlock() {
+ topLock.readLock().unlock();
+ }
+
+ @Override
+ protected void writeTopUnlock() {
+ topLock.writeLock().unlock();
+ }
+
+ @Override
+ protected boolean hasReadChildLock() {
+ return childLock.getReadLockCount() > 0 || hasWriteChildLock();
+ }
+
+ @Override
+ protected void readChildLock() {
+ childLock.readLock().lock();
+ }
+
+ @Override
+ protected void readChildUnlock() {
+ childLock.readLock().unlock();
+ }
+
+ @Override
+ protected boolean hasWriteChildLock() {
+ return childLock.isWriteLockedByCurrentThread();
+ }
+
+ @Override
+ protected void writeChildLock() {
+ childLock.writeLock().lock();
+ }
+
+ @Override
+ protected void writeChildUnlock() {
+ childLock.writeLock().unlock();
+ }
+
+ @Override
+ protected LatchLock<ReentrantReadWriteLock> clone() {
+ return new NoOpLock();
+ }
+ }
+
+ /**
+ * Test iterator for a PartitionedGSet with no partitions.
+ */
+ @Test(timeout=60000)
+ public void testIteratorForNoPartition() {
+ PartitionedGSet<TestElement, TestElement> set =
+ new PartitionedGSet<TestElement, TestElement>(
+ 16, new TestElementComparator(), new NoOpLock());
+
+ topLock.readLock().lock();
+ int count = 0;
+ Iterator<TestElement> iter = set.iterator();
+ while( iter.hasNext() ) {
+ iter.next();
+ count ++;
+ }
+ topLock.readLock().unlock();
+ Assert.assertEquals(0, count);
+ }
+
+ /**
+ * Test iterator for a PartitionedGSet with empty partitions.
+ */
+ @Test(timeout=60000)
+ public void testIteratorForEmptyPartitions() {
+ PartitionedGSet<TestElement, TestElement> set =
+ new PartitionedGSet<TestElement, TestElement>(
+ 16, new TestElementComparator(), new NoOpLock());
+
+ set.addNewPartition(new TestElement(0));
+ set.addNewPartition(new TestElement(1000));
+ set.addNewPartition(new TestElement(2000));
+
+ topLock.readLock().lock();
+ int count = 0;
+ Iterator<TestElement> iter = set.iterator();
+ while( iter.hasNext() ) {
+ iter.next();
+ count ++;
+ }
+ topLock.readLock().unlock();
+ Assert.assertEquals(0, count);
+ }
+
+ /**
+ * Test whether the iterator can return the same number of elements as stored
+ * into the PartitionedGSet.
+ */
+ @Test(timeout=60000)
+ public void testIteratorCountElements() {
+ ArrayList<Integer> list = getRandomList(ELEMENT_NUM, 123);
+ PartitionedGSet<TestElement, TestElement> set =
+ new PartitionedGSet<TestElement, TestElement>(
+ 16, new TestElementComparator(), new NoOpLock());
+
+ set.addNewPartition(new TestElement(0));
+ set.addNewPartition(new TestElement(1000));
+ set.addNewPartition(new TestElement(2000));
+
+ topLock.writeLock().lock();
+ for (Integer i : list) {
+ set.put(new TestElement(i));
+ }
+ topLock.writeLock().unlock();
+
+ topLock.readLock().lock();
+ int count = 0;
+ Iterator<TestElement> iter = set.iterator();
+ while( iter.hasNext() ) {
+ iter.next();
+ count ++;
+ }
+ topLock.readLock().unlock();
+ Assert.assertEquals(ELEMENT_NUM, count);
+ }
+
+ /**
+ * Test iterator when it is created before partitions/elements are
+ * added to the PartitionedGSet.
+ */
+ @Test(timeout=60000)
+ public void testIteratorAddElementsAfterIteratorCreation() {
+ PartitionedGSet<TestElement, TestElement> set =
+ new PartitionedGSet<TestElement, TestElement>(
+ 16, new TestElementComparator(), new NoOpLock());
+
+ // Create the iterator before partitions are added.
+ Iterator<TestElement> iter = set.iterator();
+
+ set.addNewPartition(new TestElement(0));
+ set.addNewPartition(new TestElement(1000));
+ set.addNewPartition(new TestElement(2000));
+
+ // Added one element
+ topLock.writeLock().lock();
+ set.put(new TestElement(2500));
+ topLock.writeLock().unlock();
+
+ topLock.readLock().lock();
+ int count = 0;
+ while( iter.hasNext() ) {
+ iter.next();
+ count ++;
+ }
+ topLock.readLock().unlock();
+ Assert.assertEquals(1, count);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
index 3b07dce..a0253b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
@@ -121,7 +121,7 @@ public class INodeMap {
}
@Override
- protected void readTopdUnlock() {
+ protected void readTopUnlock() {
namesystem.getFSLock().readUnlock("INodeMap", null, false);
}
@@ -194,7 +194,7 @@ public class INodeMap {
// Compute the map capacity by allocating 1% of total memory
int capacity = LightWeightGSet.computeCapacity(1, "INodeMap");
this.map = new PartitionedGSet<>(capacity, new INodeKeyComparator(),
- new INodeMapLock(), rootDir);
+ new INodeMapLock());
// Pre-populate initial empty partitions
PartitionedGSet<INode, INodeWithAdditionalFields> pgs =
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org