You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2014/07/26 02:02:01 UTC
svn commit: r1613539 - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project: ./ hadoop-hdfs/
hadoop-hdfs/src/main/java/
hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/
hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanod...
Author: cmccabe
Date: Sat Jul 26 00:02:01 2014
New Revision: 1613539
URL: http://svn.apache.org/r1613539
Log:
HDFS-6750. The DataNode should use its shared memory segment to mark short-circuit replicas that have been unlinked as stale (cmccabe)
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/ (props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/ (props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ (props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project:r1613537
Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1613537
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1613539&r1=1613538&r2=1613539&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sat Jul 26 00:02:01 2014
@@ -66,6 +66,9 @@ Release 2.6.0 - UNRELEASED
DFSOutputStream#close gives up its attempt to contact the namenode
(mitdesai21 via cmccabe)
+ HDFS-6750. The DataNode should use its shared memory segment to mark
+ short-circuit replicas that have been unlinked as stale (cmccabe)
+
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1613537
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java?rev=1613539&r1=1613538&r2=1613539&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java Sat Jul 26 00:02:01 2014
@@ -74,7 +74,7 @@ import com.google.common.collect.HashMul
* DN also marks the block's slots as "unanchorable" to prevent additional
* clients from initiating these operations in the future.
*
- * The counterpart fo this class on the client is {@link DfsClientShmManager}.
+ * The counterpart of this class on the client is {@link DfsClientShmManager}.
*/
public class ShortCircuitRegistry {
public static final Log LOG = LogFactory.getLog(ShortCircuitRegistry.class);
@@ -217,7 +217,32 @@ public class ShortCircuitRegistry {
}
return allowMunlock;
}
-
+
+ /**
+ * Invalidate any slot associated with a blockId that we are invalidating
+ * (deleting) from this DataNode. When a slot is invalid, the DFSClient will
+ * not use the corresponding replica for new read or mmap operations (although
+ * existing, ongoing read or mmap operations will complete.)
+ *
+ * @param blockId The block ID.
+ */
+ public synchronized void processBlockInvalidation(ExtendedBlockId blockId) {
+ if (!enabled) return;
+ final Set<Slot> affectedSlots = slots.get(blockId);
+ if (!affectedSlots.isEmpty()) {
+ final StringBuilder bld = new StringBuilder();
+ String prefix = "";
+ bld.append("Block ").append(blockId).append(" has been invalidated. ").
+ append("Marking short-circuit slots as invalid: ");
+ for (Slot slot : affectedSlots) {
+ slot.makeInvalid();
+ bld.append(prefix).append(slot.toString());
+ prefix = ", ";
+ }
+ LOG.info(bld.toString());
+ }
+ }
+
public static class NewShmInfo implements Closeable {
public final ShmId shmId;
public final FileInputStream stream;
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1613539&r1=1613538&r2=1613539&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Sat Jul 26 00:02:01 2014
@@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -1232,8 +1233,15 @@ class FsDatasetImpl implements FsDataset
}
volumeMap.remove(bpid, invalidBlks[i]);
}
+
+ // If a DFSClient has the replica in its cache of short-circuit file
+ // descriptors (and the client is using ShortCircuitShm), invalidate it.
+ datanode.getShortCircuitRegistry().processBlockInvalidation(
+ new ExtendedBlockId(invalidBlks[i].getBlockId(), bpid));
+
// If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
+
// Delete the block asynchronously to make sure we can do it fast enough.
// It's ok to unlink the block file before the uncache operation
// finishes.
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java?rev=1613539&r1=1613538&r2=1613539&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java Sat Jul 26 00:02:01 2014
@@ -32,11 +32,16 @@ import com.google.common.base.Preconditi
* DfsClientShm is a subclass of ShortCircuitShm which is used by the
* DfsClient.
* When the UNIX domain socket associated with this shared memory segment
- * closes unexpectedly, we mark the slots inside this segment as stale.
- * ShortCircuitReplica objects that contain stale slots are themselves stale,
+ * closes unexpectedly, we mark the slots inside this segment as disconnected.
+ * ShortCircuitReplica objects that contain disconnected slots are stale,
* and will not be used to service new reads or mmap operations.
* However, in-progress read or mmap operations will continue to proceed.
* Once the last slot is deallocated, the segment can be safely munmapped.
+ *
+ * Slots may also become stale because the associated replica has been deleted
+ * on the DataNode. In this case, the DataNode will clear the 'valid' bit.
+ * The client will then see these slots as stale (see
+ * #{ShortCircuitReplica#isStale}).
*/
public class DfsClientShm extends ShortCircuitShm
implements DomainSocketWatcher.Handler {
@@ -58,7 +63,7 @@ public class DfsClientShm extends ShortC
*
* {@link DfsClientShm#handle} sets this to true.
*/
- private boolean stale = false;
+ private boolean disconnected = false;
DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager,
DomainPeer peer) throws IOException {
@@ -76,14 +81,14 @@ public class DfsClientShm extends ShortC
}
/**
- * Determine if the shared memory segment is stale.
+ * Determine if the shared memory segment is disconnected from the DataNode.
*
* This must be called with the DfsClientShmManager lock held.
*
* @return True if the shared memory segment is stale.
*/
- public synchronized boolean isStale() {
- return stale;
+ public synchronized boolean isDisconnected() {
+ return disconnected;
}
/**
@@ -97,8 +102,8 @@ public class DfsClientShm extends ShortC
public boolean handle(DomainSocket sock) {
manager.unregisterShm(getShmId());
synchronized (this) {
- Preconditions.checkState(!stale);
- stale = true;
+ Preconditions.checkState(!disconnected);
+ disconnected = true;
boolean hadSlots = false;
for (Iterator<Slot> iter = slotIterator(); iter.hasNext(); ) {
Slot slot = iter.next();
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java?rev=1613539&r1=1613538&r2=1613539&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java Sat Jul 26 00:02:01 2014
@@ -271,12 +271,12 @@ public class DfsClientShmManager impleme
loading = false;
finishedLoading.signalAll();
}
- if (shm.isStale()) {
+ if (shm.isDisconnected()) {
// If the peer closed immediately after the shared memory segment
// was created, the DomainSocketWatcher callback might already have
- // fired and marked the shm as stale. In this case, we obviously
- // don't want to add the SharedMemorySegment to our list of valid
- // not-full segments.
+ // fired and marked the shm as disconnected. In this case, we
+ // obviously don't want to add the SharedMemorySegment to our list
+ // of valid not-full segments.
if (LOG.isDebugEnabled()) {
LOG.debug(this + ": the UNIX domain socket associated with " +
"this short-circuit memory closed before we could make " +
@@ -299,7 +299,7 @@ public class DfsClientShmManager impleme
void freeSlot(Slot slot) {
DfsClientShm shm = (DfsClientShm)slot.getShm();
shm.unregisterSlot(slot.getSlotIdx());
- if (shm.isStale()) {
+ if (shm.isDisconnected()) {
// Stale shared memory segments should not be tracked here.
Preconditions.checkState(!full.containsKey(shm.getShmId()));
Preconditions.checkState(!notFull.containsKey(shm.getShmId()));
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java?rev=1613539&r1=1613538&r2=1613539&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java Sat Jul 26 00:02:01 2014
@@ -306,6 +306,13 @@ public class ShortCircuitShm {
(slotAddress - baseAddress) / BYTES_PER_SLOT);
}
+ /**
+ * Clear the slot.
+ */
+ void clear() {
+ unsafe.putLongVolatile(null, this.slotAddress, 0);
+ }
+
private boolean isSet(long flag) {
long prev = unsafe.getLongVolatile(null, this.slotAddress);
return (prev & flag) != 0;
@@ -535,6 +542,7 @@ public class ShortCircuitShm {
}
allocatedSlots.set(idx, true);
Slot slot = new Slot(calculateSlotAddress(idx), blockId);
+ slot.clear();
slot.makeValid();
slots[idx] = slot;
if (LOG.isTraceEnabled()) {
@@ -583,7 +591,7 @@ public class ShortCircuitShm {
Slot slot = new Slot(calculateSlotAddress(slotIdx), blockId);
if (!slot.isValid()) {
throw new InvalidRequestException(this + ": slot " + slotIdx +
- " has not been allocated.");
+ " is not marked as valid.");
}
slots[slotIdx] = slot;
allocatedSlots.set(slotIdx, true);
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java?rev=1613539&r1=1613538&r2=1613539&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java Sat Jul 26 00:02:01 2014
@@ -23,6 +23,7 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY;
import static org.hamcrest.CoreMatchers.equalTo;
import java.io.DataOutputStream;
@@ -30,7 +31,9 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import org.apache.commons.lang.mutable.MutableBoolean;
@@ -462,6 +465,7 @@ public class TestShortCircuitCache {
}
}, 10, 60000);
cluster.shutdown();
+ sockDir.close();
}
@Test(timeout=60000)
@@ -516,4 +520,98 @@ public class TestShortCircuitCache {
});
cluster.shutdown();
}
+
+ /**
+ * Test unlinking a file whose blocks we are caching in the DFSClient.
+ * The DataNode will notify the DFSClient that the replica is stale via the
+ * ShortCircuitShm.
+ */
+ @Test(timeout=60000)
+ public void testUnlinkingReplicasInFileDescriptorCache() throws Exception {
+ BlockReaderTestUtil.enableShortCircuitShmTracing();
+ TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+ Configuration conf = createShortCircuitConf(
+ "testUnlinkingReplicasInFileDescriptorCache", sockDir);
+ // We don't want the CacheCleaner to time out short-circuit shared memory
+ // segments during the test, so set the timeout really high.
+ conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
+ 1000000000L);
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ final ShortCircuitCache cache =
+ fs.getClient().getClientContext().getShortCircuitCache();
+ cache.getDfsClientShmManager().visit(new Visitor() {
+ @Override
+ public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
+ throws IOException {
+ // The ClientShmManager starts off empty.
+ Assert.assertEquals(0, info.size());
+ }
+ });
+ final Path TEST_PATH = new Path("/test_file");
+ final int TEST_FILE_LEN = 8193;
+ final int SEED = 0xFADE0;
+ DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LEN,
+ (short)1, SEED);
+ byte contents[] = DFSTestUtil.readFileBuffer(fs, TEST_PATH);
+ byte expected[] = DFSTestUtil.
+ calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
+ Assert.assertTrue(Arrays.equals(contents, expected));
+ // Loading this file brought the ShortCircuitReplica into our local
+ // replica cache.
+ final DatanodeInfo datanode =
+ new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId());
+ cache.getDfsClientShmManager().visit(new Visitor() {
+ @Override
+ public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
+ throws IOException {
+ Assert.assertTrue(info.get(datanode).full.isEmpty());
+ Assert.assertFalse(info.get(datanode).disabled);
+ Assert.assertEquals(1, info.get(datanode).notFull.values().size());
+ DfsClientShm shm =
+ info.get(datanode).notFull.values().iterator().next();
+ Assert.assertFalse(shm.isDisconnected());
+ }
+ });
+ // Remove the file whose blocks we just read.
+ fs.delete(TEST_PATH, false);
+
+ // Wait for the replica to be purged from the DFSClient's cache.
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ MutableBoolean done = new MutableBoolean(true);
+ @Override
+ public Boolean get() {
+ try {
+ done.setValue(true);
+ cache.getDfsClientShmManager().visit(new Visitor() {
+ @Override
+ public void visit(HashMap<DatanodeInfo,
+ PerDatanodeVisitorInfo> info) throws IOException {
+ Assert.assertTrue(info.get(datanode).full.isEmpty());
+ Assert.assertFalse(info.get(datanode).disabled);
+ Assert.assertEquals(1,
+ info.get(datanode).notFull.values().size());
+ DfsClientShm shm = info.get(datanode).notFull.values().
+ iterator().next();
+ // Check that all slots have been invalidated.
+ for (Iterator<Slot> iter = shm.slotIterator();
+ iter.hasNext(); ) {
+ Slot slot = iter.next();
+ if (slot.isValid()) {
+ done.setValue(false);
+ }
+ }
+ }
+ });
+ } catch (IOException e) {
+ LOG.error("error running visitor", e);
+ }
+ return done.booleanValue();
+ }
+ }, 10, 60000);
+ cluster.shutdown();
+ sockDir.close();
+ }
}