You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2020/06/12 21:19:54 UTC

[hadoop] branch branch-2.10 updated: HDFS-12453. TestDataNodeHotSwapVolumes fails in trunk Jenkins runs. Contributed by Jim Brennan and Lei Xu.

This is an automated email from the ASF dual-hosted git repository.

kihwal pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 1710975  HDFS-12453. TestDataNodeHotSwapVolumes fails in trunk Jenkins runs. Contributed by Jim Brennan and Lei Xu.
1710975 is described below

commit 17109758dd6f9a86b226c025ee20b8e2abc9d366
Author: Kihwal Lee <ki...@apache.org>
AuthorDate: Fri Jun 12 16:19:36 2020 -0500

    HDFS-12453. TestDataNodeHotSwapVolumes fails in trunk Jenkins runs. Contributed by Jim Brennan and Lei Xu.
---
 .../datanode/TestDataNodeHotSwapVolumes.java       | 149 ++++++++++++++-------
 1 file changed, 97 insertions(+), 52 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
index 93c1242..e98b90a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.BlockMissingException;
+import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -36,6 +37,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -46,6 +49,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.junit.After;
@@ -83,6 +87,7 @@ import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -777,12 +782,11 @@ public class TestDataNodeHotSwapVolumes {
   private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx)
       throws IOException, ReconfigurationException, TimeoutException,
       InterruptedException, BrokenBarrierException {
-    // Starts DFS cluster with 3 DataNodes to form a pipeline.
-    startDFSCluster(1, 3);
+    startDFSCluster(1, 4);
 
     final short REPLICATION = 3;
-    final DataNode dn = cluster.getDataNodes().get(dataNodeIdx);
-    final FileSystem fs = cluster.getFileSystem();
+    final DistributedFileSystem fs = cluster.getFileSystem();
+    final DFSClient client = fs.getClient();
     final Path testFile = new Path("/test");
     FSDataOutputStream out = fs.create(testFile, REPLICATION);
 
@@ -792,54 +796,102 @@ public class TestDataNodeHotSwapVolumes {
     out.write(writeBuf);
     out.hflush();
 
-    // Make FsDatasetSpi#finalizeBlock a time-consuming operation. So if the
-    // BlockReceiver releases volume reference before finalizeBlock(), the blocks
-    // on the volume will be removed, and finalizeBlock() throws IOE.
-    final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data;
-    dn.data = Mockito.spy(data);
-    doAnswer(new Answer<Object>() {
-          public Object answer(InvocationOnMock invocation)
-              throws IOException, InterruptedException {
-            Thread.sleep(1000);
-            // Bypass the argument to FsDatasetImpl#finalizeBlock to verify that
-            // the block is not removed, since the volume reference should not
-            // be released at this point.
-            data.finalizeBlock((ExtendedBlock) invocation.getArguments()[0],
-              (boolean) invocation.getArguments()[1]);
-            return null;
-          }
-        }).when(dn.data).finalizeBlock(any(ExtendedBlock.class),
-            Mockito.anyBoolean());
-
-    final CyclicBarrier barrier = new CyclicBarrier(2);
+    BlockLocation[] blocks = fs.getFileBlockLocations(testFile, 0, BLOCK_SIZE);
+    String[] dataNodeNames = blocks[0].getNames();
+    String dataNodeName = dataNodeNames[dataNodeIdx];
+    int xferPort = Integer.parseInt(dataNodeName.split(":")[1]);
+    DataNode dn = null;
+    for (DataNode dataNode : cluster.getDataNodes()) {
+      if (dataNode.getXferPort() == xferPort) {
+        dn = dataNode;
+        break;
+      }
+    }
+    assertNotNull(dn);
 
-    List<String> oldDirs = getDataDirs(dn);
-    final String newDirs = oldDirs.get(1);  // Remove the first volume.
-    final List<Exception> exceptions = new ArrayList<>();
-    Thread reconfigThread = new Thread() {
-      public void run() {
+    final CyclicBarrier barrier = new CyclicBarrier(4);
+    final AtomicBoolean done = new AtomicBoolean(false);
+    DataNodeFaultInjector newInjector = new DataNodeFaultInjector() {
+      public void logDelaySendingAckToUpstream(
+          final String upstreamAddr, final long delayMs) throws IOException {
         try {
-          barrier.await();
-          assertThat(
-              "DN did not update its own config",
-              dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDirs),
-              is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
-        } catch (ReconfigurationException |
-            InterruptedException |
-            BrokenBarrierException e) {
-          exceptions.add(e);
+          // Make all streams which hold the volume references to wait the
+          // reconfiguration thread to start.
+          // It should only block IO during the period of reconfiguration
+          // task running.
+          if (!done.get()) {
+            barrier.await();
+            // Add delays to allow the reconfiguration thread starts before
+            // IO finish.
+            Thread.sleep(1000);
+          }
+        } catch (InterruptedException | BrokenBarrierException e) {
+          throw new IOException(e);
         }
       }
     };
-    reconfigThread.start();
+    DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
 
-    barrier.await();
-    rb.nextBytes(writeBuf);
-    out.write(writeBuf);
-    out.hflush();
-    out.close();
+    try {
+      DataNodeFaultInjector.set(newInjector);
+
+      List<String> oldDirs = getDataDirs(dn);
+      LocatedBlocks lbs = client.getLocatedBlocks("/test", 0);
+      LocatedBlock block = lbs.get(0);
+      FsVolumeImpl volume =
+          (FsVolumeImpl) dn.getFSDataset().getVolume(block.getBlock());
+      StringBuffer newDirsBuf = new StringBuffer();
+      String delim = "";
+      for (String d : oldDirs) {
+        if (! d.contains(volume.getBasePath())) {
+          newDirsBuf.append(delim).append(d);
+          delim = ",";
+        }
+      }
+      final String newDirs = newDirsBuf.toString();
+      final List<IOException> exceptions = new ArrayList<>();
+      final DataNode dataNode = dn;
+      final CyclicBarrier reconfigBarrier = new CyclicBarrier(2);
+
+      Thread reconfigThread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            reconfigBarrier.await();
+
+            // Wake up writing threads on the pipeline to finish the block.
+            barrier.await();
+
+            assertThat(
+                "DN did not update its own config",
+                dataNode.reconfigurePropertyImpl(
+                    DFS_DATANODE_DATA_DIR_KEY, newDirs),
+                is(dataNode.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
+            done.set(true);
+          } catch (ReconfigurationException |
+              InterruptedException |
+              BrokenBarrierException e) {
+            exceptions.add(new IOException(e));
+          }
+        }
+      });
+      reconfigThread.start();
 
-    reconfigThread.join();
+      // Write more data to make sure the stream threads wait on the barrier.
+      rb.nextBytes(writeBuf);
+      out.write(writeBuf);
+      reconfigBarrier.await();
+      out.hflush();
+      out.close();
+
+      reconfigThread.join();
+
+      if (!exceptions.isEmpty()) {
+        throw MultipleIOException.createIOException(exceptions);
+      }
+    } finally {
+      DataNodeFaultInjector.set(oldInjector);
+    }
 
     // Verify if the data directory reconfigure was successful
     FsDatasetSpi<? extends FsVolumeSpi> fsDatasetSpi = dn.getFSDataset();
@@ -852,19 +904,12 @@ public class TestDataNodeHotSwapVolumes {
           1, fsVolumeReferences.size());
     }
 
-    // Add a new DataNode to help with the pipeline recover.
-    cluster.startDataNodes(conf, 1, true, null, null, null);
-
     // Verify the file has sufficient replications.
     DFSTestUtil.waitReplication(fs, testFile, REPLICATION);
     // Read the content back
     byte[] content = DFSTestUtil.readFileBuffer(fs, testFile);
     assertEquals(BLOCK_SIZE, content.length);
 
-    if (!exceptions.isEmpty()) {
-      throw new IOException(exceptions.get(0).getCause());
-    }
-
     // Write more files to make sure that the DataNode that has removed volume
     // is still alive to receive data.
     for (int i = 0; i < 10; i++) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org