You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2021/05/10 18:18:05 UTC

[hadoop] 01/02: INodeMap with PartitionedGSet and per-partition locking.

This is an automated email from the ASF dual-hosted git repository.

shv pushed a commit to branch fgl
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 1f1a0c45fe44c3da0db96999978417c4ff397a93
Author: Konstantin V Shvachko <sh...@apache.org>
AuthorDate: Fri May 7 17:47:37 2021 -0700

    INodeMap with PartitionedGSet and per-partition locking.
---
 .../java/org/apache/hadoop/util/LatchLock.java     |  64 +++++
 .../org/apache/hadoop/util/PartitionedGSet.java    | 263 +++++++++++++++++++++
 .../hadoop/hdfs/server/namenode/FSDirMkdirOp.java  |  92 ++++++-
 .../hadoop/hdfs/server/namenode/FSDirectory.java   |   2 +-
 .../hadoop/hdfs/server/namenode/FSImage.java       |  29 ++-
 .../hadoop/hdfs/server/namenode/FSNamesystem.java  |   9 +-
 .../hdfs/server/namenode/FSNamesystemLock.java     |  96 +++++++-
 .../hadoop/hdfs/server/namenode/INodeMap.java      | 148 ++++++++++--
 .../java/org/apache/hadoop/hdfs/DFSTestUtil.java   |   2 +
 .../hadoop/hdfs/server/namenode/TestINodeFile.java |  39 ++-
 10 files changed, 682 insertions(+), 62 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java
new file mode 100644
index 0000000..41e33da
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+/**
+ * LatchLock controls two hierarchical Read/Write locks:
+ * the topLock and the childLock.
+ * Typically an operation starts with the topLock already acquired.
+ * To acquire child lock LatchLock will
+ * first acquire the childLock, and then release the topLock.
+ */
+public abstract class LatchLock<C> {
+  // Interfaces methods to be defined for subclasses
+  /** @return true topLock is locked for read by any thread */
+  protected abstract boolean isReadTopLocked();
+  /** @return true topLock is locked for write by any thread */
+  protected abstract boolean isWriteTopLocked();
+  protected abstract void readTopdUnlock();
+  protected abstract void writeTopUnlock();
+
+  protected abstract boolean hasReadChildLock();
+  protected abstract void readChildLock();
+  protected abstract void readChildUnlock();
+
+  protected abstract boolean hasWriteChildLock();
+  protected abstract void writeChildLock();
+  protected abstract void writeChildUnlock();
+
+  protected abstract LatchLock<C> clone();
+
+  // Public APIs to use with the class
+  public void readLock() {
+    readChildLock();
+    readTopdUnlock();
+  }
+
+  public void readUnlock() {
+    readChildUnlock();
+  }
+
+  public void writeLock() {
+    writeChildLock();
+    writeTopUnlock();
+  }
+
+  public void writeUnlock() {
+    writeChildUnlock();
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
new file mode 100644
index 0000000..4b0cdc9
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
+
+/**
+ * An implementation of {@link GSet}, which splits a collection of elements
+ * into partitions each corresponding to a range of keys.
+ *
+ * This class does not support null element.
+ *
+ * This class is backed up by LatchLock for hierarchical synchronization.
+ *
+ * @param <K> Key type for looking up the elements
+ * @param <E> Element type, which must be
+ *       (1) a subclass of K, and
+ *       (2) implementing {@link LinkedElement} interface.
+ */
+@InterfaceAudience.Private
+public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
+
+  private static final int DEFAULT_PARTITION_CAPACITY = 2027;
+
+  /**
+   * An ordered map of contiguous segments of elements.
+   * Each key in the map represent the smallest key in the mapped segment,
+   * so that all elements in this segment are >= the mapping key,
+   * but are smaller then the next key in the map.
+   * Elements within a partition do not need to be ordered.
+   */
+  private final NavigableMap<K, PartitionEntry> partitions;
+  private LatchLock<?> latchLock;
+
+  /**
+   * The number of elements in the set.
+   */
+  protected volatile int size;
+
+  /**
+   * A single partition of the {@link PartitionedGSet}.
+   * Consists of a hash table {@link LightWeightGSet} and a lock, which
+   * controls access to this partition independently on the other ones.
+   */
+  private class PartitionEntry extends LightWeightGSet<K, E> {
+    private final LatchLock<?> partLock;
+
+    PartitionEntry(int defaultPartitionCapacity) {
+      super(defaultPartitionCapacity);
+      this.partLock = latchLock.clone();
+    }
+  }
+
+  public PartitionedGSet(final int capacity,
+      final Comparator<? super K> comparator,
+      final LatchLock<?> latchLock,
+      final E rootKey) {
+    this.partitions = new TreeMap<K, PartitionEntry>(comparator);
+    this.latchLock = latchLock;
+    addNewPartition(rootKey).put(rootKey);
+    this.size = 1;
+  }
+
+  /**
+   * Creates new empty partition.
+   * @param key
+   * @return
+   */
+  private PartitionEntry addNewPartition(final K key) {
+    PartitionEntry lastPart = null;
+    if(size > 0)
+      lastPart = partitions.lastEntry().getValue();
+
+    PartitionEntry newPart =
+        new PartitionEntry(DEFAULT_PARTITION_CAPACITY);
+    // assert size == 0 || newPart.partLock.isWriteTopLocked() :
+    //      "Must hold write Lock: key = " + key;
+    partitions.put(key, newPart);
+
+    LOG.debug("Total GSet size = {}", size);
+    LOG.debug("Number of partitions = {}", partitions.size());
+    LOG.debug("Previous partition size = {}",
+        lastPart == null ? 0 : lastPart.size());
+
+    return newPart;
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  protected PartitionEntry getPartition(final K key) {
+    Entry<K, PartitionEntry> partEntry = partitions.floorEntry(key);
+    if(partEntry == null) {
+      return null;
+    }
+    PartitionEntry part = partEntry.getValue();
+    if(part == null) {
+      throw new IllegalStateException("Null partition for key: " + key);
+    }
+    assert size == 0 || part.partLock.isReadTopLocked() ||
+        part.partLock.hasReadChildLock() : "Must hold read Lock: key = " + key;
+    return part;
+  }
+
+  @Override
+  public boolean contains(final K key) {
+    PartitionEntry part = getPartition(key);
+    if(part == null) {
+      return false;
+    }
+    return part.contains(key);
+  }
+
+  @Override
+  public E get(final K key) {
+    PartitionEntry part = getPartition(key);
+    if(part == null) {
+      return null;
+    }
+    LOG.debug("get key: {}", key);
+    // part.partLock.readLock();
+    return part.get(key);
+  }
+
+  @Override
+  public E put(final E element) {
+    K key = element;
+    PartitionEntry part = getPartition(key);
+    if(part == null) {
+      throw new HadoopIllegalArgumentException("Illegal key: " + key);
+    }
+    assert size == 0 || part.partLock.isWriteTopLocked() ||
+        part.partLock.hasWriteChildLock() :
+          "Must hold write Lock: key = " + key;
+    LOG.debug("put key: {}", key);
+    PartitionEntry newPart = addNewPartitionIfNeeded(part, key);
+    if(newPart != part) {
+      newPart.partLock.writeChildLock();
+      part = newPart;
+    }
+    E result = part.put(element);
+    if(result == null) {  // new element
+      size++;
+    }
+    return result;
+  }
+
+  private PartitionEntry addNewPartitionIfNeeded(
+      PartitionEntry curPart, K key) {
+    if(curPart.size() < DEFAULT_PARTITION_CAPACITY * 1.1
+        || curPart.contains(key)) {
+      return curPart;
+    }
+    return addNewPartition(key);
+  }
+
+  @Override
+  public E remove(final K key) {
+    PartitionEntry part = getPartition(key);
+    if(part == null) {
+      return null;
+    }
+    E result = part.remove(key);
+    if(result != null) {
+      size--;
+    }
+    return result;
+  }
+
+  @Override
+  public void clear() {
+    LOG.error("Total GSet size = {}", size);
+    LOG.error("Number of partitions = {}", partitions.size());
+    // assert latchLock.hasWriteTopLock() : "Must hold write topLock";
+    // SHV May need to clear all partitions?
+    partitions.clear();
+    size = 0;
+  }
+
+  @Override
+  public Collection<E> values() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public Iterator<E> iterator() {
+    return new EntryIterator();
+  }
+
+  /**
+   * Iterator over the elements in the set.
+   * Iterates first by keys, then inside the partition
+   * corresponding to the key.
+   *
+   * Modifications are tracked by the underlying collections. We allow
+   * modifying other partitions, while iterating through the current one.
+   */
+  private class EntryIterator implements Iterator<E> {
+    private final Iterator<K> keyIterator;
+    private Iterator<E> partitionIterator;
+
+    public EntryIterator() {
+      keyIterator = partitions.keySet().iterator();
+      K curKey = partitions.firstKey();
+      partitionIterator = getPartition(curKey).iterator();
+    }
+
+    @Override
+    public boolean hasNext() {
+      if(partitionIterator.hasNext()) {
+        return true;
+      }
+      return keyIterator.hasNext();
+    }
+
+    @Override
+    public E next() {
+      if(!partitionIterator.hasNext()) {
+        K curKey = keyIterator.next();
+        partitionIterator = getPartition(curKey).iterator();
+      }
+      return partitionIterator.next();
+    }
+  }
+
+  public void latchWriteLock(K[] keys) {
+    // getPartition(parent).partLock.writeChildLock();
+    LatchLock<?> pLock = null;
+    for(K key : keys) {
+      pLock = getPartition(key).partLock;
+      pLock.writeChildLock();
+    }
+    assert pLock != null : "pLock is null";
+    pLock.writeTopUnlock();
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
index da324fb..c8c6277 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
@@ -69,18 +69,18 @@ class FSDirMkdirOp {
         // create multiple inodes.
         fsn.checkFsObjectLimit();
 
-        // Ensure that the user can traversal the path by adding implicit
-        // u+wx permission to all ancestor directories.
-        INodesInPath existing =
-            createParentDirectories(fsd, iip, permissions, false);
-        if (existing != null) {
-          existing = createSingleDirectory(
-              fsd, existing, iip.getLastLocalName(), permissions);
+        // create all missing directories along the path,
+        // but don't add them to the INodeMap yet
+        permissions = addImplicitUwx(permissions, permissions); // SHV !!!
+        INode[] missing = createPathDirectories(fsd, iip, permissions);
+        iip = iip.getExistingINodes();
+        // switch the locks
+        fsd.getINodeMap().latchWriteLock(iip, missing);
+        // Add missing inodes to the INodeMap
+        for(INode dir : missing) {
+          iip = addSingleDirectory(fsd, iip, dir, permissions);
+          assert iip != null : "iip should not be null";
         }
-        if (existing == null) {
-          throw new IOException("Failed to create directory: " + src);
-        }
-        iip = existing;
       }
       return fsd.getAuditFileInfo(iip);
     } finally {
@@ -132,6 +132,7 @@ class FSDirMkdirOp {
     if (missing == 0) {  // full path exists, return parents.
       existing = iip.getParentINodesInPath();
     } else if (missing > 1) { // need to create at least one ancestor dir.
+      FSNamesystem.LOG.error("missing =  " + missing);
       // Ensure that the user can traversal the path by adding implicit
       // u+wx permission to all ancestor directories.
       PermissionStatus basePerm = inheritPerms
@@ -143,6 +144,13 @@ class FSDirMkdirOp {
       for (int i = existing.length(); existing != null && i <= last; i++) {
         byte[] component = iip.getPathComponent(i);
         existing = createSingleDirectory(fsd, existing, component, perm);
+        if(existing == null) {
+          FSNamesystem.LOG.error("unprotectedMkdir returned null for "
+              + iip.getPath() + " for " + new String(component) + " i = " + i);
+          // Somebody already created the parent. Recalculate existing
+          existing = INodesInPath.resolve(fsd.getRoot(), iip.getPathComponents());
+          i = existing.length() - 1;
+        }
       }
     }
     return existing;
@@ -228,5 +236,67 @@ class FSDirMkdirOp {
     }
     return iip;
   }
+
+  private static INode createDirectoryINode(FSDirectory fsd,
+      INodesInPath parent, byte[] name, PermissionStatus permission)
+      throws FileAlreadyExistsException {
+    assert fsd.hasReadLock();
+    assert parent.getLastINode() != null;
+    if (!parent.getLastINode().isDirectory()) {
+      throw new FileAlreadyExistsException("Parent path is not a directory: " +
+          parent.getPath() + " " + DFSUtil.bytes2String(name));
+    }
+    final INodeDirectory dir = new INodeDirectory(
+        fsd.allocateNewInodeId(), name, permission, now());
+    return dir;
+  }
+
+  private static INode[] createPathDirectories(FSDirectory fsd,
+      INodesInPath iip, PermissionStatus perm)
+      throws IOException {
+    assert fsd.hasWriteLock();
+    INodesInPath existing = iip.getExistingINodes();
+    assert existing != null : "existing should not be null";
+    int numMissing = iip.length() - existing.length();
+    if (numMissing == 0) {  // full path exists
+      return new INode[0];
+    }
+
+    // create the missing directories along the path
+    INode[] missing = new INode[numMissing];
+    final int last = iip.length();
+    for (int i = existing.length();  i < last; i++) {
+      byte[] component = iip.getPathComponent(i);
+      missing[i - existing.length()] =
+          createDirectoryINode(fsd, existing, component, perm);
+    }
+    return missing;
+  }
+
+  private static INodesInPath addSingleDirectory(FSDirectory fsd,
+      INodesInPath existing, INode dir, PermissionStatus perm)
+      throws IOException {
+    assert fsd.hasWriteLock();
+    INodesInPath iip = fsd.addLastINode(existing, dir, perm.getPermission(), true);
+    if (iip == null) {
+      FSNamesystem.LOG.debug("somebody already created {} on path {}", dir, existing.getPath());
+      final INodeDirectory parent = existing.getLastINode().asDirectory();
+      dir = parent.getChild(dir.getLocalNameBytes(), Snapshot.CURRENT_STATE_ID);
+      return INodesInPath.append(existing, dir, dir.getLocalNameBytes());
+    }
+    existing = iip;
+    assert dir.equals(existing.getLastINode()) : "dir is not the last INode";
+
+    // Directory creation also count towards FilesCreated
+    // to match count of FilesDeleted metric.
+    NameNode.getNameNodeMetrics().incrFilesCreated();
+
+    assert dir.getPermissionStatus().getGroupName() != null :
+      "GroupName is null for " + existing.getPath();
+    String cur = existing.getPath();
+    fsd.getEditLog().logMkDir(cur, dir);
+    NameNode.stateChangeLog.debug("mkdirs: created directory {}", cur);
+    return existing;
+  }
 }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 497aa84..b5e68a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -317,7 +317,7 @@ public class FSDirectory implements Closeable {
   FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
     this.inodeId = new INodeId();
     rootDir = createRoot(ns);
-    inodeMap = INodeMap.newInstance(rootDir);
+    inodeMap = INodeMap.newInstance(rootDir, ns);
     this.isPermissionEnabled = conf.getBoolean(
       DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
       DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index 86b4150..acc5a6d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -177,18 +177,23 @@ public class FSImage implements Closeable {
  
   void format(FSNamesystem fsn, String clusterId, boolean force)
       throws IOException {
-    long fileCount = fsn.getFilesTotal();
-    // Expect 1 file, which is the root inode
-    Preconditions.checkState(fileCount == 1,
-        "FSImage.format should be called with an uninitialized namesystem, has " +
-        fileCount + " files");
-    NamespaceInfo ns = NNStorage.newNamespaceInfo();
-    LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID());
-    ns.clusterID = clusterId;
-    
-    storage.format(ns);
-    editLog.formatNonFileJournals(ns, force);
-    saveFSImageInAllDirs(fsn, 0);
+    fsn.readLock();
+    try {
+      long fileCount = fsn.getFilesTotal();
+      // Expect 1 file, which is the root inode
+      Preconditions.checkState(fileCount == 1,
+          "FSImage.format should be called with an uninitialized namesystem, has " +
+              fileCount + " files");
+      NamespaceInfo ns = NNStorage.newNamespaceInfo();
+      LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID());
+      ns.clusterID = clusterId;
+
+      storage.format(ns);
+      editLog.formatNonFileJournals(ns, force);
+      saveFSImageInAllDirs(fsn, 0);
+    } finally {
+      fsn.readUnlock();
+    }
   }
   
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 095959b..d1fe412 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1752,7 +1752,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   public void readUnlock(String opName,
       Supplier<String> lockReportInfoSupplier) {
-    this.fsLock.readUnlock(opName, lockReportInfoSupplier);
+    this.fsLock.readUnlock(opName, lockReportInfoSupplier, true);
   }
 
   @Override
@@ -1785,7 +1785,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   @Override
   public boolean hasWriteLock() {
-    return this.fsLock.isWriteLockedByCurrentThread();
+    return this.fsLock.isWriteLockedByCurrentThread() ||
+        fsLock.haswWriteChildLock();
   }
   @Override
   public boolean hasReadLock() {
@@ -1800,6 +1801,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return this.fsLock.getWriteHoldCount();
   }
 
+  public FSNamesystemLock getFSLock() {
+    return this.fsLock;
+  }
+
   /** Lock the checkpoint lock */
   public void cpLock() {
     this.cpLock.lock();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
index b4f479f..f53f10d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -29,6 +32,7 @@ import java.util.function.Supplier;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.namenode.INodeMap.INodeMapLock;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.log.LogThrottlingHelper;
 import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
@@ -129,6 +133,32 @@ class FSNamesystemLock {
 
   private static final String OVERALL_METRIC_NAME = "Overall";
 
+  private final ThreadLocal<Collection<INodeMapLock>> partitionLocks =
+      new ThreadLocal<Collection<INodeMapLock>>() {
+        @Override
+        public Collection<INodeMapLock> initialValue() {
+          return new ArrayList<INodeMapLock>();
+        }
+      };
+
+  void addChildLock(INodeMapLock lock) {
+    partitionLocks.get().add(lock);
+  }
+
+  boolean removeChildLock(INodeMapLock lock) {
+    return partitionLocks.get().remove(lock);
+  }
+
+  boolean haswWriteChildLock() {
+    Iterator<INodeMapLock> iter = partitionLocks.get().iterator();
+    // FSNamesystem.LOG.debug("partitionLocks.size = {}", partitionLocks.get().size());
+    while(iter.hasNext()) {
+      if(iter.next().hasWriteChildLock())
+        return true;
+    }
+    return false;
+  }
+
   FSNamesystemLock(Configuration conf,
       MutableRatesWithAggregation detailedHoldTimeMetrics) {
     this(conf, detailedHoldTimeMetrics, new Timer());
@@ -180,11 +210,29 @@ class FSNamesystemLock {
 
   public void readUnlock(String opName,
       Supplier<String> lockReportInfoSupplier) {
+    readUnlock(opName, lockReportInfoSupplier, true);
+  }
+
+  public void readUnlock(String opName,
+      Supplier<String> lockReportInfoSupplier,
+      boolean unlockChildren) {
     final boolean needReport = coarseLock.getReadHoldCount() == 1;
     final long readLockIntervalNanos =
         timer.monotonicNowNanos() - readLockHeldTimeStampNanos.get();
     final long currentTimeMs = timer.now();
-    coarseLock.readLock().unlock();
+
+    if(getReadHoldCount() > 0) {  // Current thread holds the lock
+      // Unlock the top FSNamesystemLock
+      coarseLock.readLock().unlock();
+    }
+
+    if(unlockChildren) {  // Also unlock and remove children locks
+      Iterator<INodeMapLock> iter = partitionLocks.get().iterator();
+      while(iter.hasNext()) {
+        iter.next().readChildUnlock();
+        iter.remove();
+      }
+    }
 
     if (needReport) {
       addMetric(opName, readLockIntervalNanos, false);
@@ -252,7 +300,7 @@ class FSNamesystemLock {
    * FSNamesystemLock#writeUnlock(String, boolean, Supplier)}
    */
   public void writeUnlock() {
-    writeUnlock(OP_NAME_OTHER, false, null);
+    writeUnlock(OP_NAME_OTHER, false, null, true);
   }
 
   /**
@@ -262,7 +310,7 @@ class FSNamesystemLock {
    * @param opName Operation name.
    */
   public void writeUnlock(String opName) {
-    writeUnlock(opName, false, null);
+    writeUnlock(opName, false, null, true);
   }
 
   /**
@@ -274,7 +322,7 @@ class FSNamesystemLock {
    */
   public void writeUnlock(String opName,
       Supplier<String> lockReportInfoSupplier) {
-    writeUnlock(opName, false, lockReportInfoSupplier);
+    writeUnlock(opName, false, lockReportInfoSupplier, true);
   }
 
   /**
@@ -286,7 +334,7 @@ class FSNamesystemLock {
    * for long time will be logged in logs and metrics.
    */
   public void writeUnlock(String opName, boolean suppressWriteLockReport) {
-    writeUnlock(opName, suppressWriteLockReport, null);
+    writeUnlock(opName, suppressWriteLockReport, null, true);
   }
 
   /**
@@ -297,8 +345,9 @@ class FSNamesystemLock {
    * for long time will be logged in logs and metrics.
    * @param lockReportInfoSupplier The info shown in the lock report
    */
-  private void writeUnlock(String opName, boolean suppressWriteLockReport,
-      Supplier<String> lockReportInfoSupplier) {
+  public void writeUnlock(String opName, boolean suppressWriteLockReport,
+      Supplier<String> lockReportInfoSupplier,
+      boolean unlockChildren) {
     final boolean needReport = !suppressWriteLockReport && coarseLock
         .getWriteHoldCount() == 1 && coarseLock.isWriteLockedByCurrentThread();
     final long writeLockIntervalNanos =
@@ -329,7 +378,18 @@ class FSNamesystemLock {
       longestWriteLockHeldInfo = new LockHeldInfo();
     }
 
-    coarseLock.writeLock().unlock();
+    if(this.isWriteLockedByCurrentThread()) {  // Current thread holds the lock
+      // Unlock the top FSNamesystemLock
+      coarseLock.writeLock().unlock();
+    }
+
+    if(unlockChildren) {  // Unlock and remove children locks
+      Iterator<INodeMapLock> iter = partitionLocks.get().iterator();
+      while(iter.hasNext()) {
+        iter.next().writeChildUnlock();
+        iter.remove();
+      }
+    }
 
     if (needReport) {
       addMetric(opName, writeLockIntervalNanos, true);
@@ -355,7 +415,25 @@ class FSNamesystemLock {
   public int getWriteHoldCount() {
     return coarseLock.getWriteHoldCount();
   }
-  
+
+  /**
+   * Queries if the write lock is held by any thread.
+   * @return {@code true} if any thread holds the write lock and
+   *         {@code false} otherwise
+   */
+  public boolean isReadLocked() {
+    return coarseLock.getReadLockCount() > 0 || isWriteLocked();
+  }
+
+  /**
+   * Queries if the write lock is held by any thread.
+   * @return {@code true} if any thread holds the write lock and
+   *         {@code false} otherwise
+   */
+  public boolean isWriteLocked() {
+    return coarseLock.isWriteLocked();
+  }
+
   public boolean isWriteLockedByCurrentThread() {
     return coarseLock.isWriteLockedByCurrentThread();
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
index f35949f..88c3233 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
@@ -17,44 +17,139 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.util.Comparator;
 import java.util.Iterator;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.util.GSet;
+import org.apache.hadoop.util.LatchLock;
 import org.apache.hadoop.util.LightWeightGSet;
-
-import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.util.PartitionedGSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Storing all the {@link INode}s and maintaining the mapping between INode ID
  * and INode.  
  */
 public class INodeMap {
-  
-  static INodeMap newInstance(INodeDirectory rootDir) {
-    // Compute the map capacity by allocating 1% of total memory
-    int capacity = LightWeightGSet.computeCapacity(1, "INodeMap");
-    GSet<INode, INodeWithAdditionalFields> map =
-        new LightWeightGSet<>(capacity);
-    map.put(rootDir);
-    return new INodeMap(map);
+  public static class INodeIdComparator implements Comparator<INode> {
+    @Override
+    public int compare(INode i1, INode i2) {
+      if (i1 == null || i2 == null) {
+        throw new NullPointerException("Cannot compare null INodesl");
+      }
+      long id1 = i1.getId();
+      long id2 = i2.getId();
+      return id1 < id2 ? -1 : id1 == id2 ? 0 : 1;
+    }
+  }
+
+  public class INodeMapLock extends LatchLock<ReentrantReadWriteLock> {
+    Logger LOG = LoggerFactory.getLogger(INodeMapLock.class);
+
+    private ReentrantReadWriteLock childLock;
+
+    INodeMapLock() {
+      this(null);
+    }
+
+    private INodeMapLock(ReentrantReadWriteLock childLock) {
+      assert namesystem != null : "namesystem is null";
+      this.childLock = childLock;
+    }
+
+    @Override
+    protected boolean isReadTopLocked() {
+      return namesystem.getFSLock().isReadLocked();
+    }
+
+    @Override
+    protected boolean isWriteTopLocked() {
+      return namesystem.getFSLock().isWriteLocked();
+    }
+
+    @Override
+    protected void readTopdUnlock() {
+      namesystem.getFSLock().readUnlock("INodeMap", null, false);
+    }
+
+    @Override
+    protected void writeTopUnlock() {
+      namesystem.getFSLock().writeUnlock("INodeMap", false, null, false);
+    }
+
+    @Override
+    protected boolean hasReadChildLock() {
+      return this.childLock.getReadHoldCount() > 0 || hasWriteChildLock();
+    }
+
+    @Override
+    protected void readChildLock() {
+      // LOG.info("readChildLock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
+      this.childLock.readLock().lock();
+      namesystem.getFSLock().addChildLock(this);
+      // LOG.info("readChildLock: done");
+    }
+
+    @Override
+    protected void readChildUnlock() {
+      // LOG.info("readChildUnlock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
+      this.childLock.readLock().unlock();
+      // LOG.info("readChildUnlock: done");
+    }
+
+    @Override
+    protected boolean hasWriteChildLock() {
+      return this.childLock.isWriteLockedByCurrentThread();
+    }
+
+    @Override
+    protected void writeChildLock() {
+      // LOG.info("writeChildLock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
+      this.childLock.writeLock().lock();
+      namesystem.getFSLock().addChildLock(this);
+      // LOG.info("writeChildLock: done");
+    }
+
+    @Override
+    protected void writeChildUnlock() {
+      // LOG.info("writeChildUnlock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
+      this.childLock.writeLock().unlock();
+      // LOG.info("writeChildUnlock: done");
+    }
+
+    @Override
+    protected LatchLock<ReentrantReadWriteLock> clone() {
+      return new INodeMapLock(new ReentrantReadWriteLock(false)); // not fair
+    }
+  }
+
+  static INodeMap newInstance(INodeDirectory rootDir,
+      FSNamesystem ns) {
+    return new INodeMap(rootDir, ns);
   }
 
   /** Synchronized by external lock. */
   private final GSet<INode, INodeWithAdditionalFields> map;
-  
+  private FSNamesystem namesystem;
+
   public Iterator<INodeWithAdditionalFields> getMapIterator() {
     return map.iterator();
   }
 
-  private INodeMap(GSet<INode, INodeWithAdditionalFields> map) {
-    Preconditions.checkArgument(map != null);
-    this.map = map;
+  private INodeMap(INodeDirectory rootDir, FSNamesystem ns) {
+    this.namesystem = ns;
+    // Compute the map capacity by allocating 1% of total memory
+    int capacity = LightWeightGSet.computeCapacity(1, "INodeMap");
+    this.map = new PartitionedGSet<>(capacity, new INodeIdComparator(),
+            new INodeMapLock(), rootDir);
   }
-  
+
   /**
    * Add an {@link INode} into the {@link INode} map. Replace the old value if 
    * necessary. 
@@ -138,4 +233,27 @@ public class INodeMap {
   public void clear() {
     map.clear();
   }
+
+  public void latchWriteLock(INodesInPath iip, INode[] missing) {
+    assert namesystem.hasReadLock() : "must have namesysem lock";
+    assert iip.length() > 0 : "INodesInPath has 0 length";
+    if(!(map instanceof PartitionedGSet)) {
+      return;
+    }
+    // Locks partitions along the path starting from the first existing parent
+    // Locking is in the hierarchical order
+    INode[] allINodes = new INode[Math.min(1, iip.length()) + missing.length];
+    allINodes[0] = iip.getLastINode();
+    System.arraycopy(missing, 0, allINodes, 1, missing.length);
+    /*
+    // Locks all the partitions along the path in the hierarchical order
+    INode[] allINodes = new INode[iip.length() + missing.length];
+    INode[] existing = iip.getINodesArray();
+    System.arraycopy(existing, 0, allINodes, 0, existing.length);
+    System.arraycopy(missing, 0, allINodes, existing.length, missing.length);
+    */
+
+    ((PartitionedGSet<INode, INodeWithAdditionalFields>)
+        map).latchWriteLock(allINodes);
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 7aa8959..1df95fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -186,6 +186,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.Whitebox;
+import org.apache.hadoop.util.GSet;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
@@ -1990,6 +1991,7 @@ public class DFSTestUtil {
     GenericTestUtils.setLogLevel(NameNode.LOG, level);
     GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level);
     GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
+    GenericTestUtils.setLogLevel(GSet.LOG, level);
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
index b32f8fe..0b21032 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
@@ -1017,23 +1017,38 @@ public class TestINodeFile {
 
       final Path dir = new Path("/dir");
       hdfs.mkdirs(dir);
-      INodeDirectory dirNode = getDir(fsdir, dir);
-      INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
-      assertSame(dirNode, dirNodeFromNode);
+      cluster.getNamesystem().readLock();
+      try {
+        INodeDirectory dirNode = getDir(fsdir, dir);
+        INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
+        assertSame(dirNode, dirNodeFromNode);
+      } finally {
+        cluster.getNamesystem().readUnlock();
+      }
 
       // set quota to dir, which leads to node replacement
       hdfs.setQuota(dir, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
-      dirNode = getDir(fsdir, dir);
-      assertTrue(dirNode.isWithQuota());
-      // the inode in inodeMap should also be replaced
-      dirNodeFromNode = fsdir.getInode(dirNode.getId());
-      assertSame(dirNode, dirNodeFromNode);
+      cluster.getNamesystem().readLock();
+      try {
+        INodeDirectory dirNode = getDir(fsdir, dir);
+        assertTrue(dirNode.isWithQuota());
+        // the inode in inodeMap should also be replaced
+        INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
+        assertSame(dirNode, dirNodeFromNode);
+      } finally {
+        cluster.getNamesystem().readUnlock();
+      }
 
       hdfs.setQuota(dir, -1, -1);
-      dirNode = getDir(fsdir, dir);
-      // the inode in inodeMap should also be replaced
-      dirNodeFromNode = fsdir.getInode(dirNode.getId());
-      assertSame(dirNode, dirNodeFromNode);
+      cluster.getNamesystem().readLock();
+      try {
+        INodeDirectory dirNode = getDir(fsdir, dir);
+        // the inode in inodeMap should also be replaced
+        INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
+        assertSame(dirNode, dirNodeFromNode);
+      } finally {
+        cluster.getNamesystem().readUnlock();
+      }
     } finally {
       if (cluster != null) {
         cluster.shutdown();

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org