You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by le...@apache.org on 2016/02/01 21:54:48 UTC

hadoop git commit: HDFS-9701. DN may deadlock when hot-swapping under load. (Xiao Chen via lei)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 924e1583e -> b8360139a


HDFS-9701. DN may deadlock when hot-swapping under load. (Xiao Chen via lei)

(cherry picked from commit 12a4ebb4471f78b642e12c2ec0784c20f62cca3c)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b8360139
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b8360139
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b8360139

Branch: refs/heads/branch-2.8
Commit: b8360139aff00a7714e28f59253ea4e2c323e777
Parents: 924e158
Author: Lei Xu <le...@apache.org>
Authored: Mon Feb 1 11:35:02 2016 -0800
Committer: Lei Xu <le...@apache.org>
Committed: Mon Feb 1 12:49:09 2016 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  2 +
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  2 +
 .../datanode/fsdataset/impl/FsVolumeImpl.java   | 23 ++++----
 .../datanode/fsdataset/impl/FsVolumeList.java   | 47 ++++++++++++++++-
 .../fsdataset/impl/TestFsDatasetImpl.java       | 55 ++++++++++++++++++++
 .../fsdataset/impl/TestFsVolumeList.java        | 42 ++++++++++++---
 6 files changed, 152 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8360139/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 16c4f55..4252cc4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1693,6 +1693,8 @@ Release 2.8.0 - UNRELEASED
     HDFS-9210. Fix some misuse of %n in VolumeScanner#printStats.
     (Xiaoyu Yao)
 
+    HDFS-9701. DN may deadlock when hot-swapping under load. (Xiao Chen via lei)
+
 Release 2.7.3 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8360139/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 950462a..2c692b0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -496,6 +496,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           // Disable the volume from the service.
           asyncDiskService.removeVolume(sd.getCurrentDir());
           volumes.removeVolume(absRoot, clearFailure);
+          volumes.waitVolumeRemoved(5000, this);
 
           // Removed all replica information for the blocks on the volume.
           // Unlike updating the volumeMap in addVolume(), this operation does
@@ -1772,6 +1773,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * 
    * @throws IOException                       May be thrown from the methods called. 
    */
+  @Override // FsDatasetSpi
   public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
       throws ReplicaNotFoundException, UnexpectedReplicaStateException,
       FileNotFoundException, EOFException, IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8360139/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 126f667..d262ef2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -234,29 +234,30 @@ public class FsVolumeImpl implements FsVolumeSpi {
   }
 
   /**
-   * Close this volume and wait all other threads to release the reference count
-   * on this volume.
-   * @throws IOException if the volume is closed or the waiting is interrupted.
+   * Close this volume.
+   * @throws IOException if the volume is closed.
    */
-  void closeAndWait() throws IOException {
+  void setClosed() throws IOException {
     try {
       this.reference.setClosed();
     } catch (ClosedChannelException e) {
       throw new IOException("The volume has already closed.", e);
     }
-    final int SLEEP_MILLIS = 500;
-    while (this.reference.getReferenceCount() > 0) {
+  }
+
+  /**
+   * Check whether this volume has successfully been closed.
+   */
+  boolean checkClosed() {
+    if (this.reference.getReferenceCount() > 0) {
       if (FsDatasetImpl.LOG.isDebugEnabled()) {
         FsDatasetImpl.LOG.debug(String.format(
             "The reference count for %s is %d, wait to be 0.",
             this, reference.getReferenceCount()));
       }
-      try {
-        Thread.sleep(SLEEP_MILLIS);
-      } catch (InterruptedException e) {
-        throw new IOException(e);
-      }
+      return false;
     }
+    return true;
   }
 
   File getCurrentDir() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8360139/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index 608ee29..ea4d597 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -23,6 +23,7 @@ import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -49,6 +50,8 @@ class FsVolumeList {
   // Tracks volume failures, sorted by volume path.
   private final Map<String, VolumeFailureInfo> volumeFailureInfos =
       Collections.synchronizedMap(new TreeMap<String, VolumeFailureInfo>());
+  private final ConcurrentLinkedQueue<FsVolumeImpl> volumesBeingRemoved =
+      new ConcurrentLinkedQueue<>();
   private Object checkDirsMutex = new Object();
 
   private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
@@ -257,10 +260,33 @@ class FsVolumeList {
             + " failure volumes.");
       }
 
+      waitVolumeRemoved(5000, checkDirsMutex);
       return failedVols;
     }
   }
 
+  /**
+   * Wait for the reference of the volume removed from a previous
+   * {@link #removeVolume(FsVolumeImpl)} call to be released.
+   *
+   * @param sleepMillis interval to recheck.
+   */
+  void waitVolumeRemoved(int sleepMillis, Object monitor) {
+    while (!checkVolumesRemoved()) {
+      if (FsDatasetImpl.LOG.isDebugEnabled()) {
+        FsDatasetImpl.LOG.debug("Waiting for volume reference to be released.");
+      }
+      try {
+        monitor.wait(sleepMillis);
+      } catch (InterruptedException e) {
+        FsDatasetImpl.LOG.info("Thread interrupted when waiting for "
+            + "volume reference to be released.");
+        Thread.currentThread().interrupt();
+      }
+    }
+    FsDatasetImpl.LOG.info("Volume reference is released.");
+  }
+
   @Override
   public String toString() {
     return volumes.toString();
@@ -298,12 +324,13 @@ class FsVolumeList {
         blockScanner.removeVolumeScanner(target);
       }
       try {
-        target.closeAndWait();
+        target.setClosed();
       } catch (IOException e) {
         FsDatasetImpl.LOG.warn(
             "Error occurs when waiting volume to close: " + target, e);
       }
       target.shutdown();
+      volumesBeingRemoved.add(target);
       FsDatasetImpl.LOG.info("Removed volume: " + target);
     } else {
       if (FsDatasetImpl.LOG.isDebugEnabled()) {
@@ -336,6 +363,24 @@ class FsVolumeList {
     return infos.toArray(new VolumeFailureInfo[infos.size()]);
   }
 
+  /**
+   * Check whether the reference of the volume from a previous
+   * {@link #removeVolume(FsVolumeImpl)} call is released.
+   *
+   * @return Whether the reference is released.
+   */
+  boolean checkVolumesRemoved() {
+    Iterator<FsVolumeImpl> it = volumesBeingRemoved.iterator();
+    while (it.hasNext()) {
+      FsVolumeImpl volume = it.next();
+      if (!volume.checkClosed()) {
+        return false;
+      }
+      it.remove();
+    }
+    return true;
+  }
+
   void addVolumeFailureInfo(VolumeFailureInfo volumeFailureInfo) {
     volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(),
         volumeFailureInfo);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8360139/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 5684f55..ab7051e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import com.google.common.collect.Lists;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
@@ -60,6 +61,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -83,8 +85,11 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestFsDatasetImpl {
+  Logger LOG = LoggerFactory.getLogger(TestFsDatasetImpl.class);
   private static final String BASE_DIR =
       new FileSystemTestHelper().getTestRootDir();
   private static final int NUM_INIT_VOLUMES = 2;
@@ -113,6 +118,7 @@ public class TestFsDatasetImpl {
     List<Storage.StorageDirectory> dirs =
         new ArrayList<Storage.StorageDirectory>();
     List<String> dirStrings = new ArrayList<String>();
+    FileUtils.deleteDirectory(new File(BASE_DIR));
     for (int i = 0; i < numDirs; i++) {
       File loc = new File(BASE_DIR + "/data" + i);
       dirStrings.add(new Path(loc.toString()).toUri().toString());
@@ -290,6 +296,7 @@ public class TestFsDatasetImpl {
       FsVolumeImpl volume = mock(FsVolumeImpl.class);
       oldVolumes.add(volume);
       when(volume.getBasePath()).thenReturn("data" + i);
+      when(volume.checkClosed()).thenReturn(true);
       FsVolumeReference ref = mock(FsVolumeReference.class);
       when(ref.getVolume()).thenReturn(volume);
       volumeList.addVolume(ref);
@@ -444,4 +451,52 @@ public class TestFsDatasetImpl {
     assertSame(replica,
         BlockPoolSlice.selectReplicaToDelete(replicaOtherNewer, replica));
   }
+
+  @Test(timeout = 30000)
+  public void testRemoveVolumeBeingWritten() throws Exception {
+    // Will write and remove on dn0.
+    final ExtendedBlock eb = new ExtendedBlock(BLOCK_POOL_IDS[0], 0);
+    final CountDownLatch startFinalizeLatch = new CountDownLatch(1);
+    final CountDownLatch brReceivedLatch = new CountDownLatch(1);
+    class BlockReportThread extends Thread {
+      public void run() {
+        LOG.info("Getting block report");
+        dataset.getBlockReports(eb.getBlockPoolId());
+        LOG.info("Successfully received block report");
+        brReceivedLatch.countDown();
+      }
+    }
+
+    final BlockReportThread brt = new BlockReportThread();
+    class ResponderThread extends Thread {
+      public void run() {
+        try (ReplicaHandler replica = dataset
+            .createRbw(StorageType.DEFAULT, eb, false)) {
+          LOG.info("createRbw finished");
+          startFinalizeLatch.countDown();
+
+          // Slow down while we're holding the reference to the volume
+          Thread.sleep(1000);
+          dataset.finalizeBlock(eb);
+          LOG.info("finalizeBlock finished");
+        } catch (Exception e) {
+          LOG.warn("Exception caught. This should not affect the test", e);
+        }
+      }
+    }
+
+    ResponderThread res = new ResponderThread();
+    res.start();
+    startFinalizeLatch.await();
+
+    Set<File> volumesToRemove = new HashSet<>();
+    volumesToRemove.add(
+        StorageLocation.parse(dataset.getVolume(eb).getBasePath()).getFile());
+    LOG.info("Removing volume " + volumesToRemove);
+    // Verify block report can be received during this
+    brt.start();
+    dataset.removeVolumes(volumesToRemove, true);
+    LOG.info("Volumes removed");
+    brReceivedLatch.await();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8360139/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
index 9b9b692..e24c725 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import com.google.common.base.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.StorageType;
@@ -25,6 +26,7 @@ import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -33,9 +35,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 public class TestFsVolumeList {
@@ -57,11 +61,11 @@ public class TestFsVolumeList {
     blockScanner = new BlockScanner(null, blockScannerConf);
   }
 
-  @Test
+  @Test(timeout=30000)
   public void testGetNextVolumeWithClosedVolume() throws IOException {
     FsVolumeList volumeList = new FsVolumeList(
         Collections.<VolumeFailureInfo>emptyList(), blockScanner, blockChooser);
-    List<FsVolumeImpl> volumes = new ArrayList<>();
+    final List<FsVolumeImpl> volumes = new ArrayList<>();
     for (int i = 0; i < 3; i++) {
       File curDir = new File(baseDir, "nextvolume-" + i);
       curDir.mkdirs();
@@ -73,7 +77,19 @@ public class TestFsVolumeList {
     }
 
     // Close the second volume.
-    volumes.get(1).closeAndWait();
+    volumes.get(1).setClosed();
+    try {
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return volumes.get(1).checkClosed();
+        }
+      }, 100, 3000);
+    } catch (TimeoutException e) {
+      fail("timed out while waiting for volume to be removed.");
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+    }
     for (int i = 0; i < 10; i++) {
       try (FsVolumeReference ref =
           volumeList.getNextVolume(StorageType.DEFAULT, 128)) {
@@ -83,11 +99,11 @@ public class TestFsVolumeList {
     }
   }
 
-  @Test
+  @Test(timeout=30000)
   public void testCheckDirsWithClosedVolume() throws IOException {
     FsVolumeList volumeList = new FsVolumeList(
         Collections.<VolumeFailureInfo>emptyList(), blockScanner, blockChooser);
-    List<FsVolumeImpl> volumes = new ArrayList<>();
+    final List<FsVolumeImpl> volumes = new ArrayList<>();
     for (int i = 0; i < 3; i++) {
       File curDir = new File(baseDir, "volume-" + i);
       curDir.mkdirs();
@@ -98,12 +114,24 @@ public class TestFsVolumeList {
     }
 
     // Close the 2nd volume.
-    volumes.get(1).closeAndWait();
+    volumes.get(1).setClosed();
+    try {
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return volumes.get(1).checkClosed();
+        }
+      }, 100, 3000);
+    } catch (TimeoutException e) {
+      fail("timed out while waiting for volume to be removed.");
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+    }
     // checkDirs() should ignore the 2nd volume since it is closed.
     volumeList.checkDirs();
   }
 
-  @Test
+  @Test(timeout=30000)
   public void testReleaseVolumeRefIfNoBlockScanner() throws IOException {
     FsVolumeList volumeList = new FsVolumeList(
         Collections.<VolumeFailureInfo>emptyList(), null, blockChooser);