You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2016/10/18 23:45:11 UTC
[25/50] [abbrv] hadoop git commit: HDFS-10301. Remove FBR tracking
state to fix false zombie storage detection for interleaving block reports.
Contributed by Vinitha Gankidi.
HDFS-10301. Remove FBR tracking state to fix false zombie storage detection for interleaving block reports. Contributed by Vinitha Gankidi.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/391ce535
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/391ce535
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/391ce535
Branch: refs/heads/HADOOP-13070
Commit: 391ce535a739dc92cb90017d759217265a4fd969
Parents: 30bb197
Author: Vinitha Reddy Gankidi <vi...@linkedin.com>
Authored: Fri Oct 14 10:37:44 2016 -0700
Committer: Konstantin V Shvachko <sh...@apache.org>
Committed: Fri Oct 14 18:13:54 2016 -0700
----------------------------------------------------------------------
.../server/blockmanagement/BlockManager.java | 75 ++++++--------------
.../blockmanagement/DatanodeDescriptor.java | 48 -------------
.../blockmanagement/DatanodeStorageInfo.java | 11 ---
.../hdfs/server/namenode/NameNodeRpcServer.java | 4 +-
.../blockmanagement/TestBlockManager.java | 19 +++--
.../TestNameNodePrunesMissingStorages.java | 70 +++++++++++++++---
.../server/datanode/BlockReportTestBase.java | 50 +++++++++++++
.../TestAddOverReplicatedStripedBlocks.java | 4 ++
8 files changed, 147 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/391ce535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 7949439..7b13add 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1347,6 +1347,8 @@ public class BlockManager implements BlockStatsMXBean {
}
}
checkSafeMode();
+ LOG.info("Removed blocks associated with storage {} from DataNode {}",
+ storageInfo, node);
}
/**
@@ -2191,7 +2193,7 @@ public class BlockManager implements BlockStatsMXBean {
public boolean processReport(final DatanodeID nodeID,
final DatanodeStorage storage,
final BlockListAsLongs newReport,
- BlockReportContext context, boolean lastStorageInRpc) throws IOException {
+ BlockReportContext context) throws IOException {
namesystem.writeLock();
final long startTime = Time.monotonicNow(); //after acquiring write lock
final long endTime;
@@ -2245,32 +2247,6 @@ public class BlockManager implements BlockStatsMXBean {
}
storageInfo.receivedBlockReport();
- if (context != null) {
- storageInfo.setLastBlockReportId(context.getReportId());
- if (lastStorageInRpc) {
- int rpcsSeen = node.updateBlockReportContext(context);
- if (rpcsSeen >= context.getTotalRpcs()) {
- long leaseId = blockReportLeaseManager.removeLease(node);
- BlockManagerFaultInjector.getInstance().
- removeBlockReportLease(node, leaseId);
- List<DatanodeStorageInfo> zombies = node.removeZombieStorages();
- if (zombies.isEmpty()) {
- LOG.debug("processReport 0x{}: no zombie storages found.",
- Long.toHexString(context.getReportId()));
- } else {
- for (DatanodeStorageInfo zombie : zombies) {
- removeZombieReplicas(context, zombie);
- }
- }
- node.clearBlockReportContext();
- } else {
- LOG.debug("processReport 0x{}: {} more RPCs remaining in this " +
- "report.", Long.toHexString(context.getReportId()),
- (context.getTotalRpcs() - rpcsSeen)
- );
- }
- }
- }
} finally {
endTime = Time.monotonicNow();
namesystem.writeUnlock();
@@ -2295,36 +2271,25 @@ public class BlockManager implements BlockStatsMXBean {
return !node.hasStaleStorages();
}
- private void removeZombieReplicas(BlockReportContext context,
- DatanodeStorageInfo zombie) {
- LOG.warn("processReport 0x{}: removing zombie storage {}, which no " +
- "longer exists on the DataNode.",
- Long.toHexString(context.getReportId()), zombie.getStorageID());
- assert(namesystem.hasWriteLock());
- Iterator<BlockInfo> iter = zombie.getBlockIterator();
- int prevBlocks = zombie.numBlocks();
- while (iter.hasNext()) {
- BlockInfo block = iter.next();
- // We assume that a block can be on only one storage in a DataNode.
- // That's why we pass in the DatanodeDescriptor rather than the
- // DatanodeStorageInfo.
- // TODO: remove this assumption in case we want to put a block on
- // more than one storage on a datanode (and because it's a difficult
- // assumption to really enforce)
- // DatanodeStorageInfo must be removed using the iterator to avoid
- // ConcurrentModificationException in the underlying storage
- iter.remove();
- removeStoredBlock(block, zombie.getDatanodeDescriptor());
- Block b = getBlockOnStorage(block, zombie);
- if (b != null) {
- invalidateBlocks.remove(zombie.getDatanodeDescriptor(), b);
+ public void removeBRLeaseIfNeeded(final DatanodeID nodeID,
+ final BlockReportContext context) throws IOException {
+ namesystem.writeLock();
+ DatanodeDescriptor node;
+ try {
+ node = datanodeManager.getDatanode(nodeID);
+ if (context != null) {
+ if (context.getTotalRpcs() == context.getCurRpc() + 1) {
+ long leaseId = this.getBlockReportLeaseManager().removeLease(node);
+ BlockManagerFaultInjector.getInstance().
+ removeBlockReportLease(node, leaseId);
+ }
+ LOG.debug("Processing RPC with index {} out of total {} RPCs in "
+ + "processReport 0x{}", context.getCurRpc(),
+ context.getTotalRpcs(), Long.toHexString(context.getReportId()));
}
+ } finally {
+ namesystem.writeUnlock();
}
- assert(zombie.numBlocks() == 0);
- LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " +
- "which no longer exists on the DataNode.",
- Long.toHexString(context.getReportId()), prevBlocks,
- zombie.getStorageID());
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/391ce535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index c74d7c5..6d163ec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
-import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -31,7 +30,6 @@ import java.util.Queue;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -43,7 +41,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -68,8 +65,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
LoggerFactory.getLogger(DatanodeDescriptor.class);
public static final DatanodeDescriptor[] EMPTY_ARRAY = {};
private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
- private static final List<DatanodeStorageInfo> EMPTY_STORAGE_INFO_LIST =
- ImmutableList.of();
/** Block and targets pair */
@InterfaceAudience.Private
@@ -154,10 +149,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
public final DecommissioningStatus decommissioningStatus =
new DecommissioningStatus();
- private long curBlockReportId = 0;
-
- private BitSet curBlockReportRpcsSeen = null;
-
private final Map<String, DatanodeStorageInfo> storageMap =
new HashMap<>();
@@ -257,20 +248,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
}
- public int updateBlockReportContext(BlockReportContext context) {
- if (curBlockReportId != context.getReportId()) {
- curBlockReportId = context.getReportId();
- curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs());
- }
- curBlockReportRpcsSeen.set(context.getCurRpc());
- return curBlockReportRpcsSeen.cardinality();
- }
-
- public void clearBlockReportContext() {
- curBlockReportId = 0;
- curBlockReportRpcsSeen = null;
- }
-
public CachedBlocksList getPendingCached() {
return pendingCached;
}
@@ -334,31 +311,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
}
- List<DatanodeStorageInfo> removeZombieStorages() {
- List<DatanodeStorageInfo> zombies = null;
- synchronized (storageMap) {
- Iterator<Map.Entry<String, DatanodeStorageInfo>> iter =
- storageMap.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<String, DatanodeStorageInfo> entry = iter.next();
- DatanodeStorageInfo storageInfo = entry.getValue();
- if (storageInfo.getLastBlockReportId() != curBlockReportId) {
- LOG.info("{} had lastBlockReportId 0x{} but curBlockReportId = 0x{}",
- storageInfo.getStorageID(),
- Long.toHexString(storageInfo.getLastBlockReportId()),
- Long.toHexString(curBlockReportId));
- iter.remove();
- if (zombies == null) {
- zombies = new LinkedList<>();
- }
- zombies.add(storageInfo);
- }
- storageInfo.setLastBlockReportId(0);
- }
- }
- return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies;
- }
-
public void resetBlocks() {
setCapacity(0);
setRemaining(0);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/391ce535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index d98a2c1..b4c8aaa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -99,9 +99,6 @@ public class DatanodeStorageInfo {
private final FoldedTreeSet<BlockInfo> blocks = new FoldedTreeSet<>();
- // The ID of the last full block report which updated this storage.
- private long lastBlockReportId = 0;
-
/** The number of block reports received */
private int blockReportCount = 0;
@@ -166,14 +163,6 @@ public class DatanodeStorageInfo {
this.blockPoolUsed = blockPoolUsed;
}
- long getLastBlockReportId() {
- return lastBlockReportId;
- }
-
- void setLastBlockReportId(long lastBlockReportId) {
- this.lastBlockReportId = lastBlockReportId;
- }
-
State getState() {
return this.state;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/391ce535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index a97a307..7894163 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -1441,11 +1441,13 @@ public class NameNodeRpcServer implements NamenodeProtocols {
@Override
public Boolean call() throws IOException {
return bm.processReport(nodeReg, reports[index].getStorage(),
- blocks, context, (index == reports.length - 1));
+ blocks, context);
}
});
metrics.incrStorageBlockReportOps();
}
+ bm.removeBRLeaseIfNeeded(nodeReg, context);
+
BlockManagerFaultInjector.getInstance().
incomingBlockReportRpc(nodeReg, context);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/391ce535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 942569a..2c7c720 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -716,12 +716,12 @@ public class TestBlockManager {
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
- BlockListAsLongs.EMPTY, null, false);
+ BlockListAsLongs.EMPTY, null);
assertEquals(1, ds.getBlockReportCount());
// send block report again, should NOT be processed
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
- BlockListAsLongs.EMPTY, null, false);
+ BlockListAsLongs.EMPTY, null);
assertEquals(1, ds.getBlockReportCount());
// re-register as if node restarted, should update existing node
@@ -732,7 +732,7 @@ public class TestBlockManager {
// send block report, should be processed after restart
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
- BlockListAsLongs.EMPTY, null, false);
+ BlockListAsLongs.EMPTY, null);
// Reinitialize as registration with empty storage list pruned
// node.storageMap.
ds = node.getStorageInfos()[0];
@@ -761,7 +761,7 @@ public class TestBlockManager {
reset(node);
doReturn(1).when(node).numBlocks();
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
- BlockListAsLongs.EMPTY, null, false);
+ BlockListAsLongs.EMPTY, null);
assertEquals(1, ds.getBlockReportCount());
}
@@ -835,7 +835,7 @@ public class TestBlockManager {
assertEquals(0, ds.getBlockReportCount());
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
builder.build(),
- new BlockReportContext(1, 0, System.nanoTime(), 0, true), false);
+ new BlockReportContext(1, 0, System.nanoTime(), 0, true));
assertEquals(1, ds.getBlockReportCount());
// verify the storage info is correct
@@ -874,8 +874,7 @@ public class TestBlockManager {
assertEquals(0, ds.getBlockReportCount());
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
generateReport(blocks),
- new BlockReportContext(1, 0, System.nanoTime(), 0, false),
- false);
+ new BlockReportContext(1, 0, System.nanoTime(), 0, false));
assertEquals(1, ds.getBlockReportCount());
// verify the storage info is correct
for (BlockInfo block : blocks) {
@@ -885,8 +884,7 @@ public class TestBlockManager {
// Send unsorted report
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
generateReport(blocks),
- new BlockReportContext(1, 0, System.nanoTime(), 0, false),
- false);
+ new BlockReportContext(1, 0, System.nanoTime(), 0, false));
assertEquals(2, ds.getBlockReportCount());
// verify the storage info is correct
for (BlockInfo block : blocks) {
@@ -897,8 +895,7 @@ public class TestBlockManager {
Collections.sort(blocks);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
generateReport(blocks),
- new BlockReportContext(1, 0, System.nanoTime(), 0, true),
- false);
+ new BlockReportContext(1, 0, System.nanoTime(), 0, true));
assertEquals(3, ds.getBlockReportCount());
// verify the storage info is correct
for (BlockInfo block : blocks) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/391ce535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index 6efc53a..274627f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -19,24 +19,23 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.base.Supplier;
+import java.util.ArrayList;
+import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -47,7 +46,6 @@ import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
-import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
@@ -56,13 +54,11 @@ import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
-import java.util.Arrays;
-import java.util.EnumSet;
import java.util.Iterator;
-import java.util.List;
import java.util.UUID;
import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
@@ -160,6 +156,8 @@ public class TestNameNodePrunesMissingStorages {
public void testRemovingStorageDoesNotProduceZombies() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+ 1000);
final int NUM_STORAGES_PER_DN = 2;
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(conf).numDataNodes(3)
@@ -262,7 +260,7 @@ public class TestNameNodePrunesMissingStorages {
assertEquals(NUM_STORAGES_PER_DN - 1, infos.length);
return true;
}
- }, 10, 30000);
+ }, 1000, 30000);
} finally {
if (cluster != null) {
cluster.shutdown();
@@ -371,4 +369,60 @@ public class TestNameNodePrunesMissingStorages {
cluster.shutdown();
}
}
+
+ @Test(timeout=300000)
+ public void testNameNodePrunesUnreportedStorages() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ // Create a cluster with one datanode with two storages
+ MiniDFSCluster cluster = new MiniDFSCluster
+ .Builder(conf).numDataNodes(1)
+ .storagesPerDatanode(2)
+ .build();
+ // Create two files to ensure each storage has a block
+ DFSTestUtil.createFile(cluster.getFileSystem(), new Path("file1"),
+ 102400, 102400, 102400, (short)1,
+ 0x1BAD5EE);
+ DFSTestUtil.createFile(cluster.getFileSystem(), new Path("file2"),
+ 102400, 102400, 102400, (short)1,
+ 0x1BAD5EED);
+ // Get the datanode storages and data directories
+ DataNode dn = cluster.getDataNodes().get(0);
+ BlockManager bm = cluster.getNameNode().getNamesystem().getBlockManager();
+ DatanodeDescriptor dnDescriptor = bm.getDatanodeManager().
+ getDatanode(cluster.getDataNodes().get(0).getDatanodeUuid());
+ DatanodeStorageInfo[] dnStoragesInfosBeforeRestart =
+ dnDescriptor.getStorageInfos();
+ Collection<String> oldDirs = new ArrayList<String>(dn.getConf().
+ getTrimmedStringCollection(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
+ // Keep the first data directory and remove the second.
+ String newDirs = oldDirs.iterator().next();
+ conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
+ // Restart the datanode with the new conf
+ cluster.stopDataNode(0);
+ cluster.startDataNodes(conf, 1, false, null, null);
+ dn = cluster.getDataNodes().get(0);
+ cluster.waitActive();
+ // Assert that the dnDescriptor has both the storages after restart
+ assertArrayEquals(dnStoragesInfosBeforeRestart,
+ dnDescriptor.getStorageInfos());
+ // Assert that the removed storage is marked as FAILED
+ // when DN heartbeats to the NN
+ int numFailedStoragesWithBlocks = 0;
+ DatanodeStorageInfo failedStorageInfo = null;
+ for (DatanodeStorageInfo dnStorageInfo: dnDescriptor.getStorageInfos()) {
+ if (dnStorageInfo.areBlocksOnFailedStorage()) {
+ numFailedStoragesWithBlocks++;
+ failedStorageInfo = dnStorageInfo;
+ }
+ }
+ assertEquals(1, numFailedStoragesWithBlocks);
+ // Heartbeat manager removes the blocks associated with this failed storage
+ bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
+ assertTrue(!failedStorageInfo.areBlocksOnFailedStorage());
+ // pruneStorageMap removes the unreported storage
+ cluster.triggerHeartbeats();
+ // Assert that the unreported storage is pruned
+ assertEquals(DataNode.getStorageLocations(dn.getConf()).size(),
+ dnDescriptor.getStorageInfos().length);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/391ce535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
index 53b9263..6810a0b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
@@ -29,7 +29,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -50,7 +55,10 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
@@ -649,6 +657,48 @@ public abstract class BlockReportTestBase {
DFSTestUtil.readFile(fs, filePath);
}
+ // See HDFS-10301
+ @Test(timeout = 300000)
+ public void testInterleavedBlockReports()
+ throws IOException, ExecutionException, InterruptedException {
+ int numConcurrentBlockReports = 3;
+ DataNode dn = cluster.getDataNodes().get(DN_N0);
+ final String poolId = cluster.getNamesystem().getBlockPoolId();
+ LOG.info("Block pool id: " + poolId);
+ final DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+ final StorageBlockReport[] reports =
+ getBlockReports(dn, poolId, true, true);
+
+ // Get the list of storage ids associated with the datanode
+ // before the test
+ BlockManager bm = cluster.getNameNode().getNamesystem().getBlockManager();
+ final DatanodeDescriptor dnDescriptor =
+ bm.getDatanodeManager().getDatanode(dn.getDatanodeId());
+ DatanodeStorageInfo[] storageInfos = dnDescriptor.getStorageInfos();
+
+ // Send the block report concurrently using
+ // numThreads=numConcurrentBlockReports
+ ExecutorService executorService =
+ Executors.newFixedThreadPool(numConcurrentBlockReports);
+ List<Future<Void>> futureList = new ArrayList<>(numConcurrentBlockReports);
+ for (int i = 0; i < numConcurrentBlockReports; i++) {
+ futureList.add(executorService.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ sendBlockReports(dnR, poolId, reports);
+ return null;
+ }
+ }));
+ }
+ for (Future<Void> future : futureList) {
+ future.get();
+ }
+ executorService.shutdown();
+
+ // Verify that the storages match before and after the test
+ Assert.assertArrayEquals(storageInfos, dnDescriptor.getStorageInfos());
+ }
+
private void waitForTempReplica(Block bl, int DN_N1) throws IOException {
final boolean tooLongWait = false;
final int TIMEOUT = 40000;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/391ce535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
index 7b281a6..13dcccf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
@@ -236,6 +237,9 @@ public class TestAddOverReplicatedStripedBlocks {
}
}
+ // This test is going to be rewritten in HDFS-10854. Ignoring this test
+ // temporarily as it fails with the fix for HDFS-10301.
+ @Ignore
@Test
public void testProcessOverReplicatedAndMissingStripedBlock()
throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org