You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2014/10/22 22:39:02 UTC

git commit: HDFS-6877. Avoid calling checkDisk when an HDFS volume is removed during a write. (Lei Xu via Colin P. McCabe) (cherry picked from commit 7b0f9bb2583cd9b7274f1e31c173c1c6a7ce467b)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 c212347d3 -> caaf77876


HDFS-6877. Avoid calling checkDisk when an HDFS volume is removed during a write. (Lei Xu via Colin P. McCabe)
(cherry picked from commit 7b0f9bb2583cd9b7274f1e31c173c1c6a7ce467b)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/caaf7787
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/caaf7787
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/caaf7787

Branch: refs/heads/branch-2
Commit: caaf778768289f43e262e095d8a3152631f98a2a
Parents: c212347
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Wed Oct 22 13:38:26 2014 -0700
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Wed Oct 22 13:38:56 2014 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../hdfs/server/datanode/BlockReceiver.java     | 24 +++++-
 .../server/datanode/fsdataset/FsDatasetSpi.java |  3 +
 .../src/main/proto/datatransfer.proto           |  2 +-
 .../datanode/TestDataNodeHotSwapVolumes.java    | 91 ++++++++++++++++----
 5 files changed, 105 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/caaf7787/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 73838ef..ad4f90a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -31,6 +31,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7254. Add documentation for hot swaping DataNode drives (Lei Xu via
     Colin P. McCabe)
 
+    HDFS-6877. Avoid calling checkDisk when an HDFS volume is removed during a
+    write. (Lei Xu via Colin P. McCabe)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/caaf7787/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 4d1cc6c..3d497f5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
@@ -1229,7 +1230,28 @@ class BlockReceiver implements Closeable {
 
           if (lastPacketInBlock) {
             // Finalize the block and close the block file
-            finalizeBlock(startTime);
+            try {
+              finalizeBlock(startTime);
+            } catch (ReplicaNotFoundException e) {
+              // Verify that the exception is due to volume removal.
+              FsVolumeSpi volume;
+              synchronized (datanode.data) {
+                volume = datanode.data.getVolume(block);
+              }
+              if (volume == null) {
+                // ReplicaInfo has been removed due to the corresponding data
+                // volume has been removed. Don't need to check disk error.
+                LOG.info(myString
+                    + ": BlockReceiver is interrupted because the block pool "
+                    + block.getBlockPoolId() + " has been removed.", e);
+                sendAckUpstream(ack, expected, totalAckTimeNanos, 0,
+                    Status.OOB_INTERRUPTED);
+                running = false;
+                receiverThread.interrupt();
+                continue;
+              }
+              throw e;
+            }
           }
 
           sendAckUpstream(ack, expected, totalAckTimeNanos,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/caaf7787/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 3f1400d..52432ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -267,6 +267,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * The block size is what is in the parameter b and it must match the amount
    *  of data written
    * @throws IOException
+   * @throws ReplicaNotFoundException if the replica can not be found when the
+   * block is been finalized. For instance, the block resides on an HDFS volume
+   * that has been removed.
    */
   public void finalizeBlock(ExtendedBlock b) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/caaf7787/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
index fb774b7..50cc00d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
@@ -219,7 +219,7 @@ enum Status {
   CHECKSUM_OK = 6;
   ERROR_UNSUPPORTED = 7;
   OOB_RESTART = 8;            // Quick restart
-  OOB_RESERVED1 = 9;          // Reserved
+  OOB_INTERRUPTED = 9;        // Interrupted
   OOB_RESERVED2 = 10;         // Reserved
   OOB_RESERVED3 = 11;         // Reserved
   IN_PROGRESS = 12;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/caaf7787/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
index f6e984b..27cfc82 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.ReconfigurationException;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -50,6 +51,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
@@ -156,9 +158,12 @@ public class TestDataNodeHotSwapVolumes {
       throws IOException, TimeoutException, InterruptedException {
     int attempts = 50;  // Wait 5 seconds.
     while (attempts > 0) {
-      if (getNumReplicas(fs, file, blockIdx) == numReplicas) {
+      int actualReplicas = getNumReplicas(fs, file, blockIdx);
+      if (actualReplicas == numReplicas) {
         return;
       }
+      System.out.printf("Block %d of file %s has %d replicas (desired %d).\n",
+          blockIdx, file.toString(), actualReplicas, numReplicas);
       Thread.sleep(100);
       attempts--;
     }
@@ -167,9 +172,16 @@ public class TestDataNodeHotSwapVolumes {
   }
 
   /** Parses data dirs from DataNode's configuration. */
-  private static Collection<String> getDataDirs(DataNode datanode) {
-    return datanode.getConf().getTrimmedStringCollection(
-        DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+  private static List<String> getDataDirs(DataNode datanode) {
+    return new ArrayList<String>(datanode.getConf().getTrimmedStringCollection(
+        DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
+  }
+
+  /** Force the DataNode to report missing blocks immediately. */
+  private static void triggerDeleteReport(DataNode datanode)
+      throws IOException {
+    datanode.scheduleAllBlockReport(0);
+    DataNodeTestUtils.triggerDeletionReport(datanode);
   }
 
   @Test
@@ -274,7 +286,7 @@ public class TestDataNodeHotSwapVolumes {
   /**
    * Test adding one volume on a running MiniDFSCluster with only one NameNode.
    */
-  @Test
+  @Test(timeout=60000)
   public void testAddOneNewVolume()
       throws IOException, ReconfigurationException,
       InterruptedException, TimeoutException {
@@ -304,7 +316,7 @@ public class TestDataNodeHotSwapVolumes {
     verifyFileLength(cluster.getFileSystem(), testFile, numBlocks);
   }
 
-  @Test(timeout = 60000)
+  @Test(timeout=60000)
   public void testAddVolumesDuringWrite()
       throws IOException, InterruptedException, TimeoutException,
       ReconfigurationException {
@@ -336,7 +348,7 @@ public class TestDataNodeHotSwapVolumes {
     assertEquals(expectedNumBlocks, actualNumBlocks);
   }
 
-  @Test
+  @Test(timeout=60000)
   public void testAddVolumesToFederationNN()
       throws IOException, TimeoutException, InterruptedException,
       ReconfigurationException {
@@ -371,7 +383,7 @@ public class TestDataNodeHotSwapVolumes {
         Collections.frequency(actualNumBlocks.get(0), 0));
   }
 
-  @Test
+  @Test(timeout=60000)
   public void testRemoveOneVolume()
       throws ReconfigurationException, InterruptedException, TimeoutException,
       IOException {
@@ -410,12 +422,13 @@ public class TestDataNodeHotSwapVolumes {
     assertEquals(10 / 2 + 6, blocksForVolume1.getNumberOfBlocks());
   }
 
-  @Test
+  @Test(timeout=60000)
   public void testReplicatingAfterRemoveVolume()
       throws InterruptedException, TimeoutException, IOException,
       ReconfigurationException {
     startDFSCluster(1, 2);
-    final DistributedFileSystem fs = cluster.getFileSystem();
+
+    final FileSystem fs = cluster.getFileSystem();
     final short replFactor = 2;
     Path testFile = new Path("/test");
     createFile(testFile, 4, replFactor);
@@ -428,14 +441,9 @@ public class TestDataNodeHotSwapVolumes {
     assertFileLocksReleased(
       new ArrayList<String>(oldDirs).subList(1, oldDirs.size()));
 
-    // Force DataNode to report missing blocks.
-    dn.scheduleAllBlockReport(0);
-    DataNodeTestUtils.triggerDeletionReport(dn);
+    triggerDeleteReport(dn);
 
-    // The 2nd block only has 1 replica due to the removed data volume.
     waitReplication(fs, testFile, 1, 1);
-
-    // Wait NameNode to replica missing blocks.
     DFSTestUtil.waitReplication(fs, testFile, replFactor);
   }
 
@@ -478,4 +486,55 @@ public class TestDataNodeHotSwapVolumes {
       }
     }
   }
+
+  @Test(timeout=180000)
+  public void testRemoveVolumeBeingWritten()
+      throws InterruptedException, TimeoutException, ReconfigurationException,
+      IOException {
+    // test against removing volumes on the different DataNode on the pipeline.
+    for (int i = 0; i < 3; i++) {
+      testRemoveVolumeBeingWrittenForDatanode(i);
+    }
+  }
+
+  /**
+   * Test the case that remove a data volume on a particular DataNode when the
+   * volume is actively being written.
+   * @param dataNodeIdx the index of the DataNode to remove a volume.
+   */
+  private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx)
+      throws IOException, ReconfigurationException, TimeoutException,
+      InterruptedException {
+    // Starts DFS cluster with 3 DataNodes to form a pipeline.
+    startDFSCluster(1, 3);
+
+    final short REPLICATION = 3;
+    final DataNode dn = cluster.getDataNodes().get(dataNodeIdx);
+    final FileSystem fs = cluster.getFileSystem();
+    final Path testFile = new Path("/test");
+
+    FSDataOutputStream out = fs.create(testFile, REPLICATION);
+
+    Random rb = new Random(0);
+    byte[] writeBuf = new byte[BLOCK_SIZE / 2];  // half of the block.
+    rb.nextBytes(writeBuf);
+    out.write(writeBuf);
+    out.hflush();
+
+    List<String> oldDirs = getDataDirs(dn);
+    String newDirs = oldDirs.get(1);  // Remove the first volume.
+    dn.reconfigurePropertyImpl(
+        DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
+
+    rb.nextBytes(writeBuf);
+    out.write(writeBuf);
+    out.hflush();
+    out.close();
+
+    // Verify the file has sufficient replications.
+    DFSTestUtil.waitReplication(fs, testFile, REPLICATION);
+    // Read the content back
+    byte[] content = DFSTestUtil.readFileBuffer(fs, testFile);
+    assertEquals(BLOCK_SIZE, content.length);
+  }
 }