You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/12/21 05:32:41 UTC
svn commit: r1221608 - in
/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/
src/main/java/org/apache/hadoop/hdfs/server/datanode/
src/main/java/org/apache/hadoop/hdfs/serv...
Author: todd
Date: Wed Dec 21 04:32:40 2011
New Revision: 1221608
URL: http://svn.apache.org/viewvc?rev=1221608&view=rev
Log:
HDFS-1972. Fencing mechanism for block invalidations and replications. Contributed by Todd Lipcon.
Added:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ActiveState.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt Wed Dec 21 04:32:40 2011
@@ -73,3 +73,5 @@ HDFS-2678. When a FailoverProxyProvider
HDFS-2682. When a FailoverProxyProvider is used, Client should not retry for 45 times if it is timing out to connect to server. (Uma Maheswara Rao G via todd)
HDFS-2693. Fix synchronization issues around state transition (todd)
+
+HDFS-1972. Fencing mechanism for block invalidations and replications (todd)
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Dec 21 04:32:40 2011
@@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -68,6 +69,8 @@ import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.Daemon;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
/**
* Keeps information related to the blocks stored in the Hadoop cluster.
@@ -91,6 +94,7 @@ public class BlockManager {
private volatile long underReplicatedBlocksCount = 0L;
private volatile long scheduledReplicationBlocksCount = 0L;
private volatile long excessBlocksCount = 0L;
+ private volatile long postponedMisreplicatedBlocksCount = 0L;
/** Used by metrics */
public long getPendingReplicationBlocksCount() {
@@ -116,6 +120,10 @@ public class BlockManager {
public long getExcessBlocksCount() {
return excessBlocksCount;
}
+ /** Used by metrics */
+ public long getPostponedMisreplicatedBlocksCount() {
+ return postponedMisreplicatedBlocksCount;
+ }
/**replicationRecheckInterval is how often namenode checks for new replication work*/
private final long replicationRecheckInterval;
@@ -134,6 +142,15 @@ public class BlockManager {
/** Blocks to be invalidated. */
private final InvalidateBlocks invalidateBlocks;
+
+ /**
+ * After a failover, over-replicated blocks may not be handled
+ * until all of the replicas have done a block report to the
+ * new active. This is to make sure that this NameNode has been
+ * notified of all block deletions that might have been pending
+ * when the failover happened.
+ */
+ private final Set<Block> postponedMisreplicatedBlocks = Sets.newHashSet();
//
// Keeps a TreeSet for every named node. Each treeset contains
@@ -316,49 +333,15 @@ public class BlockManager {
out.println("Metasave: Blocks waiting for replication: " +
neededReplications.size());
for (Block block : neededReplications) {
- List<DatanodeDescriptor> containingNodes =
- new ArrayList<DatanodeDescriptor>();
- List<DatanodeDescriptor> containingLiveReplicasNodes =
- new ArrayList<DatanodeDescriptor>();
-
- NumberReplicas numReplicas = new NumberReplicas();
- // source node returned is not used
- chooseSourceDatanode(block, containingNodes,
- containingLiveReplicasNodes, numReplicas);
- assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas();
- int usableReplicas = numReplicas.liveReplicas() +
- numReplicas.decommissionedReplicas();
-
- if (block instanceof BlockInfo) {
- String fileName = ((BlockInfo)block).getINode().getFullPathName();
- out.print(fileName + ": ");
- }
- // l: == live:, d: == decommissioned c: == corrupt e: == excess
- out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
- " (replicas:" +
- " l: " + numReplicas.liveReplicas() +
- " d: " + numReplicas.decommissionedReplicas() +
- " c: " + numReplicas.corruptReplicas() +
- " e: " + numReplicas.excessReplicas() + ") ");
-
- Collection<DatanodeDescriptor> corruptNodes =
- corruptReplicas.getNodes(block);
-
- for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
- jt.hasNext();) {
- DatanodeDescriptor node = jt.next();
- String state = "";
- if (corruptNodes != null && corruptNodes.contains(node)) {
- state = "(corrupt)";
- } else if (node.isDecommissioned() ||
- node.isDecommissionInProgress()) {
- state = "(decommissioned)";
- }
- out.print(" " + node + state + " : ");
- }
- out.println("");
+ dumpBlockMeta(block, out);
}
}
+
+ // Dump any postponed over-replicated blocks
+ out.println("Mis-replicated blocks that have been postponed:");
+ for (Block block : postponedMisreplicatedBlocks) {
+ dumpBlockMeta(block, out);
+ }
// Dump blocks from pendingReplication
pendingReplications.metaSave(out);
@@ -369,6 +352,58 @@ public class BlockManager {
// Dump all datanodes
getDatanodeManager().datanodeDump(out);
}
+
+ /**
+ * Dump the metadata for the given block in a human-readable
+ * form.
+ */
+ private void dumpBlockMeta(Block block, PrintWriter out) {
+ List<DatanodeDescriptor> containingNodes =
+ new ArrayList<DatanodeDescriptor>();
+ List<DatanodeDescriptor> containingLiveReplicasNodes =
+ new ArrayList<DatanodeDescriptor>();
+
+ NumberReplicas numReplicas = new NumberReplicas();
+ // source node returned is not used
+ chooseSourceDatanode(block, containingNodes,
+ containingLiveReplicasNodes, numReplicas);
+ assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas();
+ int usableReplicas = numReplicas.liveReplicas() +
+ numReplicas.decommissionedReplicas();
+
+ if (block instanceof BlockInfo) {
+ String fileName = ((BlockInfo)block).getINode().getFullPathName();
+ out.print(fileName + ": ");
+ }
+ // l: == live:, d: == decommissioned c: == corrupt e: == excess
+ out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
+ " (replicas:" +
+ " l: " + numReplicas.liveReplicas() +
+ " d: " + numReplicas.decommissionedReplicas() +
+ " c: " + numReplicas.corruptReplicas() +
+ " e: " + numReplicas.excessReplicas() + ") ");
+
+ Collection<DatanodeDescriptor> corruptNodes =
+ corruptReplicas.getNodes(block);
+
+ for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
+ jt.hasNext();) {
+ DatanodeDescriptor node = jt.next();
+ String state = "";
+ if (corruptNodes != null && corruptNodes.contains(node)) {
+ state = "(corrupt)";
+ } else if (node.isDecommissioned() ||
+ node.isDecommissionInProgress()) {
+ state = "(decommissioned)";
+ }
+
+ if (node.areBlockContentsStale()) {
+ state += " (block deletions maybe out of date)";
+ }
+ out.print(" " + node + state + " : ");
+ }
+ out.println("");
+ }
/** @return maxReplicationStreams */
public int getMaxReplicationStreams() {
@@ -782,6 +817,14 @@ public class BlockManager {
node.resetBlocks();
invalidateBlocks.remove(node.getStorageID());
+
+ // If the DN hasn't block-reported since the most recent
+ // failover, then we may have been holding up on processing
+ // over-replicated blocks because of it. But we can now
+ // process those blocks.
+ if (node.areBlockContentsStale()) {
+ rescanPostponedMisreplicatedBlocks();
+ }
}
/**
@@ -879,10 +922,17 @@ public class BlockManager {
+ " because datanode " + dn.getName() + " does not exist.");
}
- // Check how many copies we have of the block. If we have at least one
- // copy on a live node, then we can delete it.
- int count = countNodes(blk).liveReplicas();
- if (count >= 1) {
+ // Check how many copies we have of the block
+ NumberReplicas nr = countNodes(blk);
+ if (nr.replicasOnStaleNodes() > 0) {
+ NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: postponing " +
+ "invalidation of block " + blk + " on " + dn + " because " +
+ nr.replicasOnStaleNodes() + " replica(s) are located on nodes " +
+ "with potentially out-of-date block reports.");
+ postponeBlock(blk);
+
+ } else if (nr.liveReplicas() >= 1) {
+ // If we have at least one copy on a live node, then we can delete it.
addToInvalidates(blk, dn);
removeStoredBlock(blk, node);
if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -895,6 +945,13 @@ public class BlockManager {
}
}
+ private void postponeBlock(Block blk) {
+ if (postponedMisreplicatedBlocks.add(blk)) {
+ postponedMisreplicatedBlocksCount++;
+ }
+ }
+
+
void updateState() {
pendingReplicationBlocksCount = pendingReplications.size();
underReplicatedBlocksCount = neededReplications.size();
@@ -933,7 +990,7 @@ public class BlockManager {
*
* @return number of blocks scheduled for replication during this iteration.
*/
- private int computeReplicationWork(int blocksToProcess) throws IOException {
+ int computeReplicationWork(int blocksToProcess) throws IOException {
List<List<Block>> blocksToReplicate = null;
namesystem.writeLock();
try {
@@ -984,8 +1041,10 @@ public class BlockManager {
NumberReplicas numReplicas = new NumberReplicas();
srcNode = chooseSourceDatanode(
block, containingNodes, liveReplicaNodes, numReplicas);
- if(srcNode == null) // block can not be replicated from any node
+ if(srcNode == null) { // block can not be replicated from any node
+ LOG.debug("Block " + block + " cannot be repl from any node");
continue;
+ }
assert liveReplicaNodes.size() == numReplicas.liveReplicas();
// do not schedule more if enough replicas is already pending
@@ -1235,7 +1294,7 @@ public class BlockManager {
srcNode = node;
}
if(numReplicas != null)
- numReplicas.initialize(live, decommissioned, corrupt, excess);
+ numReplicas.initialize(live, decommissioned, corrupt, excess, 0);
return srcNode;
}
@@ -1316,6 +1375,19 @@ public class BlockManager {
} else {
processReport(node, newReport);
}
+
+ // Now that we have an up-to-date block report, we know that any
+ // deletions from a previous NN iteration have been accounted for.
+ boolean staleBefore = node.areBlockContentsStale();
+ node.receivedBlockReport();
+ if (staleBefore && !node.areBlockContentsStale()) {
+ LOG.info("BLOCK* processReport: " +
+ "Received first block report from " + node +
+ " after becoming active. Its block contents are no longer" +
+ " considered stale.");
+ rescanPostponedMisreplicatedBlocks();
+ }
+
} finally {
endTime = Util.now();
namesystem.writeUnlock();
@@ -1328,6 +1400,37 @@ public class BlockManager {
+ ", processing time: " + (endTime - startTime) + " msecs");
}
+ /**
+ * Rescan the list of blocks which were previously postponed.
+ */
+ private void rescanPostponedMisreplicatedBlocks() {
+ for (Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
+ it.hasNext();) {
+ Block b = it.next();
+
+ BlockInfo bi = blocksMap.getStoredBlock(b);
+ if (bi == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
+ "Postponed mis-replicated block " + b + " no longer found " +
+ "in block map.");
+ }
+ it.remove();
+ postponedMisreplicatedBlocksCount--;
+ continue;
+ }
+ MisReplicationResult res = processMisReplicatedBlock(bi);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
+ "Re-scanned block " + b + ", result is " + res);
+ }
+ if (res != MisReplicationResult.POSTPONE) {
+ it.remove();
+ postponedMisreplicatedBlocksCount--;
+ }
+ }
+ }
+
private void processReport(final DatanodeDescriptor node,
final BlockListAsLongs report) throws IOException {
// Normal case:
@@ -1505,8 +1608,9 @@ public class BlockManager {
// Ignore replicas already scheduled to be removed from the DN
if(invalidateBlocks.contains(dn.getStorageID(), block)) {
- assert storedBlock.findDatanode(dn) < 0 : "Block " + block
- + " in recentInvalidatesSet should not appear in DN " + dn;
+/* TODO: following assertion is incorrect, see HDFS-2668
+assert storedBlock.findDatanode(dn) < 0 : "Block " + block
+ + " in recentInvalidatesSet should not appear in DN " + dn; */
return storedBlock;
}
@@ -1773,41 +1877,81 @@ public class BlockManager {
public void processMisReplicatedBlocks() {
assert namesystem.hasWriteLock();
- long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0;
+ long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0, nrPostponed = 0;
neededReplications.clear();
for (BlockInfo block : blocksMap.getBlocks()) {
- INodeFile fileINode = block.getINode();
- if (fileINode == null) {
- // block does not belong to any file
- nrInvalid++;
- addToInvalidates(block);
- continue;
- }
- // calculate current replication
- short expectedReplication = fileINode.getReplication();
- NumberReplicas num = countNodes(block);
- int numCurrentReplica = num.liveReplicas();
- // add to under-replicated queue if need to be
- if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
- if (neededReplications.add(block, numCurrentReplica, num
- .decommissionedReplicas(), expectedReplication)) {
- nrUnderReplicated++;
- }
- }
-
- if (numCurrentReplica > expectedReplication) {
- // over-replicated block
+ MisReplicationResult res = processMisReplicatedBlock(block);
+ LOG.info("block " + block + ": " + res);
+ switch (res) {
+ case UNDER_REPLICATED:
+ nrUnderReplicated++;
+ break;
+ case OVER_REPLICATED:
nrOverReplicated++;
- processOverReplicatedBlock(block, expectedReplication, null, null);
+ break;
+ case INVALID:
+ nrInvalid++;
+ break;
+ case POSTPONE:
+ nrPostponed++;
+ postponeBlock(block);
+ break;
+ case OK:
+ break;
+ default:
+ throw new AssertionError("Invalid enum value: " + res);
}
}
-
+
LOG.info("Total number of blocks = " + blocksMap.size());
LOG.info("Number of invalid blocks = " + nrInvalid);
LOG.info("Number of under-replicated blocks = " + nrUnderReplicated);
- LOG.info("Number of over-replicated blocks = " + nrOverReplicated);
+ LOG.info("Number of over-replicated blocks = " + nrOverReplicated +
+ ((nrPostponed > 0) ? ( " (" + nrPostponed + " postponed)") : ""));
}
+ /**
+ * Process a single possibly misreplicated block. This adds it to the
+ * appropriate queues if necessary, and returns a result code indicating
+ * what happened with it.
+ */
+ private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
+ INodeFile fileINode = block.getINode();
+ if (fileINode == null) {
+ // block does not belong to any file
+ addToInvalidates(block);
+ return MisReplicationResult.INVALID;
+ }
+ // calculate current replication
+ short expectedReplication = fileINode.getReplication();
+ NumberReplicas num = countNodes(block);
+ int numCurrentReplica = num.liveReplicas();
+ // add to under-replicated queue if need to be
+ if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
+ if (neededReplications.add(block, numCurrentReplica, num
+ .decommissionedReplicas(), expectedReplication)) {
+ return MisReplicationResult.UNDER_REPLICATED;
+ }
+ }
+
+ if (numCurrentReplica > expectedReplication) {
+ if (num.replicasOnStaleNodes() > 0) {
+ // If any of the replicas of this block are on nodes that are
+ // considered "stale", then these replicas may in fact have
+ // already been deleted. So, we cannot safely act on the
+ // over-replication until a later point in time, when
+ // the "stale" nodes have block reported.
+ return MisReplicationResult.POSTPONE;
+ }
+
+ // over-replicated block
+ processOverReplicatedBlock(block, expectedReplication, null, null);
+ return MisReplicationResult.OVER_REPLICATED;
+ }
+
+ return MisReplicationResult.OK;
+ }
+
/** Set replication for the blocks. */
public void setReplication(final short oldRepl, final short newRepl,
final String src, final Block... blocks) throws IOException {
@@ -1851,6 +1995,14 @@ public class BlockManager {
for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
it.hasNext();) {
DatanodeDescriptor cur = it.next();
+ if (cur.areBlockContentsStale()) {
+ LOG.info("BLOCK* processOverReplicatedBlock: " +
+ "Postponing processing of over-replicated block " +
+ block + " since datanode " + cur + " does not yet have up-to-date " +
+ "block information.");
+ postponeBlock(block);
+ return;
+ }
LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
.getStorageID());
if (excessBlocks == null || !excessBlocks.contains(block)) {
@@ -2153,13 +2305,15 @@ public class BlockManager {
}
/**
- * Return the number of nodes that are live and decommissioned.
+ * Return the number of nodes hosting a given block, grouped
+ * by the state of those replicas.
*/
public NumberReplicas countNodes(Block b) {
- int count = 0;
+ int decommissioned = 0;
int live = 0;
int corrupt = 0;
int excess = 0;
+ int stale = 0;
Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
while (nodeIter.hasNext()) {
@@ -2167,7 +2321,7 @@ public class BlockManager {
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
corrupt++;
} else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
- count++;
+ decommissioned++;
} else {
LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
.getStorageID());
@@ -2177,8 +2331,11 @@ public class BlockManager {
live++;
}
}
+ if (node.areBlockContentsStale()) {
+ stale++;
+ }
}
- return new NumberReplicas(live, count, corrupt, excess);
+ return new NumberReplicas(live, decommissioned, corrupt, excess, stale);
}
/**
@@ -2323,10 +2480,14 @@ public class BlockManager {
}
public void removeBlock(Block block) {
+ assert namesystem.hasWriteLock();
block.setNumBytes(BlockCommand.NO_ACK);
addToInvalidates(block);
corruptReplicas.removeFromCorruptReplicasMap(block);
blocksMap.removeBlock(block);
+ if (postponedMisreplicatedBlocks.remove(block)) {
+ postponedMisreplicatedBlocksCount--;
+ }
}
public BlockInfo getStoredBlock(Block block) {
@@ -2387,8 +2548,10 @@ public class BlockManager {
namesystem.writeLock();
try {
// blocks should not be replicated or removed if safe mode is on
- if (namesystem.isInSafeMode())
+ if (namesystem.isInSafeMode()) {
+ LOG.debug("In safemode, not computing replication work");
return 0;
+ }
// get blocks to invalidate for the nodeId
assert nodeId != null;
return invalidateBlocks.invalidateWork(nodeId);
@@ -2571,6 +2734,19 @@ public class BlockManager {
return workFound;
}
+ /**
+ * Clear all queues that hold decisions previously made by
+ * this NameNode.
+ */
+ public void clearQueues() {
+ neededReplications.clear();
+ pendingReplications.clear();
+ excessReplicateMap.clear();
+ invalidateBlocks.clear();
+ datanodeManager.clearPendingQueues();
+ };
+
+
private static class ReplicationWork {
private Block block;
@@ -2601,4 +2777,22 @@ public class BlockManager {
this.targets = null;
}
}
+
+ /**
+ * A simple result enum for the result of
+ * {@link BlockManager#processMisReplicatedBlock(BlockInfo)}.
+ */
+ enum MisReplicationResult {
+ /** The block should be invalidated since it belongs to a deleted file. */
+ INVALID,
+ /** The block is currently under-replicated. */
+ UNDER_REPLICATED,
+ /** The block is currently over-replicated. */
+ OVER_REPLICATED,
+ /** A decision can't currently be made about this block. */
+ POSTPONE,
+ /** The block is properly replicated */
+ OK
+ }
+
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Wed Dec 21 04:32:40 2011
@@ -60,7 +60,7 @@ public class BlockPlacementPolicyDefault
initialize(conf, stats, clusterMap);
}
- BlockPlacementPolicyDefault() {
+ protected BlockPlacementPolicyDefault() {
}
@Override
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Wed Dec 21 04:32:40 2011
@@ -94,6 +94,10 @@ public class DatanodeDescriptor extends
boolean contains(E e) {
return blockq.contains(e);
}
+
+ synchronized void clear() {
+ blockq.clear();
+ }
}
private volatile BlockInfo blockList = null;
@@ -103,6 +107,24 @@ public class DatanodeDescriptor extends
public boolean isAlive = false;
public boolean needKeyUpdate = false;
+ /**
+ * Set to false on any NN failover, and reset to true
+ * whenever a block report is received.
+ */
+ private boolean heartbeatedSinceFailover = false;
+
+ /**
+ * At startup or at any failover, the DNs in the cluster may
+ * have pending block deletions from a previous incarnation
+ * of the NameNode. Thus, we consider their block contents
+ * stale until we have received a block report. When a DN
+ * is considered stale, any replicas on it are transitively
+ * considered stale. If any block has at least one stale replica,
+ * then no invalidations will be processed for this block.
+ * See HDFS-1972.
+ */
+ private boolean blockContentsStale = true;
+
// A system administrator can tune the balancer bandwidth parameter
// (dfs.balance.bandwidthPerSec) dynamically by calling
// "dfsadmin -setBalanacerBandwidth <newbandwidth>", at which point the
@@ -281,6 +303,14 @@ public class DatanodeDescriptor extends
this.invalidateBlocks.clear();
this.volumeFailures = 0;
}
+
+ public void clearBlockQueues() {
+ synchronized (invalidateBlocks) {
+ this.invalidateBlocks.clear();
+ this.recoverBlocks.clear();
+ this.replicateBlocks.clear();
+ }
+ }
public int numBlocks() {
return numBlocks;
@@ -298,6 +328,7 @@ public class DatanodeDescriptor extends
this.lastUpdate = System.currentTimeMillis();
this.xceiverCount = xceiverCount;
this.volumeFailures = volFailures;
+ this.heartbeatedSinceFailover = true;
rollBlocksScheduled(lastUpdate);
}
@@ -564,5 +595,36 @@ public class DatanodeDescriptor extends
this.bandwidth = bandwidth;
}
+ public boolean areBlockContentsStale() {
+ return blockContentsStale;
+ }
+
+ public void markStaleAfterFailover() {
+ heartbeatedSinceFailover = false;
+ blockContentsStale = true;
+ }
+
+ public void receivedBlockReport() {
+ if (heartbeatedSinceFailover) {
+ blockContentsStale = false;
+ }
+ }
+ @Override
+ public String dumpDatanode() {
+ StringBuilder sb = new StringBuilder(super.dumpDatanode());
+ int repl = replicateBlocks.size();
+ if (repl > 0) {
+ sb.append(" ").append(repl).append(" blocks to be replicated;");
+ }
+ int inval = invalidateBlocks.size();
+ if (inval > 0) {
+ sb.append(" ").append(inval).append(" blocks to be invalidated;");
+ }
+ int recover = recoverBlocks.size();
+ if (recover > 0) {
+ sb.append(" ").append(recover).append(" blocks to be recovered;");
+ }
+ return sb.toString();
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Wed Dec 21 04:32:40 2011
@@ -945,4 +945,27 @@ public class DatanodeManager {
}
}
}
+
+ public void markAllDatanodesStale() {
+ LOG.info("Marking all datandoes as stale");
+ synchronized (datanodeMap) {
+ for (DatanodeDescriptor dn : datanodeMap.values()) {
+ dn.markStaleAfterFailover();
+ }
+ }
+ }
+
+ /**
+ * Clear any actions that are queued up to be sent to the DNs
+ * on their next heartbeats. This includes block invalidations,
+ * recoveries, and replication requests.
+ */
+ public void clearPendingQueues() {
+ synchronized (datanodeMap) {
+ for (DatanodeDescriptor dn : datanodeMap.values()) {
+ dn.clearBlockQueues();
+ }
+ }
+ }
+
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Wed Dec 21 04:32:40 2011
@@ -160,4 +160,9 @@ class InvalidateBlocks {
numBlocks -= toInvalidate.size();
return toInvalidate;
}
+
+ synchronized void clear() {
+ node2blocks.clear();
+ numBlocks = 0;
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java Wed Dec 21 04:32:40 2011
@@ -26,20 +26,22 @@ public class NumberReplicas {
private int decommissionedReplicas;
private int corruptReplicas;
private int excessReplicas;
+ private int replicasOnStaleNodes;
NumberReplicas() {
- initialize(0, 0, 0, 0);
+ initialize(0, 0, 0, 0, 0);
}
- NumberReplicas(int live, int decommissioned, int corrupt, int excess) {
- initialize(live, decommissioned, corrupt, excess);
+ NumberReplicas(int live, int decommissioned, int corrupt, int excess, int stale) {
+ initialize(live, decommissioned, corrupt, excess, stale);
}
- void initialize(int live, int decommissioned, int corrupt, int excess) {
+ void initialize(int live, int decommissioned, int corrupt, int excess, int stale) {
liveReplicas = live;
decommissionedReplicas = decommissioned;
corruptReplicas = corrupt;
excessReplicas = excess;
+ replicasOnStaleNodes = stale;
}
public int liveReplicas() {
@@ -54,4 +56,13 @@ public class NumberReplicas {
public int excessReplicas() {
return excessReplicas;
}
+
+ /**
+ * @return the number of replicas which are on stale nodes.
+ * This is not mutually exclusive with the other counts -- ie a
+ * replica may count as both "live" and "stale".
+ */
+ public int replicasOnStaleNodes() {
+ return replicasOnStaleNodes;
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java Wed Dec 21 04:32:40 2011
@@ -104,6 +104,14 @@ class PendingReplicationBlocks {
}
}
+
+ public void clear() {
+ synchronized (pendingReplications) {
+ pendingReplications.clear();
+ timedOutItems.clear();
+ }
+ }
+
/**
* The total number of blocks that are undergoing replication
*/
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Wed Dec 21 04:32:40 2011
@@ -499,6 +499,16 @@ class BPOfferService {
}
/**
+ * Run an immediate deletion report on this thread. Used by tests.
+ */
+ @VisibleForTesting
+ void triggerDeletionReportForTests() throws IOException {
+ for (BPServiceActor actor : bpServices) {
+ actor.triggerDeletionReportForTests();
+ }
+ }
+
+ /**
* Run an immediate heartbeat from all actors. Used by tests.
*/
@VisibleForTesting
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Wed Dec 21 04:32:40 2011
@@ -281,8 +281,18 @@ class BPServiceActor implements Runnable
*/
@VisibleForTesting
void triggerBlockReportForTests() throws IOException {
+ synchronized (receivedAndDeletedBlockList) {
lastBlockReport = 0;
- blockReport();
+ lastHeartbeat = 0;
+ receivedAndDeletedBlockList.notifyAll();
+ while (lastBlockReport == 0) {
+ try {
+ receivedAndDeletedBlockList.wait(100);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
}
@VisibleForTesting
@@ -290,6 +300,29 @@ class BPServiceActor implements Runnable
synchronized (receivedAndDeletedBlockList) {
lastHeartbeat = 0;
receivedAndDeletedBlockList.notifyAll();
+ while (lastHeartbeat == 0) {
+ try {
+ receivedAndDeletedBlockList.wait(100);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ void triggerDeletionReportForTests() throws IOException {
+ synchronized (receivedAndDeletedBlockList) {
+ lastDeletedReport = 0;
+ receivedAndDeletedBlockList.notifyAll();
+
+ while (lastDeletedReport == 0) {
+ try {
+ receivedAndDeletedBlockList.wait(100);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java Wed Dec 21 04:32:40 2011
@@ -107,6 +107,14 @@ class FSDatasetAsyncDiskService {
}
+ synchronized long countPendingDeletions() {
+ long count = 0;
+ for (ThreadPoolExecutor exec : executors.values()) {
+ count += exec.getTaskCount() - exec.getCompletedTaskCount();
+ }
+ return count;
+ }
+
/**
* Execute the task sometime in the future, using ThreadPools.
*/
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Dec 21 04:32:40 2011
@@ -71,6 +71,7 @@ import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
+import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.URI;
@@ -508,6 +509,17 @@ public class FSNamesystem implements Nam
"taking over writer role in edits logs.");
editLogTailer.catchupDuringFailover();
+ LOG.info("Reprocessing replication and invalidation queues...");
+ blockManager.getDatanodeManager().markAllDatanodesStale();
+ blockManager.clearQueues();
+ blockManager.processMisReplicatedBlocks();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("NameNode metadata after re-processing " +
+ "replication and invalidation queues during failover:\n" +
+ metaSaveAsString());
+ }
+
long nextTxId = dir.fsImage.getLastAppliedTxId() + 1;
LOG.info("Will take over writing edit logs at txnid " +
nextTxId);
@@ -523,7 +535,7 @@ public class FSNamesystem implements Nam
writeUnlock();
}
}
-
+
/**
* Stop services required in active state
* @throws InterruptedException
@@ -781,14 +793,7 @@ public class FSNamesystem implements Nam
File file = new File(System.getProperty("hadoop.log.dir"), filename);
PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
true)));
-
- long totalInodes = this.dir.totalInodes();
- long totalBlocks = this.getBlocksTotal();
- out.println(totalInodes + " files and directories, " + totalBlocks
- + " blocks = " + (totalInodes + totalBlocks) + " total");
-
- blockManager.metaSave(out);
-
+ metaSave(out);
out.flush();
out.close();
} finally {
@@ -796,6 +801,25 @@ public class FSNamesystem implements Nam
}
}
+ private void metaSave(PrintWriter out) {
+ assert hasWriteLock();
+ long totalInodes = this.dir.totalInodes();
+ long totalBlocks = this.getBlocksTotal();
+ out.println(totalInodes + " files and directories, " + totalBlocks
+ + " blocks = " + (totalInodes + totalBlocks) + " total");
+
+ blockManager.metaSave(out);
+ }
+
+ private String metaSaveAsString() {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ metaSave(pw);
+ pw.flush();
+ return sw.toString();
+ }
+
+
long getDefaultBlockSize() {
return serverDefaults.getBlockSize();
}
@@ -3605,6 +3629,9 @@ public class FSNamesystem implements Nam
@Override
public boolean isPopulatingReplQueues() {
+ if (!haContext.getState().shouldPopulateReplQueues()) {
+ return false;
+ }
// safeMode is volatile, and may be set to null at any time
SafeModeInfo safeMode = this.safeMode;
if (safeMode == null)
@@ -3939,6 +3966,11 @@ public class FSNamesystem implements Nam
}
@Metric
+ public long getPostponedMisreplicatedBlocks() {
+ return blockManager.getPostponedMisreplicatedBlocksCount();
+ }
+
+ @Metric
public int getBlockCapacity() {
return blockManager.getCapacity();
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ActiveState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ActiveState.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ActiveState.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ActiveState.java Wed Dec 21 04:32:40 2011
@@ -42,6 +42,11 @@ public class ActiveState extends HAState
}
@Override
+ public boolean shouldPopulateReplQueues() {
+ return true;
+ }
+
+ @Override
public void setState(HAContext context, HAState s) throws ServiceFailedException {
if (s == NameNode.STANDBY_STATE) {
setStateInternal(context, s);
@@ -67,4 +72,5 @@ public class ActiveState extends HAState
throw new ServiceFailedException("Failed to stop active services", e);
}
}
+
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java Wed Dec 21 04:32:40 2011
@@ -106,9 +106,12 @@ abstract public class HAState {
public abstract void checkOperation(final HAContext context, final OperationCategory op)
throws StandbyException;
+ public abstract boolean shouldPopulateReplQueues();
+
/**
* @return String representation of the service state.
*/
+ @Override
public String toString() {
return state.toString();
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyState.java Wed Dec 21 04:32:40 2011
@@ -80,5 +80,10 @@ public class StandbyState extends HAStat
+ context.getState();
throw new StandbyException(msg);
}
+
+ @Override
+ public boolean shouldPopulateReplQueues() {
+ return false;
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Wed Dec 21 04:32:40 2011
@@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.protocolR2
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
@@ -1574,6 +1575,30 @@ public class MiniDFSCluster {
ServiceFailedException {
getHaServiceClient(nnIndex).transitionToStandby();
}
+
+
+ public void triggerBlockReports()
+ throws IOException {
+ for (DataNode dn : getDataNodes()) {
+ DataNodeAdapter.triggerBlockReport(dn);
+ }
+ }
+
+
+ public void triggerDeletionReports()
+ throws IOException {
+ for (DataNode dn : getDataNodes()) {
+ DataNodeAdapter.triggerDeletionReport(dn);
+ }
+ }
+
+ public void triggerHeartbeats()
+ throws IOException {
+ for (DataNode dn : getDataNodes()) {
+ DataNodeAdapter.triggerHeartbeat(dn);
+ }
+ }
+
/** Wait until the given namenode gets registration from all the datanodes */
public void waitActive(int nnIndex) throws IOException {
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java Wed Dec 21 04:32:40 2011
@@ -122,4 +122,26 @@ public class BlockManagerTestUtil {
return blockManager.computeDatanodeWork();
}
+ public static int computeInvalidationWork(BlockManager bm) {
+ return bm.computeInvalidateWork(Integer.MAX_VALUE);
+ }
+
+ /**
+ * Compute all the replication and invalidation work for the
+ * given BlockManager.
+ *
+ * This differs from the above functions in that it computes
+ * replication work for all DNs rather than a particular subset,
+ * regardless of invalidation/replication limit configurations.
+ *
+ * NB: you may want to set
+ * {@link DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY} to
+ * a high value to ensure that all work is calculated.
+ */
+ public static int computeAllPendingWork(BlockManager bm)
+ throws IOException {
+ int work = computeInvalidationWork(bm);
+ work += bm.computeReplicationWork(Integer.MAX_VALUE);
+ return work;
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java Wed Dec 21 04:32:40 2011
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import java.io.IOException;
+
/**
* WARNING!! This is TEST ONLY class: it never has to be used
* for ANY development purposes.
@@ -42,4 +44,27 @@ public class DataNodeAdapter {
boolean heartbeatsDisabledForTests) {
dn.setHeartbeatsDisabledForTests(heartbeatsDisabledForTests);
}
+
+ public static void triggerDeletionReport(DataNode dn) throws IOException {
+ for (BPOfferService bpos : dn.getAllBpOs()) {
+ bpos.triggerDeletionReportForTests();
+ }
+ }
+
+ public static void triggerHeartbeat(DataNode dn) throws IOException {
+ for (BPOfferService bpos : dn.getAllBpOs()) {
+ bpos.triggerHeartbeatForTests();
+ }
+ }
+
+ public static void triggerBlockReport(DataNode dn) throws IOException {
+ for (BPOfferService bpos : dn.getAllBpOs()) {
+ bpos.triggerBlockReportForTests();
+ }
+ }
+
+ public static long getPendingAsyncDeletions(DataNode dn) {
+ FSDataset fsd = (FSDataset)dn.getFSDataset();
+ return fsd.asyncDiskService.countPendingDeletions();
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Wed Dec 21 04:32:40 2011
@@ -81,6 +81,11 @@ public class NameNodeAdapter {
namenode.getNamesystem().leaveSafeMode(checkForUpgrades);
}
+ public static void abortEditLogs(NameNode nn) {
+ FSEditLog el = nn.getFSImage().getEditLog();
+ el.abortCurrentLogSegment();
+ }
+
/**
* Get the internal RPC server instance.
* @return rpc server
Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java?rev=1221608&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java (added)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java Wed Dec 21 04:32:40 2011
@@ -0,0 +1,469 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.ServiceFailedException;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.TestDFSClientFailover;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
+import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
+import org.apache.hadoop.hdfs.server.namenode.FSInodeInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
+
+
+public class TestDNFencing {
+
+ protected static final Log LOG = LogFactory.getLog(
+ TestDNFencing.class);
+ private static final String TEST_FILE_DATA = "hello highly available world";
+ private static final String TEST_FILE = "/testStandbyIsHot";
+ private static final Path TEST_FILE_PATH = new Path(TEST_FILE);
+ private static final int SMALL_BLOCK = 1024;
+
+ private Configuration conf;
+ private MiniDFSCluster cluster;
+ private NameNode nn1, nn2;
+ private FileSystem fs;
+
+ static {
+ ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)LogFactory.getLog(BlockManager.class)).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+ }
+
+ @Before
+ public void setupCluster() throws Exception {
+ conf = new Configuration();
+ conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, SMALL_BLOCK);
+ // Bump up replication interval so that we only run replication
+ // checks explicitly.
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 600);
+ // Increase max streams so that we re-replicate quickly.
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 1000);
+ // See RandomDeleterPolicy javadoc.
+ conf.setClass("dfs.block.replicator.classname", RandomDeleterPolicy.class,
+ BlockPlacementPolicy.class);
+ cluster = new MiniDFSCluster.Builder(conf)
+ .nnTopology(MiniDFSNNTopology.simpleHATopology())
+ .numDataNodes(3)
+ .build();
+ nn1 = cluster.getNameNode(0);
+ nn2 = cluster.getNameNode(1);
+
+ cluster.waitActive();
+ cluster.transitionToActive(0);
+ // Trigger block reports so that the first NN trusts all
+ // of the DNs, and will issue deletions
+ cluster.triggerBlockReports();
+ nn2.getNamesystem().getEditLogTailer().setSleepTime(250);
+ nn2.getNamesystem().getEditLogTailer().interrupt();
+ fs = TestDFSClientFailover.configureFailoverFs(cluster, conf);
+ }
+
+ @After
+ public void shutdownCluster() throws Exception {
+ if (cluster != null) {
+ banner("Shutting down cluster. NN1 metadata:");
+ doMetasave(nn1);
+ banner("Shutting down cluster. NN2 metadata:");
+ doMetasave(nn2);
+ cluster.shutdown();
+ }
+ }
+
+
+ @Test
+ public void testDnFencing() throws Exception {
+ // Create a file with replication level 3.
+ DFSTestUtil.createFile(fs, TEST_FILE_PATH, 30*SMALL_BLOCK, (short)3, 1L);
+ ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_FILE_PATH);
+
+ // Drop its replication count to 1, so it becomes over-replicated.
+ // Then compute the invalidation of the extra blocks and trigger
+ // heartbeats so the invalidations are flushed to the DNs.
+ nn1.getRpcServer().setReplication(TEST_FILE, (short) 1);
+ BlockManagerTestUtil.computeInvalidationWork(
+ nn1.getNamesystem().getBlockManager());
+ cluster.triggerHeartbeats();
+
+ // Transition nn2 to active even though nn1 still thinks it's active.
+ banner("Failing to NN2 but let NN1 continue to think it's active");
+ NameNodeAdapter.abortEditLogs(nn1);
+ NameNodeAdapter.enterSafeMode(nn1, false);
+ cluster.transitionToActive(1);
+
+ // Check that the standby picked up the replication change.
+ assertEquals(1,
+ nn2.getRpcServer().getFileInfo(TEST_FILE).getReplication());
+
+ // Dump some info for debugging purposes.
+ banner("NN2 Metadata immediately after failover");
+ doMetasave(nn2);
+
+ // Even though NN2 considers the blocks over-replicated, it should
+ // post-pone the block invalidation because the DNs are still "stale".
+ assertEquals(30, nn2.getNamesystem().getPostponedMisreplicatedBlocks());
+
+ banner("Triggering heartbeats and block reports so that fencing is completed");
+ cluster.triggerHeartbeats();
+ cluster.triggerBlockReports();
+
+ banner("Metadata after nodes have all block-reported");
+ doMetasave(nn2);
+
+ // The blocks should no longer be postponed.
+ assertEquals(0, nn2.getNamesystem().getPostponedMisreplicatedBlocks());
+
+ // Wait for NN2 to enact its deletions (replication monitor has to run, etc)
+ BlockManagerTestUtil.computeInvalidationWork(
+ nn2.getNamesystem().getBlockManager());
+ cluster.triggerHeartbeats();
+ waitForDNDeletions(cluster);
+ cluster.triggerDeletionReports();
+ assertEquals(0, nn2.getNamesystem().getUnderReplicatedBlocks());
+ assertEquals(0, nn2.getNamesystem().getPendingReplicationBlocks());
+
+ banner("Making sure the file is still readable");
+ FileSystem fs2 = cluster.getFileSystem(1);
+ DFSTestUtil.readFile(fs2, TEST_FILE_PATH);
+
+ banner("Waiting for the actual block files to get deleted from DNs.");
+ waitForTrueReplication(cluster, block, 1);
+ }
+
+ /**
+ * Test case which restarts the standby node in such a way that,
+ * when it exits safemode, it will want to invalidate a bunch
+ * of over-replicated block replicas. Ensures that if we failover
+ * at this point it won't lose data.
+ */
+ @Test
+ public void testNNClearsCommandsOnFailoverAfterStartup()
+ throws Exception {
+ // Make lots of blocks to increase chances of triggering a bug.
+ DFSTestUtil.createFile(fs, TEST_FILE_PATH, 30*SMALL_BLOCK, (short)3, 1L);
+
+ banner("Shutting down NN2");
+ cluster.shutdownNameNode(1);
+
+ banner("Setting replication to 1, rolling edit log.");
+ nn1.getRpcServer().setReplication(TEST_FILE, (short) 1);
+ nn1.getRpcServer().rollEditLog();
+
+ // Start NN2 again. When it starts up, it will see all of the
+ // blocks as over-replicated, since it has the metadata for
+ // replication=1, but the DNs haven't yet processed the deletions.
+ banner("Starting NN2 again.");
+ cluster.restartNameNode(1);
+ nn2 = cluster.getNameNode(1);
+
+ banner("triggering BRs");
+ cluster.triggerBlockReports();
+
+ // We expect that both NN1 and NN2 will have some number of
+ // deletions queued up for the DNs.
+ banner("computing invalidation on nn1");
+ BlockManagerTestUtil.computeInvalidationWork(
+ nn1.getNamesystem().getBlockManager());
+
+ banner("computing invalidation on nn2");
+ BlockManagerTestUtil.computeInvalidationWork(
+ nn2.getNamesystem().getBlockManager());
+
+ // Dump some info for debugging purposes.
+ banner("Metadata immediately before failover");
+ doMetasave(nn2);
+
+
+ // Transition nn2 to active even though nn1 still thinks it's active
+ banner("Failing to NN2 but let NN1 continue to think it's active");
+ NameNodeAdapter.abortEditLogs(nn1);
+ NameNodeAdapter.enterSafeMode(nn1, false);
+
+ cluster.transitionToActive(1);
+
+ // Check that the standby picked up the replication change.
+ assertEquals(1,
+ nn2.getRpcServer().getFileInfo(TEST_FILE).getReplication());
+
+ // Dump some info for debugging purposes.
+ banner("Metadata immediately after failover");
+ doMetasave(nn2);
+
+ banner("Triggering heartbeats and block reports so that fencing is completed");
+ cluster.triggerHeartbeats();
+ cluster.triggerBlockReports();
+
+ banner("Metadata after nodes have all block-reported");
+ doMetasave(nn2);
+
+ // The block should no longer be postponed.
+ assertEquals(0, nn2.getNamesystem().getPostponedMisreplicatedBlocks());
+
+ // Wait for NN2 to enact its deletions (replication monitor has to run, etc)
+ BlockManagerTestUtil.computeInvalidationWork(
+ nn2.getNamesystem().getBlockManager());
+
+ waitForNNToIssueDeletions(nn2);
+ cluster.triggerHeartbeats();
+ waitForDNDeletions(cluster);
+ cluster.triggerDeletionReports();
+ assertEquals(0, nn2.getNamesystem().getUnderReplicatedBlocks());
+ assertEquals(0, nn2.getNamesystem().getPendingReplicationBlocks());
+
+ banner("Making sure the file is still readable");
+ FileSystem fs2 = cluster.getFileSystem(1);
+ DFSTestUtil.readFile(fs2, TEST_FILE_PATH);
+ }
+
+ /**
+ * Test case that reduces replication of a file with a lot of blocks
+ * and then fails over right after those blocks enter the DN invalidation
+ * queues on the active. Ensures that fencing is correct and no replicas
+ * are lost.
+ */
+ @Test
+ public void testNNClearsCommandsOnFailoverWithReplChanges()
+ throws Exception {
+ // Make lots of blocks to increase chances of triggering a bug.
+ DFSTestUtil.createFile(fs, TEST_FILE_PATH, 30*SMALL_BLOCK, (short)1, 1L);
+
+ banner("rolling NN1's edit log, forcing catch-up");
+ TestEditLogTailer.waitForStandbyToCatchUp(nn1, nn2);
+
+ // Get some new replicas reported so that NN2 now considers
+ // them over-replicated and schedules some more deletions
+ nn1.getRpcServer().setReplication(TEST_FILE, (short) 2);
+ while (BlockManagerTestUtil.getComputedDatanodeWork(
+ nn1.getNamesystem().getBlockManager()) > 0) {
+ LOG.info("Getting more replication work computed");
+ }
+ BlockManager bm1 = nn1.getNamesystem().getBlockManager();
+ while (bm1.getPendingReplicationBlocksCount() > 0) {
+ BlockManagerTestUtil.updateState(bm1);
+ cluster.triggerHeartbeats();
+ Thread.sleep(1000);
+ }
+
+ banner("triggering BRs");
+ cluster.triggerBlockReports();
+
+ nn1.getRpcServer().setReplication(TEST_FILE, (short) 1);
+
+
+ banner("computing invalidation on nn1");
+
+ BlockManagerTestUtil.computeInvalidationWork(
+ nn1.getNamesystem().getBlockManager());
+ doMetasave(nn1);
+
+ banner("computing invalidation on nn2");
+ BlockManagerTestUtil.computeInvalidationWork(
+ nn2.getNamesystem().getBlockManager());
+ doMetasave(nn2);
+
+ // Dump some info for debugging purposes.
+ banner("Metadata immediately before failover");
+ doMetasave(nn2);
+
+
+ // Transition nn2 to active even though nn1 still thinks it's active
+ banner("Failing to NN2 but let NN1 continue to think it's active");
+ NameNodeAdapter.abortEditLogs(nn1);
+ NameNodeAdapter.enterSafeMode(nn1, false);
+
+
+ BlockManagerTestUtil.computeInvalidationWork(
+ nn2.getNamesystem().getBlockManager());
+ cluster.transitionToActive(1);
+
+ // Check that the standby picked up the replication change.
+ assertEquals(1,
+ nn2.getRpcServer().getFileInfo(TEST_FILE).getReplication());
+
+ // Dump some info for debugging purposes.
+ banner("Metadata immediately after failover");
+ doMetasave(nn2);
+
+ banner("Triggering heartbeats and block reports so that fencing is completed");
+ cluster.triggerHeartbeats();
+ cluster.triggerBlockReports();
+
+ banner("Metadata after nodes have all block-reported");
+ doMetasave(nn2);
+
+ // The block should no longer be postponed.
+ assertEquals(0, nn2.getNamesystem().getPostponedMisreplicatedBlocks());
+
+ // Wait for NN2 to enact its deletions (replication monitor has to run, etc)
+ BlockManagerTestUtil.computeInvalidationWork(
+ nn2.getNamesystem().getBlockManager());
+
+ waitForNNToIssueDeletions(nn2);
+ cluster.triggerHeartbeats();
+ waitForDNDeletions(cluster);
+ cluster.triggerDeletionReports();
+ assertEquals(0, nn2.getNamesystem().getUnderReplicatedBlocks());
+ assertEquals(0, nn2.getNamesystem().getPendingReplicationBlocks());
+
+ banner("Making sure the file is still readable");
+ FileSystem fs2 = cluster.getFileSystem(1);
+ DFSTestUtil.readFile(fs2, TEST_FILE_PATH);
+ }
+
+ /**
+ * Print a big banner in the test log to make debug easier.
+ */
+ private void banner(String string) {
+ LOG.info("\n\n\n\n================================================\n" +
+ string + "\n" +
+ "==================================================\n\n");
+ }
+
+ private void doMetasave(NameNode nn2) {
+ nn2.getNamesystem().writeLock();
+ try {
+ PrintWriter pw = new PrintWriter(System.err);
+ nn2.getNamesystem().getBlockManager().metaSave(pw);
+ pw.flush();
+ } finally {
+ nn2.getNamesystem().writeUnlock();
+ }
+ }
+
+ private void waitForTrueReplication(final MiniDFSCluster cluster,
+ final ExtendedBlock block, final int waitFor) throws Exception {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ try {
+ return getTrueReplication(cluster, block) == waitFor;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }, 500, 10000);
+ }
+
+ private int getTrueReplication(MiniDFSCluster cluster, ExtendedBlock block)
+ throws IOException {
+ int count = 0;
+ for (DataNode dn : cluster.getDataNodes()) {
+ if (dn.getFSDataset().getStoredBlock(block.getBlockPoolId(), block.getBlockId()) != null) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ private void waitForDNDeletions(final MiniDFSCluster cluster)
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ for (DataNode dn : cluster.getDataNodes()) {
+ if (DataNodeAdapter.getPendingAsyncDeletions(dn) > 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }, 1000, 10000);
+
+ }
+
+ private void waitForNNToIssueDeletions(final NameNode nn)
+ throws Exception {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ LOG.info("Waiting for NN to issue block deletions to DNs");
+ return nn.getNamesystem().getBlockManager().getPendingDeletionBlocksCount() == 0;
+ }
+ }, 250, 10000);
+ }
+
+ /**
+ * A BlockPlacementPolicy which, rather than using space available, makes
+ * random decisions about which excess replica to delete. This is because,
+ * in the test cases, the two NNs will usually (but not quite always)
+ * make the same decision of which replica to delete. The fencing issues
+ * are exacerbated when the two NNs make different decisions, which can
+ * happen in "real life" when they have slightly out-of-sync heartbeat
+ * information regarding disk usage.
+ */
+ public static class RandomDeleterPolicy extends BlockPlacementPolicyDefault {
+
+ public RandomDeleterPolicy() {
+ super();
+ }
+
+ @Override
+ public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo inode,
+ Block block, short replicationFactor,
+ Collection<DatanodeDescriptor> first,
+ Collection<DatanodeDescriptor> second) {
+
+ Collection<DatanodeDescriptor> chooseFrom =
+ !first.isEmpty() ? first : second;
+
+ List<DatanodeDescriptor> l = Lists.newArrayList(chooseFrom);
+ return l.get(DFSUtil.getRandom().nextInt(l.size()));
+ }
+ }
+
+}
Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java?rev=1221608&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java (added)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java Wed Dec 21 04:32:40 2011
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.TestDFSClientFailover;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.base.Supplier;
+
+
+/**
+ * Stress-test for potential bugs when replication is changing
+ * on blocks during a failover.
+ */
+public class TestDNFencingWithReplication {
+ static {
+ ((Log4JLogger)FSNamesystem.auditLog).getLogger().setLevel(Level.WARN);
+ ((Log4JLogger)Server.LOG).getLogger().setLevel(Level.FATAL);
+ ((Log4JLogger)LogFactory.getLog(
+ "org.apache.hadoop.io.retry.RetryInvocationHandler"))
+ .getLogger().setLevel(Level.FATAL);
+ }
+
+ private static final int NUM_THREADS = 20;
+ // How long should the test try to run for. In practice
+ // it runs for ~20-30s longer than this constant due to startup/
+ // shutdown time.
+ private static final long RUNTIME = 35000;
+ private static final int BLOCK_SIZE = 1024;
+
+ private static class ReplicationToggler extends RepeatingTestThread {
+ private final FileSystem fs;
+ private final Path path;
+
+ public ReplicationToggler(TestContext ctx, FileSystem fs, Path p) {
+ super(ctx);
+ this.fs = fs;
+ this.path = p;
+ }
+
+ @Override
+ public void doAnAction() throws Exception {
+ fs.setReplication(path, (short)1);
+ waitForReplicas(1);
+ fs.setReplication(path, (short)2);
+ waitForReplicas(2);
+ }
+
+ private void waitForReplicas(final int replicas) throws Exception {
+ try {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ try {
+ BlockLocation[] blocks = fs.getFileBlockLocations(path, 0, 10);
+ Assert.assertEquals(1, blocks.length);
+ return blocks[0].getHosts().length == replicas;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }, 100, 60000);
+ } catch (TimeoutException te) {
+ throw new IOException("Timed out waiting for " + replicas + " replicas " +
+ "on path " + path);
+ }
+ }
+
+ public String toString() {
+ return "Toggler for " + path;
+ }
+ }
+
+ @Test
+ public void testFencingStress() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
+ conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+ // Increase max streams so that we re-replicate quickly.
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 1000);
+
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .nnTopology(MiniDFSNNTopology.simpleHATopology())
+ .numDataNodes(3)
+ .build();
+ try {
+ cluster.waitActive();
+ cluster.transitionToActive(0);
+
+ final NameNode nn1 = cluster.getNameNode(0);
+ final NameNode nn2 = cluster.getNameNode(1);
+ nn2.getNamesystem().getEditLogTailer().setSleepTime(250);
+ nn2.getNamesystem().getEditLogTailer().interrupt();
+
+ FileSystem fs = TestDFSClientFailover.configureFailoverFs(
+ cluster, conf);
+ TestContext togglers = new TestContext();
+ for (int i = 0; i < NUM_THREADS; i++) {
+ Path p = new Path("/test-" + i);
+ DFSTestUtil.createFile(fs, p, BLOCK_SIZE*10, (short)3, (long)i);
+ togglers.addThread(new ReplicationToggler(togglers, fs, p));
+ }
+
+ // Start a separate thread which will make sure that replication
+ // happens quickly by triggering deletion reports and replication
+ // work calculation frequently.
+ TestContext triggerCtx = new TestContext();
+ triggerCtx.addThread(new RepeatingTestThread(triggerCtx) {
+
+ @Override
+ public void doAnAction() throws Exception {
+ for (DataNode dn : cluster.getDataNodes()) {
+ DataNodeAdapter.triggerDeletionReport(dn);
+ DataNodeAdapter.triggerHeartbeat(dn);
+ }
+ for (int i = 0; i < 2; i++) {
+ NameNode nn = cluster.getNameNode(i);
+ BlockManagerTestUtil.computeAllPendingWork(
+ nn.getNamesystem().getBlockManager());
+ }
+ Thread.sleep(500);
+ }
+ });
+
+ triggerCtx.addThread(new RepeatingTestThread(triggerCtx) {
+
+ @Override
+ public void doAnAction() throws Exception {
+ System.err.println("==============================\n" +
+ "Failing over from 0->1\n" +
+ "==================================");
+ cluster.transitionToStandby(0);
+ cluster.transitionToActive(1);
+
+ Thread.sleep(5000);
+ System.err.println("==============================\n" +
+ "Failing over from 1->0\n" +
+ "==================================");
+
+ cluster.transitionToStandby(1);
+ cluster.transitionToActive(0);
+ Thread.sleep(5000);
+ }
+ });
+
+ triggerCtx.startThreads();
+ togglers.startThreads();
+
+ togglers.waitFor(RUNTIME);
+ togglers.stop();
+ triggerCtx.stop();
+
+ // CHeck that the files can be read without throwing
+ for (int i = 0; i < NUM_THREADS; i++) {
+ Path p = new Path("/test-" + i);
+ DFSTestUtil.readFile(fs, p);
+ }
+ } finally {
+ System.err.println("===========================\n\n\n\n");
+ cluster.shutdown();
+ }
+
+ }
+}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java Wed Dec 21 04:32:40 2011
@@ -108,8 +108,7 @@ public class TestEditLogTailer {
long activeTxId = active.getNamesystem().getFSImage().getEditLog()
.getLastWrittenTxId();
- // TODO: we should really just ask for a log roll here
- doSaveNamespace(active);
+ active.getRpcServer().rollEditLog();
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < NN_LAG_TIMEOUT) {
@@ -124,12 +123,4 @@ public class TestEditLogTailer {
" (currently at " +
standby.getNamesystem().getFSImage().getLastAppliedTxId() + ")");
}
-
- private static void doSaveNamespace(NameNode nn)
- throws IOException {
- NameNodeAdapter.enterSafeMode(nn, false);
- NameNodeAdapter.saveNamespace(nn);
- NameNodeAdapter.leaveSafeMode(nn, false);
- }
-
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java?rev=1221608&r1=1221607&r2=1221608&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java Wed Dec 21 04:32:40 2011
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeoutExcep
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -33,10 +34,17 @@ import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.TestDFSClientFailover;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Assert;
import org.junit.Test;
import com.google.common.base.Supplier;
@@ -52,6 +60,12 @@ public class TestStandbyIsHot {
private static final String TEST_FILE = "/testStandbyIsHot";
private static final Path TEST_FILE_PATH = new Path(TEST_FILE);
+ static {
+ ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)LogFactory.getLog(BlockManager.class)).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+ }
+
@Test
public void testStandbyIsHot() throws Exception {
Configuration conf = new Configuration();
@@ -79,19 +93,40 @@ public class TestStandbyIsHot {
nn1.getRpcServer().rollEditLog();
System.err.println("==================================");
- waitForBlockLocations(nn2, TEST_FILE, 3);
-
- nn1.stop();
- cluster.transitionToActive(1);
+ // Block locations should show up on standby.
+ LOG.info("Waiting for block locations to appear on standby node");
+ waitForBlockLocations(cluster, nn2, TEST_FILE, 3);
+
+ // Trigger immediate heartbeats and block reports so
+ // that the active "trusts" all of the DNs
+ cluster.triggerHeartbeats();
+ cluster.triggerBlockReports();
+
+ // Change replication
+ LOG.info("Changing replication to 1");
+ fs.setReplication(TEST_FILE_PATH, (short)1);
+ waitForBlockLocations(cluster, nn1, TEST_FILE, 1);
- assertEquals(TEST_FILE_DATA, DFSTestUtil.readFile(fs, TEST_FILE_PATH));
+ nn1.getRpcServer().rollEditLog();
+
+ LOG.info("Waiting for lowered replication to show up on standby");
+ waitForBlockLocations(cluster, nn2, TEST_FILE, 1);
+
+ // Change back to 3
+ LOG.info("Changing replication to 3");
+ fs.setReplication(TEST_FILE_PATH, (short)3);
+ nn1.getRpcServer().rollEditLog();
+
+ LOG.info("Waiting for higher replication to show up on standby");
+ waitForBlockLocations(cluster, nn2, TEST_FILE, 3);
} finally {
cluster.shutdown();
}
}
- private void waitForBlockLocations(final NameNode nn,
+ static void waitForBlockLocations(final MiniDFSCluster cluster,
+ final NameNode nn,
final String path, final int expectedReplicas)
throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@@ -100,8 +135,19 @@ public class TestStandbyIsHot {
public Boolean get() {
try {
LocatedBlocks locs = NameNodeAdapter.getBlockLocations(nn, path, 0, 1000);
- LOG.info("Got locs: " + locs);
- return locs.getLastLocatedBlock().getLocations().length == expectedReplicas;
+ DatanodeInfo[] dnis = locs.getLastLocatedBlock().getLocations();
+ for (DatanodeInfo dni : dnis) {
+ Assert.assertNotNull(dni);
+ }
+ int numReplicas = dnis.length;
+
+ LOG.info("Got " + numReplicas + " locs: " + locs);
+ if (numReplicas > expectedReplicas) {
+ for (DataNode dn : cluster.getDataNodes()) {
+ DataNodeAdapter.triggerDeletionReport(dn);
+ }
+ }
+ return numReplicas == expectedReplicas;
} catch (IOException e) {
LOG.warn("No block locations yet: " + e.getMessage());
return false;