You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2020/06/12 21:19:54 UTC
[hadoop] branch branch-2.10 updated: HDFS-12453.
TestDataNodeHotSwapVolumes fails in trunk Jenkins runs. Contributed by Jim
Brennan and Lei Xu.
This is an automated email from the ASF dual-hosted git repository.
kihwal pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 1710975 HDFS-12453. TestDataNodeHotSwapVolumes fails in trunk Jenkins runs. Contributed by Jim Brennan and Lei Xu.
1710975 is described below
commit 17109758dd6f9a86b226c025ee20b8e2abc9d366
Author: Kihwal Lee <ki...@apache.org>
AuthorDate: Fri Jun 12 16:19:36 2020 -0500
HDFS-12453. TestDataNodeHotSwapVolumes fails in trunk Jenkins runs. Contributed by Jim Brennan and Lei Xu.
---
.../datanode/TestDataNodeHotSwapVolumes.java | 149 ++++++++++++++-------
1 file changed, 97 insertions(+), 52 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
index 93c1242..e98b90a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockMissingException;
+import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -36,6 +37,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -46,6 +49,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
@@ -83,6 +87,7 @@ import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -777,12 +782,11 @@ public class TestDataNodeHotSwapVolumes {
private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx)
throws IOException, ReconfigurationException, TimeoutException,
InterruptedException, BrokenBarrierException {
- // Starts DFS cluster with 3 DataNodes to form a pipeline.
- startDFSCluster(1, 3);
+ startDFSCluster(1, 4);
final short REPLICATION = 3;
- final DataNode dn = cluster.getDataNodes().get(dataNodeIdx);
- final FileSystem fs = cluster.getFileSystem();
+ final DistributedFileSystem fs = cluster.getFileSystem();
+ final DFSClient client = fs.getClient();
final Path testFile = new Path("/test");
FSDataOutputStream out = fs.create(testFile, REPLICATION);
@@ -792,54 +796,102 @@ public class TestDataNodeHotSwapVolumes {
out.write(writeBuf);
out.hflush();
- // Make FsDatasetSpi#finalizeBlock a time-consuming operation. So if the
- // BlockReceiver releases volume reference before finalizeBlock(), the blocks
- // on the volume will be removed, and finalizeBlock() throws IOE.
- final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data;
- dn.data = Mockito.spy(data);
- doAnswer(new Answer<Object>() {
- public Object answer(InvocationOnMock invocation)
- throws IOException, InterruptedException {
- Thread.sleep(1000);
- // Bypass the argument to FsDatasetImpl#finalizeBlock to verify that
- // the block is not removed, since the volume reference should not
- // be released at this point.
- data.finalizeBlock((ExtendedBlock) invocation.getArguments()[0],
- (boolean) invocation.getArguments()[1]);
- return null;
- }
- }).when(dn.data).finalizeBlock(any(ExtendedBlock.class),
- Mockito.anyBoolean());
-
- final CyclicBarrier barrier = new CyclicBarrier(2);
+ BlockLocation[] blocks = fs.getFileBlockLocations(testFile, 0, BLOCK_SIZE);
+ String[] dataNodeNames = blocks[0].getNames();
+ String dataNodeName = dataNodeNames[dataNodeIdx];
+ int xferPort = Integer.parseInt(dataNodeName.split(":")[1]);
+ DataNode dn = null;
+ for (DataNode dataNode : cluster.getDataNodes()) {
+ if (dataNode.getXferPort() == xferPort) {
+ dn = dataNode;
+ break;
+ }
+ }
+ assertNotNull(dn);
- List<String> oldDirs = getDataDirs(dn);
- final String newDirs = oldDirs.get(1); // Remove the first volume.
- final List<Exception> exceptions = new ArrayList<>();
- Thread reconfigThread = new Thread() {
- public void run() {
+ final CyclicBarrier barrier = new CyclicBarrier(4);
+ final AtomicBoolean done = new AtomicBoolean(false);
+ DataNodeFaultInjector newInjector = new DataNodeFaultInjector() {
+ public void logDelaySendingAckToUpstream(
+ final String upstreamAddr, final long delayMs) throws IOException {
try {
- barrier.await();
- assertThat(
- "DN did not update its own config",
- dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDirs),
- is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
- } catch (ReconfigurationException |
- InterruptedException |
- BrokenBarrierException e) {
- exceptions.add(e);
+ // Make all streams which hold the volume references to wait the
+ // reconfiguration thread to start.
+ // It should only block IO during the period of reconfiguration
+ // task running.
+ if (!done.get()) {
+ barrier.await();
+ // Add delays to allow the reconfiguration thread starts before
+ // IO finish.
+ Thread.sleep(1000);
+ }
+ } catch (InterruptedException | BrokenBarrierException e) {
+ throw new IOException(e);
}
}
};
- reconfigThread.start();
+ DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
- barrier.await();
- rb.nextBytes(writeBuf);
- out.write(writeBuf);
- out.hflush();
- out.close();
+ try {
+ DataNodeFaultInjector.set(newInjector);
+
+ List<String> oldDirs = getDataDirs(dn);
+ LocatedBlocks lbs = client.getLocatedBlocks("/test", 0);
+ LocatedBlock block = lbs.get(0);
+ FsVolumeImpl volume =
+ (FsVolumeImpl) dn.getFSDataset().getVolume(block.getBlock());
+ StringBuffer newDirsBuf = new StringBuffer();
+ String delim = "";
+ for (String d : oldDirs) {
+ if (! d.contains(volume.getBasePath())) {
+ newDirsBuf.append(delim).append(d);
+ delim = ",";
+ }
+ }
+ final String newDirs = newDirsBuf.toString();
+ final List<IOException> exceptions = new ArrayList<>();
+ final DataNode dataNode = dn;
+ final CyclicBarrier reconfigBarrier = new CyclicBarrier(2);
+
+ Thread reconfigThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ reconfigBarrier.await();
+
+ // Wake up writing threads on the pipeline to finish the block.
+ barrier.await();
+
+ assertThat(
+ "DN did not update its own config",
+ dataNode.reconfigurePropertyImpl(
+ DFS_DATANODE_DATA_DIR_KEY, newDirs),
+ is(dataNode.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
+ done.set(true);
+ } catch (ReconfigurationException |
+ InterruptedException |
+ BrokenBarrierException e) {
+ exceptions.add(new IOException(e));
+ }
+ }
+ });
+ reconfigThread.start();
- reconfigThread.join();
+ // Write more data to make sure the stream threads wait on the barrier.
+ rb.nextBytes(writeBuf);
+ out.write(writeBuf);
+ reconfigBarrier.await();
+ out.hflush();
+ out.close();
+
+ reconfigThread.join();
+
+ if (!exceptions.isEmpty()) {
+ throw MultipleIOException.createIOException(exceptions);
+ }
+ } finally {
+ DataNodeFaultInjector.set(oldInjector);
+ }
// Verify if the data directory reconfigure was successful
FsDatasetSpi<? extends FsVolumeSpi> fsDatasetSpi = dn.getFSDataset();
@@ -852,19 +904,12 @@ public class TestDataNodeHotSwapVolumes {
1, fsVolumeReferences.size());
}
- // Add a new DataNode to help with the pipeline recover.
- cluster.startDataNodes(conf, 1, true, null, null, null);
-
// Verify the file has sufficient replications.
DFSTestUtil.waitReplication(fs, testFile, REPLICATION);
// Read the content back
byte[] content = DFSTestUtil.readFileBuffer(fs, testFile);
assertEquals(BLOCK_SIZE, content.length);
- if (!exceptions.isEmpty()) {
- throw new IOException(exceptions.get(0).getCause());
- }
-
// Write more files to make sure that the DataNode that has removed volume
// is still alive to receive data.
for (int i = 0; i < 10; i++) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org