You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/04/06 21:23:08 UTC

[38/50] [abbrv] hadoop git commit: HDFS-7996. After swapping a volume, BlockReceiver reports ReplicaNotFoundException (Lei (Eddy) Xu via Colin P. McCabe)

HDFS-7996. After swapping a volume, BlockReceiver reports ReplicaNotFoundException (Lei (Eddy) Xu via Colin P. McCabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5864e886
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5864e886
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5864e886

Branch: refs/heads/YARN-2928
Commit: 5864e88653b444d44a808003ee5668d71d5be020
Parents: fb13303
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Fri Apr 3 14:19:46 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Mon Apr 6 12:08:16 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 ++
 .../hdfs/server/datanode/BlockReceiver.java     | 51 ++++++++++++--------
 .../datanode/TestDataNodeHotSwapVolumes.java    | 30 +++++++++++-
 3 files changed, 64 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5864e886/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index ae56f1b..2d399a4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1373,6 +1373,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-8039. Fix TestDebugAdmin#testRecoverLease and
     testVerifyBlockChecksumCommand on Windows. (Xiaoyu Yao via cnauroth)
 
+    HDFS-7996. After swapping a volume, BlockReceiver reports
+    ReplicaNotFoundException (Lei (Eddy) Xu via Colin P. McCabe)
+
     BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
 
       HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5864e886/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 4e8ce94..58cb8b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -281,7 +281,7 @@ class BlockReceiver implements Closeable {
   }
 
   /**
-   * close files.
+   * close files and release volume reference.
    */
   @Override
   public void close() throws IOException {
@@ -798,17 +798,20 @@ class BlockReceiver implements Closeable {
       // then finalize block or convert temporary to RBW.
       // For client-writes, the block is finalized in the PacketResponder.
       if (isDatanode || isTransfer) {
-        // close the block/crc files
-        close();
-        block.setNumBytes(replicaInfo.getNumBytes());
-
-        if (stage == BlockConstructionStage.TRANSFER_RBW) {
-          // for TRANSFER_RBW, convert temporary to RBW
-          datanode.data.convertTemporaryToRbw(block);
-        } else {
-          // for isDatnode or TRANSFER_FINALIZED
-          // Finalize the block.
-          datanode.data.finalizeBlock(block);
+        // Hold a volume reference to finalize block.
+        try (ReplicaHandler handler = claimReplicaHandler()) {
+          // close the block/crc files
+          close();
+          block.setNumBytes(replicaInfo.getNumBytes());
+
+          if (stage == BlockConstructionStage.TRANSFER_RBW) {
+            // for TRANSFER_RBW, convert temporary to RBW
+            datanode.data.convertTemporaryToRbw(block);
+          } else {
+            // for isDatnode or TRANSFER_FINALIZED
+            // Finalize the block.
+            datanode.data.finalizeBlock(block);
+          }
         }
         datanode.metrics.incrBlocksWritten();
       }
@@ -980,7 +983,14 @@ class BlockReceiver implements Closeable {
     }
     return partialCrc;
   }
-  
+
+  /** The caller claims the ownership of the replica handler. */
+  private ReplicaHandler claimReplicaHandler() {
+    ReplicaHandler handler = replicaHandler;
+    replicaHandler = null;
+    return handler;
+  }
+
   private static enum PacketResponderType {
     NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
   }
@@ -1280,12 +1290,15 @@ class BlockReceiver implements Closeable {
      * @param startTime time when BlockReceiver started receiving the block
      */
     private void finalizeBlock(long startTime) throws IOException {
-      BlockReceiver.this.close();
-      final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime()
-          : 0;
-      block.setNumBytes(replicaInfo.getNumBytes());
-      datanode.data.finalizeBlock(block);
-      
+      long endTime = 0;
+      // Hold a volume reference to finalize block.
+      try (ReplicaHandler handler = BlockReceiver.this.claimReplicaHandler()) {
+        BlockReceiver.this.close();
+        endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
+        block.setNumBytes(replicaInfo.getNumBytes());
+        datanode.data.finalizeBlock(block);
+      }
+
       if (pinning) {
         datanode.data.setPinning(block);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5864e886/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
index f5772e3..668084b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -64,6 +65,8 @@ import java.util.concurrent.TimeoutException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 import static org.hamcrest.CoreMatchers.anyOf;
@@ -77,6 +80,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.timeout;
 
 public class TestDataNodeHotSwapVolumes {
@@ -577,6 +581,7 @@ public class TestDataNodeHotSwapVolumes {
     final DataNode dn = cluster.getDataNodes().get(dataNodeIdx);
     final FileSystem fs = cluster.getFileSystem();
     final Path testFile = new Path("/test");
+    final long lastTimeDiskErrorCheck = dn.getLastDiskErrorCheck();
 
     FSDataOutputStream out = fs.create(testFile, REPLICATION);
 
@@ -586,6 +591,23 @@ public class TestDataNodeHotSwapVolumes {
     out.write(writeBuf);
     out.hflush();
 
+    // Make FsDatasetSpi#finalizeBlock a time-consuming operation. So if the
+    // BlockReceiver releases volume reference before finalizeBlock(), the blocks
+    // on the volume will be removed, and finalizeBlock() throws IOE.
+    final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data;
+    dn.data = Mockito.spy(data);
+    doAnswer(new Answer<Object>() {
+          public Object answer(InvocationOnMock invocation)
+              throws IOException, InterruptedException {
+            Thread.sleep(1000);
+            // Bypass the argument to FsDatasetImpl#finalizeBlock to verify that
+            // the block is not removed, since the volume reference should not
+            // be released at this point.
+            data.finalizeBlock((ExtendedBlock) invocation.getArguments()[0]);
+            return null;
+          }
+        }).when(dn.data).finalizeBlock(any(ExtendedBlock.class));
+
     final CyclicBarrier barrier = new CyclicBarrier(2);
 
     List<String> oldDirs = getDataDirs(dn);
@@ -612,13 +634,19 @@ public class TestDataNodeHotSwapVolumes {
     out.hflush();
     out.close();
 
+    reconfigThread.join();
+
     // Verify the file has sufficient replications.
     DFSTestUtil.waitReplication(fs, testFile, REPLICATION);
     // Read the content back
     byte[] content = DFSTestUtil.readFileBuffer(fs, testFile);
     assertEquals(BLOCK_SIZE, content.length);
 
-    reconfigThread.join();
+    // If an IOException thrown from BlockReceiver#run, it triggers
+    // DataNode#checkDiskError(). So we can test whether checkDiskError() is called,
+    // to see whether there is IOException in BlockReceiver#run().
+    assertEquals(lastTimeDiskErrorCheck, dn.getLastDiskErrorCheck());
+
     if (!exceptions.isEmpty()) {
       throw new IOException(exceptions.get(0).getCause());
     }