You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by le...@apache.org on 2017/09/29 18:03:37 UTC

hadoop git commit: HDFS-12453. TestDataNodeHotSwapVolumes fails in trunk Jenkins runs. (Lei (Eddy) Xu)

Repository: hadoop
Updated Branches:
  refs/heads/branch-3.0 7a9479597 -> 337506190


HDFS-12453. TestDataNodeHotSwapVolumes fails in trunk Jenkins runs. (Lei (Eddy) Xu)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/33750619
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/33750619
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/33750619

Branch: refs/heads/branch-3.0
Commit: 337506190b559bc712b18a606b68a18677e90f7f
Parents: 7a94795
Author: Lei Xu <le...@apache.org>
Authored: Fri Sep 29 10:46:17 2017 -0700
Committer: Lei Xu <le...@apache.org>
Committed: Fri Sep 29 10:47:37 2017 -0700

----------------------------------------------------------------------
 .../datanode/TestDataNodeHotSwapVolumes.java    | 129 ++++++++++++-------
 1 file changed, 83 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/33750619/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
index 5d4ac1e..df5e297 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.BlockMissingException;
+import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -36,6 +37,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -46,6 +49,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.junit.After;
@@ -68,6 +72,7 @@ import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -83,6 +88,7 @@ import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -775,12 +781,11 @@ public class TestDataNodeHotSwapVolumes {
   private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx)
       throws IOException, ReconfigurationException, TimeoutException,
       InterruptedException, BrokenBarrierException {
-    // Starts DFS cluster with 3 DataNodes to form a pipeline.
-    startDFSCluster(1, 3);
+    startDFSCluster(1, 4);
 
     final short REPLICATION = 3;
-    final DataNode dn = cluster.getDataNodes().get(dataNodeIdx);
-    final FileSystem fs = cluster.getFileSystem();
+    final DistributedFileSystem fs = cluster.getFileSystem();
+    final DFSClient client = fs.getClient();
     final Path testFile = new Path("/test");
     FSDataOutputStream out = fs.create(testFile, REPLICATION);
 
@@ -790,54 +795,93 @@ public class TestDataNodeHotSwapVolumes {
     out.write(writeBuf);
     out.hflush();
 
-    // Make FsDatasetSpi#finalizeBlock a time-consuming operation. So if the
-    // BlockReceiver releases volume reference before finalizeBlock(), the blocks
-    // on the volume will be removed, and finalizeBlock() throws IOE.
-    final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data;
-    dn.data = Mockito.spy(data);
-    doAnswer(new Answer<Object>() {
-          public Object answer(InvocationOnMock invocation)
-              throws IOException, InterruptedException {
+    BlockLocation[] blocks = fs.getFileBlockLocations(testFile, 0, BLOCK_SIZE);
+    String[] dataNodeNames = blocks[0].getNames();
+    String dataNodeName = dataNodeNames[dataNodeIdx];
+    int xferPort = Integer.parseInt(dataNodeName.split(":")[1]);
+    DataNode dn = null;
+    for (DataNode dataNode : cluster.getDataNodes()) {
+      if (dataNode.getXferPort() == xferPort) {
+        dn = dataNode;
+        break;
+      }
+    }
+    assertNotNull(dn);
+
+    final CyclicBarrier barrier = new CyclicBarrier(4);
+    final AtomicBoolean done = new AtomicBoolean(false);
+    DataNodeFaultInjector newInjector = new DataNodeFaultInjector() {
+      public void logDelaySendingAckToUpstream(
+          final String upstreamAddr, final long delayMs) throws IOException {
+        try {
+          // Make all streams which hold the volume references to wait the
+          // reconfiguration thread to start.
+          // It should only block IO during the period of reconfiguration
+          // task running.
+          if (!done.get()) {
+            barrier.await();
+            // Add delays to allow the reconfiguration thread starts before
+            // IO finish.
             Thread.sleep(1000);
-            // Bypass the argument to FsDatasetImpl#finalizeBlock to verify that
-            // the block is not removed, since the volume reference should not
-            // be released at this point.
-            data.finalizeBlock((ExtendedBlock) invocation.getArguments()[0],
-              (boolean) invocation.getArguments()[1]);
-            return null;
           }
-        }).when(dn.data).finalizeBlock(any(ExtendedBlock.class),
-            Mockito.anyBoolean());
-
-    final CyclicBarrier barrier = new CyclicBarrier(2);
+        } catch (InterruptedException | BrokenBarrierException e) {
+          throw new IOException(e);
+        }
+      }
+    };
+    DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
 
-    List<String> oldDirs = getDataDirs(dn);
-    final String newDirs = oldDirs.get(1);  // Remove the first volume.
-    final List<Exception> exceptions = new ArrayList<>();
-    Thread reconfigThread = new Thread() {
-      public void run() {
+    try {
+      DataNodeFaultInjector.set(newInjector);
+
+      List<String> oldDirs = getDataDirs(dn);
+      LocatedBlocks lbs = client.getLocatedBlocks("/test", 0);
+      LocatedBlock block = lbs.get(0);
+      FsVolumeImpl volume =
+          (FsVolumeImpl) dn.getFSDataset().getVolume(block.getBlock());
+      final String newDirs = oldDirs.stream()
+          .filter((d) -> !d.contains(volume.getStorageLocation().toString()))
+          .collect(Collectors.joining(","));
+      final List<IOException> exceptions = new ArrayList<>();
+      final DataNode dataNode = dn;
+      final CyclicBarrier reconfigBarrier = new CyclicBarrier(2);
+
+      Thread reconfigThread = new Thread(() -> {
         try {
+          reconfigBarrier.await();
+
+          // Wake up writing threads on the pipeline to finish the block.
           barrier.await();
+
           assertThat(
               "DN did not update its own config",
-              dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDirs),
-              is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
+              dataNode.reconfigurePropertyImpl(
+                  DFS_DATANODE_DATA_DIR_KEY, newDirs),
+              is(dataNode.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
+          done.set(true);
         } catch (ReconfigurationException |
             InterruptedException |
             BrokenBarrierException e) {
-          exceptions.add(e);
+          exceptions.add(new IOException(e));
         }
-      }
-    };
-    reconfigThread.start();
+      });
+      reconfigThread.start();
 
-    barrier.await();
-    rb.nextBytes(writeBuf);
-    out.write(writeBuf);
-    out.hflush();
-    out.close();
+      // Write more data to make sure the stream threads wait on the barrier.
+      rb.nextBytes(writeBuf);
+      out.write(writeBuf);
+      reconfigBarrier.await();
+      out.hflush();
+      out.close();
+
+      reconfigThread.join();
 
-    reconfigThread.join();
+      if (!exceptions.isEmpty()) {
+        throw MultipleIOException.createIOException(exceptions);
+      }
+    } finally {
+      DataNodeFaultInjector.set(oldInjector);
+    }
 
     // Verify if the data directory reconfigure was successful
     FsDatasetSpi<? extends FsVolumeSpi> fsDatasetSpi = dn.getFSDataset();
@@ -851,19 +895,12 @@ public class TestDataNodeHotSwapVolumes {
           1, fsVolumeReferences.size());
     }
 
-    // Add a new DataNode to help with the pipeline recover.
-    cluster.startDataNodes(conf, 1, true, null, null, null);
-
     // Verify the file has sufficient replications.
     DFSTestUtil.waitReplication(fs, testFile, REPLICATION);
     // Read the content back
     byte[] content = DFSTestUtil.readFileBuffer(fs, testFile);
     assertEquals(BLOCK_SIZE, content.length);
 
-    if (!exceptions.isEmpty()) {
-      throw new IOException(exceptions.get(0).getCause());
-    }
-
     // Write more files to make sure that the DataNode that has removed volume
     // is still alive to receive data.
     for (int i = 0; i < 10; i++) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org