You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by le...@apache.org on 2016/12/29 07:35:36 UTC

hadoop git commit: HDFS-11251. ConcurrentModificationException during DataNode#refreshVolumes. (Manoj Govindassamy via lei)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 af266c8c8 -> f349c7692


HDFS-11251. ConcurrentModificationException during DataNode#refreshVolumes. (Manoj Govindassamy via lei)

(cherry picked from commit e9f1396834174646a8d7aa8fc6c4a4f724ca5b28)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f349c769
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f349c769
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f349c769

Branch: refs/heads/branch-2
Commit: f349c7692d38642afd31ce7ffc6e1960f0781743
Parents: af266c8
Author: Lei Xu <le...@apache.org>
Authored: Thu Dec 29 15:10:36 2016 +0800
Committer: Lei Xu <le...@apache.org>
Committed: Thu Dec 29 15:35:17 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hdfs/server/common/Storage.java      |   6 +-
 .../server/datanode/BlockPoolSliceStorage.java  |   2 +-
 .../hdfs/server/datanode/DataStorage.java       |   2 +-
 .../datanode/TestDataNodeHotSwapVolumes.java    | 154 +++++++++++++++++--
 4 files changed, 150 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f349c769/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
index 6f1ee19..009d0f0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
@@ -32,6 +32,7 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
@@ -121,8 +122,9 @@ public abstract class Storage extends StorageInfo {
     public StorageDirType getStorageDirType();
     public boolean isOfType(StorageDirType type);
   }
-  
-  protected List<StorageDirectory> storageDirs = new ArrayList<StorageDirectory>();
+
+  protected List<StorageDirectory> storageDirs =
+      new CopyOnWriteArrayList<StorageDirectory>();
   
   private class DirIterator implements Iterator<StorageDirectory> {
     final StorageDirType dirType;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f349c769/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
index fd90ae9..81fce6f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
@@ -296,7 +296,7 @@ public class BlockPoolSliceStorage extends Storage {
          it.hasNext(); ) {
       StorageDirectory sd = it.next();
       if (sd.getRoot().getAbsoluteFile().equals(absPathToRemove)) {
-        it.remove();
+        this.storageDirs.remove(sd);
         break;
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f349c769/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 0e6b339..0a8f1fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -519,7 +519,7 @@ public class DataStorage extends Storage {
           bpsStorage.remove(bpRoot.getAbsoluteFile());
         }
 
-        it.remove();
+        this.storageDirs.remove(sd);
         try {
           sd.unlock();
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f349c769/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
index e36b744c..5d4543d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@ -47,7 +47,9 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.File;
@@ -62,8 +64,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -91,6 +95,7 @@ public class TestDataNodeHotSwapVolumes {
   private static final Log LOG = LogFactory.getLog(
     TestDataNodeHotSwapVolumes.class);
   private static final int BLOCK_SIZE = 512;
+  private static final int DEFAULT_STORAGES_PER_DATANODE = 2;
   private MiniDFSCluster cluster;
 
   @After
@@ -100,6 +105,11 @@ public class TestDataNodeHotSwapVolumes {
 
   private void startDFSCluster(int numNameNodes, int numDataNodes)
       throws IOException {
+    startDFSCluster(numNameNodes, numDataNodes, DEFAULT_STORAGES_PER_DATANODE);
+  }
+
+  private void startDFSCluster(int numNameNodes, int numDataNodes,
+      int storagePerDataNode) throws IOException {
     shutdown();
     Configuration conf = new Configuration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
@@ -121,6 +131,7 @@ public class TestDataNodeHotSwapVolumes {
     cluster = new MiniDFSCluster.Builder(conf)
         .nnTopology(nnTopology)
         .numDataNodes(numDataNodes)
+        .storagesPerDatanode(storagePerDataNode)
         .build();
     cluster.waitActive();
   }
@@ -279,7 +290,12 @@ public class TestDataNodeHotSwapVolumes {
 
   /** Add volumes to the first DataNode. */
   private void addVolumes(int numNewVolumes)
-      throws ReconfigurationException, IOException {
+      throws InterruptedException, IOException, ReconfigurationException {
+    addVolumes(numNewVolumes, new CountDownLatch(0));
+  }
+
+  private void addVolumes(int numNewVolumes, CountDownLatch waitLatch)
+      throws ReconfigurationException, IOException, InterruptedException {
     File dataDir = new File(cluster.getDataDirectory());
     DataNode dn = cluster.getDataNodes().get(0);  // First DataNode.
     Configuration conf = dn.getConf();
@@ -311,6 +327,9 @@ public class TestDataNodeHotSwapVolumes {
         dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDataDir),
         is(conf.get(DFS_DATANODE_DATA_DIR_KEY)));
 
+    // Await on the latch for needed operations to complete
+    waitLatch.await();
+
     // Verify the configuration value is appropriately set.
     String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(",");
     String[] expectDataDirs = newDataDir.split(",");
@@ -398,23 +417,34 @@ public class TestDataNodeHotSwapVolumes {
       throws IOException, InterruptedException, TimeoutException,
       ReconfigurationException {
     startDFSCluster(1, 1);
+    int numVolumes = cluster.getStoragesPerDatanode();
     String bpid = cluster.getNamesystem().getBlockPoolId();
     Path testFile = new Path("/test");
-    createFile(testFile, 4);  // Each volume has 2 blocks.
 
-    addVolumes(2);
+    // Each volume has 2 blocks
+    int initialBlockCount = numVolumes * 2;
+    createFile(testFile, initialBlockCount);
+
+    int newVolumeCount = 5;
+    addVolumes(newVolumeCount);
+    numVolumes += newVolumeCount;
+
+    int additionalBlockCount = 9;
+    int totalBlockCount = initialBlockCount + additionalBlockCount;
 
     // Continue to write the same file, thus the new volumes will have blocks.
-    DFSTestUtil.appendFile(cluster.getFileSystem(), testFile, BLOCK_SIZE * 8);
-    verifyFileLength(cluster.getFileSystem(), testFile, 8 + 4);
-    // After appending data, there should be [2, 2, 4, 4] blocks in each volume
-    // respectively.
-    List<Integer> expectedNumBlocks = Arrays.asList(2, 2, 4, 4);
+    DFSTestUtil.appendFile(cluster.getFileSystem(), testFile,
+        BLOCK_SIZE * additionalBlockCount);
+    verifyFileLength(cluster.getFileSystem(), testFile, totalBlockCount);
+
+    // After appending data, each new volume added should
+    // have 1 block each.
+    List<Integer> expectedNumBlocks = Arrays.asList(1, 1, 1, 1, 1, 4, 4);
 
     List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
         cluster.getAllBlockReports(bpid);
     assertEquals(1, blockReports.size());  // 1 DataNode
-    assertEquals(4, blockReports.get(0).size());  // 4 volumes
+    assertEquals(numVolumes, blockReports.get(0).size());  // 7 volumes
     Map<DatanodeStorage, BlockListAsLongs> dnReport =
         blockReports.get(0);
     List<Integer> actualNumBlocks = new ArrayList<Integer>();
@@ -425,6 +455,110 @@ public class TestDataNodeHotSwapVolumes {
     assertEquals(expectedNumBlocks, actualNumBlocks);
   }
 
+  @Test(timeout=180000)
+  public void testAddVolumesConcurrently()
+      throws IOException, InterruptedException, TimeoutException,
+      ReconfigurationException {
+    startDFSCluster(1, 1, 10);
+    int numVolumes = cluster.getStoragesPerDatanode();
+    String blockPoolId = cluster.getNamesystem().getBlockPoolId();
+    Path testFile = new Path("/test");
+
+    // Each volume has 2 blocks
+    int initialBlockCount = numVolumes * 2;
+    createFile(testFile, initialBlockCount);
+
+    final DataNode dn = cluster.getDataNodes().get(0);
+    final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data;
+    dn.data = Mockito.spy(data);
+
+    final int newVolumeCount = 40;
+    final List<Thread> addVolumeDelayedThreads = new ArrayList<>();
+    final AtomicBoolean addVolumeError = new AtomicBoolean(false);
+    final AtomicBoolean listStorageError = new AtomicBoolean(false);
+    final CountDownLatch addVolumeCompletionLatch =
+        new CountDownLatch(newVolumeCount);
+
+    // Thread to list all storage available at DataNode,
+    // when the volumes are being added in parallel.
+    final Thread listStorageThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (addVolumeCompletionLatch.getCount() != newVolumeCount) {
+          int i = 0;
+          while(i++ < 1000) {
+            try {
+              dn.getStorage().listStorageDirectories();
+            } catch (Exception e) {
+              listStorageError.set(true);
+              LOG.error("Error listing storage: " + e);
+            }
+          }
+        }
+      }
+    });
+    listStorageThread.start();
+
+    // FsDatasetImpl addVolume mocked to perform the operation asynchronously
+    doAnswer(new Answer<Object>() {
+      @Override
+      public Object answer(final InvocationOnMock invocationOnMock) throws Throwable {
+        final Random r = new Random();
+        Thread addVolThread =
+            new Thread(new Runnable() {
+              @Override
+              public void run() {
+                try {
+                  r.setSeed(Time.now());
+                  // Let 50% of add volume operations
+                  // start after an initial delay.
+                  if (r.nextInt(10) > 4) {
+                    int s = r.nextInt(10) + 1;
+                    Thread.sleep(s * 100);
+                  }
+                  invocationOnMock.callRealMethod();
+                } catch (Throwable throwable) {
+                  addVolumeError.set(true);
+                  LOG.error("Error adding volume: " + throwable);
+                } finally {
+                  addVolumeCompletionLatch.countDown();
+                }
+              }
+            });
+        addVolumeDelayedThreads.add(addVolThread);
+        addVolThread.start();
+        return null;
+      }
+    }).when(dn.data).addVolume(any(StorageLocation.class), any(List.class));
+
+    addVolumes(newVolumeCount, addVolumeCompletionLatch);
+    numVolumes += newVolumeCount;
+
+    // Wait for all addVolume and listStorage Threads to complete
+    for (Thread t : addVolumeDelayedThreads) {
+      t.join();
+    }
+    listStorageThread.join();
+
+    // Verify errors while adding volumes and listing storage directories
+    Assert.assertEquals("Error adding volumes!", false, addVolumeError.get());
+    Assert.assertEquals("Error listing storage!",
+        false, listStorageError.get());
+
+    int additionalBlockCount = 9;
+    int totalBlockCount = initialBlockCount + additionalBlockCount;
+
+    // Continue to write the same file, thus the new volumes will have blocks.
+    DFSTestUtil.appendFile(cluster.getFileSystem(), testFile,
+        BLOCK_SIZE * additionalBlockCount);
+    verifyFileLength(cluster.getFileSystem(), testFile, totalBlockCount);
+
+    List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
+        cluster.getAllBlockReports(blockPoolId);
+    assertEquals(1, blockReports.size());
+    assertEquals(numVolumes, blockReports.get(0).size());
+  }
+
   @Test(timeout=60000)
   public void testAddVolumesToFederationNN()
       throws IOException, TimeoutException, InterruptedException,
@@ -778,7 +912,7 @@ public class TestDataNodeHotSwapVolumes {
   }
 
   /**
-   * Verify that {@link DataNode#checkDiskErrors()} removes all metadata in
+   * Verify that {@link DataNode#checkDiskError()} removes all metadata in
    * DataNode upon a volume failure. Thus we can run reconfig on the same
    * configuration to reload the new volume on the same directory as the failed one.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org