You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2016/03/21 21:14:48 UTC
[35/50] [abbrv] hadoop git commit: HDFS-9874. Long living DataXceiver
threads cause volume shutdown to block. Contributed by Rushabh Shah.
HDFS-9874. Long living DataXceiver threads cause volume shutdown to block. Contributed by Rushabh Shah.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/63c966a3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/63c966a3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/63c966a3
Branch: refs/heads/YARN-3368
Commit: 63c966a3fbeb675959fc4101e65de9f57aecd17d
Parents: dc951e6
Author: Kihwal Lee <ki...@apache.org>
Authored: Fri Mar 18 10:24:59 2016 -0500
Committer: Kihwal Lee <ki...@apache.org>
Committed: Fri Mar 18 10:24:59 2016 -0500
----------------------------------------------------------------------
.../hdfs/server/datanode/ReplicaInPipeline.java | 7 +++
.../datanode/fsdataset/impl/FsDatasetImpl.java | 13 ++++
.../datanode/fsdataset/impl/FsVolumeImpl.java | 6 ++
.../fsdataset/impl/TestFsDatasetImpl.java | 66 ++++++++++++++++++++
4 files changed, 92 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c966a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index d9406f0..5caca15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -183,6 +183,13 @@ public class ReplicaInPipeline extends ReplicaInfo
this.writer = writer;
}
+ public void interruptThread() {
+ if (writer != null && writer != Thread.currentThread()
+ && writer.isAlive()) {
+ this.writer.interrupt();
+ }
+ }
+
@Override // Object
public boolean equals(Object o) {
return super.equals(o);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c966a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 2e8226a..d6a0df6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -3112,5 +3112,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
public void setTimer(Timer newTimer) {
this.timer = newTimer;
}
+
+ synchronized void stopAllDataxceiverThreads(FsVolumeImpl volume) {
+ for (String blockPoolId : volumeMap.getBlockPoolList()) {
+ Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
+ for (ReplicaInfo replicaInfo : replicas) {
+ if (replicaInfo instanceof ReplicaInPipeline
+ && replicaInfo.getVolume().equals(volume)) {
+ ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline) replicaInfo;
+ replicaInPipeline.interruptThread();
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c966a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 857d0ad..0d060f9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -240,6 +240,11 @@ public class FsVolumeImpl implements FsVolumeSpi {
Preconditions.checkState(reference.getReferenceCount() > 0);
}
+ @VisibleForTesting
+ int getReferenceCount() {
+ return this.reference.getReferenceCount();
+ }
+
/**
* Close this volume.
* @throws IOException if the volume is closed.
@@ -247,6 +252,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
void setClosed() throws IOException {
try {
this.reference.setClosed();
+ dataset.stopAllDataxceiverThreads(this);
} catch (ClosedChannelException e) {
throw new IOException("The volume has already closed.", e);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c966a3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index b2cfe89..70e9332 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -21,14 +21,19 @@ import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
@@ -51,6 +56,7 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.FakeTimer;
import org.apache.hadoop.util.StringUtils;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
@@ -622,4 +628,64 @@ public class TestFsDatasetImpl {
LOG.info("Volumes removed");
brReceivedLatch.await();
}
+
+ /**
+ * Tests stopping all the active DataXceiver thread on volume failure event.
+ * @throws Exception
+ */
+ @Test
+ public void testCleanShutdownOfVolume() throws Exception {
+ MiniDFSCluster cluster = null;
+ try {
+ Configuration config = new HdfsConfiguration();
+ config.setLong(
+ DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, 1000);
+ config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
+
+ cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+ DataNode dataNode = cluster.getDataNodes().get(0);
+ Path filePath = new Path("test.dat");
+ // Create a file and keep the output stream unclosed.
+ FSDataOutputStream out = fs.create(filePath, (short) 1);
+ out.write(1);
+ out.hflush();
+
+ ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
+ FsVolumeImpl volume = (FsVolumeImpl) dataNode.getFSDataset().getVolume(
+ block);
+ File finalizedDir = volume.getFinalizedDir(cluster.getNamesystem()
+ .getBlockPoolId());
+
+ if (finalizedDir.exists()) {
+ // Remove write and execute access so that checkDiskErrorThread detects
+ // this volume is bad.
+ finalizedDir.setExecutable(false);
+ finalizedDir.setWritable(false);
+ }
+ Assert.assertTrue("Reference count for the volume should be greater "
+ + "than 0", volume.getReferenceCount() > 0);
+ // Invoke the synchronous checkDiskError method
+ dataNode.getFSDataset().checkDataDir();
+ // Sleep for 1 second so that datanode can interrupt and cluster clean up
+ Thread.sleep(1000);
+ assertEquals("There are active threads still referencing volume: "
+ + volume.getBasePath(), 0, volume.getReferenceCount());
+ LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0);
+ DatanodeInfo info = lb.getLocations()[0];
+
+ try {
+ out.close();
+ Assert.fail("This is not a valid code path. "
+ + "out.close should have thrown an exception.");
+ } catch (IOException ioe) {
+ Assert.assertTrue(ioe.getMessage().contains(info.toString()));
+ }
+ finalizedDir.setWritable(true);
+ finalizedDir.setExecutable(true);
+ } finally {
+ cluster.shutdown();
+ }
+ }
}