You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by le...@apache.org on 2017/09/29 18:03:37 UTC
hadoop git commit: HDFS-12453. TestDataNodeHotSwapVolumes fails in
trunk Jenkins runs. (Lei (Eddy) Xu)
Repository: hadoop
Updated Branches:
refs/heads/branch-3.0 7a9479597 -> 337506190
HDFS-12453. TestDataNodeHotSwapVolumes fails in trunk Jenkins runs. (Lei (Eddy) Xu)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/33750619
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/33750619
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/33750619
Branch: refs/heads/branch-3.0
Commit: 337506190b559bc712b18a606b68a18677e90f7f
Parents: 7a94795
Author: Lei Xu <le...@apache.org>
Authored: Fri Sep 29 10:46:17 2017 -0700
Committer: Lei Xu <le...@apache.org>
Committed: Fri Sep 29 10:47:37 2017 -0700
----------------------------------------------------------------------
.../datanode/TestDataNodeHotSwapVolumes.java | 129 ++++++++++++-------
1 file changed, 83 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/33750619/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
index 5d4ac1e..df5e297 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockMissingException;
+import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -36,6 +37,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -46,6 +49,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
@@ -68,6 +72,7 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -83,6 +88,7 @@ import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -775,12 +781,11 @@ public class TestDataNodeHotSwapVolumes {
private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx)
throws IOException, ReconfigurationException, TimeoutException,
InterruptedException, BrokenBarrierException {
- // Starts DFS cluster with 3 DataNodes to form a pipeline.
- startDFSCluster(1, 3);
+ startDFSCluster(1, 4);
final short REPLICATION = 3;
- final DataNode dn = cluster.getDataNodes().get(dataNodeIdx);
- final FileSystem fs = cluster.getFileSystem();
+ final DistributedFileSystem fs = cluster.getFileSystem();
+ final DFSClient client = fs.getClient();
final Path testFile = new Path("/test");
FSDataOutputStream out = fs.create(testFile, REPLICATION);
@@ -790,54 +795,93 @@ public class TestDataNodeHotSwapVolumes {
out.write(writeBuf);
out.hflush();
- // Make FsDatasetSpi#finalizeBlock a time-consuming operation. So if the
- // BlockReceiver releases volume reference before finalizeBlock(), the blocks
- // on the volume will be removed, and finalizeBlock() throws IOE.
- final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data;
- dn.data = Mockito.spy(data);
- doAnswer(new Answer<Object>() {
- public Object answer(InvocationOnMock invocation)
- throws IOException, InterruptedException {
+ BlockLocation[] blocks = fs.getFileBlockLocations(testFile, 0, BLOCK_SIZE);
+ String[] dataNodeNames = blocks[0].getNames();
+ String dataNodeName = dataNodeNames[dataNodeIdx];
+ int xferPort = Integer.parseInt(dataNodeName.split(":")[1]);
+ DataNode dn = null;
+ for (DataNode dataNode : cluster.getDataNodes()) {
+ if (dataNode.getXferPort() == xferPort) {
+ dn = dataNode;
+ break;
+ }
+ }
+ assertNotNull(dn);
+
+ final CyclicBarrier barrier = new CyclicBarrier(4);
+ final AtomicBoolean done = new AtomicBoolean(false);
+ DataNodeFaultInjector newInjector = new DataNodeFaultInjector() {
+ public void logDelaySendingAckToUpstream(
+ final String upstreamAddr, final long delayMs) throws IOException {
+ try {
+ // Make all streams which hold the volume references to wait the
+ // reconfiguration thread to start.
+ // It should only block IO during the period of reconfiguration
+ // task running.
+ if (!done.get()) {
+ barrier.await();
+ // Add delays to allow the reconfiguration thread starts before
+ // IO finish.
Thread.sleep(1000);
- // Bypass the argument to FsDatasetImpl#finalizeBlock to verify that
- // the block is not removed, since the volume reference should not
- // be released at this point.
- data.finalizeBlock((ExtendedBlock) invocation.getArguments()[0],
- (boolean) invocation.getArguments()[1]);
- return null;
}
- }).when(dn.data).finalizeBlock(any(ExtendedBlock.class),
- Mockito.anyBoolean());
-
- final CyclicBarrier barrier = new CyclicBarrier(2);
+ } catch (InterruptedException | BrokenBarrierException e) {
+ throw new IOException(e);
+ }
+ }
+ };
+ DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
- List<String> oldDirs = getDataDirs(dn);
- final String newDirs = oldDirs.get(1); // Remove the first volume.
- final List<Exception> exceptions = new ArrayList<>();
- Thread reconfigThread = new Thread() {
- public void run() {
+ try {
+ DataNodeFaultInjector.set(newInjector);
+
+ List<String> oldDirs = getDataDirs(dn);
+ LocatedBlocks lbs = client.getLocatedBlocks("/test", 0);
+ LocatedBlock block = lbs.get(0);
+ FsVolumeImpl volume =
+ (FsVolumeImpl) dn.getFSDataset().getVolume(block.getBlock());
+ final String newDirs = oldDirs.stream()
+ .filter((d) -> !d.contains(volume.getStorageLocation().toString()))
+ .collect(Collectors.joining(","));
+ final List<IOException> exceptions = new ArrayList<>();
+ final DataNode dataNode = dn;
+ final CyclicBarrier reconfigBarrier = new CyclicBarrier(2);
+
+ Thread reconfigThread = new Thread(() -> {
try {
+ reconfigBarrier.await();
+
+ // Wake up writing threads on the pipeline to finish the block.
barrier.await();
+
assertThat(
"DN did not update its own config",
- dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDirs),
- is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
+ dataNode.reconfigurePropertyImpl(
+ DFS_DATANODE_DATA_DIR_KEY, newDirs),
+ is(dataNode.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
+ done.set(true);
} catch (ReconfigurationException |
InterruptedException |
BrokenBarrierException e) {
- exceptions.add(e);
+ exceptions.add(new IOException(e));
}
- }
- };
- reconfigThread.start();
+ });
+ reconfigThread.start();
- barrier.await();
- rb.nextBytes(writeBuf);
- out.write(writeBuf);
- out.hflush();
- out.close();
+ // Write more data to make sure the stream threads wait on the barrier.
+ rb.nextBytes(writeBuf);
+ out.write(writeBuf);
+ reconfigBarrier.await();
+ out.hflush();
+ out.close();
+
+ reconfigThread.join();
- reconfigThread.join();
+ if (!exceptions.isEmpty()) {
+ throw MultipleIOException.createIOException(exceptions);
+ }
+ } finally {
+ DataNodeFaultInjector.set(oldInjector);
+ }
// Verify if the data directory reconfigure was successful
FsDatasetSpi<? extends FsVolumeSpi> fsDatasetSpi = dn.getFSDataset();
@@ -851,19 +895,12 @@ public class TestDataNodeHotSwapVolumes {
1, fsVolumeReferences.size());
}
- // Add a new DataNode to help with the pipeline recover.
- cluster.startDataNodes(conf, 1, true, null, null, null);
-
// Verify the file has sufficient replications.
DFSTestUtil.waitReplication(fs, testFile, REPLICATION);
// Read the content back
byte[] content = DFSTestUtil.readFileBuffer(fs, testFile);
assertEquals(BLOCK_SIZE, content.length);
- if (!exceptions.isEmpty()) {
- throw new IOException(exceptions.get(0).getCause());
- }
-
// Write more files to make sure that the DataNode that has removed volume
// is still alive to receive data.
for (int i = 0; i < 10; i++) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org