You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2021/07/23 22:30:43 UTC

[hadoop] branch fgl updated (b1e2c07 -> 7598a68)

This is an automated email from the ASF dual-hosted git repository.

shv pushed a change to branch fgl
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    omit b1e2c07  HDFS-16125. [FGL] Fix the iterator for PartitionedGSet. Contributed by Xing Lin. (#3197)
    omit 1829b58  Add namespace key for INode. (shv)
    omit 0e7d216  INodeMap with PartitionedGSet and per-partition locking.
     add 6ed7670  HDFS-16067. Support Append API in NNThroughputBenchmark. Contributed by Renukaprasad C.
     add 0ac443b  YARN-10855. yarn logs cli fails to retrieve logs if any TFile is corrupt or empty. Contributed by Jim Brennan.
     add 17bf2fc  YARN-10858. [UI2] YARN-10826 breaks Queue view. (#3213)
     add e1d00ad  HADOOP-16290. Enable RpcMetrics units to be configurable (#3198)
     add de41ce8  HDFS-16087. Fix stuck issue in rbfbalance tool (#3141).  Contributed by Eric Yin.
     add e634bf3  YARN-10630. [UI2] Ambiguous queue name resolution (#3214)
     add 0441efe  YARN-10860. Make max container per heartbeat configs refreshable. Contributed by Eric Badger.
     add dbd255f  HADOOP-17796. Upgrade jetty version to 9.4.43 (#3208)
     add 2da9b95  YARN-10657. We should make max application per queue to support node label. Contributed by Andras Gyori.
     add 98412ce  HADOOP-17813. Checkstyle - Allow line length: 100
     add 3a52bfc  HADOOP-17808. ipc.Client to set interrupt flag after catching InterruptedException (#3219)
     add aa1a5dd  YARN-10829. Support getApplications API in FederationClientInterceptor (#3135)
     add 63dfd84  HADOOP-17458. S3A to treat "SdkClientException: Data read has a different length than the expected" as EOFException (#3040)
     new 6c2b39e  INodeMap with PartitionedGSet and per-partition locking.
     new b784277  Add namespace key for INode. (shv)
     new d6efb60  HDFS-16125. [FGL] Fix the iterator for PartitionedGSet. Contributed by Xing Lin. (#3197)
     new 0a73bde  HDFS-16128. [FGL] Added support for saving/loading an FS Image for PartitionedGSet. Contributed by Xing Lin. (#3201)
     new 7598a68  HDFS-16130. [FGL] Implement CREATE File with FGL. Contributed by Renukaprasad C. (#3205)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (b1e2c07)
            \
             N -- N -- N   refs/heads/fgl (7598a68)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/main/resources/checkstyle/checkstyle.xml   |   4 +-
 .../apache/hadoop/fs/CommonConfigurationKeys.java  |   4 +-
 .../main/java/org/apache/hadoop/ipc/Client.java    |   8 +-
 .../org/apache/hadoop/ipc/DecayRpcScheduler.java   |   8 +-
 .../java/org/apache/hadoop/ipc/RpcScheduler.java   |   8 +-
 .../main/java/org/apache/hadoop/ipc/Server.java    |  10 +-
 .../org/apache/hadoop/ipc/metrics/RpcMetrics.java  |  38 +++++-
 .../org/apache/hadoop/util/PartitionedGSet.java    |  21 +++-
 .../src/main/resources/core-default.xml            |  15 +++
 .../src/site/markdown/Benchmarking.md              |   1 +
 .../hadoop-common/src/site/markdown/Metrics.md     |   2 +
 .../test/java/org/apache/hadoop/ipc/TestRPC.java   |  69 ++++++++++-
 .../hdfs/rbfbalance/RouterDistCpProcedure.java     |   1 +
 .../hdfs/rbfbalance/TestRouterDistCpProcedure.java | 120 ++++++++++++++++++
 .../hadoop/hdfs/server/namenode/FSDirMkdirOp.java  |  40 ++++--
 .../hdfs/server/namenode/FSDirWriteFileOp.java     |  94 +++++++++-----
 .../hadoop/hdfs/server/namenode/FSDirectory.java   |  83 +++++++++++++
 .../hadoop/hdfs/server/namenode/FSImage.java       |  12 ++
 .../hdfs/server/namenode/FSImageFormatPBINode.java |  11 +-
 .../hadoop/hdfs/server/namenode/INodeMap.java      |  77 +++++++++---
 .../server/namenode/NNThroughputBenchmark.java     |  52 ++++++++
 .../server/namenode/TestNNThroughputBenchmark.java |  46 +++++++
 hadoop-project/pom.xml                             |   2 +-
 .../java/org/apache/hadoop/fs/s3a/S3AUtils.java    |  16 ++-
 .../java/org/apache/hadoop/fs/s3a/TestInvoker.java |  36 ++++++
 .../hadoop/tools/fedbalance/DistCpProcedure.java   |   4 +-
 .../tools/fedbalance/TestDistCpProcedure.java      |   6 +-
 .../protocolrecords/GetApplicationsResponse.java   |  12 ++
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |   9 ++
 .../yarn/conf/TestYarnConfigurationFields.java     |   2 +
 .../apache/hadoop/yarn/client/cli/TestLogsCLI.java |  30 +++++
 .../yarn/logaggregation/AggregatedLogFormat.java   |   2 +-
 .../tfile/LogAggregationTFileController.java       |  33 ++---
 .../scheduler/capacity/AbstractCSQueue.java        |  46 +++----
 .../scheduler/capacity/CapacityScheduler.java      |  11 +-
 .../capacity/CapacitySchedulerQueueManager.java    |   5 +
 .../scheduler/capacity/LeafQueue.java              |   9 +-
 .../TestCapacitySchedulerAutoQueueCreation.java    |  10 +-
 .../scheduler/capacity/TestLeafQueue.java          |  39 ++++++
 .../clientrm/FederationClientInterceptor.java      |  48 +++++++-
 .../router/clientrm/RouterYarnClientUtils.java     | 137 +++++++++++++++++++++
 .../clientrm/TestFederationClientInterceptor.java  | 118 +++++++++++++++++-
 .../router/clientrm/TestRouterYarnClientUtils.java | 117 ++++++++++++++++++
 .../src/site/markdown/CapacityScheduler.md         |   4 +-
 .../main/webapp/app/components/tree-selector.js    |  14 +--
 .../webapp/app/models/yarn-queue/capacity-queue.js |   2 +-
 .../app/serializers/yarn-queue/capacity-queue.js   |  14 +--
 .../hadoop-yarn-ui/src/main/webapp/bower.json      |   2 +-
 48 files changed, 1269 insertions(+), 183 deletions(-)
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/rbfbalance/TestRouterDistCpProcedure.java

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 05/05: HDFS-16130. [FGL] Implement CREATE File with FGL. Contributed by Renukaprasad C. (#3205)

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shv pushed a commit to branch fgl
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 7598a68b3bcb539a199c595491a03f64cd777e4e
Author: Renukaprasad C <pr...@yahoo.co.in>
AuthorDate: Fri Jul 23 15:24:34 2021 -0700

    HDFS-16130. [FGL] Implement CREATE File with FGL. Contributed by Renukaprasad C. (#3205)
---
 .../hadoop/hdfs/server/namenode/FSDirMkdirOp.java  | 36 ++++++---
 .../hdfs/server/namenode/FSDirWriteFileOp.java     | 94 +++++++++++++++-------
 2 files changed, 87 insertions(+), 43 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
index 1c979e5..4a15fd9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
@@ -70,18 +70,7 @@ class FSDirMkdirOp {
         // create multiple inodes.
         fsn.checkFsObjectLimit();
 
-        // create all missing directories along the path,
-        // but don't add them to the INodeMap yet
-        permissions = addImplicitUwx(permissions, permissions); // SHV !!!
-        INode[] missing = createPathDirectories(fsd, iip, permissions);
-        iip = iip.getExistingINodes();
-        // switch the locks
-        fsd.getINodeMap().latchWriteLock(iip, missing);
-        // Add missing inodes to the INodeMap
-        for(INode dir : missing) {
-          iip = addSingleDirectory(fsd, iip, dir, permissions);
-          assert iip != null : "iip should not be null";
-        }
+        iip = createMissingDirs(fsd, iip, permissions);
       }
       return fsd.getAuditFileInfo(iip);
     } finally {
@@ -89,6 +78,26 @@ class FSDirMkdirOp {
     }
   }
 
+  static INodesInPath createMissingDirs(FSDirectory fsd,
+      INodesInPath iip, PermissionStatus permissions) throws IOException {
+    // create all missing directories along the path,
+    // but don't add them to the INodeMap yet
+    permissions = addImplicitUwx(permissions, permissions); // SHV !!!
+    INode[] missing = createPathDirectories(fsd, iip, permissions);
+    iip = iip.getExistingINodes();
+    if (missing.length == 0) {
+      return iip;
+    }
+    // switch the locks
+    fsd.getINodeMap().latchWriteLock(iip, missing);
+    // Add missing inodes to the INodeMap
+    for (INode dir : missing) {
+      iip = addSingleDirectory(fsd, iip, dir, permissions);
+      assert iip != null : "iip should not be null";
+    }
+    return iip;
+  }
+
   /**
    * For a given absolute path, create all ancestors as directories along the
    * path. All ancestors inherit their parent's permission plus an implicit
@@ -253,6 +262,9 @@ class FSDirMkdirOp {
     return dir;
   }
 
+  /**
+   * Find-out missing iNodes for the current mkdir OP.
+   */
   private static INode[] createPathDirectories(FSDirectory fsd,
       INodesInPath iip, PermissionStatus perm)
       throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 0d9c6ae..f2cca7b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -228,6 +228,13 @@ class FSDirWriteFileOp {
     // while chooseTarget() was executing.
     LocatedBlock[] onRetryBlock = new LocatedBlock[1];
     INodesInPath iip = fsn.dir.resolvePath(null, src, fileId);
+
+    INode[] missing = new INode[]{iip.getLastINode()};
+    INodesInPath existing = iip.getParentINodesInPath();
+    FSDirectory fsd = fsn.getFSDirectory();
+    // switch the locks
+    fsd.getINodeMap().latchWriteLock(existing, missing);
+
     FileState fileState = analyzeFileState(fsn, iip, fileId, clientName,
                                            previous, onRetryBlock);
     final INodeFile pendingFile = fileState.inode;
@@ -392,8 +399,8 @@ class FSDirWriteFileOp {
     }
     fsn.checkFsObjectLimit();
     INodeFile newNode = null;
-    INodesInPath parent =
-        FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions);
+    INodesInPath parent = FSDirMkdirOp.createMissingDirs(fsd,
+        iip.getParentINodesInPath(), permissions);
     if (parent != null) {
       iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
           replication, blockSize, holder, clientMachine, shouldReplicate,
@@ -541,41 +548,22 @@ class FSDirWriteFileOp {
       FSDirectory fsd, INodesInPath existing, byte[] localName,
       PermissionStatus permissions, short replication, long preferredBlockSize,
       String clientName, String clientMachine, boolean shouldReplicate,
-      String ecPolicyName, String storagePolicy) throws IOException {
+      String ecPolicyName, String storagePolicy)
+      throws IOException {
 
     Preconditions.checkNotNull(existing);
     long modTime = now();
     INodesInPath newiip;
     fsd.writeLock();
     try {
-      boolean isStriped = false;
-      ErasureCodingPolicy ecPolicy = null;
-      byte storagepolicyid = 0;
-      if (storagePolicy != null && !storagePolicy.isEmpty()) {
-        BlockStoragePolicy policy =
-            fsd.getBlockManager().getStoragePolicy(storagePolicy);
-        if (policy == null) {
-          throw new HadoopIllegalArgumentException(
-              "Cannot find a block policy with the name " + storagePolicy);
-        }
-        storagepolicyid = policy.getId();
-      }
-      if (!shouldReplicate) {
-        ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
-            fsd.getFSNamesystem(), ecPolicyName, existing);
-        if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) {
-          isStriped = true;
-        }
-      }
-      final BlockType blockType = isStriped ?
-          BlockType.STRIPED : BlockType.CONTIGUOUS;
-      final Short replicationFactor = (!isStriped ? replication : null);
-      final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
-      INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
-          modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize,
-          storagepolicyid, blockType);
-      newNode.setLocalName(localName);
-      newNode.toUnderConstruction(clientName, clientMachine);
+      INodeFile newNode = createINodeFile(fsd, existing, localName,
+          permissions, replication, preferredBlockSize, clientName,
+          clientMachine, shouldReplicate, ecPolicyName, storagePolicy, modTime);
+
+      INode[] missing = new INode[] {newNode};
+      // switch the locks
+      fsd.getINodeMap().latchWriteLock(existing, missing);
+
       newiip = fsd.addINode(existing, newNode, permissions.getPermission());
     } finally {
       fsd.writeUnlock();
@@ -593,6 +581,42 @@ class FSDirWriteFileOp {
     return newiip;
   }
 
+  private static INodeFile createINodeFile(FSDirectory fsd,
+      INodesInPath existing, byte[] localName, PermissionStatus permissions,
+      short replication, long preferredBlockSize, String clientName,
+      String clientMachine, boolean shouldReplicate, String ecPolicyName,
+      String storagePolicy, long modTime) throws IOException {
+    boolean isStriped = false;
+    ErasureCodingPolicy ecPolicy = null;
+    byte storagepolicyid = 0;
+    if (storagePolicy != null && !storagePolicy.isEmpty()) {
+      BlockStoragePolicy policy =
+          fsd.getBlockManager().getStoragePolicy(storagePolicy);
+      if (policy == null) {
+        throw new HadoopIllegalArgumentException(
+            "Cannot find a block policy with the name " + storagePolicy);
+      }
+      storagepolicyid = policy.getId();
+    }
+    if (!shouldReplicate) {
+      ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
+          fsd.getFSNamesystem(), ecPolicyName, existing);
+      if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) {
+        isStriped = true;
+      }
+    }
+    final BlockType blockType = isStriped ?
+        BlockType.STRIPED : BlockType.CONTIGUOUS;
+    final Short replicationFactor = (!isStriped ? replication : null);
+    final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
+    INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
+        modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize,
+        storagepolicyid, blockType);
+    newNode.setLocalName(localName);
+    newNode.toUnderConstruction(clientName, clientMachine);
+    return newNode;
+  }
+
   private static FileState analyzeFileState(
       FSNamesystem fsn, INodesInPath iip, long fileId, String clientName,
       ExtendedBlock previous, LocatedBlock[] onRetryBlock)
@@ -687,6 +711,14 @@ class FSDirWriteFileOp {
     }
     checkBlock(fsn, last);
     INodesInPath iip = fsn.dir.resolvePath(pc, src, fileId);
+
+    assert (iip.getLastINode() instanceof INodeFile);
+    INode[] missing = new INode[] {iip.getLastINode()};
+    INodesInPath existing = iip.getParentINodesInPath();
+    // switch the locks
+    FSDirectory fsd = fsn.getFSDirectory();
+    fsd.getINodeMap().latchWriteLock(existing, missing);
+
     return completeFileInternal(fsn, iip, holder,
         ExtendedBlock.getLocalBlock(last), fileId);
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 04/05: HDFS-16128. [FGL] Added support for saving/loading an FS Image for PartitionedGSet. Contributed by Xing Lin. (#3201)

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shv pushed a commit to branch fgl
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 0a73bde9a0051060943fdd2d7461008f471ce30d
Author: Xing Lin <xi...@linkedin.com>
AuthorDate: Fri Jul 23 12:58:31 2021 -0700

    HDFS-16128. [FGL] Added support for saving/loading an FS Image for PartitionedGSet. Contributed by Xing Lin. (#3201)
---
 .../org/apache/hadoop/util/PartitionedGSet.java    | 21 ++++--
 .../hadoop/hdfs/server/namenode/FSDirMkdirOp.java  |  4 +-
 .../hadoop/hdfs/server/namenode/FSDirectory.java   | 83 ++++++++++++++++++++++
 .../hadoop/hdfs/server/namenode/FSImage.java       | 12 ++++
 .../hdfs/server/namenode/FSImageFormatPBINode.java | 11 +--
 .../hadoop/hdfs/server/namenode/INodeMap.java      | 77 ++++++++++++++++----
 6 files changed, 182 insertions(+), 26 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
index f3569cc..5fe50ab 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
@@ -68,7 +68,7 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
    * Consists of a hash table {@link LightWeightGSet} and a lock, which
    * controls access to this partition independently on the other ones.
    */
-  private class PartitionEntry extends LightWeightGSet<K, E> {
+  public class PartitionEntry extends LightWeightGSet<K, E> {
     private final LatchLock<?> partLock;
 
     PartitionEntry(int defaultPartitionCapacity) {
@@ -121,7 +121,7 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
     return size;
   }
 
-  protected PartitionEntry getPartition(final K key) {
+  public PartitionEntry getPartition(final K key) {
     Entry<K, PartitionEntry> partEntry = partitions.floorEntry(key);
     if(partEntry == null) {
       return null;
@@ -174,6 +174,10 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
     E result = part.put(element);
     if(result == null) {  // new element
       size++;
+      LOG.debug("partitionPGSet.put: added key {}, size is now {} ", key, size);
+    } else {
+      LOG.debug("partitionPGSet.put: replaced key {}, size is now {}",
+          key, size);
     }
     return result;
   }
@@ -230,15 +234,18 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
       try {
         long[] key = (long[]) inodeClass.
             getMethod("getNamespaceKey", int.class).invoke(e.getKey(), 2);
-        long[] firstKey = new long[0];
+        long[] firstKey = new long[key.length];
         if(part.iterator().hasNext()) {
           Object first = part.iterator().next();
-          firstKey = (long[]) inodeClass.getMethod(
+          long[] firstKeyRef = (long[]) inodeClass.getMethod(
             "getNamespaceKey", int.class).invoke(first, 2);
           Object parent = inodeClass.
               getMethod("getParent").invoke(first);
           long parentId = (parent == null ? 0L :
             (long) inodeClass.getMethod("getId").invoke(parent));
+          for (int j=0; j < key.length; j++) {
+            firstKey[j] = firstKeyRef[j];
+          }
           firstKey[0] = parentId;
         }
         LOG.error("Partition #{}\t key: {}\t size: {}\t first: {}",
@@ -250,8 +257,8 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
     partSizeAvg = (int) (totalSize / parts.size());
     LOG.error("Partition sizes: min = {}, avg = {}, max = {}, sum = {}",
         partSizeMin, partSizeAvg, partSizeMax, totalSize);
-    LOG.error("Number of partitions: empty = {}, full = {}",
-        numEmptyPartitions, numFullPartitions);
+    LOG.error("Number of partitions: empty = {}, in-use = {}, full = {}",
+        numEmptyPartitions, parts.size()-numEmptyPartitions, numFullPartitions);
   }
 
   @Override
@@ -277,6 +284,8 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
     private Iterator<K> keyIterator;
     private Iterator<E> partitionIterator;
 
+    // Set partitionIterator to point to the first partition, or set it to null
+    // when there is no partitions created for this PartitionedGSet.
     public EntryIterator() {
       keyIterator = partitions.keySet().iterator();
  
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
index 5a40906..1c979e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.security.AccessControlException;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import static org.apache.hadoop.util.Time.now;
 
@@ -146,7 +147,8 @@ class FSDirMkdirOp {
         existing = createSingleDirectory(fsd, existing, component, perm);
         if(existing == null) {
           FSNamesystem.LOG.error("unprotectedMkdir returned null for "
-              + iip.getPath() + " for " + new String(component) + " i = " + i);
+              + iip.getPath() + " for "
+              + Arrays.toString(component) + " i = " + i);
           // Somebody already created the parent. Recalculate existing
           existing = INodesInPath.resolve(fsd.getRoot(), iip.getPathComponents());
           i = existing.length() - 1;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index b5e68a0..3d32e7a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -17,7 +17,11 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.util.Iterator;
+import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.util.GSet;
+import org.apache.hadoop.util.LightWeightGSet;
 import org.apache.hadoop.util.StringUtils;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -160,6 +164,8 @@ public class FSDirectory implements Closeable {
   private final int contentCountLimit; // max content summary counts per run
   private final long contentSleepMicroSec;
   private final INodeMap inodeMap; // Synchronized by dirLock
+  // Temp InodeMap used when loading an FS image.
+  private final GSet<INode, INodeWithAdditionalFields> inodeMapTemp;
   private long yieldCount = 0; // keep track of lock yield count.
   private int quotaInitThreads;
 
@@ -318,6 +324,11 @@ public class FSDirectory implements Closeable {
     this.inodeId = new INodeId();
     rootDir = createRoot(ns);
     inodeMap = INodeMap.newInstance(rootDir, ns);
+    inodeMapTemp = new LightWeightGSet<INode, INodeWithAdditionalFields>(1000);
+
+    // add rootDir to inodeMapTemp.
+    inodeMapTemp.put(rootDir);
+
     this.isPermissionEnabled = conf.getBoolean(
       DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
       DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
@@ -1475,6 +1486,26 @@ public class FSDirectory implements Closeable {
   public INodeMap getINodeMap() {
     return inodeMap;
   }
+  public GSet<INode, INodeWithAdditionalFields> getTempINodeMap() {
+    return inodeMapTemp;
+  }
+
+  public final void addToTempInodeMap(INode inode) {
+    if (inode instanceof INodeWithAdditionalFields) {
+      LOG.debug("addToTempInodeMap: id={}, inodeMapTemp.size={}",
+          inode.getId(), inodeMapTemp.size());
+      inodeMapTemp.put((INodeWithAdditionalFields) inode);
+      if (!inode.isSymlink()) {
+        final XAttrFeature xaf = inode.getXAttrFeature();
+        addEncryptionZone((INodeWithAdditionalFields) inode, xaf);
+        StoragePolicySatisfyManager spsManager =
+            namesystem.getBlockManager().getSPSManager();
+        if (spsManager != null && spsManager.isEnabled()) {
+          addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf);
+        }
+      }
+    }
+  }
 
   /**
    * This method is always called with writeLock of FSDirectory held.
@@ -1544,6 +1575,43 @@ public class FSDirectory implements Closeable {
   }
 
   /**
+   * After the inodes are set properly (set the parent for each inode), we move
+   * them from INodeMapTemp to INodeMap.
+   */
+  void moveInodes() throws IOException {
+    long count=0, inodeNum = inodeMapTemp.size();
+    LOG.debug("inodeMapTemp={}", inodeMapTemp);
+
+    /**
+     * Note:
+     * LightweightGSet uses linked lists, to implement a map. Thus, to move an
+     * Inode from one LightweightGSet (inodeMapTemp) to another (inodeMap),
+     * we need to first remove it from its original LightweightGSet and then
+     * add it to the new LightweightGSet.
+     */
+    Iterator<INodeWithAdditionalFields> iter = inodeMapTemp.iterator();
+    while (iter.hasNext()) {
+      INodeWithAdditionalFields n = iter.next();
+      iter.remove();
+
+      LOG.debug("populate {}-th inode: id={}, fullpath={}",
+          count, n.getId(), n.getFullPathName());
+
+      inodeMap.put(n);
+      count++;
+    }
+
+    if (count != inodeNum) {
+      String msg = String.format("moveInodes: expected to move %l inodes, " +
+          "but moved %l inodes", inodeNum, count);
+      throw new IOException(msg);
+    }
+
+    //inodeMap.show();
+    inodeMapTemp.clear();
+  }
+
+  /**
    * This method is always called with writeLock of FSDirectory held.
    */
   public final void removeFromInodeMap(List<? extends INode> inodes) {
@@ -1860,6 +1928,21 @@ public class FSDirectory implements Closeable {
     }
   }
 
+  public INode getInode(INode inode) {
+    return inodeMap.get(inode);
+  }
+  public INode getInodeFromTempINodeMap(long id) {
+    LOG.debug("getInodeFromTempINodeMap: id={}, TempINodeMap.size={}",
+        id, inodeMapTemp.size());
+    /*
+     * Convert a long inode id into an INode object. We only need to compare
+     * two inodes by inode id. So, it can be any type of INode object.
+     */
+    INode inode = new INodeDirectory(id, null,
+        new PermissionStatus("", "", new FsPermission((short) 0)), 0);
+
+    return inodeMapTemp.get(inode);
+  }
   @VisibleForTesting
   FSPermissionChecker getPermissionChecker(String fsOwner, String superGroup,
       UserGroupInformation ugi) throws AccessControlException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index 1305438..0f0024e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -761,6 +761,16 @@ public class FSImage implements Closeable {
           "above for more info.");
     }
     prog.endPhase(Phase.LOADING_FSIMAGE);
+
+    /*
+     * loadEdits always sets the parent of an inode before adding the inode to
+     * inodeMap. So, it is safe to move inodes from inodeMapTemp to inodeMap
+     * before loadEdits.
+     */
+    FSDirectory dir = target.getFSDirectory();
+    dir.moveInodes();
+    LOG.info("LOADING_FSIMAGE: loaded {} inodes into inodeMap",
+        dir.getINodeMap().size());
     
     if (!rollingRollback) {
       prog.beginPhase(Phase.LOADING_EDITS);
@@ -776,6 +786,8 @@ public class FSImage implements Closeable {
       needToSave = false;
     }
     editLog.setNextTxId(lastAppliedTxId + 1);
+    LOG.info("LOADING_EDITS: loaded {} inodes into inodeMap",
+        dir.getINodeMap().size());
     return needToSave;
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index 0a69c99..fe37b82 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@ -276,9 +276,10 @@ public final class FSImageFormatPBINode {
         if (e == null) {
           break;
         }
-        INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
+        INodeDirectory p =
+            dir.getInodeFromTempINodeMap(e.getParent()).asDirectory();
         for (long id : e.getChildrenList()) {
-          INode child = dir.getInode(id);
+          INode child = dir.getInodeFromTempINodeMap(id);
           if (!addToParent(p, child)) {
             LOG.warn("Failed to add the inode {} to the directory {}",
                 child.getId(), p.getId());
@@ -382,6 +383,7 @@ public final class FSImageFormatPBINode {
         if (p == null) {
           break;
         }
+        LOG.debug("loadINodesInSection: cntr={}, inode={}", cntr, p.getId());
         if (p.getId() == INodeId.ROOT_INODE_ID) {
           synchronized(this) {
             loadRootINode(p);
@@ -389,7 +391,7 @@ public final class FSImageFormatPBINode {
         } else {
           INode n = loadINode(p);
           synchronized(this) {
-            dir.addToInodeMap(n);
+            dir.addToTempInodeMap(n);
           }
           fillUpInodeList(inodeList, n);
         }
@@ -761,7 +763,7 @@ public final class FSImageFormatPBINode {
               DirEntry.newBuilder().setParent(n.getId());
           for (INode inode : children) {
             // Error if the child inode doesn't exist in inodeMap
-            if (dir.getInode(inode.getId()) == null) {
+            if (dir.getInode(inode) == null) {
               FSImage.LOG.error(
                   "FSImageFormatPBINode#serializeINodeDirectorySection: " +
                       "Dangling child pointer found. Missing INode in " +
@@ -812,6 +814,7 @@ public final class FSImageFormatPBINode {
       Iterator<INodeWithAdditionalFields> iter = inodesMap.getMapIterator();
       while (iter.hasNext()) {
         INodeWithAdditionalFields n = iter.next();
+        LOG.debug("i = {}, save inode: {}", i, n);
         save(out, n);
         ++i;
         if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
index a0253b6..4c34954 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -35,12 +36,12 @@ import org.apache.hadoop.util.PartitionedGSet;
  * and INode.  
  */
 public class INodeMap {
-  static final int NAMESPACE_KEY_DEBTH = 2;
+  static final int NAMESPACE_KEY_DEPTH = 2;
   static final int NUM_RANGES_STATIC = 256;  // power of 2
 
   public static class INodeKeyComparator implements Comparator<INode> {
     INodeKeyComparator() {
-      FSDirectory.LOG.info("Namespace key debth = {}", NAMESPACE_KEY_DEBTH);
+      FSDirectory.LOG.info("Namespace key depth = {}", NAMESPACE_KEY_DEPTH);
     }
 
     @Override
@@ -48,9 +49,9 @@ public class INodeMap {
       if (i1 == null || i2 == null) {
         throw new NullPointerException("Cannot compare null INodes");
       }
-      long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEBTH);
-      long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEBTH);
-      for(int l = 0; l < NAMESPACE_KEY_DEBTH; l++) {
+      long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEPTH);
+      long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEPTH);
+      for(int l = 0; l < NAMESPACE_KEY_DEPTH; l++) {
         if(key1[l] == key2[l]) continue;
         return (key1[l] < key2[l] ? -1 : 1);
       }
@@ -64,7 +65,7 @@ public class INodeMap {
    */
   public static class HPINodeKeyComparator implements Comparator<INode> {
     HPINodeKeyComparator() {
-      FSDirectory.LOG.info("Namespace key debth = {}", NAMESPACE_KEY_DEBTH);
+      FSDirectory.LOG.info("Namespace key depth = {}", NAMESPACE_KEY_DEPTH);
     }
 
     @Override
@@ -72,13 +73,13 @@ public class INodeMap {
       if (i1 == null || i2 == null) {
         throw new NullPointerException("Cannot compare null INodes");
       }
-      long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEBTH);
-      long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEBTH);
+      long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEPTH);
+      long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEPTH);
       long key1_0 = INode.indexOf(key1);
       long key2_0 = INode.indexOf(key2);
       if(key1_0 != key2_0)
         return (key1_0 < key2_0 ? -1 : 1);
-      for(int l = 1; l < NAMESPACE_KEY_DEBTH; l++) {
+      for(int l = 1; l < NAMESPACE_KEY_DEPTH; l++) {
         if(key1[l] == key2[l]) continue;
         return (key1[l] < key2[l] ? -1 : 1);
       }
@@ -202,8 +203,8 @@ public class INodeMap {
     PermissionStatus perm = new PermissionStatus(
         "", "", new FsPermission((short) 0));
     for(int p = 0; p < NUM_RANGES_STATIC; p++) {
-      INodeDirectory key = new INodeDirectory(
-          INodeId.ROOT_INODE_ID, "range key".getBytes(), perm, 0);
+      INodeDirectory key = new INodeDirectory(INodeId.ROOT_INODE_ID,
+          "range key".getBytes(StandardCharsets.UTF_8), perm, 0);
       key.setParent(new INodeDirectory((long)p, null, perm, 0));
       pgs.addNewPartition(key);
     }
@@ -244,8 +245,11 @@ public class INodeMap {
    *         such {@link INode} in the map.
    */
   public INode get(long id) {
-    INode inode = new INodeWithAdditionalFields(id, null, new PermissionStatus(
-        "", "", new FsPermission((short) 0)), 0, 0) {
+    PartitionedGSet<INode, INodeWithAdditionalFields> pgs =
+        (PartitionedGSet<INode, INodeWithAdditionalFields>) map;
+
+    INode inode = new INodeWithAdditionalFields(id, null,
+        new PermissionStatus("", "", new FsPermission((short) 0)), 0, 0) {
       
       @Override
       void recordModification(int latestSnapshotId) {
@@ -275,7 +279,7 @@ public class INodeMap {
       }
 
       @Override
-      public byte getStoragePolicyID(){
+      public byte getStoragePolicyID() {
         return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
       }
 
@@ -284,8 +288,51 @@ public class INodeMap {
         return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
       }
     };
+
+    /*
+     * Iterate all partitions of PGSet and return the INode.
+     * Just for fallback.
+     */
+    PermissionStatus perm =
+        new PermissionStatus("", "", new FsPermission((short) 0));
+    // TODO: create a static array, to avoid creation of keys each time.
+    for (int p = 0; p < NUM_RANGES_STATIC; p++) {
+      INodeDirectory key = new INodeDirectory(
+          INodeId.ROOT_INODE_ID, "range key".getBytes(), perm, 0);
+      key.setParent(new INodeDirectory((long)p, null, perm, 0));
+      PartitionedGSet<INode, INodeWithAdditionalFields>.PartitionEntry e =
+          pgs.getPartition(key);
       
-    return map.get(inode);
+      if (e.contains(inode)) {
+        return (INode) e.get(inode);
+      }
+    }
+
+    return null;
+  }
+
+  public INode get(INode inode) {
+
+    /*
+     * Check whether the Inode has (NAMESPACE_KEY_DEPTH - 1) levels of parent
+     * dirs
+     */
+    int i = NAMESPACE_KEY_DEPTH - 1;
+    INode tmpInode = inode;
+    while (i > 0 && tmpInode.getParent() != null) {
+      tmpInode = tmpInode.getParent();
+      i--;
+    }
+
+    /*
+     * If the Inode has (NAMESPACE_KEY_DEPTH - 1) levels of parent dirs,
+     * use map.get(); else, fall back to get INode based on Inode ID.
+     */
+    if (i == 0) {
+      return map.get(inode);
+    } else {
+      return get(inode.getId());
+    }
   }
   
   /**

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/05: INodeMap with PartitionedGSet and per-partition locking.

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shv pushed a commit to branch fgl
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 6c2b39e8e418d4d54932506752828a59e6e92feb
Author: Konstantin V Shvachko <sh...@apache.org>
AuthorDate: Fri May 7 17:47:37 2021 -0700

    INodeMap with PartitionedGSet and per-partition locking.
---
 .../java/org/apache/hadoop/util/LatchLock.java     |  64 +++++
 .../org/apache/hadoop/util/PartitionedGSet.java    | 263 +++++++++++++++++++++
 .../hadoop/hdfs/server/namenode/FSDirMkdirOp.java  |  92 ++++++-
 .../hadoop/hdfs/server/namenode/FSDirectory.java   |   2 +-
 .../hadoop/hdfs/server/namenode/FSImage.java       |  29 ++-
 .../hadoop/hdfs/server/namenode/FSNamesystem.java  |   9 +-
 .../hdfs/server/namenode/FSNamesystemLock.java     |  96 +++++++-
 .../hadoop/hdfs/server/namenode/INodeMap.java      | 148 ++++++++++--
 .../java/org/apache/hadoop/hdfs/DFSTestUtil.java   |   2 +
 .../hadoop/hdfs/server/namenode/TestINodeFile.java |  39 ++-
 10 files changed, 682 insertions(+), 62 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java
new file mode 100644
index 0000000..41e33da
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+/**
+ * LatchLock controls two hierarchical Read/Write locks:
+ * the topLock and the childLock.
+ * Typically an operation starts with the topLock already acquired.
+ * To acquire child lock LatchLock will
+ * first acquire the childLock, and then release the topLock.
+ */
+public abstract class LatchLock<C> {
+  // Interfaces methods to be defined for subclasses
+  /** @return true topLock is locked for read by any thread */
+  protected abstract boolean isReadTopLocked();
+  /** @return true topLock is locked for write by any thread */
+  protected abstract boolean isWriteTopLocked();
+  protected abstract void readTopdUnlock();
+  protected abstract void writeTopUnlock();
+
+  protected abstract boolean hasReadChildLock();
+  protected abstract void readChildLock();
+  protected abstract void readChildUnlock();
+
+  protected abstract boolean hasWriteChildLock();
+  protected abstract void writeChildLock();
+  protected abstract void writeChildUnlock();
+
+  protected abstract LatchLock<C> clone();
+
+  // Public APIs to use with the class
+  public void readLock() {
+    readChildLock();
+    readTopdUnlock();
+  }
+
+  public void readUnlock() {
+    readChildUnlock();
+  }
+
+  public void writeLock() {
+    writeChildLock();
+    writeTopUnlock();
+  }
+
+  public void writeUnlock() {
+    writeChildUnlock();
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
new file mode 100644
index 0000000..4b0cdc9
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
+
+/**
+ * An implementation of {@link GSet}, which splits a collection of elements
+ * into partitions each corresponding to a range of keys.
+ *
+ * This class does not support null element.
+ *
+ * This class is backed up by LatchLock for hierarchical synchronization.
+ *
+ * @param <K> Key type for looking up the elements
+ * @param <E> Element type, which must be
+ *       (1) a subclass of K, and
+ *       (2) implementing {@link LinkedElement} interface.
+ */
+@InterfaceAudience.Private
+public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
+
+  private static final int DEFAULT_PARTITION_CAPACITY = 2027;
+
+  /**
+   * An ordered map of contiguous segments of elements.
+   * Each key in the map represent the smallest key in the mapped segment,
+   * so that all elements in this segment are >= the mapping key,
+   * but are smaller then the next key in the map.
+   * Elements within a partition do not need to be ordered.
+   */
+  private final NavigableMap<K, PartitionEntry> partitions;
+  private LatchLock<?> latchLock;
+
+  /**
+   * The number of elements in the set.
+   */
+  protected volatile int size;
+
+  /**
+   * A single partition of the {@link PartitionedGSet}.
+   * Consists of a hash table {@link LightWeightGSet} and a lock, which
+   * controls access to this partition independently on the other ones.
+   */
+  private class PartitionEntry extends LightWeightGSet<K, E> {
+    private final LatchLock<?> partLock;
+
+    PartitionEntry(int defaultPartitionCapacity) {
+      super(defaultPartitionCapacity);
+      this.partLock = latchLock.clone();
+    }
+  }
+
+  public PartitionedGSet(final int capacity,
+      final Comparator<? super K> comparator,
+      final LatchLock<?> latchLock,
+      final E rootKey) {
+    this.partitions = new TreeMap<K, PartitionEntry>(comparator);
+    this.latchLock = latchLock;
+    addNewPartition(rootKey).put(rootKey);
+    this.size = 1;
+  }
+
+  /**
+   * Creates new empty partition.
+   * @param key
+   * @return
+   */
+  private PartitionEntry addNewPartition(final K key) {
+    PartitionEntry lastPart = null;
+    if(size > 0)
+      lastPart = partitions.lastEntry().getValue();
+
+    PartitionEntry newPart =
+        new PartitionEntry(DEFAULT_PARTITION_CAPACITY);
+    // assert size == 0 || newPart.partLock.isWriteTopLocked() :
+    //      "Must hold write Lock: key = " + key;
+    partitions.put(key, newPart);
+
+    LOG.debug("Total GSet size = {}", size);
+    LOG.debug("Number of partitions = {}", partitions.size());
+    LOG.debug("Previous partition size = {}",
+        lastPart == null ? 0 : lastPart.size());
+
+    return newPart;
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  protected PartitionEntry getPartition(final K key) {
+    Entry<K, PartitionEntry> partEntry = partitions.floorEntry(key);
+    if(partEntry == null) {
+      return null;
+    }
+    PartitionEntry part = partEntry.getValue();
+    if(part == null) {
+      throw new IllegalStateException("Null partition for key: " + key);
+    }
+    assert size == 0 || part.partLock.isReadTopLocked() ||
+        part.partLock.hasReadChildLock() : "Must hold read Lock: key = " + key;
+    return part;
+  }
+
+  @Override
+  public boolean contains(final K key) {
+    PartitionEntry part = getPartition(key);
+    if(part == null) {
+      return false;
+    }
+    return part.contains(key);
+  }
+
+  @Override
+  public E get(final K key) {
+    PartitionEntry part = getPartition(key);
+    if(part == null) {
+      return null;
+    }
+    LOG.debug("get key: {}", key);
+    // part.partLock.readLock();
+    return part.get(key);
+  }
+
+  @Override
+  public E put(final E element) {
+    K key = element;
+    PartitionEntry part = getPartition(key);
+    if(part == null) {
+      throw new HadoopIllegalArgumentException("Illegal key: " + key);
+    }
+    assert size == 0 || part.partLock.isWriteTopLocked() ||
+        part.partLock.hasWriteChildLock() :
+          "Must hold write Lock: key = " + key;
+    LOG.debug("put key: {}", key);
+    PartitionEntry newPart = addNewPartitionIfNeeded(part, key);
+    if(newPart != part) {
+      newPart.partLock.writeChildLock();
+      part = newPart;
+    }
+    E result = part.put(element);
+    if(result == null) {  // new element
+      size++;
+    }
+    return result;
+  }
+
+  private PartitionEntry addNewPartitionIfNeeded(
+      PartitionEntry curPart, K key) {
+    if(curPart.size() < DEFAULT_PARTITION_CAPACITY * 1.1
+        || curPart.contains(key)) {
+      return curPart;
+    }
+    return addNewPartition(key);
+  }
+
+  @Override
+  public E remove(final K key) {
+    PartitionEntry part = getPartition(key);
+    if(part == null) {
+      return null;
+    }
+    E result = part.remove(key);
+    if(result != null) {
+      size--;
+    }
+    return result;
+  }
+
+  @Override
+  public void clear() {
+    LOG.error("Total GSet size = {}", size);
+    LOG.error("Number of partitions = {}", partitions.size());
+    // assert latchLock.hasWriteTopLock() : "Must hold write topLock";
+    // SHV May need to clear all partitions?
+    partitions.clear();
+    size = 0;
+  }
+
+  @Override
+  public Collection<E> values() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public Iterator<E> iterator() {
+    return new EntryIterator();
+  }
+
+  /**
+   * Iterator over the elements in the set.
+   * Iterates first by keys, then inside the partition
+   * corresponding to the key.
+   *
+   * Modifications are tracked by the underlying collections. We allow
+   * modifying other partitions, while iterating through the current one.
+   */
+  private class EntryIterator implements Iterator<E> {
+    private final Iterator<K> keyIterator;
+    private Iterator<E> partitionIterator;
+
+    public EntryIterator() {
+      keyIterator = partitions.keySet().iterator();
+      K curKey = partitions.firstKey();
+      partitionIterator = getPartition(curKey).iterator();
+    }
+
+    @Override
+    public boolean hasNext() {
+      if(partitionIterator.hasNext()) {
+        return true;
+      }
+      return keyIterator.hasNext();
+    }
+
+    @Override
+    public E next() {
+      if(!partitionIterator.hasNext()) {
+        K curKey = keyIterator.next();
+        partitionIterator = getPartition(curKey).iterator();
+      }
+      return partitionIterator.next();
+    }
+  }
+
+  public void latchWriteLock(K[] keys) {
+    // getPartition(parent).partLock.writeChildLock();
+    LatchLock<?> pLock = null;
+    for(K key : keys) {
+      pLock = getPartition(key).partLock;
+      pLock.writeChildLock();
+    }
+    assert pLock != null : "pLock is null";
+    pLock.writeTopUnlock();
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
index da324fb..c8c6277 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
@@ -69,18 +69,18 @@ class FSDirMkdirOp {
         // create multiple inodes.
         fsn.checkFsObjectLimit();
 
-        // Ensure that the user can traversal the path by adding implicit
-        // u+wx permission to all ancestor directories.
-        INodesInPath existing =
-            createParentDirectories(fsd, iip, permissions, false);
-        if (existing != null) {
-          existing = createSingleDirectory(
-              fsd, existing, iip.getLastLocalName(), permissions);
+        // create all missing directories along the path,
+        // but don't add them to the INodeMap yet
+        permissions = addImplicitUwx(permissions, permissions); // SHV !!!
+        INode[] missing = createPathDirectories(fsd, iip, permissions);
+        iip = iip.getExistingINodes();
+        // switch the locks
+        fsd.getINodeMap().latchWriteLock(iip, missing);
+        // Add missing inodes to the INodeMap
+        for(INode dir : missing) {
+          iip = addSingleDirectory(fsd, iip, dir, permissions);
+          assert iip != null : "iip should not be null";
         }
-        if (existing == null) {
-          throw new IOException("Failed to create directory: " + src);
-        }
-        iip = existing;
       }
       return fsd.getAuditFileInfo(iip);
     } finally {
@@ -132,6 +132,7 @@ class FSDirMkdirOp {
     if (missing == 0) {  // full path exists, return parents.
       existing = iip.getParentINodesInPath();
     } else if (missing > 1) { // need to create at least one ancestor dir.
+      FSNamesystem.LOG.error("missing =  " + missing);
       // Ensure that the user can traversal the path by adding implicit
       // u+wx permission to all ancestor directories.
       PermissionStatus basePerm = inheritPerms
@@ -143,6 +144,13 @@ class FSDirMkdirOp {
       for (int i = existing.length(); existing != null && i <= last; i++) {
         byte[] component = iip.getPathComponent(i);
         existing = createSingleDirectory(fsd, existing, component, perm);
+        if(existing == null) {
+          FSNamesystem.LOG.error("unprotectedMkdir returned null for "
+              + iip.getPath() + " for " + new String(component) + " i = " + i);
+          // Somebody already created the parent. Recalculate existing
+          existing = INodesInPath.resolve(fsd.getRoot(), iip.getPathComponents());
+          i = existing.length() - 1;
+        }
       }
     }
     return existing;
@@ -228,5 +236,67 @@ class FSDirMkdirOp {
     }
     return iip;
   }
+
+  private static INode createDirectoryINode(FSDirectory fsd,
+      INodesInPath parent, byte[] name, PermissionStatus permission)
+      throws FileAlreadyExistsException {
+    assert fsd.hasReadLock();
+    assert parent.getLastINode() != null;
+    if (!parent.getLastINode().isDirectory()) {
+      throw new FileAlreadyExistsException("Parent path is not a directory: " +
+          parent.getPath() + " " + DFSUtil.bytes2String(name));
+    }
+    final INodeDirectory dir = new INodeDirectory(
+        fsd.allocateNewInodeId(), name, permission, now());
+    return dir;
+  }
+
+  private static INode[] createPathDirectories(FSDirectory fsd,
+      INodesInPath iip, PermissionStatus perm)
+      throws IOException {
+    assert fsd.hasWriteLock();
+    INodesInPath existing = iip.getExistingINodes();
+    assert existing != null : "existing should not be null";
+    int numMissing = iip.length() - existing.length();
+    if (numMissing == 0) {  // full path exists
+      return new INode[0];
+    }
+
+    // create the missing directories along the path
+    INode[] missing = new INode[numMissing];
+    final int last = iip.length();
+    for (int i = existing.length();  i < last; i++) {
+      byte[] component = iip.getPathComponent(i);
+      missing[i - existing.length()] =
+          createDirectoryINode(fsd, existing, component, perm);
+    }
+    return missing;
+  }
+
+  private static INodesInPath addSingleDirectory(FSDirectory fsd,
+      INodesInPath existing, INode dir, PermissionStatus perm)
+      throws IOException {
+    assert fsd.hasWriteLock();
+    INodesInPath iip = fsd.addLastINode(existing, dir, perm.getPermission(), true);
+    if (iip == null) {
+      FSNamesystem.LOG.debug("somebody already created {} on path {}", dir, existing.getPath());
+      final INodeDirectory parent = existing.getLastINode().asDirectory();
+      dir = parent.getChild(dir.getLocalNameBytes(), Snapshot.CURRENT_STATE_ID);
+      return INodesInPath.append(existing, dir, dir.getLocalNameBytes());
+    }
+    existing = iip;
+    assert dir.equals(existing.getLastINode()) : "dir is not the last INode";
+
+    // Directory creation also count towards FilesCreated
+    // to match count of FilesDeleted metric.
+    NameNode.getNameNodeMetrics().incrFilesCreated();
+
+    assert dir.getPermissionStatus().getGroupName() != null :
+      "GroupName is null for " + existing.getPath();
+    String cur = existing.getPath();
+    fsd.getEditLog().logMkDir(cur, dir);
+    NameNode.stateChangeLog.debug("mkdirs: created directory {}", cur);
+    return existing;
+  }
 }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 497aa84..b5e68a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -317,7 +317,7 @@ public class FSDirectory implements Closeable {
   FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
     this.inodeId = new INodeId();
     rootDir = createRoot(ns);
-    inodeMap = INodeMap.newInstance(rootDir);
+    inodeMap = INodeMap.newInstance(rootDir, ns);
     this.isPermissionEnabled = conf.getBoolean(
       DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
       DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index f7749ce..1305438 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -177,18 +177,23 @@ public class FSImage implements Closeable {
  
   void format(FSNamesystem fsn, String clusterId, boolean force)
       throws IOException {
-    long fileCount = fsn.getFilesTotal();
-    // Expect 1 file, which is the root inode
-    Preconditions.checkState(fileCount == 1,
-        "FSImage.format should be called with an uninitialized namesystem, has " +
-        fileCount + " files");
-    NamespaceInfo ns = NNStorage.newNamespaceInfo();
-    LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID());
-    ns.clusterID = clusterId;
-    
-    storage.format(ns);
-    editLog.formatNonFileJournals(ns, force);
-    saveFSImageInAllDirs(fsn, 0);
+    fsn.readLock();
+    try {
+      long fileCount = fsn.getFilesTotal();
+      // Expect 1 file, which is the root inode
+      Preconditions.checkState(fileCount == 1,
+          "FSImage.format should be called with an uninitialized namesystem, has " +
+              fileCount + " files");
+      NamespaceInfo ns = NNStorage.newNamespaceInfo();
+      LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID());
+      ns.clusterID = clusterId;
+
+      storage.format(ns);
+      editLog.formatNonFileJournals(ns, force);
+      saveFSImageInAllDirs(fsn, 0);
+    } finally {
+      fsn.readUnlock();
+    }
   }
   
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 7ccaae9..a8fb490 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1753,7 +1753,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   public void readUnlock(String opName,
       Supplier<String> lockReportInfoSupplier) {
-    this.fsLock.readUnlock(opName, lockReportInfoSupplier);
+    this.fsLock.readUnlock(opName, lockReportInfoSupplier, true);
   }
 
   @Override
@@ -1786,7 +1786,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   @Override
   public boolean hasWriteLock() {
-    return this.fsLock.isWriteLockedByCurrentThread();
+    return this.fsLock.isWriteLockedByCurrentThread() ||
+        fsLock.haswWriteChildLock();
   }
   @Override
   public boolean hasReadLock() {
@@ -1801,6 +1802,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return this.fsLock.getWriteHoldCount();
   }
 
+  public FSNamesystemLock getFSLock() {
+    return this.fsLock;
+  }
+
   /** Lock the checkpoint lock */
   public void cpLock() {
     this.cpLock.lock();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
index b4f479f..f53f10d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -29,6 +32,7 @@ import java.util.function.Supplier;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.namenode.INodeMap.INodeMapLock;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.log.LogThrottlingHelper;
 import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
@@ -129,6 +133,32 @@ class FSNamesystemLock {
 
   private static final String OVERALL_METRIC_NAME = "Overall";
 
+  private final ThreadLocal<Collection<INodeMapLock>> partitionLocks =
+      new ThreadLocal<Collection<INodeMapLock>>() {
+        @Override
+        public Collection<INodeMapLock> initialValue() {
+          return new ArrayList<INodeMapLock>();
+        }
+      };
+
+  void addChildLock(INodeMapLock lock) {
+    partitionLocks.get().add(lock);
+  }
+
+  boolean removeChildLock(INodeMapLock lock) {
+    return partitionLocks.get().remove(lock);
+  }
+
+  boolean haswWriteChildLock() {
+    Iterator<INodeMapLock> iter = partitionLocks.get().iterator();
+    // FSNamesystem.LOG.debug("partitionLocks.size = {}", partitionLocks.get().size());
+    while(iter.hasNext()) {
+      if(iter.next().hasWriteChildLock())
+        return true;
+    }
+    return false;
+  }
+
   FSNamesystemLock(Configuration conf,
       MutableRatesWithAggregation detailedHoldTimeMetrics) {
     this(conf, detailedHoldTimeMetrics, new Timer());
@@ -180,11 +210,29 @@ class FSNamesystemLock {
 
   public void readUnlock(String opName,
       Supplier<String> lockReportInfoSupplier) {
+    readUnlock(opName, lockReportInfoSupplier, true);
+  }
+
+  public void readUnlock(String opName,
+      Supplier<String> lockReportInfoSupplier,
+      boolean unlockChildren) {
     final boolean needReport = coarseLock.getReadHoldCount() == 1;
     final long readLockIntervalNanos =
         timer.monotonicNowNanos() - readLockHeldTimeStampNanos.get();
     final long currentTimeMs = timer.now();
-    coarseLock.readLock().unlock();
+
+    if(getReadHoldCount() > 0) {  // Current thread holds the lock
+      // Unlock the top FSNamesystemLock
+      coarseLock.readLock().unlock();
+    }
+
+    if(unlockChildren) {  // Also unlock and remove children locks
+      Iterator<INodeMapLock> iter = partitionLocks.get().iterator();
+      while(iter.hasNext()) {
+        iter.next().readChildUnlock();
+        iter.remove();
+      }
+    }
 
     if (needReport) {
       addMetric(opName, readLockIntervalNanos, false);
@@ -252,7 +300,7 @@ class FSNamesystemLock {
    * FSNamesystemLock#writeUnlock(String, boolean, Supplier)}
    */
   public void writeUnlock() {
-    writeUnlock(OP_NAME_OTHER, false, null);
+    writeUnlock(OP_NAME_OTHER, false, null, true);
   }
 
   /**
@@ -262,7 +310,7 @@ class FSNamesystemLock {
    * @param opName Operation name.
    */
   public void writeUnlock(String opName) {
-    writeUnlock(opName, false, null);
+    writeUnlock(opName, false, null, true);
   }
 
   /**
@@ -274,7 +322,7 @@ class FSNamesystemLock {
    */
   public void writeUnlock(String opName,
       Supplier<String> lockReportInfoSupplier) {
-    writeUnlock(opName, false, lockReportInfoSupplier);
+    writeUnlock(opName, false, lockReportInfoSupplier, true);
   }
 
   /**
@@ -286,7 +334,7 @@ class FSNamesystemLock {
    * for long time will be logged in logs and metrics.
    */
   public void writeUnlock(String opName, boolean suppressWriteLockReport) {
-    writeUnlock(opName, suppressWriteLockReport, null);
+    writeUnlock(opName, suppressWriteLockReport, null, true);
   }
 
   /**
@@ -297,8 +345,9 @@ class FSNamesystemLock {
    * for long time will be logged in logs and metrics.
    * @param lockReportInfoSupplier The info shown in the lock report
    */
-  private void writeUnlock(String opName, boolean suppressWriteLockReport,
-      Supplier<String> lockReportInfoSupplier) {
+  public void writeUnlock(String opName, boolean suppressWriteLockReport,
+      Supplier<String> lockReportInfoSupplier,
+      boolean unlockChildren) {
     final boolean needReport = !suppressWriteLockReport && coarseLock
         .getWriteHoldCount() == 1 && coarseLock.isWriteLockedByCurrentThread();
     final long writeLockIntervalNanos =
@@ -329,7 +378,18 @@ class FSNamesystemLock {
       longestWriteLockHeldInfo = new LockHeldInfo();
     }
 
-    coarseLock.writeLock().unlock();
+    if(this.isWriteLockedByCurrentThread()) {  // Current thread holds the lock
+      // Unlock the top FSNamesystemLock
+      coarseLock.writeLock().unlock();
+    }
+
+    if(unlockChildren) {  // Unlock and remove children locks
+      Iterator<INodeMapLock> iter = partitionLocks.get().iterator();
+      while(iter.hasNext()) {
+        iter.next().writeChildUnlock();
+        iter.remove();
+      }
+    }
 
     if (needReport) {
       addMetric(opName, writeLockIntervalNanos, true);
@@ -355,7 +415,25 @@ class FSNamesystemLock {
   public int getWriteHoldCount() {
     return coarseLock.getWriteHoldCount();
   }
-  
+
+  /**
+   * Queries if the write lock is held by any thread.
+   * @return {@code true} if any thread holds the write lock and
+   *         {@code false} otherwise
+   */
+  public boolean isReadLocked() {
+    return coarseLock.getReadLockCount() > 0 || isWriteLocked();
+  }
+
+  /**
+   * Queries if the write lock is held by any thread.
+   * @return {@code true} if any thread holds the write lock and
+   *         {@code false} otherwise
+   */
+  public boolean isWriteLocked() {
+    return coarseLock.isWriteLocked();
+  }
+
   public boolean isWriteLockedByCurrentThread() {
     return coarseLock.isWriteLockedByCurrentThread();
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
index f35949f..88c3233 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
@@ -17,44 +17,139 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.util.Comparator;
 import java.util.Iterator;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.util.GSet;
+import org.apache.hadoop.util.LatchLock;
 import org.apache.hadoop.util.LightWeightGSet;
-
-import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.util.PartitionedGSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Storing all the {@link INode}s and maintaining the mapping between INode ID
  * and INode.  
  */
 public class INodeMap {
-  
-  static INodeMap newInstance(INodeDirectory rootDir) {
-    // Compute the map capacity by allocating 1% of total memory
-    int capacity = LightWeightGSet.computeCapacity(1, "INodeMap");
-    GSet<INode, INodeWithAdditionalFields> map =
-        new LightWeightGSet<>(capacity);
-    map.put(rootDir);
-    return new INodeMap(map);
+  public static class INodeIdComparator implements Comparator<INode> {
+    @Override
+    public int compare(INode i1, INode i2) {
+      if (i1 == null || i2 == null) {
+        throw new NullPointerException("Cannot compare null INodesl");
+      }
+      long id1 = i1.getId();
+      long id2 = i2.getId();
+      return id1 < id2 ? -1 : id1 == id2 ? 0 : 1;
+    }
+  }
+
+  public class INodeMapLock extends LatchLock<ReentrantReadWriteLock> {
+    Logger LOG = LoggerFactory.getLogger(INodeMapLock.class);
+
+    private ReentrantReadWriteLock childLock;
+
+    INodeMapLock() {
+      this(null);
+    }
+
+    private INodeMapLock(ReentrantReadWriteLock childLock) {
+      assert namesystem != null : "namesystem is null";
+      this.childLock = childLock;
+    }
+
+    @Override
+    protected boolean isReadTopLocked() {
+      return namesystem.getFSLock().isReadLocked();
+    }
+
+    @Override
+    protected boolean isWriteTopLocked() {
+      return namesystem.getFSLock().isWriteLocked();
+    }
+
+    @Override
+    protected void readTopdUnlock() {
+      namesystem.getFSLock().readUnlock("INodeMap", null, false);
+    }
+
+    @Override
+    protected void writeTopUnlock() {
+      namesystem.getFSLock().writeUnlock("INodeMap", false, null, false);
+    }
+
+    @Override
+    protected boolean hasReadChildLock() {
+      return this.childLock.getReadHoldCount() > 0 || hasWriteChildLock();
+    }
+
+    @Override
+    protected void readChildLock() {
+      // LOG.info("readChildLock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
+      this.childLock.readLock().lock();
+      namesystem.getFSLock().addChildLock(this);
+      // LOG.info("readChildLock: done");
+    }
+
+    @Override
+    protected void readChildUnlock() {
+      // LOG.info("readChildUnlock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
+      this.childLock.readLock().unlock();
+      // LOG.info("readChildUnlock: done");
+    }
+
+    @Override
+    protected boolean hasWriteChildLock() {
+      return this.childLock.isWriteLockedByCurrentThread();
+    }
+
+    @Override
+    protected void writeChildLock() {
+      // LOG.info("writeChildLock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
+      this.childLock.writeLock().lock();
+      namesystem.getFSLock().addChildLock(this);
+      // LOG.info("writeChildLock: done");
+    }
+
+    @Override
+    protected void writeChildUnlock() {
+      // LOG.info("writeChildUnlock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
+      this.childLock.writeLock().unlock();
+      // LOG.info("writeChildUnlock: done");
+    }
+
+    @Override
+    protected LatchLock<ReentrantReadWriteLock> clone() {
+      return new INodeMapLock(new ReentrantReadWriteLock(false)); // not fair
+    }
+  }
+
+  static INodeMap newInstance(INodeDirectory rootDir,
+      FSNamesystem ns) {
+    return new INodeMap(rootDir, ns);
   }
 
   /** Synchronized by external lock. */
   private final GSet<INode, INodeWithAdditionalFields> map;
-  
+  private FSNamesystem namesystem;
+
   public Iterator<INodeWithAdditionalFields> getMapIterator() {
     return map.iterator();
   }
 
-  private INodeMap(GSet<INode, INodeWithAdditionalFields> map) {
-    Preconditions.checkArgument(map != null);
-    this.map = map;
+  private INodeMap(INodeDirectory rootDir, FSNamesystem ns) {
+    this.namesystem = ns;
+    // Compute the map capacity by allocating 1% of total memory
+    int capacity = LightWeightGSet.computeCapacity(1, "INodeMap");
+    this.map = new PartitionedGSet<>(capacity, new INodeIdComparator(),
+            new INodeMapLock(), rootDir);
   }
-  
+
   /**
    * Add an {@link INode} into the {@link INode} map. Replace the old value if 
    * necessary. 
@@ -138,4 +233,27 @@ public class INodeMap {
   public void clear() {
     map.clear();
   }
+
+  public void latchWriteLock(INodesInPath iip, INode[] missing) {
+    assert namesystem.hasReadLock() : "must have namesysem lock";
+    assert iip.length() > 0 : "INodesInPath has 0 length";
+    if(!(map instanceof PartitionedGSet)) {
+      return;
+    }
+    // Locks partitions along the path starting from the first existing parent
+    // Locking is in the hierarchical order
+    INode[] allINodes = new INode[Math.min(1, iip.length()) + missing.length];
+    allINodes[0] = iip.getLastINode();
+    System.arraycopy(missing, 0, allINodes, 1, missing.length);
+    /*
+    // Locks all the partitions along the path in the hierarchical order
+    INode[] allINodes = new INode[iip.length() + missing.length];
+    INode[] existing = iip.getINodesArray();
+    System.arraycopy(existing, 0, allINodes, 0, existing.length);
+    System.arraycopy(missing, 0, allINodes, existing.length, missing.length);
+    */
+
+    ((PartitionedGSet<INode, INodeWithAdditionalFields>)
+        map).latchWriteLock(allINodes);
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index d813375..4982119 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -186,6 +186,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.Whitebox;
+import org.apache.hadoop.util.GSet;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
@@ -1997,6 +1998,7 @@ public class DFSTestUtil {
     GenericTestUtils.setLogLevel(NameNode.LOG, level);
     GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level);
     GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
+    GenericTestUtils.setLogLevel(GSet.LOG, level);
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
index b32f8fe..0b21032 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
@@ -1017,23 +1017,38 @@ public class TestINodeFile {
 
       final Path dir = new Path("/dir");
       hdfs.mkdirs(dir);
-      INodeDirectory dirNode = getDir(fsdir, dir);
-      INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
-      assertSame(dirNode, dirNodeFromNode);
+      cluster.getNamesystem().readLock();
+      try {
+        INodeDirectory dirNode = getDir(fsdir, dir);
+        INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
+        assertSame(dirNode, dirNodeFromNode);
+      } finally {
+        cluster.getNamesystem().readUnlock();
+      }
 
       // set quota to dir, which leads to node replacement
       hdfs.setQuota(dir, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
-      dirNode = getDir(fsdir, dir);
-      assertTrue(dirNode.isWithQuota());
-      // the inode in inodeMap should also be replaced
-      dirNodeFromNode = fsdir.getInode(dirNode.getId());
-      assertSame(dirNode, dirNodeFromNode);
+      cluster.getNamesystem().readLock();
+      try {
+        INodeDirectory dirNode = getDir(fsdir, dir);
+        assertTrue(dirNode.isWithQuota());
+        // the inode in inodeMap should also be replaced
+        INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
+        assertSame(dirNode, dirNodeFromNode);
+      } finally {
+        cluster.getNamesystem().readUnlock();
+      }
 
       hdfs.setQuota(dir, -1, -1);
-      dirNode = getDir(fsdir, dir);
-      // the inode in inodeMap should also be replaced
-      dirNodeFromNode = fsdir.getInode(dirNode.getId());
-      assertSame(dirNode, dirNodeFromNode);
+      cluster.getNamesystem().readLock();
+      try {
+        INodeDirectory dirNode = getDir(fsdir, dir);
+        // the inode in inodeMap should also be replaced
+        INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
+        assertSame(dirNode, dirNodeFromNode);
+      } finally {
+        cluster.getNamesystem().readUnlock();
+      }
     } finally {
       if (cluster != null) {
         cluster.shutdown();

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 03/05: HDFS-16125. [FGL] Fix the iterator for PartitionedGSet. Contributed by Xing Lin. (#3197)

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shv pushed a commit to branch fgl
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit d6efb60f498f81befc2e88c6f9f1929dd765e951
Author: Xing Lin <xi...@linkedin.com>
AuthorDate: Fri Jul 16 13:04:59 2021 -0700

    HDFS-16125. [FGL] Fix the iterator for PartitionedGSet. Contributed by Xing Lin. (#3197)
---
 .../java/org/apache/hadoop/util/LatchLock.java     |   4 +-
 .../org/apache/hadoop/util/PartitionedGSet.java    |  35 ++-
 .../apache/hadoop/util/TestPartitionedGSet.java    | 270 +++++++++++++++++++++
 .../hadoop/hdfs/server/namenode/INodeMap.java      |   4 +-
 4 files changed, 300 insertions(+), 13 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java
index 41e33da..fd98391 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java
@@ -30,7 +30,7 @@ public abstract class LatchLock<C> {
   protected abstract boolean isReadTopLocked();
   /** @return true topLock is locked for write by any thread */
   protected abstract boolean isWriteTopLocked();
-  protected abstract void readTopdUnlock();
+  protected abstract void readTopUnlock();
   protected abstract void writeTopUnlock();
 
   protected abstract boolean hasReadChildLock();
@@ -46,7 +46,7 @@ public abstract class LatchLock<C> {
   // Public APIs to use with the class
   public void readLock() {
     readChildLock();
-    readTopdUnlock();
+    readTopUnlock();
   }
 
   public void readUnlock() {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
index 7ebb1b3..f3569cc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
@@ -24,7 +24,7 @@ import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeMap;
-
+import java.util.NoSuchElementException;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
@@ -79,8 +79,7 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
 
   public PartitionedGSet(final int capacity,
       final Comparator<? super K> comparator,
-      final LatchLock<?> latchLock,
-      final E rootKey) {
+      final LatchLock<?> latchLock) {
     this.partitions = new TreeMap<K, PartitionEntry>(comparator);
     this.latchLock = latchLock;
     // addNewPartition(rootKey).put(rootKey);
@@ -275,17 +274,36 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
    * modifying other partitions, while iterating through the current one.
    */
   private class EntryIterator implements Iterator<E> {
-    private final Iterator<K> keyIterator;
+    private Iterator<K> keyIterator;
     private Iterator<E> partitionIterator;
 
     public EntryIterator() {
       keyIterator = partitions.keySet().iterator();
-      K curKey = partitions.firstKey();
-      partitionIterator = getPartition(curKey).iterator();
+ 
+      if (!keyIterator.hasNext()) {
+        partitionIterator = null;
+        return;
+      }
+
+      K firstKey = keyIterator.next();
+      partitionIterator = partitions.get(firstKey).iterator();
     }
 
     @Override
     public boolean hasNext() {
+
+      // Special case: an iterator was created for an empty PartitionedGSet.
+      // Check whether new partitions have been added since then.
+      if (partitionIterator == null) {
+        if (partitions.size() == 0) {
+          return false;
+        } else {
+          keyIterator = partitions.keySet().iterator();
+          K nextKey = keyIterator.next();
+          partitionIterator = partitions.get(nextKey).iterator();
+        }
+      }
+
       while(!partitionIterator.hasNext()) {
         if(!keyIterator.hasNext()) {
           return false;
@@ -298,9 +316,8 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
 
     @Override
     public E next() {
-      while(!partitionIterator.hasNext()) {
-        K curKey = keyIterator.next();
-        partitionIterator = getPartition(curKey).iterator();
+      if (!hasNext()) {
+        throw new NoSuchElementException("No more elements in this set.");
       }
       return partitionIterator.next();
     }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestPartitionedGSet.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestPartitionedGSet.java
new file mode 100644
index 0000000..9ae772c
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestPartitionedGSet.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Testing {@link PartitionedGSet} */
+public class TestPartitionedGSet {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestPartitionedGSet.class);
+  private static final int ELEMENT_NUM = 100;
+
+  /**
+   * Generate positive random numbers for testing. We want to use only positive
+   * numbers because the smallest partition used in testing is 0.
+   *
+   * @param length
+   *    number of random numbers to be generated.
+   *
+   * @param randomSeed
+   *    seed to be used for random number generator.
+   *
+   * @return
+   *    An array of Integers
+   */
+  private static ArrayList<Integer> getRandomList(int length, int randomSeed) {
+    Random random = new Random(randomSeed);
+    ArrayList<Integer> list = new ArrayList<Integer>(length);
+    for (int i = 0; i < length; i++) {
+      list.add(random.nextInt(Integer.MAX_VALUE));
+    }
+    return list;
+  }
+
+  private static class TestElement implements LinkedElement {
+    private final int val;
+    private LinkedElement next;
+
+    TestElement(int val) {
+      this.val = val;
+      this.next = null;
+    }
+
+    public int getVal() {
+      return val;
+    }
+
+    @Override
+    public void setNext(LinkedElement next) {
+      this.next = next;
+    }
+
+    @Override
+    public LinkedElement getNext() {
+      return next;
+    }
+  }
+
+  private static class TestElementComparator implements Comparator<TestElement>
+  {
+    @Override
+    public int compare(TestElement e1, TestElement e2) {
+      if (e1 == null || e2 == null) {
+        throw new NullPointerException("Cannot compare null elements");
+      }
+
+      return e1.getVal() - e2.getVal();
+    }
+  }
+
+  protected ReentrantReadWriteLock topLock =
+      new ReentrantReadWriteLock(false);
+  /**
+   * We are NOT testing any concurrent access to a PartitionedGSet here.
+   */
+  private class NoOpLock extends LatchLock<ReentrantReadWriteLock> {
+    private ReentrantReadWriteLock childLock;
+
+    public NoOpLock() {
+      childLock = new ReentrantReadWriteLock(false);
+    }
+
+    @Override
+    protected boolean isReadTopLocked() {
+      return topLock.getReadLockCount() > 0 || isWriteTopLocked();
+    }
+
+    @Override
+    protected boolean isWriteTopLocked() {
+      return topLock.isWriteLocked();
+    }
+
+    @Override
+    protected void readTopUnlock() {
+      topLock.readLock().unlock();
+    }
+
+    @Override
+    protected void writeTopUnlock() {
+      topLock.writeLock().unlock();
+    }
+
+    @Override
+    protected boolean hasReadChildLock() {
+      return childLock.getReadLockCount() > 0 || hasWriteChildLock();
+    }
+
+    @Override
+    protected void readChildLock() {
+      childLock.readLock().lock();
+    }
+
+    @Override
+    protected void readChildUnlock() {
+      childLock.readLock().unlock();
+    }
+
+    @Override
+    protected boolean hasWriteChildLock() {
+      return childLock.isWriteLockedByCurrentThread();
+    }
+
+    @Override
+    protected void writeChildLock() {
+      childLock.writeLock().lock();
+    }
+
+    @Override
+    protected void writeChildUnlock() {
+      childLock.writeLock().unlock();
+    }
+
+    @Override
+    protected LatchLock<ReentrantReadWriteLock> clone() {
+      return new NoOpLock();
+    }
+  }
+
+  /**
+   * Test iterator for a PartitionedGSet with no partitions.
+   */
+  @Test(timeout=60000)
+  public void testIteratorForNoPartition() {
+    PartitionedGSet<TestElement, TestElement> set =
+        new PartitionedGSet<TestElement, TestElement>(
+            16, new TestElementComparator(), new NoOpLock());
+
+    topLock.readLock().lock();
+    int count = 0;
+    Iterator<TestElement> iter = set.iterator();
+    while( iter.hasNext() ) {
+      iter.next();
+      count ++;
+    }
+    topLock.readLock().unlock();
+    Assert.assertEquals(0, count);
+  }
+
+  /**
+   * Test iterator for a PartitionedGSet with empty partitions.
+   */
+  @Test(timeout=60000)
+  public void testIteratorForEmptyPartitions() {
+    PartitionedGSet<TestElement, TestElement> set =
+        new PartitionedGSet<TestElement, TestElement>(
+            16, new TestElementComparator(), new NoOpLock());
+
+    set.addNewPartition(new TestElement(0));
+    set.addNewPartition(new TestElement(1000));
+    set.addNewPartition(new TestElement(2000));
+
+    topLock.readLock().lock();
+    int count = 0;
+    Iterator<TestElement> iter = set.iterator();
+    while( iter.hasNext() ) {
+      iter.next();
+      count ++;
+    }
+    topLock.readLock().unlock();
+    Assert.assertEquals(0, count);
+  }
+
+  /**
+   * Test whether the iterator can return the same number of elements as stored
+   * into the PartitionedGSet.
+   */
+  @Test(timeout=60000)
+  public void testIteratorCountElements() {
+    ArrayList<Integer> list = getRandomList(ELEMENT_NUM, 123);
+    PartitionedGSet<TestElement, TestElement> set =
+        new PartitionedGSet<TestElement, TestElement>(
+            16, new TestElementComparator(), new NoOpLock());
+
+    set.addNewPartition(new TestElement(0));
+    set.addNewPartition(new TestElement(1000));
+    set.addNewPartition(new TestElement(2000));
+
+    topLock.writeLock().lock();
+    for (Integer i : list) {
+      set.put(new TestElement(i));
+    }
+    topLock.writeLock().unlock();
+
+    topLock.readLock().lock();
+    int count = 0;
+    Iterator<TestElement> iter = set.iterator();
+    while( iter.hasNext() ) {
+      iter.next();
+      count ++;
+    }
+    topLock.readLock().unlock();
+    Assert.assertEquals(ELEMENT_NUM, count);
+  }
+
+  /**
+   * Test iterator when it is created before partitions/elements are
+   * added to the PartitionedGSet.
+   */
+  @Test(timeout=60000)
+  public void testIteratorAddElementsAfterIteratorCreation() {
+    PartitionedGSet<TestElement, TestElement> set =
+        new PartitionedGSet<TestElement, TestElement>(
+            16, new TestElementComparator(), new NoOpLock());
+
+    // Create the iterator before partitions are added.
+    Iterator<TestElement> iter = set.iterator();
+
+    set.addNewPartition(new TestElement(0));
+    set.addNewPartition(new TestElement(1000));
+    set.addNewPartition(new TestElement(2000));
+
+    // Added one element
+    topLock.writeLock().lock();
+    set.put(new TestElement(2500));
+    topLock.writeLock().unlock();
+
+    topLock.readLock().lock();
+    int count = 0;
+    while( iter.hasNext() ) {
+      iter.next();
+      count ++;
+    }
+    topLock.readLock().unlock();
+    Assert.assertEquals(1, count);
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
index 3b07dce..a0253b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
@@ -121,7 +121,7 @@ public class INodeMap {
     }
 
     @Override
-    protected void readTopdUnlock() {
+    protected void readTopUnlock() {
       namesystem.getFSLock().readUnlock("INodeMap", null, false);
     }
 
@@ -194,7 +194,7 @@ public class INodeMap {
     // Compute the map capacity by allocating 1% of total memory
     int capacity = LightWeightGSet.computeCapacity(1, "INodeMap");
     this.map = new PartitionedGSet<>(capacity, new INodeKeyComparator(),
-            new INodeMapLock(), rootDir);
+            new INodeMapLock());
 
     // Pre-populate initial empty partitions
     PartitionedGSet<INode, INodeWithAdditionalFields> pgs =

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/05: Add namespace key for INode. (shv)

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shv pushed a commit to branch fgl
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit b78427744d964760c36167d1dd92408aa73a5955
Author: Konstantin V Shvachko <sh...@apache.org>
AuthorDate: Fri May 7 17:51:58 2021 -0700

    Add namespace key for INode. (shv)
---
 .../org/apache/hadoop/util/PartitionedGSet.java    | 80 ++++++++++++++++++----
 .../hadoop/hdfs/server/namenode/FSDirMkdirOp.java  |  3 +
 .../apache/hadoop/hdfs/server/namenode/INode.java  | 40 ++++++++++-
 .../hadoop/hdfs/server/namenode/INodeMap.java      | 71 +++++++++++++++++--
 4 files changed, 176 insertions(+), 18 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
index 4b0cdc9..7ebb1b3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
@@ -22,6 +22,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -44,7 +45,8 @@ import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
 @InterfaceAudience.Private
 public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
 
-  private static final int DEFAULT_PARTITION_CAPACITY = 2027;
+  private static final int DEFAULT_PARTITION_CAPACITY = 65536; // 4096; // 5120; // 2048; // 1027;
+  private static final float DEFAULT_PARTITION_OVERFLOW = 1.8f;
 
   /**
    * An ordered map of contiguous segments of elements.
@@ -81,8 +83,11 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
       final E rootKey) {
     this.partitions = new TreeMap<K, PartitionEntry>(comparator);
     this.latchLock = latchLock;
-    addNewPartition(rootKey).put(rootKey);
-    this.size = 1;
+    // addNewPartition(rootKey).put(rootKey);
+    // this.size = 1;
+    this.size = 0;
+    LOG.info("Partition capacity = {}", DEFAULT_PARTITION_CAPACITY);
+    LOG.info("Partition overflow factor = {}", DEFAULT_PARTITION_OVERFLOW);
   }
 
   /**
@@ -90,16 +95,19 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
    * @param key
    * @return
    */
-  private PartitionEntry addNewPartition(final K key) {
+  public PartitionEntry addNewPartition(final K key) {
+    Entry<K, PartitionEntry> lastEntry = partitions.lastEntry();
     PartitionEntry lastPart = null;
-    if(size > 0)
-      lastPart = partitions.lastEntry().getValue();
+    if(lastEntry != null)
+      lastPart = lastEntry.getValue();
 
     PartitionEntry newPart =
         new PartitionEntry(DEFAULT_PARTITION_CAPACITY);
     // assert size == 0 || newPart.partLock.isWriteTopLocked() :
     //      "Must hold write Lock: key = " + key;
-    partitions.put(key, newPart);
+    PartitionEntry oldPart = partitions.put(key, newPart);
+    assert oldPart == null :
+      "RangeMap already has a partition associated with " + key;
 
     LOG.debug("Total GSet size = {}", size);
     LOG.debug("Number of partitions = {}", partitions.size());
@@ -173,7 +181,7 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
 
   private PartitionEntry addNewPartitionIfNeeded(
       PartitionEntry curPart, K key) {
-    if(curPart.size() < DEFAULT_PARTITION_CAPACITY * 1.1
+    if(curPart.size() < DEFAULT_PARTITION_CAPACITY * DEFAULT_PARTITION_OVERFLOW
         || curPart.contains(key)) {
       return curPart;
     }
@@ -197,12 +205,56 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
   public void clear() {
     LOG.error("Total GSet size = {}", size);
     LOG.error("Number of partitions = {}", partitions.size());
+    printStats();
     // assert latchLock.hasWriteTopLock() : "Must hold write topLock";
     // SHV May need to clear all partitions?
     partitions.clear();
     size = 0;
   }
 
+  private void printStats() {
+    int partSizeMin = Integer.MAX_VALUE, partSizeAvg = 0, partSizeMax = 0;
+    long totalSize = 0;
+    int numEmptyPartitions = 0, numFullPartitions = 0;
+    Collection<PartitionEntry> parts = partitions.values();
+    Set<Entry<K, PartitionEntry>> entries = partitions.entrySet();
+    int i = 0;
+    for(Entry<K, PartitionEntry> e : entries) {
+      PartitionEntry part = e.getValue();
+      int s = part.size;
+      if(s == 0) numEmptyPartitions++;
+      if(s > DEFAULT_PARTITION_CAPACITY) numFullPartitions++;
+      totalSize += s;
+      partSizeMin = (s < partSizeMin ? s : partSizeMin);
+      partSizeMax = (partSizeMax < s ? s : partSizeMax);
+      Class<?> inodeClass = e.getKey().getClass();
+      try {
+        long[] key = (long[]) inodeClass.
+            getMethod("getNamespaceKey", int.class).invoke(e.getKey(), 2);
+        long[] firstKey = new long[0];
+        if(part.iterator().hasNext()) {
+          Object first = part.iterator().next();
+          firstKey = (long[]) inodeClass.getMethod(
+            "getNamespaceKey", int.class).invoke(first, 2);
+          Object parent = inodeClass.
+              getMethod("getParent").invoke(first);
+          long parentId = (parent == null ? 0L :
+            (long) inodeClass.getMethod("getId").invoke(parent));
+          firstKey[0] = parentId;
+        }
+        LOG.error("Partition #{}\t key: {}\t size: {}\t first: {}",
+            i++, key, s, firstKey);  // SHV should be info
+      } catch (Exception ex) {
+        LOG.error("Cannot find Method getNamespaceKey() in {}", inodeClass);
+      }
+    }
+    partSizeAvg = (int) (totalSize / parts.size());
+    LOG.error("Partition sizes: min = {}, avg = {}, max = {}, sum = {}",
+        partSizeMin, partSizeAvg, partSizeMax, totalSize);
+    LOG.error("Number of partitions: empty = {}, full = {}",
+        numEmptyPartitions, numFullPartitions);
+  }
+
   @Override
   public Collection<E> values() {
     // TODO Auto-generated method stub
@@ -234,15 +286,19 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
 
     @Override
     public boolean hasNext() {
-      if(partitionIterator.hasNext()) {
-        return true;
+      while(!partitionIterator.hasNext()) {
+        if(!keyIterator.hasNext()) {
+          return false;
+        }
+        K curKey = keyIterator.next();
+        partitionIterator = getPartition(curKey).iterator();
       }
-      return keyIterator.hasNext();
+      return partitionIterator.hasNext();
     }
 
     @Override
     public E next() {
-      if(!partitionIterator.hasNext()) {
+      while(!partitionIterator.hasNext()) {
         K curKey = keyIterator.next();
         partitionIterator = getPartition(curKey).iterator();
       }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
index c8c6277..5a40906 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
@@ -265,10 +265,13 @@ class FSDirMkdirOp {
     // create the missing directories along the path
     INode[] missing = new INode[numMissing];
     final int last = iip.length();
+    INode parent = existing.getLastINode();
     for (int i = existing.length();  i < last; i++) {
       byte[] component = iip.getPathComponent(i);
       missing[i - existing.length()] =
           createDirectoryINode(fsd, existing, component, perm);
+      missing[i - existing.length()].setParent(parent.asDirectory());
+      parent = missing[i - existing.length()];
     }
     return missing;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
index daff95c..42e462e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
 import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -577,6 +578,43 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
     return name == null? null: DFSUtil.bytes2String(name);
   }
 
+  private long[] namespaceKey;
+
+  /**
+   * Key of an INode.
+   * Defines partitioning of INodes in the INodeMap.
+   *
+   * @param level how many levels to be included in the key
+   * @return
+   */
+  public long[] getNamespaceKey(int level) {
+    if(namespaceKey == null) {  // generate the namespace key
+      long[] buf = new long[level];
+      INode cur = this;
+      for(int l = 0; l < level; l++) {
+        long curId = (cur == null) ? 0L : cur.getId();
+        buf[level - l - 1] = curId;
+        cur =  (cur == null) ? null : cur.parent;
+      }
+      buf[0] = indexOf(buf);
+      namespaceKey = buf;
+    }
+    return namespaceKey;
+  }
+
+  private final static long LARGE_PRIME = 512927357;
+  public static long indexOf(long[] key) {
+    if(key[key.length-1] == INodeId.ROOT_INODE_ID) {
+      return key[0];
+    }
+    long idx = LARGE_PRIME * key[0];
+    idx = (idx ^ (idx >> 32)) & (INodeMap.NUM_RANGES_STATIC -1);
+    return idx;
+  }
+
+  /**
+   * Key of a snapshot Diff Element
+   */
   @Override
   public final byte[] getKey() {
     return getLocalNameBytes();
@@ -636,7 +674,7 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
 
   @Override
   public String toString() {
-    return getLocalName();
+    return getLocalName() + ": " + Arrays.toString(namespaceKey);
   }
 
   @VisibleForTesting
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
index 88c3233..3b07dce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
@@ -29,14 +29,63 @@ import org.apache.hadoop.util.GSet;
 import org.apache.hadoop.util.LatchLock;
 import org.apache.hadoop.util.LightWeightGSet;
 import org.apache.hadoop.util.PartitionedGSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Storing all the {@link INode}s and maintaining the mapping between INode ID
  * and INode.  
  */
 public class INodeMap {
+  static final int NAMESPACE_KEY_DEBTH = 2;
+  static final int NUM_RANGES_STATIC = 256;  // power of 2
+
+  public static class INodeKeyComparator implements Comparator<INode> {
+    INodeKeyComparator() {
+      FSDirectory.LOG.info("Namespace key debth = {}", NAMESPACE_KEY_DEBTH);
+    }
+
+    @Override
+    public int compare(INode i1, INode i2) {
+      if (i1 == null || i2 == null) {
+        throw new NullPointerException("Cannot compare null INodes");
+      }
+      long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEBTH);
+      long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEBTH);
+      for(int l = 0; l < NAMESPACE_KEY_DEBTH; l++) {
+        if(key1[l] == key2[l]) continue;
+        return (key1[l] < key2[l] ? -1 : 1);
+      }
+      return 0;
+    }
+  }
+
+  /**
+   * INodeKeyComparator with Hashed Parent
+   *
+   */
+  public static class HPINodeKeyComparator implements Comparator<INode> {
+    HPINodeKeyComparator() {
+      FSDirectory.LOG.info("Namespace key debth = {}", NAMESPACE_KEY_DEBTH);
+    }
+
+    @Override
+    public int compare(INode i1, INode i2) {
+      if (i1 == null || i2 == null) {
+        throw new NullPointerException("Cannot compare null INodes");
+      }
+      long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEBTH);
+      long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEBTH);
+      long key1_0 = INode.indexOf(key1);
+      long key2_0 = INode.indexOf(key2);
+      if(key1_0 != key2_0)
+        return (key1_0 < key2_0 ? -1 : 1);
+      for(int l = 1; l < NAMESPACE_KEY_DEBTH; l++) {
+        if(key1[l] == key2[l]) continue;
+        return (key1[l] < key2[l] ? -1 : 1);
+      }
+      return 0;
+    }
+  }
+
   public static class INodeIdComparator implements Comparator<INode> {
     @Override
     public int compare(INode i1, INode i2) {
@@ -50,8 +99,6 @@ public class INodeMap {
   }
 
   public class INodeMapLock extends LatchLock<ReentrantReadWriteLock> {
-    Logger LOG = LoggerFactory.getLogger(INodeMapLock.class);
-
     private ReentrantReadWriteLock childLock;
 
     INodeMapLock() {
@@ -146,8 +193,22 @@ public class INodeMap {
     this.namesystem = ns;
     // Compute the map capacity by allocating 1% of total memory
     int capacity = LightWeightGSet.computeCapacity(1, "INodeMap");
-    this.map = new PartitionedGSet<>(capacity, new INodeIdComparator(),
+    this.map = new PartitionedGSet<>(capacity, new INodeKeyComparator(),
             new INodeMapLock(), rootDir);
+
+    // Pre-populate initial empty partitions
+    PartitionedGSet<INode, INodeWithAdditionalFields> pgs =
+        (PartitionedGSet<INode, INodeWithAdditionalFields>) map;
+    PermissionStatus perm = new PermissionStatus(
+        "", "", new FsPermission((short) 0));
+    for(int p = 0; p < NUM_RANGES_STATIC; p++) {
+      INodeDirectory key = new INodeDirectory(
+          INodeId.ROOT_INODE_ID, "range key".getBytes(), perm, 0);
+      key.setParent(new INodeDirectory((long)p, null, perm, 0));
+      pgs.addNewPartition(key);
+    }
+
+    map.put(rootDir);
   }
 
   /**

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org