You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2016/03/18 16:26:31 UTC

hadoop git commit: HDFS-9874. Long living DataXceiver threads cause volume shutdown to block. Contributed by Rushabh Shah.

Repository: hadoop
Updated Branches:
  refs/heads/trunk dc951e606 -> 63c966a3f


HDFS-9874. Long living DataXceiver threads cause volume shutdown to block. Contributed by Rushabh Shah.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/63c966a3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/63c966a3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/63c966a3

Branch: refs/heads/trunk
Commit: 63c966a3fbeb675959fc4101e65de9f57aecd17d
Parents: dc951e6
Author: Kihwal Lee <ki...@apache.org>
Authored: Fri Mar 18 10:24:59 2016 -0500
Committer: Kihwal Lee <ki...@apache.org>
Committed: Fri Mar 18 10:24:59 2016 -0500

----------------------------------------------------------------------
 .../hdfs/server/datanode/ReplicaInPipeline.java |  7 +++
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 13 ++++
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |  6 ++
 .../fsdataset/impl/TestFsDatasetImpl.java       | 66 ++++++++++++++++++++
 4 files changed, 92 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c966a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index d9406f0..5caca15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -183,6 +183,13 @@ public class ReplicaInPipeline extends ReplicaInfo
     this.writer = writer;
   }
   
+  public void interruptThread() {
+    if (writer != null && writer != Thread.currentThread() 
+        && writer.isAlive()) {
+      this.writer.interrupt();
+    }
+  }
+
   @Override  // Object
   public boolean equals(Object o) {
     return super.equals(o);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c966a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 2e8226a..d6a0df6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -3112,5 +3112,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public void setTimer(Timer newTimer) {
     this.timer = newTimer;
   }
+
+  synchronized void stopAllDataxceiverThreads(FsVolumeImpl volume) {
+    for (String blockPoolId : volumeMap.getBlockPoolList()) {
+      Collection<ReplicaInfo> replicas = volumeMap.replicas(blockPoolId);
+      for (ReplicaInfo replicaInfo : replicas) {
+        if (replicaInfo instanceof ReplicaInPipeline
+            && replicaInfo.getVolume().equals(volume)) {
+          ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline) replicaInfo;
+          replicaInPipeline.interruptThread();
+        }
+      }
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c966a3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 857d0ad..0d060f9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -240,6 +240,11 @@ public class FsVolumeImpl implements FsVolumeSpi {
     Preconditions.checkState(reference.getReferenceCount() > 0);
   }
 
+  @VisibleForTesting
+  int getReferenceCount() {
+    return this.reference.getReferenceCount();
+  }
+
   /**
    * Close this volume.
    * @throws IOException if the volume is closed.
@@ -247,6 +252,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
   void setClosed() throws IOException {
     try {
       this.reference.setClosed();
+      dataset.stopAllDataxceiverThreads(this);
     } catch (ClosedChannelException e) {
       throw new IOException("The volume has already closed.", e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63c966a3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index b2cfe89..70e9332 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -21,14 +21,19 @@ import com.google.common.collect.Lists;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
@@ -51,6 +56,7 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.FakeTimer;
 import org.apache.hadoop.util.StringUtils;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Matchers;
@@ -622,4 +628,64 @@ public class TestFsDatasetImpl {
     LOG.info("Volumes removed");
     brReceivedLatch.await();
   }
+
+  /**
+   * Tests stopping all the active DataXceiver thread on volume failure event.
+   * @throws Exception
+   */
+  @Test
+  public void testCleanShutdownOfVolume() throws Exception {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration config = new HdfsConfiguration();
+      config.setLong(
+          DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, 1000);
+      config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
+
+      cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      DataNode dataNode = cluster.getDataNodes().get(0);
+      Path filePath = new Path("test.dat");
+      // Create a file and keep the output stream unclosed.
+      FSDataOutputStream out = fs.create(filePath, (short) 1);
+      out.write(1);
+      out.hflush();
+
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
+      FsVolumeImpl volume = (FsVolumeImpl) dataNode.getFSDataset().getVolume(
+          block);
+      File finalizedDir = volume.getFinalizedDir(cluster.getNamesystem()
+          .getBlockPoolId());
+
+      if (finalizedDir.exists()) {
+        // Remove write and execute access so that checkDiskErrorThread detects
+        // this volume is bad.
+        finalizedDir.setExecutable(false);
+        finalizedDir.setWritable(false);
+      }
+      Assert.assertTrue("Reference count for the volume should be greater "
+          + "than 0", volume.getReferenceCount() > 0);
+      // Invoke the synchronous checkDiskError method
+      dataNode.getFSDataset().checkDataDir();
+      // Sleep for 1 second so that datanode can interrupt and cluster clean up
+      Thread.sleep(1000);
+      assertEquals("There are active threads still referencing volume: "
+          + volume.getBasePath(), 0, volume.getReferenceCount());
+      LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0);
+      DatanodeInfo info = lb.getLocations()[0];
+
+      try {
+        out.close();
+        Assert.fail("This is not a valid code path. "
+            + "out.close should have thrown an exception.");
+      } catch (IOException ioe) {
+        Assert.assertTrue(ioe.getMessage().contains(info.toString()));
+      }
+      finalizedDir.setWritable(true);
+      finalizedDir.setExecutable(true);
+    } finally {
+    cluster.shutdown();
+    }
+  }
 }