You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by le...@apache.org on 2016/12/29 07:35:36 UTC
hadoop git commit: HDFS-11251. ConcurrentModificationException during
DataNode#refreshVolumes. (Manoj Govindassamy via lei)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 af266c8c8 -> f349c7692
HDFS-11251. ConcurrentModificationException during DataNode#refreshVolumes. (Manoj Govindassamy via lei)
(cherry picked from commit e9f1396834174646a8d7aa8fc6c4a4f724ca5b28)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f349c769
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f349c769
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f349c769
Branch: refs/heads/branch-2
Commit: f349c7692d38642afd31ce7ffc6e1960f0781743
Parents: af266c8
Author: Lei Xu <le...@apache.org>
Authored: Thu Dec 29 15:10:36 2016 +0800
Committer: Lei Xu <le...@apache.org>
Committed: Thu Dec 29 15:35:17 2016 +0800
----------------------------------------------------------------------
.../hadoop/hdfs/server/common/Storage.java | 6 +-
.../server/datanode/BlockPoolSliceStorage.java | 2 +-
.../hdfs/server/datanode/DataStorage.java | 2 +-
.../datanode/TestDataNodeHotSwapVolumes.java | 154 +++++++++++++++++--
4 files changed, 150 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f349c769/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
index 6f1ee19..009d0f0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
@@ -32,6 +32,7 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
@@ -121,8 +122,9 @@ public abstract class Storage extends StorageInfo {
public StorageDirType getStorageDirType();
public boolean isOfType(StorageDirType type);
}
-
- protected List<StorageDirectory> storageDirs = new ArrayList<StorageDirectory>();
+
+ protected List<StorageDirectory> storageDirs =
+ new CopyOnWriteArrayList<StorageDirectory>();
private class DirIterator implements Iterator<StorageDirectory> {
final StorageDirType dirType;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f349c769/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
index fd90ae9..81fce6f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
@@ -296,7 +296,7 @@ public class BlockPoolSliceStorage extends Storage {
it.hasNext(); ) {
StorageDirectory sd = it.next();
if (sd.getRoot().getAbsoluteFile().equals(absPathToRemove)) {
- it.remove();
+ this.storageDirs.remove(sd);
break;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f349c769/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 0e6b339..0a8f1fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -519,7 +519,7 @@ public class DataStorage extends Storage {
bpsStorage.remove(bpRoot.getAbsoluteFile());
}
- it.remove();
+ this.storageDirs.remove(sd);
try {
sd.unlock();
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f349c769/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
index e36b744c..5d4543d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@ -47,7 +47,9 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Test;
import java.io.File;
@@ -62,8 +64,10 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -91,6 +95,7 @@ public class TestDataNodeHotSwapVolumes {
private static final Log LOG = LogFactory.getLog(
TestDataNodeHotSwapVolumes.class);
private static final int BLOCK_SIZE = 512;
+ private static final int DEFAULT_STORAGES_PER_DATANODE = 2;
private MiniDFSCluster cluster;
@After
@@ -100,6 +105,11 @@ public class TestDataNodeHotSwapVolumes {
private void startDFSCluster(int numNameNodes, int numDataNodes)
throws IOException {
+ startDFSCluster(numNameNodes, numDataNodes, DEFAULT_STORAGES_PER_DATANODE);
+ }
+
+ private void startDFSCluster(int numNameNodes, int numDataNodes,
+ int storagePerDataNode) throws IOException {
shutdown();
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
@@ -121,6 +131,7 @@ public class TestDataNodeHotSwapVolumes {
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(nnTopology)
.numDataNodes(numDataNodes)
+ .storagesPerDatanode(storagePerDataNode)
.build();
cluster.waitActive();
}
@@ -279,7 +290,12 @@ public class TestDataNodeHotSwapVolumes {
/** Add volumes to the first DataNode. */
private void addVolumes(int numNewVolumes)
- throws ReconfigurationException, IOException {
+ throws InterruptedException, IOException, ReconfigurationException {
+ addVolumes(numNewVolumes, new CountDownLatch(0));
+ }
+
+ private void addVolumes(int numNewVolumes, CountDownLatch waitLatch)
+ throws ReconfigurationException, IOException, InterruptedException {
File dataDir = new File(cluster.getDataDirectory());
DataNode dn = cluster.getDataNodes().get(0); // First DataNode.
Configuration conf = dn.getConf();
@@ -311,6 +327,9 @@ public class TestDataNodeHotSwapVolumes {
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDataDir),
is(conf.get(DFS_DATANODE_DATA_DIR_KEY)));
+ // Await on the latch for needed operations to complete
+ waitLatch.await();
+
// Verify the configuration value is appropriately set.
String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(",");
String[] expectDataDirs = newDataDir.split(",");
@@ -398,23 +417,34 @@ public class TestDataNodeHotSwapVolumes {
throws IOException, InterruptedException, TimeoutException,
ReconfigurationException {
startDFSCluster(1, 1);
+ int numVolumes = cluster.getStoragesPerDatanode();
String bpid = cluster.getNamesystem().getBlockPoolId();
Path testFile = new Path("/test");
- createFile(testFile, 4); // Each volume has 2 blocks.
- addVolumes(2);
+ // Each volume has 2 blocks
+ int initialBlockCount = numVolumes * 2;
+ createFile(testFile, initialBlockCount);
+
+ int newVolumeCount = 5;
+ addVolumes(newVolumeCount);
+ numVolumes += newVolumeCount;
+
+ int additionalBlockCount = 9;
+ int totalBlockCount = initialBlockCount + additionalBlockCount;
// Continue to write the same file, thus the new volumes will have blocks.
- DFSTestUtil.appendFile(cluster.getFileSystem(), testFile, BLOCK_SIZE * 8);
- verifyFileLength(cluster.getFileSystem(), testFile, 8 + 4);
- // After appending data, there should be [2, 2, 4, 4] blocks in each volume
- // respectively.
- List<Integer> expectedNumBlocks = Arrays.asList(2, 2, 4, 4);
+ DFSTestUtil.appendFile(cluster.getFileSystem(), testFile,
+ BLOCK_SIZE * additionalBlockCount);
+ verifyFileLength(cluster.getFileSystem(), testFile, totalBlockCount);
+
+ // After appending data, each new volume added should
+ // have 1 block each.
+ List<Integer> expectedNumBlocks = Arrays.asList(1, 1, 1, 1, 1, 4, 4);
List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
cluster.getAllBlockReports(bpid);
assertEquals(1, blockReports.size()); // 1 DataNode
- assertEquals(4, blockReports.get(0).size()); // 4 volumes
+ assertEquals(numVolumes, blockReports.get(0).size()); // 7 volumes
Map<DatanodeStorage, BlockListAsLongs> dnReport =
blockReports.get(0);
List<Integer> actualNumBlocks = new ArrayList<Integer>();
@@ -425,6 +455,110 @@ public class TestDataNodeHotSwapVolumes {
assertEquals(expectedNumBlocks, actualNumBlocks);
}
+ @Test(timeout=180000)
+ public void testAddVolumesConcurrently()
+ throws IOException, InterruptedException, TimeoutException,
+ ReconfigurationException {
+ startDFSCluster(1, 1, 10);
+ int numVolumes = cluster.getStoragesPerDatanode();
+ String blockPoolId = cluster.getNamesystem().getBlockPoolId();
+ Path testFile = new Path("/test");
+
+ // Each volume has 2 blocks
+ int initialBlockCount = numVolumes * 2;
+ createFile(testFile, initialBlockCount);
+
+ final DataNode dn = cluster.getDataNodes().get(0);
+ final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data;
+ dn.data = Mockito.spy(data);
+
+ final int newVolumeCount = 40;
+ final List<Thread> addVolumeDelayedThreads = new ArrayList<>();
+ final AtomicBoolean addVolumeError = new AtomicBoolean(false);
+ final AtomicBoolean listStorageError = new AtomicBoolean(false);
+ final CountDownLatch addVolumeCompletionLatch =
+ new CountDownLatch(newVolumeCount);
+
+ // Thread to list all storage available at DataNode,
+ // when the volumes are being added in parallel.
+ final Thread listStorageThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (addVolumeCompletionLatch.getCount() != newVolumeCount) {
+ int i = 0;
+ while(i++ < 1000) {
+ try {
+ dn.getStorage().listStorageDirectories();
+ } catch (Exception e) {
+ listStorageError.set(true);
+ LOG.error("Error listing storage: " + e);
+ }
+ }
+ }
+ }
+ });
+ listStorageThread.start();
+
+ // FsDatasetImpl addVolume mocked to perform the operation asynchronously
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(final InvocationOnMock invocationOnMock) throws Throwable {
+ final Random r = new Random();
+ Thread addVolThread =
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ r.setSeed(Time.now());
+ // Let 50% of add volume operations
+ // start after an initial delay.
+ if (r.nextInt(10) > 4) {
+ int s = r.nextInt(10) + 1;
+ Thread.sleep(s * 100);
+ }
+ invocationOnMock.callRealMethod();
+ } catch (Throwable throwable) {
+ addVolumeError.set(true);
+ LOG.error("Error adding volume: " + throwable);
+ } finally {
+ addVolumeCompletionLatch.countDown();
+ }
+ }
+ });
+ addVolumeDelayedThreads.add(addVolThread);
+ addVolThread.start();
+ return null;
+ }
+ }).when(dn.data).addVolume(any(StorageLocation.class), any(List.class));
+
+ addVolumes(newVolumeCount, addVolumeCompletionLatch);
+ numVolumes += newVolumeCount;
+
+ // Wait for all addVolume and listStorage Threads to complete
+ for (Thread t : addVolumeDelayedThreads) {
+ t.join();
+ }
+ listStorageThread.join();
+
+ // Verify errors while adding volumes and listing storage directories
+ Assert.assertEquals("Error adding volumes!", false, addVolumeError.get());
+ Assert.assertEquals("Error listing storage!",
+ false, listStorageError.get());
+
+ int additionalBlockCount = 9;
+ int totalBlockCount = initialBlockCount + additionalBlockCount;
+
+ // Continue to write the same file, thus the new volumes will have blocks.
+ DFSTestUtil.appendFile(cluster.getFileSystem(), testFile,
+ BLOCK_SIZE * additionalBlockCount);
+ verifyFileLength(cluster.getFileSystem(), testFile, totalBlockCount);
+
+ List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
+ cluster.getAllBlockReports(blockPoolId);
+ assertEquals(1, blockReports.size());
+ assertEquals(numVolumes, blockReports.get(0).size());
+ }
+
@Test(timeout=60000)
public void testAddVolumesToFederationNN()
throws IOException, TimeoutException, InterruptedException,
@@ -778,7 +912,7 @@ public class TestDataNodeHotSwapVolumes {
}
/**
- * Verify that {@link DataNode#checkDiskErrors()} removes all metadata in
+ * Verify that {@link DataNode#checkDiskError()} removes all metadata in
* DataNode upon a volume failure. Thus we can run reconfig on the same
* configuration to reload the new volume on the same directory as the failed one.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org