You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2021/05/10 18:18:05 UTC
[hadoop] 01/02: INodeMap with PartitionedGSet and per-partition
locking.
This is an automated email from the ASF dual-hosted git repository.
shv pushed a commit to branch fgl
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 1f1a0c45fe44c3da0db96999978417c4ff397a93
Author: Konstantin V Shvachko <sh...@apache.org>
AuthorDate: Fri May 7 17:47:37 2021 -0700
INodeMap with PartitionedGSet and per-partition locking.
---
.../java/org/apache/hadoop/util/LatchLock.java | 64 +++++
.../org/apache/hadoop/util/PartitionedGSet.java | 263 +++++++++++++++++++++
.../hadoop/hdfs/server/namenode/FSDirMkdirOp.java | 92 ++++++-
.../hadoop/hdfs/server/namenode/FSDirectory.java | 2 +-
.../hadoop/hdfs/server/namenode/FSImage.java | 29 ++-
.../hadoop/hdfs/server/namenode/FSNamesystem.java | 9 +-
.../hdfs/server/namenode/FSNamesystemLock.java | 96 +++++++-
.../hadoop/hdfs/server/namenode/INodeMap.java | 148 ++++++++++--
.../java/org/apache/hadoop/hdfs/DFSTestUtil.java | 2 +
.../hadoop/hdfs/server/namenode/TestINodeFile.java | 39 ++-
10 files changed, 682 insertions(+), 62 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java
new file mode 100644
index 0000000..41e33da
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+/**
+ * LatchLock controls two hierarchical Read/Write locks:
+ * the topLock and the childLock.
+ * Typically an operation starts with the topLock already acquired.
+ * To acquire child lock LatchLock will
+ * first acquire the childLock, and then release the topLock.
+ */
+public abstract class LatchLock<C> {
+ // Interfaces methods to be defined for subclasses
+ /** @return true topLock is locked for read by any thread */
+ protected abstract boolean isReadTopLocked();
+ /** @return true topLock is locked for write by any thread */
+ protected abstract boolean isWriteTopLocked();
+ protected abstract void readTopdUnlock();
+ protected abstract void writeTopUnlock();
+
+ protected abstract boolean hasReadChildLock();
+ protected abstract void readChildLock();
+ protected abstract void readChildUnlock();
+
+ protected abstract boolean hasWriteChildLock();
+ protected abstract void writeChildLock();
+ protected abstract void writeChildUnlock();
+
+ protected abstract LatchLock<C> clone();
+
+ // Public APIs to use with the class
+ public void readLock() {
+ readChildLock();
+ readTopdUnlock();
+ }
+
+ public void readUnlock() {
+ readChildUnlock();
+ }
+
+ public void writeLock() {
+ writeChildLock();
+ writeTopUnlock();
+ }
+
+ public void writeUnlock() {
+ writeChildUnlock();
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
new file mode 100644
index 0000000..4b0cdc9
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
+
+/**
+ * An implementation of {@link GSet}, which splits a collection of elements
+ * into partitions each corresponding to a range of keys.
+ *
+ * This class does not support null element.
+ *
+ * This class is backed up by LatchLock for hierarchical synchronization.
+ *
+ * @param <K> Key type for looking up the elements
+ * @param <E> Element type, which must be
+ * (1) a subclass of K, and
+ * (2) implementing {@link LinkedElement} interface.
+ */
+@InterfaceAudience.Private
+public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
+
+ private static final int DEFAULT_PARTITION_CAPACITY = 2027;
+
+ /**
+ * An ordered map of contiguous segments of elements.
+ * Each key in the map represent the smallest key in the mapped segment,
+ * so that all elements in this segment are >= the mapping key,
+ * but are smaller then the next key in the map.
+ * Elements within a partition do not need to be ordered.
+ */
+ private final NavigableMap<K, PartitionEntry> partitions;
+ private LatchLock<?> latchLock;
+
+ /**
+ * The number of elements in the set.
+ */
+ protected volatile int size;
+
+ /**
+ * A single partition of the {@link PartitionedGSet}.
+ * Consists of a hash table {@link LightWeightGSet} and a lock, which
+ * controls access to this partition independently on the other ones.
+ */
+ private class PartitionEntry extends LightWeightGSet<K, E> {
+ private final LatchLock<?> partLock;
+
+ PartitionEntry(int defaultPartitionCapacity) {
+ super(defaultPartitionCapacity);
+ this.partLock = latchLock.clone();
+ }
+ }
+
+ public PartitionedGSet(final int capacity,
+ final Comparator<? super K> comparator,
+ final LatchLock<?> latchLock,
+ final E rootKey) {
+ this.partitions = new TreeMap<K, PartitionEntry>(comparator);
+ this.latchLock = latchLock;
+ addNewPartition(rootKey).put(rootKey);
+ this.size = 1;
+ }
+
+ /**
+ * Creates new empty partition.
+ * @param key
+ * @return
+ */
+ private PartitionEntry addNewPartition(final K key) {
+ PartitionEntry lastPart = null;
+ if(size > 0)
+ lastPart = partitions.lastEntry().getValue();
+
+ PartitionEntry newPart =
+ new PartitionEntry(DEFAULT_PARTITION_CAPACITY);
+ // assert size == 0 || newPart.partLock.isWriteTopLocked() :
+ // "Must hold write Lock: key = " + key;
+ partitions.put(key, newPart);
+
+ LOG.debug("Total GSet size = {}", size);
+ LOG.debug("Number of partitions = {}", partitions.size());
+ LOG.debug("Previous partition size = {}",
+ lastPart == null ? 0 : lastPart.size());
+
+ return newPart;
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ protected PartitionEntry getPartition(final K key) {
+ Entry<K, PartitionEntry> partEntry = partitions.floorEntry(key);
+ if(partEntry == null) {
+ return null;
+ }
+ PartitionEntry part = partEntry.getValue();
+ if(part == null) {
+ throw new IllegalStateException("Null partition for key: " + key);
+ }
+ assert size == 0 || part.partLock.isReadTopLocked() ||
+ part.partLock.hasReadChildLock() : "Must hold read Lock: key = " + key;
+ return part;
+ }
+
+ @Override
+ public boolean contains(final K key) {
+ PartitionEntry part = getPartition(key);
+ if(part == null) {
+ return false;
+ }
+ return part.contains(key);
+ }
+
+ @Override
+ public E get(final K key) {
+ PartitionEntry part = getPartition(key);
+ if(part == null) {
+ return null;
+ }
+ LOG.debug("get key: {}", key);
+ // part.partLock.readLock();
+ return part.get(key);
+ }
+
+ @Override
+ public E put(final E element) {
+ K key = element;
+ PartitionEntry part = getPartition(key);
+ if(part == null) {
+ throw new HadoopIllegalArgumentException("Illegal key: " + key);
+ }
+ assert size == 0 || part.partLock.isWriteTopLocked() ||
+ part.partLock.hasWriteChildLock() :
+ "Must hold write Lock: key = " + key;
+ LOG.debug("put key: {}", key);
+ PartitionEntry newPart = addNewPartitionIfNeeded(part, key);
+ if(newPart != part) {
+ newPart.partLock.writeChildLock();
+ part = newPart;
+ }
+ E result = part.put(element);
+ if(result == null) { // new element
+ size++;
+ }
+ return result;
+ }
+
+ private PartitionEntry addNewPartitionIfNeeded(
+ PartitionEntry curPart, K key) {
+ if(curPart.size() < DEFAULT_PARTITION_CAPACITY * 1.1
+ || curPart.contains(key)) {
+ return curPart;
+ }
+ return addNewPartition(key);
+ }
+
+ @Override
+ public E remove(final K key) {
+ PartitionEntry part = getPartition(key);
+ if(part == null) {
+ return null;
+ }
+ E result = part.remove(key);
+ if(result != null) {
+ size--;
+ }
+ return result;
+ }
+
+ @Override
+ public void clear() {
+ LOG.error("Total GSet size = {}", size);
+ LOG.error("Number of partitions = {}", partitions.size());
+ // assert latchLock.hasWriteTopLock() : "Must hold write topLock";
+ // SHV May need to clear all partitions?
+ partitions.clear();
+ size = 0;
+ }
+
+ @Override
+ public Collection<E> values() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ return new EntryIterator();
+ }
+
+ /**
+ * Iterator over the elements in the set.
+ * Iterates first by keys, then inside the partition
+ * corresponding to the key.
+ *
+ * Modifications are tracked by the underlying collections. We allow
+ * modifying other partitions, while iterating through the current one.
+ */
+ private class EntryIterator implements Iterator<E> {
+ private final Iterator<K> keyIterator;
+ private Iterator<E> partitionIterator;
+
+ public EntryIterator() {
+ keyIterator = partitions.keySet().iterator();
+ K curKey = partitions.firstKey();
+ partitionIterator = getPartition(curKey).iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if(partitionIterator.hasNext()) {
+ return true;
+ }
+ return keyIterator.hasNext();
+ }
+
+ @Override
+ public E next() {
+ if(!partitionIterator.hasNext()) {
+ K curKey = keyIterator.next();
+ partitionIterator = getPartition(curKey).iterator();
+ }
+ return partitionIterator.next();
+ }
+ }
+
+ public void latchWriteLock(K[] keys) {
+ // getPartition(parent).partLock.writeChildLock();
+ LatchLock<?> pLock = null;
+ for(K key : keys) {
+ pLock = getPartition(key).partLock;
+ pLock.writeChildLock();
+ }
+ assert pLock != null : "pLock is null";
+ pLock.writeTopUnlock();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
index da324fb..c8c6277 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java
@@ -69,18 +69,18 @@ class FSDirMkdirOp {
// create multiple inodes.
fsn.checkFsObjectLimit();
- // Ensure that the user can traversal the path by adding implicit
- // u+wx permission to all ancestor directories.
- INodesInPath existing =
- createParentDirectories(fsd, iip, permissions, false);
- if (existing != null) {
- existing = createSingleDirectory(
- fsd, existing, iip.getLastLocalName(), permissions);
+ // create all missing directories along the path,
+ // but don't add them to the INodeMap yet
+ permissions = addImplicitUwx(permissions, permissions); // SHV !!!
+ INode[] missing = createPathDirectories(fsd, iip, permissions);
+ iip = iip.getExistingINodes();
+ // switch the locks
+ fsd.getINodeMap().latchWriteLock(iip, missing);
+ // Add missing inodes to the INodeMap
+ for(INode dir : missing) {
+ iip = addSingleDirectory(fsd, iip, dir, permissions);
+ assert iip != null : "iip should not be null";
}
- if (existing == null) {
- throw new IOException("Failed to create directory: " + src);
- }
- iip = existing;
}
return fsd.getAuditFileInfo(iip);
} finally {
@@ -132,6 +132,7 @@ class FSDirMkdirOp {
if (missing == 0) { // full path exists, return parents.
existing = iip.getParentINodesInPath();
} else if (missing > 1) { // need to create at least one ancestor dir.
+ FSNamesystem.LOG.error("missing = " + missing);
// Ensure that the user can traversal the path by adding implicit
// u+wx permission to all ancestor directories.
PermissionStatus basePerm = inheritPerms
@@ -143,6 +144,13 @@ class FSDirMkdirOp {
for (int i = existing.length(); existing != null && i <= last; i++) {
byte[] component = iip.getPathComponent(i);
existing = createSingleDirectory(fsd, existing, component, perm);
+ if(existing == null) {
+ FSNamesystem.LOG.error("unprotectedMkdir returned null for "
+ + iip.getPath() + " for " + new String(component) + " i = " + i);
+ // Somebody already created the parent. Recalculate existing
+ existing = INodesInPath.resolve(fsd.getRoot(), iip.getPathComponents());
+ i = existing.length() - 1;
+ }
}
}
return existing;
@@ -228,5 +236,67 @@ class FSDirMkdirOp {
}
return iip;
}
+
+ private static INode createDirectoryINode(FSDirectory fsd,
+ INodesInPath parent, byte[] name, PermissionStatus permission)
+ throws FileAlreadyExistsException {
+ assert fsd.hasReadLock();
+ assert parent.getLastINode() != null;
+ if (!parent.getLastINode().isDirectory()) {
+ throw new FileAlreadyExistsException("Parent path is not a directory: " +
+ parent.getPath() + " " + DFSUtil.bytes2String(name));
+ }
+ final INodeDirectory dir = new INodeDirectory(
+ fsd.allocateNewInodeId(), name, permission, now());
+ return dir;
+ }
+
+ private static INode[] createPathDirectories(FSDirectory fsd,
+ INodesInPath iip, PermissionStatus perm)
+ throws IOException {
+ assert fsd.hasWriteLock();
+ INodesInPath existing = iip.getExistingINodes();
+ assert existing != null : "existing should not be null";
+ int numMissing = iip.length() - existing.length();
+ if (numMissing == 0) { // full path exists
+ return new INode[0];
+ }
+
+ // create the missing directories along the path
+ INode[] missing = new INode[numMissing];
+ final int last = iip.length();
+ for (int i = existing.length(); i < last; i++) {
+ byte[] component = iip.getPathComponent(i);
+ missing[i - existing.length()] =
+ createDirectoryINode(fsd, existing, component, perm);
+ }
+ return missing;
+ }
+
+ private static INodesInPath addSingleDirectory(FSDirectory fsd,
+ INodesInPath existing, INode dir, PermissionStatus perm)
+ throws IOException {
+ assert fsd.hasWriteLock();
+ INodesInPath iip = fsd.addLastINode(existing, dir, perm.getPermission(), true);
+ if (iip == null) {
+ FSNamesystem.LOG.debug("somebody already created {} on path {}", dir, existing.getPath());
+ final INodeDirectory parent = existing.getLastINode().asDirectory();
+ dir = parent.getChild(dir.getLocalNameBytes(), Snapshot.CURRENT_STATE_ID);
+ return INodesInPath.append(existing, dir, dir.getLocalNameBytes());
+ }
+ existing = iip;
+ assert dir.equals(existing.getLastINode()) : "dir is not the last INode";
+
+ // Directory creation also count towards FilesCreated
+ // to match count of FilesDeleted metric.
+ NameNode.getNameNodeMetrics().incrFilesCreated();
+
+ assert dir.getPermissionStatus().getGroupName() != null :
+ "GroupName is null for " + existing.getPath();
+ String cur = existing.getPath();
+ fsd.getEditLog().logMkDir(cur, dir);
+ NameNode.stateChangeLog.debug("mkdirs: created directory {}", cur);
+ return existing;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 497aa84..b5e68a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -317,7 +317,7 @@ public class FSDirectory implements Closeable {
FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
this.inodeId = new INodeId();
rootDir = createRoot(ns);
- inodeMap = INodeMap.newInstance(rootDir);
+ inodeMap = INodeMap.newInstance(rootDir, ns);
this.isPermissionEnabled = conf.getBoolean(
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index 86b4150..acc5a6d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -177,18 +177,23 @@ public class FSImage implements Closeable {
void format(FSNamesystem fsn, String clusterId, boolean force)
throws IOException {
- long fileCount = fsn.getFilesTotal();
- // Expect 1 file, which is the root inode
- Preconditions.checkState(fileCount == 1,
- "FSImage.format should be called with an uninitialized namesystem, has " +
- fileCount + " files");
- NamespaceInfo ns = NNStorage.newNamespaceInfo();
- LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID());
- ns.clusterID = clusterId;
-
- storage.format(ns);
- editLog.formatNonFileJournals(ns, force);
- saveFSImageInAllDirs(fsn, 0);
+ fsn.readLock();
+ try {
+ long fileCount = fsn.getFilesTotal();
+ // Expect 1 file, which is the root inode
+ Preconditions.checkState(fileCount == 1,
+ "FSImage.format should be called with an uninitialized namesystem, has " +
+ fileCount + " files");
+ NamespaceInfo ns = NNStorage.newNamespaceInfo();
+ LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID());
+ ns.clusterID = clusterId;
+
+ storage.format(ns);
+ editLog.formatNonFileJournals(ns, force);
+ saveFSImageInAllDirs(fsn, 0);
+ } finally {
+ fsn.readUnlock();
+ }
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 095959b..d1fe412 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1752,7 +1752,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
public void readUnlock(String opName,
Supplier<String> lockReportInfoSupplier) {
- this.fsLock.readUnlock(opName, lockReportInfoSupplier);
+ this.fsLock.readUnlock(opName, lockReportInfoSupplier, true);
}
@Override
@@ -1785,7 +1785,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
@Override
public boolean hasWriteLock() {
- return this.fsLock.isWriteLockedByCurrentThread();
+ return this.fsLock.isWriteLockedByCurrentThread() ||
+ fsLock.haswWriteChildLock();
}
@Override
public boolean hasReadLock() {
@@ -1800,6 +1801,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return this.fsLock.getWriteHoldCount();
}
+ public FSNamesystemLock getFSLock() {
+ return this.fsLock;
+ }
+
/** Lock the checkpoint lock */
public void cpLock() {
this.cpLock.lock();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
index b4f479f..f53f10d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.hdfs.server.namenode;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -29,6 +32,7 @@ import java.util.function.Supplier;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.namenode.INodeMap.INodeMapLock;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.log.LogThrottlingHelper;
import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
@@ -129,6 +133,32 @@ class FSNamesystemLock {
private static final String OVERALL_METRIC_NAME = "Overall";
+ private final ThreadLocal<Collection<INodeMapLock>> partitionLocks =
+ new ThreadLocal<Collection<INodeMapLock>>() {
+ @Override
+ public Collection<INodeMapLock> initialValue() {
+ return new ArrayList<INodeMapLock>();
+ }
+ };
+
+ void addChildLock(INodeMapLock lock) {
+ partitionLocks.get().add(lock);
+ }
+
+ boolean removeChildLock(INodeMapLock lock) {
+ return partitionLocks.get().remove(lock);
+ }
+
+ boolean haswWriteChildLock() {
+ Iterator<INodeMapLock> iter = partitionLocks.get().iterator();
+ // FSNamesystem.LOG.debug("partitionLocks.size = {}", partitionLocks.get().size());
+ while(iter.hasNext()) {
+ if(iter.next().hasWriteChildLock())
+ return true;
+ }
+ return false;
+ }
+
FSNamesystemLock(Configuration conf,
MutableRatesWithAggregation detailedHoldTimeMetrics) {
this(conf, detailedHoldTimeMetrics, new Timer());
@@ -180,11 +210,29 @@ class FSNamesystemLock {
public void readUnlock(String opName,
Supplier<String> lockReportInfoSupplier) {
+ readUnlock(opName, lockReportInfoSupplier, true);
+ }
+
+ public void readUnlock(String opName,
+ Supplier<String> lockReportInfoSupplier,
+ boolean unlockChildren) {
final boolean needReport = coarseLock.getReadHoldCount() == 1;
final long readLockIntervalNanos =
timer.monotonicNowNanos() - readLockHeldTimeStampNanos.get();
final long currentTimeMs = timer.now();
- coarseLock.readLock().unlock();
+
+ if(getReadHoldCount() > 0) { // Current thread holds the lock
+ // Unlock the top FSNamesystemLock
+ coarseLock.readLock().unlock();
+ }
+
+ if(unlockChildren) { // Also unlock and remove children locks
+ Iterator<INodeMapLock> iter = partitionLocks.get().iterator();
+ while(iter.hasNext()) {
+ iter.next().readChildUnlock();
+ iter.remove();
+ }
+ }
if (needReport) {
addMetric(opName, readLockIntervalNanos, false);
@@ -252,7 +300,7 @@ class FSNamesystemLock {
* FSNamesystemLock#writeUnlock(String, boolean, Supplier)}
*/
public void writeUnlock() {
- writeUnlock(OP_NAME_OTHER, false, null);
+ writeUnlock(OP_NAME_OTHER, false, null, true);
}
/**
@@ -262,7 +310,7 @@ class FSNamesystemLock {
* @param opName Operation name.
*/
public void writeUnlock(String opName) {
- writeUnlock(opName, false, null);
+ writeUnlock(opName, false, null, true);
}
/**
@@ -274,7 +322,7 @@ class FSNamesystemLock {
*/
public void writeUnlock(String opName,
Supplier<String> lockReportInfoSupplier) {
- writeUnlock(opName, false, lockReportInfoSupplier);
+ writeUnlock(opName, false, lockReportInfoSupplier, true);
}
/**
@@ -286,7 +334,7 @@ class FSNamesystemLock {
* for long time will be logged in logs and metrics.
*/
public void writeUnlock(String opName, boolean suppressWriteLockReport) {
- writeUnlock(opName, suppressWriteLockReport, null);
+ writeUnlock(opName, suppressWriteLockReport, null, true);
}
/**
@@ -297,8 +345,9 @@ class FSNamesystemLock {
* for long time will be logged in logs and metrics.
* @param lockReportInfoSupplier The info shown in the lock report
*/
- private void writeUnlock(String opName, boolean suppressWriteLockReport,
- Supplier<String> lockReportInfoSupplier) {
+ public void writeUnlock(String opName, boolean suppressWriteLockReport,
+ Supplier<String> lockReportInfoSupplier,
+ boolean unlockChildren) {
final boolean needReport = !suppressWriteLockReport && coarseLock
.getWriteHoldCount() == 1 && coarseLock.isWriteLockedByCurrentThread();
final long writeLockIntervalNanos =
@@ -329,7 +378,18 @@ class FSNamesystemLock {
longestWriteLockHeldInfo = new LockHeldInfo();
}
- coarseLock.writeLock().unlock();
+ if(this.isWriteLockedByCurrentThread()) { // Current thread holds the lock
+ // Unlock the top FSNamesystemLock
+ coarseLock.writeLock().unlock();
+ }
+
+ if(unlockChildren) { // Unlock and remove children locks
+ Iterator<INodeMapLock> iter = partitionLocks.get().iterator();
+ while(iter.hasNext()) {
+ iter.next().writeChildUnlock();
+ iter.remove();
+ }
+ }
if (needReport) {
addMetric(opName, writeLockIntervalNanos, true);
@@ -355,7 +415,25 @@ class FSNamesystemLock {
public int getWriteHoldCount() {
return coarseLock.getWriteHoldCount();
}
-
+
+ /**
+ * Queries if the write lock is held by any thread.
+ * @return {@code true} if any thread holds the write lock and
+ * {@code false} otherwise
+ */
+ public boolean isReadLocked() {
+ return coarseLock.getReadLockCount() > 0 || isWriteLocked();
+ }
+
+ /**
+ * Queries if the write lock is held by any thread.
+ * @return {@code true} if any thread holds the write lock and
+ * {@code false} otherwise
+ */
+ public boolean isWriteLocked() {
+ return coarseLock.isWriteLocked();
+ }
+
public boolean isWriteLockedByCurrentThread() {
return coarseLock.isWriteLockedByCurrentThread();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
index f35949f..88c3233 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
@@ -17,44 +17,139 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import java.util.Comparator;
import java.util.Iterator;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.util.GSet;
+import org.apache.hadoop.util.LatchLock;
import org.apache.hadoop.util.LightWeightGSet;
-
-import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.util.PartitionedGSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Storing all the {@link INode}s and maintaining the mapping between INode ID
* and INode.
*/
public class INodeMap {
-
- static INodeMap newInstance(INodeDirectory rootDir) {
- // Compute the map capacity by allocating 1% of total memory
- int capacity = LightWeightGSet.computeCapacity(1, "INodeMap");
- GSet<INode, INodeWithAdditionalFields> map =
- new LightWeightGSet<>(capacity);
- map.put(rootDir);
- return new INodeMap(map);
+ public static class INodeIdComparator implements Comparator<INode> {
+ @Override
+ public int compare(INode i1, INode i2) {
+ if (i1 == null || i2 == null) {
+ throw new NullPointerException("Cannot compare null INodesl");
+ }
+ long id1 = i1.getId();
+ long id2 = i2.getId();
+ return id1 < id2 ? -1 : id1 == id2 ? 0 : 1;
+ }
+ }
+
+ public class INodeMapLock extends LatchLock<ReentrantReadWriteLock> {
+ Logger LOG = LoggerFactory.getLogger(INodeMapLock.class);
+
+ private ReentrantReadWriteLock childLock;
+
+ INodeMapLock() {
+ this(null);
+ }
+
+ private INodeMapLock(ReentrantReadWriteLock childLock) {
+ assert namesystem != null : "namesystem is null";
+ this.childLock = childLock;
+ }
+
+ @Override
+ protected boolean isReadTopLocked() {
+ return namesystem.getFSLock().isReadLocked();
+ }
+
+ @Override
+ protected boolean isWriteTopLocked() {
+ return namesystem.getFSLock().isWriteLocked();
+ }
+
+ @Override
+ protected void readTopdUnlock() {
+ namesystem.getFSLock().readUnlock("INodeMap", null, false);
+ }
+
+ @Override
+ protected void writeTopUnlock() {
+ namesystem.getFSLock().writeUnlock("INodeMap", false, null, false);
+ }
+
+ @Override
+ protected boolean hasReadChildLock() {
+ return this.childLock.getReadHoldCount() > 0 || hasWriteChildLock();
+ }
+
+ @Override
+ protected void readChildLock() {
+ // LOG.info("readChildLock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
+ this.childLock.readLock().lock();
+ namesystem.getFSLock().addChildLock(this);
+ // LOG.info("readChildLock: done");
+ }
+
+ @Override
+ protected void readChildUnlock() {
+ // LOG.info("readChildUnlock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
+ this.childLock.readLock().unlock();
+ // LOG.info("readChildUnlock: done");
+ }
+
+ @Override
+ protected boolean hasWriteChildLock() {
+ return this.childLock.isWriteLockedByCurrentThread();
+ }
+
+ @Override
+ protected void writeChildLock() {
+ // LOG.info("writeChildLock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
+ this.childLock.writeLock().lock();
+ namesystem.getFSLock().addChildLock(this);
+ // LOG.info("writeChildLock: done");
+ }
+
+ @Override
+ protected void writeChildUnlock() {
+ // LOG.info("writeChildUnlock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName());
+ this.childLock.writeLock().unlock();
+ // LOG.info("writeChildUnlock: done");
+ }
+
+ @Override
+ protected LatchLock<ReentrantReadWriteLock> clone() {
+ return new INodeMapLock(new ReentrantReadWriteLock(false)); // not fair
+ }
+ }
+
+ static INodeMap newInstance(INodeDirectory rootDir,
+ FSNamesystem ns) {
+ return new INodeMap(rootDir, ns);
}
/** Synchronized by external lock. */
private final GSet<INode, INodeWithAdditionalFields> map;
-
+ private FSNamesystem namesystem;
+
public Iterator<INodeWithAdditionalFields> getMapIterator() {
return map.iterator();
}
- private INodeMap(GSet<INode, INodeWithAdditionalFields> map) {
- Preconditions.checkArgument(map != null);
- this.map = map;
+ private INodeMap(INodeDirectory rootDir, FSNamesystem ns) {
+ this.namesystem = ns;
+ // Compute the map capacity by allocating 1% of total memory
+ int capacity = LightWeightGSet.computeCapacity(1, "INodeMap");
+ this.map = new PartitionedGSet<>(capacity, new INodeIdComparator(),
+ new INodeMapLock(), rootDir);
}
-
+
/**
* Add an {@link INode} into the {@link INode} map. Replace the old value if
* necessary.
@@ -138,4 +233,27 @@ public class INodeMap {
public void clear() {
map.clear();
}
+
+ public void latchWriteLock(INodesInPath iip, INode[] missing) {
+ assert namesystem.hasReadLock() : "must have namesysem lock";
+ assert iip.length() > 0 : "INodesInPath has 0 length";
+ if(!(map instanceof PartitionedGSet)) {
+ return;
+ }
+ // Locks partitions along the path starting from the first existing parent
+ // Locking is in the hierarchical order
+ INode[] allINodes = new INode[Math.min(1, iip.length()) + missing.length];
+ allINodes[0] = iip.getLastINode();
+ System.arraycopy(missing, 0, allINodes, 1, missing.length);
+ /*
+ // Locks all the partitions along the path in the hierarchical order
+ INode[] allINodes = new INode[iip.length() + missing.length];
+ INode[] existing = iip.getINodesArray();
+ System.arraycopy(existing, 0, allINodes, 0, existing.length);
+ System.arraycopy(missing, 0, allINodes, existing.length, missing.length);
+ */
+
+ ((PartitionedGSet<INode, INodeWithAdditionalFields>)
+ map).latchWriteLock(allINodes);
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 7aa8959..1df95fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -186,6 +186,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.Whitebox;
+import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
@@ -1990,6 +1991,7 @@ public class DFSTestUtil {
GenericTestUtils.setLogLevel(NameNode.LOG, level);
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level);
GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
+ GenericTestUtils.setLogLevel(GSet.LOG, level);
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
index b32f8fe..0b21032 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
@@ -1017,23 +1017,38 @@ public class TestINodeFile {
final Path dir = new Path("/dir");
hdfs.mkdirs(dir);
- INodeDirectory dirNode = getDir(fsdir, dir);
- INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
- assertSame(dirNode, dirNodeFromNode);
+ cluster.getNamesystem().readLock();
+ try {
+ INodeDirectory dirNode = getDir(fsdir, dir);
+ INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
+ assertSame(dirNode, dirNodeFromNode);
+ } finally {
+ cluster.getNamesystem().readUnlock();
+ }
// set quota to dir, which leads to node replacement
hdfs.setQuota(dir, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
- dirNode = getDir(fsdir, dir);
- assertTrue(dirNode.isWithQuota());
- // the inode in inodeMap should also be replaced
- dirNodeFromNode = fsdir.getInode(dirNode.getId());
- assertSame(dirNode, dirNodeFromNode);
+ cluster.getNamesystem().readLock();
+ try {
+ INodeDirectory dirNode = getDir(fsdir, dir);
+ assertTrue(dirNode.isWithQuota());
+ // the inode in inodeMap should also be replaced
+ INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
+ assertSame(dirNode, dirNodeFromNode);
+ } finally {
+ cluster.getNamesystem().readUnlock();
+ }
hdfs.setQuota(dir, -1, -1);
- dirNode = getDir(fsdir, dir);
- // the inode in inodeMap should also be replaced
- dirNodeFromNode = fsdir.getInode(dirNode.getId());
- assertSame(dirNode, dirNodeFromNode);
+ cluster.getNamesystem().readLock();
+ try {
+ INodeDirectory dirNode = getDir(fsdir, dir);
+ // the inode in inodeMap should also be replaced
+ INode dirNodeFromNode = fsdir.getInode(dirNode.getId());
+ assertSame(dirNode, dirNodeFromNode);
+ } finally {
+ cluster.getNamesystem().readUnlock();
+ }
} finally {
if (cluster != null) {
cluster.shutdown();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org