You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2012/02/01 06:16:49 UTC
svn commit: r1238940 - in
/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/
src/main/java/org/apache/hadoop/hdfs/server/namenode/
src/test/java/org/apache/hadoop/hdfs/serv...
Author: eli
Date: Wed Feb 1 05:16:49 2012
New Revision: 1238940
URL: http://svn.apache.org/viewvc?rev=1238940&view=rev
Log:
HDFS-2742. HA: observed dataloss in replication stress test. Contributed by Todd Lipcon
Added:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java
Removed:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/PendingDataNodeMessages.java
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt?rev=1238940&r1=1238939&r2=1238940&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt Wed Feb 1 05:16:49 2012
@@ -145,3 +145,5 @@ HDFS-2824. Fix failover when prior NN di
HDFS-2853. HA: NN fails to start if the shared edits dir is marked required (atm via eli)
HDFS-2845. SBN should not allow browsing of the file system via web UI. (Bikas Saha via atm)
+
+HDFS-2742. HA: observed dataloss in replication stress test. (todd via eli)
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1238940&r1=1238939&r2=1238940&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Wed Feb 1 05:16:49 2012
@@ -180,7 +180,7 @@ public class BlockInfo extends Block imp
/**
* Count the number of data-nodes the block belongs to.
*/
- int numNodes() {
+ public int numNodes() {
assert this.triplets != null : "BlockInfo is not initialized";
assert triplets.length % 3 == 0 : "Malformed BlockInfo";
for(int idx = getCapacity()-1; idx >= 0; idx--) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1238940&r1=1238939&r2=1238940&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Feb 1 05:16:49 2012
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.bl
import java.io.IOException;
import java.io.PrintWriter;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -28,6 +29,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
@@ -49,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.U
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Util;
@@ -58,7 +61,6 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
-import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -69,7 +71,6 @@ import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.Daemon;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
/**
@@ -83,11 +84,20 @@ public class BlockManager {
/** Default load factor of map */
public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
+ private static final String QUEUE_REASON_CORRUPT_STATE =
+ "it has the wrong state or generation stamp";
+
+ private static final String QUEUE_REASON_FUTURE_GENSTAMP =
+ "generation stamp is in the future";
+
private final Namesystem namesystem;
private final DatanodeManager datanodeManager;
private final HeartbeatManager heartbeatManager;
private final BlockTokenSecretManager blockTokenSecretManager;
+
+ private final PendingDataNodeMessages pendingDNMessages =
+ new PendingDataNodeMessages();
private volatile long pendingReplicationBlocksCount = 0L;
private volatile long corruptReplicaBlocksCount = 0L;
@@ -124,6 +134,10 @@ public class BlockManager {
public long getPostponedMisreplicatedBlocksCount() {
return postponedMisreplicatedBlocksCount;
}
+ /** Used by metrics */
+ public int getPendingDataNodeMessageCount() {
+ return pendingDNMessages.count();
+ }
/**replicationRecheckInterval is how often namenode checks for new replication work*/
private final long replicationRecheckInterval;
@@ -479,12 +493,24 @@ public class BlockManager {
if(curBlock.isComplete())
return curBlock;
BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
- if (!force && ucBlock.numNodes() < minReplication)
+ int numNodes = ucBlock.numNodes();
+ if (!force && numNodes < minReplication)
throw new IOException("Cannot complete block: " +
"block does not satisfy minimal replication requirement.");
BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
// replace penultimate block in file
fileINode.setBlock(blkIndex, completeBlock);
+
+ // Since safe-mode only counts complete blocks, and we now have
+ // one more complete block, we need to adjust the total up, and
+ // also count it as safe, if we have at least the minimum replica
+ // count. (We may not have the minimum replica count yet if this is
+ // a "forced" completion when a file is getting closed by an
+ // OP_CLOSE edit on the standby).
+ namesystem.adjustSafeModeBlockTotals(0, 1);
+ namesystem.incrementSafeBlockCount(
+ Math.min(numNodes, minReplication));
+
// replace block in the blocksMap
return blocksMap.replaceBlock(completeBlock);
}
@@ -547,6 +573,14 @@ public class BlockManager {
String datanodeId = dd.getStorageID();
invalidateBlocks.remove(datanodeId, oldBlock);
}
+
+ // Adjust safe-mode totals, since under-construction blocks don't
+ // count in safe-mode.
+ namesystem.adjustSafeModeBlockTotals(
+ // decrement safe if we had enough
+ targets.length >= minReplication ? -1 : 0,
+ // always decrement total blocks
+ -1);
final long fileLength = fileINode.computeContentSummary().getLength();
final long pos = fileLength - ucBlock.getNumBytes();
@@ -1483,9 +1517,19 @@ public class BlockManager {
assert (node.numBlocks() == 0);
BlockReportIterator itBR = report.getBlockReportIterator();
+ boolean isStandby = namesystem.isInStandbyState();
+
while(itBR.hasNext()) {
Block iblk = itBR.next();
ReplicaState reportedState = itBR.getCurrentReplicaState();
+
+ if (isStandby &&
+ namesystem.isGenStampInFuture(iblk.getGenerationStamp())) {
+ queueReportedBlock(node, iblk, reportedState,
+ QUEUE_REASON_FUTURE_GENSTAMP);
+ continue;
+ }
+
BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
// If block does not belong to any file, we are done.
if (storedBlock == null) continue;
@@ -1493,7 +1537,14 @@ public class BlockManager {
// If block is corrupt, mark it and continue to next block.
BlockUCState ucState = storedBlock.getBlockUCState();
if (isReplicaCorrupt(iblk, reportedState, storedBlock, ucState, node)) {
- markBlockAsCorrupt(storedBlock, node);
+ if (namesystem.isInStandbyState()) {
+ // In the Standby, we may receive a block report for a file that we
+ // just have an out-of-date gen-stamp or state for, for example.
+ queueReportedBlock(node, iblk, reportedState,
+ QUEUE_REASON_CORRUPT_STATE);
+ } else {
+ markBlockAsCorrupt(storedBlock, node);
+ }
continue;
}
@@ -1576,7 +1627,8 @@ public class BlockManager {
* @param toCorrupt replicas with unexpected length or generation stamp;
* add to corrupt replicas
* @param toUC replicas of blocks currently under construction
- * @return
+ * @return the up-to-date stored block, if it should be kept.
+ * Otherwise, null.
*/
private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
final Block block, final ReplicaState reportedState,
@@ -1591,6 +1643,13 @@ public class BlockManager {
+ " replicaState = " + reportedState);
}
+ if (namesystem.isInStandbyState() &&
+ namesystem.isGenStampInFuture(block.getGenerationStamp())) {
+ queueReportedBlock(dn, block, reportedState,
+ QUEUE_REASON_FUTURE_GENSTAMP);
+ return null;
+ }
+
// find block by blockId
BlockInfo storedBlock = blocksMap.getStoredBlock(block);
if(storedBlock == null) {
@@ -1615,7 +1674,16 @@ assert storedBlock.findDatanode(dn) < 0
}
if (isReplicaCorrupt(block, reportedState, storedBlock, ucState, dn)) {
- toCorrupt.add(storedBlock);
+ if (namesystem.isInStandbyState()) {
+ // If the block is an out-of-date generation stamp or state,
+ // but we're the standby, we shouldn't treat it as corrupt,
+ // but instead just queue it for later processing.
+ queueReportedBlock(dn, storedBlock, reportedState,
+ QUEUE_REASON_CORRUPT_STATE);
+
+ } else {
+ toCorrupt.add(storedBlock);
+ }
return storedBlock;
}
@@ -1633,6 +1701,68 @@ assert storedBlock.findDatanode(dn) < 0
return storedBlock;
}
+ /**
+ * Queue the given reported block for later processing in the
+ * standby node. {@see PendingDataNodeMessages}.
+ * @param reason a textual reason to report in the debug logs
+ */
+ private void queueReportedBlock(DatanodeDescriptor dn, Block block,
+ ReplicaState reportedState, String reason) {
+ assert namesystem.isInStandbyState();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Queueing reported block " + block +
+ " in state " + reportedState +
+ " from datanode " + dn + " for later processing " +
+ "because " + reason + ".");
+ }
+ pendingDNMessages.enqueueReportedBlock(dn, block, reportedState);
+ }
+
+ /**
+ * Try to process any messages that were previously queued for the given
+ * block. This is called from FSEditLogLoader whenever a block's state
+ * in the namespace has changed or a new block has been created.
+ */
+ public void processQueuedMessagesForBlock(Block b) throws IOException {
+ Queue<ReportedBlockInfo> queue = pendingDNMessages.takeBlockQueue(b);
+ if (queue == null) {
+ // Nothing to re-process
+ return;
+ }
+ processQueuedMessages(queue);
+ }
+
+ private void processQueuedMessages(Iterable<ReportedBlockInfo> rbis)
+ throws IOException {
+ for (ReportedBlockInfo rbi : rbis) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing previouly queued message " + rbi);
+ }
+ processAndHandleReportedBlock(
+ rbi.getNode(), rbi.getBlock(), rbi.getReportedState(), null);
+ }
+ }
+
+ /**
+ * Process any remaining queued datanode messages after entering
+ * active state. At this point they will not be re-queued since
+ * we are the definitive master node and thus should be up-to-date
+ * with the namespace information.
+ */
+ public void processAllPendingDNMessages() throws IOException {
+ assert !namesystem.isInStandbyState() :
+ "processAllPendingDNMessages() should be called after exiting " +
+ "standby state!";
+ int count = pendingDNMessages.count();
+ if (count > 0) {
+ LOG.info("Processing " + count + " messages from DataNodes " +
+ "that were previously queued during standby state.");
+ }
+ processQueuedMessages(pendingDNMessages.takeAll());
+ assert pendingDNMessages.count() == 0;
+ }
+
/*
* The next two methods test the various cases under which we must conclude
* the replica is corrupt, or under construction. These are laid out
@@ -1742,13 +1872,15 @@ assert storedBlock.findDatanode(dn) < 0
// Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock);
if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
- && numCurrentReplica >= minReplication)
+ && numCurrentReplica >= minReplication) {
storedBlock = completeBlock(storedBlock.getINode(), storedBlock, false);
-
- // check whether safe replication is reached for the block
- // only complete blocks are counted towards that
- if(storedBlock.isComplete())
+ } else if (storedBlock.isComplete()) {
+ // check whether safe replication is reached for the block
+ // only complete blocks are counted towards that.
+ // In the case that the block just became complete above, completeBlock()
+ // handles the safe block count maintenance.
namesystem.incrementSafeBlockCount(numCurrentReplica);
+ }
}
/**
@@ -1807,15 +1939,17 @@ assert storedBlock.findDatanode(dn) < 0
+ pendingReplications.getNumReplicas(storedBlock);
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
- numLiveReplicas >= minReplication)
+ numLiveReplicas >= minReplication) {
storedBlock = completeBlock(fileINode, storedBlock, false);
-
- // check whether safe replication is reached for the block
- // only complete blocks are counted towards that
- // Is no-op if not in safe mode.
- if(storedBlock.isComplete())
+ } else if (storedBlock.isComplete()) {
+ // check whether safe replication is reached for the block
+ // only complete blocks are counted towards that
+ // Is no-op if not in safe mode.
+ // In the case that the block just became complete above, completeBlock()
+ // handles the safe block count maintenance.
namesystem.incrementSafeBlockCount(numCurrentReplica);
-
+ }
+
// if file is under construction, then done for now
if (fileINode.isUnderConstruction()) {
return storedBlock;
@@ -2514,7 +2648,7 @@ assert storedBlock.findDatanode(dn) < 0
}
public int getActiveBlockCount() {
- return blocksMap.size() - (int)invalidateBlocks.numBlocks();
+ return blocksMap.size();
}
public DatanodeDescriptor[] getNodes(BlockInfo block) {
Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java?rev=1238940&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java (added)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java Wed Feb 1 05:16:49 2012
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * In the Standby Node, we can receive messages about blocks
+ * before they are actually available in the namespace, or while
+ * they have an outdated state in the namespace. In those cases,
+ * we queue those block-related messages in this structure.
+ * */
+class PendingDataNodeMessages {
+
+ Map<Block, Queue<ReportedBlockInfo>> queueByBlockId =
+ Maps.newHashMap();
+ private int count = 0;
+
+
+ static class ReportedBlockInfo {
+ private final Block block;
+ private final DatanodeDescriptor dn;
+ private final ReplicaState reportedState;
+
+ ReportedBlockInfo(DatanodeDescriptor dn, Block block,
+ ReplicaState reportedState) {
+ this.dn = dn;
+ this.block = block;
+ this.reportedState = reportedState;
+ }
+
+ Block getBlock() {
+ return block;
+ }
+
+ DatanodeDescriptor getNode() {
+ return dn;
+ }
+
+ ReplicaState getReportedState() {
+ return reportedState;
+ }
+
+ @Override
+ public String toString() {
+ return "ReportedBlockInfo [block=" + block + ", dn=" + dn
+ + ", reportedState=" + reportedState + "]";
+ }
+ }
+
+ void enqueueReportedBlock(DatanodeDescriptor dn, Block block,
+ ReplicaState reportedState) {
+ block = new Block(block);
+ getBlockQueue(block).add(
+ new ReportedBlockInfo(dn, block, reportedState));
+ count++;
+ }
+
+ /**
+ * @return any messages that were previously queued for the given block,
+ * or null if no messages were queued.
+ */
+ Queue<ReportedBlockInfo> takeBlockQueue(Block block) {
+ Queue<ReportedBlockInfo> queue = queueByBlockId.remove(block);
+ if (queue != null) {
+ count -= queue.size();
+ }
+ return queue;
+ }
+
+
+ private Queue<ReportedBlockInfo> getBlockQueue(Block block) {
+ Queue<ReportedBlockInfo> queue = queueByBlockId.get(block);
+ if (queue == null) {
+ queue = Lists.newLinkedList();
+ queueByBlockId.put(block, queue);
+ }
+ return queue;
+ }
+
+ public int count() {
+ return count ;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<Block, Queue<ReportedBlockInfo>> entry :
+ queueByBlockId.entrySet()) {
+ sb.append("Block " + entry.getKey() + ":\n");
+ for (ReportedBlockInfo rbi : entry.getValue()) {
+ sb.append(" ").append(rbi).append("\n");
+ }
+ }
+ return sb.toString();
+ }
+
+ public Iterable<ReportedBlockInfo> takeAll() {
+ List<ReportedBlockInfo> rbis = Lists.newArrayListWithCapacity(
+ count);
+ for (Queue<ReportedBlockInfo> q : queueByBlockId.values()) {
+ rbis.addAll(q);
+ }
+ queueByBlockId.clear();
+ count = 0;
+ return rbis;
+ }
+}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1238940&r1=1238939&r2=1238940&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Wed Feb 1 05:16:49 2012
@@ -66,7 +66,6 @@ import com.google.common.base.Joiner;
@InterfaceStability.Evolving
public class FSEditLogLoader {
private final FSNamesystem fsNamesys;
- private long maxGenStamp = 0;
public FSEditLogLoader(FSNamesystem fsNamesys) {
this.fsNamesys = fsNamesys;
@@ -91,15 +90,6 @@ public class FSEditLogLoader {
+ " of size " + edits.length() + " edits # " + numEdits
+ " loaded in " + (now()-startTime)/1000 + " seconds.");
} finally {
- fsNamesys.setBlockTotal();
-
- // Delay the notification of genstamp updates until after
- // setBlockTotal() above. Otherwise, we will mark blocks
- // as "safe" before they've been incorporated in the expected
- // totalBlocks and threshold for SafeMode -- triggering an
- // assertion failure and/or exiting safemode too early!
- fsNamesys.notifyGenStampUpdate(maxGenStamp);
-
edits.close();
fsNamesys.writeUnlock();
}
@@ -183,6 +173,12 @@ public class FSEditLogLoader {
switch (op.opCode) {
case OP_ADD: {
AddCloseOp addCloseOp = (AddCloseOp)op;
+ if (FSNamesystem.LOG.isDebugEnabled()) {
+ FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
+ " numblocks : " + addCloseOp.blocks.length +
+ " clientHolder " + addCloseOp.clientName +
+ " clientMachine " + addCloseOp.clientMachine);
+ }
// See if the file already exists (persistBlocks call)
INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
@@ -197,13 +193,6 @@ public class FSEditLogLoader {
}
long blockSize = addCloseOp.blockSize;
- if (FSNamesystem.LOG.isDebugEnabled()) {
- FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
- " numblocks : " + addCloseOp.blocks.length +
- " clientHolder " + addCloseOp.clientName +
- " clientMachine " + addCloseOp.clientMachine);
- }
-
// Older versions of HDFS does not store the block size in inode.
// If the file has more than one block, use the size of the
// first block as the blocksize. Otherwise use the default
@@ -227,12 +216,18 @@ public class FSEditLogLoader {
addCloseOp.atime, blockSize);
fsNamesys.prepareFileForWrite(addCloseOp.path, node,
- addCloseOp.clientName, addCloseOp.clientMachine, null);
+ addCloseOp.clientName, addCloseOp.clientMachine, null,
+ false);
} else { // This is OP_ADD on an existing file
if (!oldFile.isUnderConstruction()) {
// This is a call to append() on an already-closed file.
+ if (FSNamesystem.LOG.isDebugEnabled()) {
+ FSNamesystem.LOG.debug("Reopening an already-closed file " +
+ "for append");
+ }
fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile,
- addCloseOp.clientName, addCloseOp.clientMachine, null);
+ addCloseOp.clientName, addCloseOp.clientMachine, null,
+ false);
oldFile = getINodeFile(fsDir, addCloseOp.path);
}
@@ -243,6 +238,13 @@ public class FSEditLogLoader {
case OP_CLOSE: {
AddCloseOp addCloseOp = (AddCloseOp)op;
+ if (FSNamesystem.LOG.isDebugEnabled()) {
+ FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
+ " numblocks : " + addCloseOp.blocks.length +
+ " clientHolder " + addCloseOp.clientName +
+ " clientMachine " + addCloseOp.clientMachine);
+ }
+
INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
if (oldFile == null) {
throw new IOException("Operation trying to close non-existent file " +
@@ -478,14 +480,23 @@ public class FSEditLogLoader {
}
oldBlock.setNumBytes(newBlock.getNumBytes());
+ boolean changeMade =
+ oldBlock.getGenerationStamp() != newBlock.getGenerationStamp();
oldBlock.setGenerationStamp(newBlock.getGenerationStamp());
if (oldBlock instanceof BlockInfoUnderConstruction &&
(!isLastBlock || addCloseOp.opCode == FSEditLogOpCodes.OP_CLOSE)) {
+ changeMade = true;
fsNamesys.getBlockManager().forceCompleteBlock(
(INodeFileUnderConstruction)file,
(BlockInfoUnderConstruction)oldBlock);
}
+ if (changeMade) {
+ // The state or gen-stamp of the block has changed. So, we may be
+ // able to process some messages from datanodes that we previously
+ // were unable to process.
+ fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
+ }
}
if (addCloseOp.blocks.length < oldBlocks.length) {
@@ -517,13 +528,9 @@ public class FSEditLogLoader {
}
fsNamesys.getBlockManager().addINode(newBI, file);
file.addBlock(newBI);
+ fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
}
}
-
- // Record the max genstamp seen
- for (Block b : addCloseOp.blocks) {
- maxGenStamp = Math.max(maxGenStamp, b.getGenerationStamp());
- }
}
private static void dumpOpCounts(
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1238940&r1=1238939&r2=1238940&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Feb 1 05:16:49 2012
@@ -154,10 +154,6 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
-import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.BlockReceivedDeleteMessage;
-import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.BlockReportMessage;
-import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.CommitBlockSynchronizationMessage;
-import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.DataNodeMessage;
import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
@@ -321,8 +317,6 @@ public class FSNamesystem implements Nam
// lock to protect FSNamesystem.
private ReentrantReadWriteLock fsLock;
- private PendingDataNodeMessages pendingDatanodeMessages = new PendingDataNodeMessages();
-
/**
* Used when this NN is in standby state to read from the shared edit log.
*/
@@ -342,11 +336,7 @@ public class FSNamesystem implements Nam
private boolean haEnabled;
private final Configuration conf;
-
- PendingDataNodeMessages getPendingDataNodeMessages() {
- return pendingDatanodeMessages;
- }
-
+
/**
* Instantiates an FSNamesystem loaded from the image and edits
* directories specified in the passed Configuration.
@@ -481,6 +471,8 @@ public class FSNamesystem implements Nam
try {
nnResourceChecker = new NameNodeResourceChecker(conf);
checkAvailableResources();
+ assert safeMode != null &&
+ !safeMode.initializedReplQueues;
setBlockTotal();
blockManager.activate(conf);
this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
@@ -531,6 +523,7 @@ public class FSNamesystem implements Nam
LOG.info("Reprocessing replication and invalidation queues...");
blockManager.getDatanodeManager().markAllDatanodesStale();
blockManager.clearQueues();
+ blockManager.processAllPendingDNMessages();
blockManager.processMisReplicatedBlocks();
if (LOG.isDebugEnabled()) {
@@ -849,8 +842,9 @@ public class FSNamesystem implements Nam
public boolean isRunning() {
return fsRunning;
}
-
- private boolean isInStandbyState() {
+
+ @Override
+ public boolean isInStandbyState() {
if (haContext == null || haContext.getState() == null) {
// We're still starting up. In this case, if HA is
// on for the cluster, we always start in standby. Otherwise
@@ -1543,7 +1537,8 @@ public class FSNamesystem implements Nam
blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
if (append && myFile != null) {
- return prepareFileForWrite(src, myFile, holder, clientMachine, clientNode);
+ return prepareFileForWrite(
+ src, myFile, holder, clientMachine, clientNode, true);
} else {
// Now we can add the name to the filesystem. This file has no
// blocks associated with it.
@@ -1581,12 +1576,14 @@ public class FSNamesystem implements Nam
* @param leaseHolder identifier of the lease holder on this file
* @param clientMachine identifier of the client machine
* @param clientNode if the client is collocated with a DN, that DN's descriptor
+ * @param writeToEditLog whether to persist this change to the edit log
* @return the last block locations if the block is partial or null otherwise
* @throws UnresolvedLinkException
* @throws IOException
*/
public LocatedBlock prepareFileForWrite(String src, INode file,
- String leaseHolder, String clientMachine, DatanodeDescriptor clientNode)
+ String leaseHolder, String clientMachine, DatanodeDescriptor clientNode,
+ boolean writeToEditLog)
throws UnresolvedLinkException, IOException {
INodeFile node = (INodeFile) file;
INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
@@ -1601,6 +1598,10 @@ public class FSNamesystem implements Nam
clientNode);
dir.replaceNode(src, node, cons);
leaseManager.addLease(cons.getClientName(), src);
+
+ if (writeToEditLog) {
+ getEditLog().logOpenFile(src, cons);
+ }
return blockManager.convertLastBlockToUnderConstruction(cons);
}
@@ -2346,9 +2347,45 @@ public class FSNamesystem implements Nam
if (blocks == null) {
return;
}
- for(Block b : blocks) {
+
+ // In the case that we are a Standby tailing edits from the
+ // active while in safe-mode, we need to track the total number
+ // of blocks and safe blocks in the system.
+ boolean trackBlockCounts = isSafeModeTrackingBlocks();
+ int numRemovedComplete = 0, numRemovedSafe = 0;
+
+ for (Block b : blocks) {
+ if (trackBlockCounts) {
+ BlockInfo bi = blockManager.getStoredBlock(b);
+ if (bi.isComplete()) {
+ numRemovedComplete++;
+ if (bi.numNodes() >= blockManager.minReplication) {
+ numRemovedSafe++;
+ }
+ }
+ }
blockManager.removeBlock(b);
}
+ if (trackBlockCounts) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adjusting safe-mode totals for deletion of " + src + ":" +
+ "decreasing safeBlocks by " + numRemovedSafe +
+ ", totalBlocks by " + numRemovedComplete);
+ }
+ adjustSafeModeBlockTotals(-numRemovedSafe, -numRemovedComplete);
+ }
+ }
+
+ /**
+ * @see SafeModeInfo#shouldIncrementallyTrackBlocks
+ */
+ private boolean isSafeModeTrackingBlocks() {
+ if (!haEnabled) {
+ // Never track blocks incrementally in non-HA code.
+ return false;
+ }
+ SafeModeInfo sm = this.safeMode;
+ return sm != null && sm.shouldIncrementallyTrackBlocks();
}
/**
@@ -2712,15 +2749,8 @@ public class FSNamesystem implements Nam
checkOperation(OperationCategory.WRITE);
if (haContext.getState().equals(NameNode.STANDBY_STATE)) {
// TODO(HA) we'll never get here, since we check for WRITE operation above!
- if (isGenStampInFuture(newgenerationstamp)) {
- LOG.info("Required GS=" + newgenerationstamp
- + ", Queuing commitBlockSynchronization message");
- getPendingDataNodeMessages().queueMessage(
- new PendingDataNodeMessages.CommitBlockSynchronizationMessage(
- lastblock, newgenerationstamp, newlength, closeFile, deleteblock,
- newtargets, newgenerationstamp));
- return;
- }
+ // Need to implement tests, etc, for this - block recovery spanning
+ // failover.
}
if (isInSafeMode()) {
@@ -3264,6 +3294,8 @@ public class FSNamesystem implements Nam
boolean initializedReplQueues = false;
/** Was safemode entered automatically because available resources were low. */
private boolean resourcesLow = false;
+ /** Should safemode adjust its block totals as blocks come in */
+ private boolean shouldIncrementallyTrackBlocks = false;
/**
* Creates SafeModeInfo when the name node enters
@@ -3292,6 +3324,18 @@ public class FSNamesystem implements Nam
}
/**
+ * In the HA case, the StandbyNode can be in safemode while the namespace
+ * is modified by the edit log tailer. In this case, the number of total
+ * blocks changes as edits are processed (eg blocks are added and deleted).
+ * However, we don't want to do the incremental tracking during the
+ * startup-time loading process -- only once the initial total has been
+ * set after the image has been loaded.
+ */
+ private boolean shouldIncrementallyTrackBlocks() {
+ return shouldIncrementallyTrackBlocks;
+ }
+
+ /**
* Creates SafeModeInfo when safe mode is entered manually, or because
* available resources are low.
*
@@ -3476,6 +3520,13 @@ public class FSNamesystem implements Nam
this.blockThreshold = (int) (blockTotal * threshold);
this.blockReplQueueThreshold =
(int) (blockTotal * replQueueThreshold);
+ if (haEnabled) {
+ // After we initialize the block count, any further namespace
+ // modifications done while in safe mode need to keep track
+ // of the number of total blocks in the system.
+ this.shouldIncrementallyTrackBlocks = true;
+ }
+
checkMode();
}
@@ -3485,9 +3536,10 @@ public class FSNamesystem implements Nam
* @param replication current replication
*/
private synchronized void incrementSafeBlockCount(short replication) {
- if (replication == safeReplication)
+ if (replication == safeReplication) {
this.blockSafe++;
- checkMode();
+ checkMode();
+ }
}
/**
@@ -3496,9 +3548,11 @@ public class FSNamesystem implements Nam
* @param replication current replication
*/
private synchronized void decrementSafeBlockCount(short replication) {
- if (replication == safeReplication-1)
+ if (replication == safeReplication-1) {
this.blockSafe--;
- checkMode();
+ assert blockSafe >= 0 || isManual();
+ checkMode();
+ }
}
/**
@@ -3636,6 +3690,26 @@ public class FSNamesystem implements Nam
+ "BlockManager data: active=" + activeBlocks);
}
}
+
+ private void adjustBlockTotals(int deltaSafe, int deltaTotal) {
+ if (!shouldIncrementallyTrackBlocks) {
+ return;
+ }
+ assert haEnabled;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adjusting block totals from " +
+ blockSafe + "/" + blockTotal + " to " +
+ (blockSafe + deltaSafe) + "/" + (blockTotal + deltaTotal));
+ }
+ assert blockSafe + deltaSafe >= 0 : "Can't reduce blockSafe " +
+ blockSafe + " by " + deltaSafe + ": would be negative";
+ assert blockTotal + deltaTotal >= 0 : "Can't reduce blockTotal " +
+ blockTotal + " by " + deltaTotal + ": would be negative";
+
+ blockSafe += deltaSafe;
+ setBlockTotal(blockTotal + deltaTotal);
+ }
}
/**
@@ -3741,7 +3815,24 @@ public class FSNamesystem implements Nam
SafeModeInfo safeMode = this.safeMode;
if (safeMode == null) // mostly true
return;
- safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas());
+ BlockInfo storedBlock = blockManager.getStoredBlock(b);
+ if (storedBlock.isComplete()) {
+ safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas());
+ }
+ }
+
+ /**
+ * Adjust the total number of blocks safe and expected during safe mode.
+ * If safe mode is not currently on, this is a no-op.
+ * @param deltaSafe the change in number of safe blocks
+ * @param deltaTotal the change i nnumber of total blocks expected
+ */
+ public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal) {
+ // safeMode is volatile, and may be set to null at any time
+ SafeModeInfo safeMode = this.safeMode;
+ if (safeMode == null)
+ return;
+ safeMode.adjustBlockTotals(deltaSafe, deltaTotal);
}
/**
@@ -4066,6 +4157,11 @@ public class FSNamesystem implements Nam
}
@Metric
+ public int getPendingDataNodeMessageCount() {
+ return blockManager.getPendingDataNodeMessageCount();
+ }
+
+ @Metric
public int getBlockCapacity() {
return blockManager.getCapacity();
}
@@ -4912,54 +5008,6 @@ public class FSNamesystem implements Nam
public boolean isGenStampInFuture(long genStamp) {
return (genStamp > getGenerationStamp());
}
-
- public void notifyGenStampUpdate(long gs) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Generation stamp " + gs + " has been reached. " +
- "Processing pending messages from DataNodes...");
- }
- DataNodeMessage msg = pendingDatanodeMessages.take(gs);
- while (msg != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing previously pending message: " + msg);
- }
- try {
- switch (msg.getType()) {
- case BLOCK_RECEIVED_DELETE:
- BlockReceivedDeleteMessage m = (BlockReceivedDeleteMessage) msg;
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog
- .debug("*BLOCK* NameNode.blockReceivedAndDeleted: " + "from "
- + m.getNodeReg().getName() + " "
- + m.getReceivedAndDeletedBlocks().length + " blocks.");
- }
- this.getBlockManager().processIncrementalBlockReport(m.getNodeReg(),
- m.getPoolId(), m.getReceivedAndDeletedBlocks());
- break;
- case BLOCK_REPORT:
- BlockReportMessage mbr = (BlockReportMessage) msg;
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
- + "from " + mbr.getNodeReg().getName() + " "
- + mbr.getBlockList().getNumberOfBlocks() + " blocks");
- }
- this.getBlockManager().processReport(mbr.getNodeReg(),
- mbr.getPoolId(), mbr.getBlockList());
- break;
- case COMMIT_BLOCK_SYNCHRONIZATION:
- CommitBlockSynchronizationMessage mcbm = (CommitBlockSynchronizationMessage) msg;
- this.commitBlockSynchronization(mcbm.getBlock(),
- mcbm.getNewgenerationstamp(), mcbm.getNewlength(),
- mcbm.isCloseFile(), mcbm.isDeleteblock(), mcbm.getNewtargets());
- break;
- }
- } catch (IOException ex) {
- LOG.warn("Could not process the message " + msg.getType(), ex);
- }
- msg = pendingDatanodeMessages.take(gs);
- }
- }
-
@VisibleForTesting
public EditLogTailer getEditLogTailer() {
return editLogTailer;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1238940&r1=1238939&r2=1238940&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Wed Feb 1 05:16:49 2012
@@ -878,16 +878,6 @@ class NameNodeRpcServer implements Namen
String poolId, long[] blocks) throws IOException {
verifyRequest(nodeReg);
BlockListAsLongs blist = new BlockListAsLongs(blocks);
- if (nn.isStandbyState()) {
- long maxGs = blist.getMaxGsInBlockList();
- if (namesystem.isGenStampInFuture(maxGs)) {
- LOG.info("Required GS="+maxGs+", Queuing blockReport message");
- namesystem.getPendingDataNodeMessages().queueMessage(
- new PendingDataNodeMessages.BlockReportMessage(nodeReg, poolId,
- blist, maxGs));
- return null;
- }
- }
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
+ "from " + nodeReg.getName() + " " + blist.getNumberOfBlocks()
@@ -904,25 +894,6 @@ class NameNodeRpcServer implements Namen
public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) throws IOException {
verifyRequest(nodeReg);
- if (nn.isStandbyState()) {
- if (receivedAndDeletedBlocks.length > 0) {
- long maxGs = receivedAndDeletedBlocks[0].getBlock()
- .getGenerationStamp();
- for (ReceivedDeletedBlockInfo binfo : receivedAndDeletedBlocks) {
- if (binfo.getBlock().getGenerationStamp() > maxGs) {
- maxGs = binfo.getBlock().getGenerationStamp();
- }
- }
- if (namesystem.isGenStampInFuture(maxGs)) {
- LOG.info("Required GS=" + maxGs
- + ", Queuing blockReceivedAndDeleted message");
- namesystem.getPendingDataNodeMessages().queueMessage(
- new PendingDataNodeMessages.BlockReceivedDeleteMessage(nodeReg,
- poolId, receivedAndDeletedBlocks, maxGs));
- return;
- }
- }
- }
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
+"from "+nodeReg.getName()+" "+receivedAndDeletedBlocks.length
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java?rev=1238940&r1=1238939&r2=1238940&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java Wed Feb 1 05:16:49 2012
@@ -32,4 +32,10 @@ public interface Namesystem extends RwLo
/** @return the block pool ID */
public String getBlockPoolId();
+
+ public boolean isInStandbyState();
+
+ public boolean isGenStampInFuture(long generationStamp);
+
+ public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal);
}
\ No newline at end of file
Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java?rev=1238940&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java (added)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java Wed Feb 1 05:16:49 2012
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.junit.Assert.*;
+
+import java.util.Queue;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+
+
+public class TestPendingDataNodeMessages {
+ PendingDataNodeMessages msgs = new PendingDataNodeMessages();
+
+ private final Block block1Gs1 = new Block(1, 0, 1);
+ private final Block block1Gs2 = new Block(1, 0, 2);
+ private final Block block1Gs2DifferentInstance =
+ new Block(1, 0, 2);
+ private final Block block2Gs1 = new Block(2, 0, 1);
+
+ private final DatanodeDescriptor fakeDN = new DatanodeDescriptor(
+ new DatanodeID("fake"));
+
+ @Test
+ public void testQueues() {
+ msgs.enqueueReportedBlock(fakeDN, block1Gs1, ReplicaState.FINALIZED);
+ msgs.enqueueReportedBlock(fakeDN, block1Gs2, ReplicaState.FINALIZED);
+
+ assertEquals(2, msgs.count());
+
+ // Nothing queued yet for block 2
+ assertNull(msgs.takeBlockQueue(block2Gs1));
+ assertEquals(2, msgs.count());
+
+ Queue<ReportedBlockInfo> q =
+ msgs.takeBlockQueue(block1Gs2DifferentInstance);
+ assertEquals(
+ "ReportedBlockInfo [block=blk_1_1, dn=fake, reportedState=FINALIZED]," +
+ "ReportedBlockInfo [block=blk_1_2, dn=fake, reportedState=FINALIZED]",
+ Joiner.on(",").join(q));
+ assertEquals(0, msgs.count());
+
+ // Should be null if we pull again
+ assertNull(msgs.takeBlockQueue(block1Gs1));
+ assertEquals(0, msgs.count());
+ }
+}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1238940&r1=1238939&r2=1238940&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Wed Feb 1 05:16:49 2012
@@ -30,8 +30,8 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
-import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.SafeModeInfo;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.ipc.Server;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java?rev=1238940&r1=1238939&r2=1238940&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java Wed Feb 1 05:16:49 2012
@@ -21,18 +21,18 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.net.URISyntaxException;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ha.ServiceFailedException;
+import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
@@ -40,23 +40,29 @@ import org.apache.hadoop.hdfs.MiniDFSClu
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.FSInodeInfo;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
-import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
@@ -360,6 +366,164 @@ public class TestDNFencing {
FileSystem fs2 = cluster.getFileSystem(1);
DFSTestUtil.readFile(fs2, TEST_FILE_PATH);
}
+
+ /**
+ * Regression test for HDFS-2742. The issue in this bug was:
+ * - DN does a block report while file is open. This BR contains
+ * the block in RBW state.
+ * - Standby queues the RBW state in PendingDatanodeMessages
+ * - Standby processes edit logs during failover. Before fixing
+ * this bug, it was mistakenly applying the RBW reported state
+ * after the block had been completed, causing the block to get
+ * marked corrupt. Instead, we should now be applying the RBW
+ * message on OP_ADD, and then the FINALIZED message on OP_CLOSE.
+ */
+ @Test
+ public void testBlockReportsWhileFileBeingWritten() throws Exception {
+ FSDataOutputStream out = fs.create(TEST_FILE_PATH);
+ try {
+ AppendTestUtil.write(out, 0, 10);
+ out.hflush();
+
+ // Block report will include the RBW replica, but will be
+ // queued on the StandbyNode.
+ cluster.triggerBlockReports();
+
+ } finally {
+ IOUtils.closeStream(out);
+ }
+
+ cluster.transitionToStandby(0);
+ cluster.transitionToActive(1);
+
+ // Verify that no replicas are marked corrupt, and that the
+ // file is readable from the failed-over standby.
+ BlockManagerTestUtil.updateState(nn1.getNamesystem().getBlockManager());
+ BlockManagerTestUtil.updateState(nn2.getNamesystem().getBlockManager());
+ assertEquals(0, nn1.getNamesystem().getCorruptReplicaBlocks());
+ assertEquals(0, nn2.getNamesystem().getCorruptReplicaBlocks());
+
+ DFSTestUtil.readFile(fs, TEST_FILE_PATH);
+ }
+
+ /**
+ * Test that, when a block is re-opened for append, the related
+ * datanode messages are correctly queued by the SBN because
+ * they have future states and genstamps.
+ */
+ @Test
+ public void testQueueingWithAppend() throws Exception {
+ int numQueued = 0;
+ int numDN = cluster.getDataNodes().size();
+
+ FSDataOutputStream out = fs.create(TEST_FILE_PATH);
+ try {
+ AppendTestUtil.write(out, 0, 10);
+ out.hflush();
+
+ // Opening the file will report RBW replicas, but will be
+ // queued on the StandbyNode.
+ numQueued += numDN; // RBW messages
+ } finally {
+ IOUtils.closeStream(out);
+ numQueued += numDN; // blockReceived messages
+ }
+
+ cluster.triggerBlockReports();
+ numQueued += numDN;
+
+ try {
+ out = fs.append(TEST_FILE_PATH);
+ AppendTestUtil.write(out, 10, 10);
+ // RBW replicas once it's opened for append
+ numQueued += numDN;
+
+ } finally {
+ IOUtils.closeStream(out);
+ numQueued += numDN; // blockReceived
+ }
+
+ cluster.triggerBlockReports();
+ numQueued += numDN;
+
+ assertEquals(numQueued, cluster.getNameNode(1).getNamesystem().
+ getPendingDataNodeMessageCount());
+
+ cluster.transitionToStandby(0);
+ cluster.transitionToActive(1);
+
+ // Verify that no replicas are marked corrupt, and that the
+ // file is readable from the failed-over standby.
+ BlockManagerTestUtil.updateState(nn1.getNamesystem().getBlockManager());
+ BlockManagerTestUtil.updateState(nn2.getNamesystem().getBlockManager());
+ assertEquals(0, nn1.getNamesystem().getCorruptReplicaBlocks());
+ assertEquals(0, nn2.getNamesystem().getCorruptReplicaBlocks());
+
+ AppendTestUtil.check(fs, TEST_FILE_PATH, 20);
+ }
+
+ /**
+ * Another regression test for HDFS-2742. This tests the following sequence:
+ * - DN does a block report while file is open. This BR contains
+ * the block in RBW state.
+ * - The block report is delayed in reaching the standby.
+ * - The file is closed.
+ * - The standby processes the OP_ADD and OP_CLOSE operations before
+ * the RBW block report arrives.
+ * - The standby should not mark the block as corrupt.
+ */
+ @Test
+ public void testRBWReportArrivesAfterEdits() throws Exception {
+ final CountDownLatch brFinished = new CountDownLatch(1);
+ DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG) {
+ @Override
+ protected Object passThrough(InvocationOnMock invocation)
+ throws Throwable {
+ try {
+ return super.passThrough(invocation);
+ } finally {
+ // inform the test that our block report went through.
+ brFinished.countDown();
+ }
+ }
+ };
+
+ FSDataOutputStream out = fs.create(TEST_FILE_PATH);
+ try {
+ AppendTestUtil.write(out, 0, 10);
+ out.hflush();
+
+ DataNode dn = cluster.getDataNodes().get(0);
+ DatanodeProtocolClientSideTranslatorPB spy =
+ DataNodeAdapter.spyOnBposToNN(dn, nn2);
+
+ Mockito.doAnswer(delayer)
+ .when(spy).blockReport(
+ Mockito.<DatanodeRegistration>anyObject(),
+ Mockito.anyString(),
+ Mockito.<long[]>anyObject());
+ dn.scheduleAllBlockReport(0);
+ delayer.waitForCall();
+
+ } finally {
+ IOUtils.closeStream(out);
+ }
+
+ cluster.transitionToStandby(0);
+ cluster.transitionToActive(1);
+
+ delayer.proceed();
+ brFinished.await();
+
+ // Verify that no replicas are marked corrupt, and that the
+ // file is readable from the failed-over standby.
+ BlockManagerTestUtil.updateState(nn1.getNamesystem().getBlockManager());
+ BlockManagerTestUtil.updateState(nn2.getNamesystem().getBlockManager());
+ assertEquals(0, nn1.getNamesystem().getCorruptReplicaBlocks());
+ assertEquals(0, nn2.getNamesystem().getCorruptReplicaBlocks());
+
+ DFSTestUtil.readFile(fs, TEST_FILE_PATH);
+ }
/**
* Print a big banner in the test log to make debug easier.
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java?rev=1238940&r1=1238939&r2=1238940&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java Wed Feb 1 05:16:49 2012
@@ -25,10 +25,13 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -38,15 +41,19 @@ import org.apache.hadoop.hdfs.MiniDFSClu
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
/**
* Tests that exercise safemode in an HA cluster.
@@ -60,6 +67,12 @@ public class TestHASafeMode {
private MiniDFSCluster cluster;
private Runtime mockRuntime = mock(Runtime.class);
+ static {
+ ((Log4JLogger)LogFactory.getLog(FSImage.class)).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+ }
+
@Before
public void setupCluster() throws Exception {
Configuration conf = new Configuration();
@@ -112,7 +125,11 @@ public class TestHASafeMode {
@Test
public void testEnterSafeModeInANNShouldNotThrowNPE() throws Exception {
banner("Restarting active");
+ DFSTestUtil
+ .createFile(fs, new Path("/test"), 3 * BLOCK_SIZE, (short) 3, 1L);
restartActive();
+ nn0.getRpcServer().transitionToActive();
+
FSNamesystem namesystem = nn0.getNamesystem();
String status = namesystem.getSafemode();
assertTrue("Bad safemode status: '" + status + "'", status
@@ -187,24 +204,14 @@ public class TestHASafeMode {
banner("Restarting standby");
restartStandby();
- // We expect it to be stuck in safemode (not the extension) because
- // the block reports are delayed (since they include blocks
- // from /test2 which are too-high genstamps.
- String status = nn1.getNamesystem().getSafemode();
- assertTrue("Bad safemode status: '" + status + "'",
- status.startsWith(
- "Safe mode is ON." +
- "The reported blocks 0 needs additional 3 blocks to reach"));
+ // We expect it not to be stuck in safemode, since those blocks
+ // that are already visible to the SBN should be processed
+ // in the initial block reports.
+ assertSafeMode(nn1, 3, 3);
banner("Waiting for standby to catch up to active namespace");
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
-
- status = nn1.getNamesystem().getSafemode();
- assertTrue("Bad safemode status: '" + status + "'",
- status.startsWith(
- "Safe mode is ON." +
- "The reported blocks 8 has reached the threshold 0.9990 of " +
- "total blocks 8. Safe mode will be turned off automatically"));
+ assertSafeMode(nn1, 8, 8);
}
/**
@@ -224,12 +231,7 @@ public class TestHASafeMode {
banner("Restarting standby");
restartStandby();
- String status = nn1.getNamesystem().getSafemode();
- assertTrue("Bad safemode status: '" + status + "'",
- status.startsWith(
- "Safe mode is ON." +
- "The reported blocks 3 has reached the threshold 0.9990 of " +
- "total blocks 3. Safe mode will be turned off automatically"));
+ assertSafeMode(nn1, 3, 3);
// Create a few blocks which will send blockReceived calls to the
// SBN.
@@ -240,12 +242,7 @@ public class TestHASafeMode {
banner("Waiting for standby to catch up to active namespace");
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
- status = nn1.getNamesystem().getSafemode();
- assertTrue("Bad safemode status: '" + status + "'",
- status.startsWith(
- "Safe mode is ON." +
- "The reported blocks 8 has reached the threshold 0.9990 of " +
- "total blocks 8. Safe mode will be turned off automatically"));
+ assertSafeMode(nn1, 8, 8);
}
/**
@@ -285,20 +282,11 @@ public class TestHASafeMode {
banner("Restarting standby");
restartStandby();
- String status = nn1.getNamesystem().getSafemode();
- assertTrue("Bad safemode status: '" + status + "'",
- status.startsWith(
- "Safe mode is ON." +
- "The reported blocks 0 needs additional 5 blocks to reach"));
+ assertSafeMode(nn1, 0, 5);
banner("Waiting for standby to catch up to active namespace");
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
- status = nn1.getNamesystem().getSafemode();
- assertTrue("Bad safemode status: '" + status + "'",
- status.startsWith(
- "Safe mode is ON." +
- "The reported blocks 0 has reached the threshold 0.9990 of " +
- "total blocks 0. Safe mode will be turned off automatically"));
+ assertSafeMode(nn1, 0, 0);
}
/**
@@ -320,12 +308,7 @@ public class TestHASafeMode {
restartStandby();
// It will initially have all of the blocks necessary.
- String status = nn1.getNamesystem().getSafemode();
- assertTrue("Bad safemode status: '" + status + "'",
- status.startsWith(
- "Safe mode is ON." +
- "The reported blocks 10 has reached the threshold 0.9990 of " +
- "total blocks 10. Safe mode will be turned off automatically"));
+ assertSafeMode(nn1, 10, 10);
// Delete those blocks while the SBN is in safe mode - this
// should reduce it back below the threshold
@@ -339,23 +322,123 @@ public class TestHASafeMode {
HATestUtil.waitForDNDeletions(cluster);
cluster.triggerDeletionReports();
- status = nn1.getNamesystem().getSafemode();
- assertTrue("Bad safemode status: '" + status + "'",
- status.startsWith(
- "Safe mode is ON." +
- "The reported blocks 0 needs additional 10 blocks"));
+ assertSafeMode(nn1, 0, 10);
banner("Waiting for standby to catch up to active namespace");
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
- status = nn1.getNamesystem().getSafemode();
- assertTrue("Bad safemode status: '" + status + "'",
- status.startsWith(
- "Safe mode is ON." +
- "The reported blocks 0 has reached the threshold 0.9990 of " +
- "total blocks 0. Safe mode will be turned off automatically"));
+ assertSafeMode(nn1, 0, 0);
+ }
+
+ /**
+ * Tests that the standby node properly tracks the number of total
+ * and safe blocks while it is in safe mode. Since safe-mode only
+ * counts completed blocks, append needs to decrement the total
+ * number of blocks and then re-increment when the file is closed
+ * again.
+ */
+ @Test
+ public void testAppendWhileInSafeMode() throws Exception {
+ banner("Starting with NN0 active and NN1 standby, creating some blocks");
+ // Make 4.5 blocks so that append() will re-open an existing block
+ // instead of just adding a new one
+ DFSTestUtil.createFile(fs, new Path("/test"),
+ 4*BLOCK_SIZE + BLOCK_SIZE/2, (short) 3, 1L);
+
+ // Roll edit log so that, when the SBN restarts, it will load
+ // the namespace during startup.
+ nn0.getRpcServer().rollEditLog();
+
+ banner("Restarting standby");
+ restartStandby();
+
+ // It will initially have all of the blocks necessary.
+ assertSafeMode(nn1, 5, 5);
+
+ // Append to a block while SBN is in safe mode. This should
+ // not affect safemode initially, since the DN message
+ // will get queued.
+ FSDataOutputStream stm = fs.append(new Path("/test"));
+ try {
+ assertSafeMode(nn1, 5, 5);
+
+ // if we roll edits now, the SBN should see that it's under construction
+ // and change its total count and safe count down by one, since UC
+ // blocks are not counted by safe mode.
+ HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
+ assertSafeMode(nn1, 4, 4);
+ } finally {
+ IOUtils.closeStream(stm);
+ }
+
+ // Delete those blocks while the SBN is in safe mode - this
+ // should reduce it back below the threshold
+ banner("Removing the blocks without rolling the edit log");
+ fs.delete(new Path("/test"), true);
+ BlockManagerTestUtil.computeAllPendingWork(
+ nn0.getNamesystem().getBlockManager());
+
+ banner("Triggering deletions on DNs and Deletion Reports");
+ cluster.triggerHeartbeats();
+ HATestUtil.waitForDNDeletions(cluster);
+ cluster.triggerDeletionReports();
+
+ assertSafeMode(nn1, 0, 4);
+
+ banner("Waiting for standby to catch up to active namespace");
+ HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
+
+ assertSafeMode(nn1, 0, 0);
+ }
+
+ /**
+ * Regression test for a bug experienced while developing
+ * HDFS-2742. The scenario here is:
+ * - image contains some blocks
+ * - edits log contains at least one block addition, followed
+ * by deletion of more blocks than were added.
+ * - When node starts up, some incorrect accounting of block
+ * totals caused an assertion failure.
+ */
+ @Test
+ public void testBlocksDeletedInEditLog() throws Exception {
+ banner("Starting with NN0 active and NN1 standby, creating some blocks");
+ // Make 4 blocks persisted in the image.
+ DFSTestUtil.createFile(fs, new Path("/test"),
+ 4*BLOCK_SIZE, (short) 3, 1L);
+ NameNodeAdapter.enterSafeMode(nn0, false);
+ NameNodeAdapter.saveNamespace(nn0);
+ NameNodeAdapter.leaveSafeMode(nn0, false);
+
+ // OP_ADD for 2 blocks
+ DFSTestUtil.createFile(fs, new Path("/test2"),
+ 2*BLOCK_SIZE, (short) 3, 1L);
+
+ // OP_DELETE for 4 blocks
+ fs.delete(new Path("/test"), true);
+
+ restartActive();
}
+ private void assertSafeMode(NameNode nn, int safe, int total) {
+ String status = nn1.getNamesystem().getSafemode();
+ if (safe == total) {
+ assertTrue("Bad safemode status: '" + status + "'",
+ status.startsWith(
+ "Safe mode is ON." +
+ "The reported blocks " + safe + " has reached the threshold " +
+ "0.9990 of total blocks " + total + ". Safe mode will be " +
+ "turned off automatically"));
+ } else {
+ int additional = total - safe;
+ assertTrue("Bad safemode status: '" + status + "'",
+ status.startsWith(
+ "Safe mode is ON." +
+ "The reported blocks " + safe + " needs additional " +
+ additional + " blocks"));
+ }
+ }
+
/**
* Set up a namesystem with several edits, both deletions and
* additions, and failover to a new NN while that NN is in
@@ -378,26 +461,107 @@ public class TestHASafeMode {
banner("Restarting standby");
restartStandby();
- // We expect it to be stuck in safemode (not the extension) because
- // the block reports are delayed (since they include blocks
- // from /test2 which are too-high genstamps.
- String status = nn1.getNamesystem().getSafemode();
- assertTrue("Bad safemode status: '" + status + "'",
- status.startsWith(
- "Safe mode is ON." +
- "The reported blocks 0 needs additional 3 blocks to reach"));
-
+ // We expect it to be on its way out of safemode, since all of the blocks
+ // from the edit log have been reported.
+ assertSafeMode(nn1, 3, 3);
+
// Initiate a failover into it while it's in safemode
banner("Initiating a failover into NN1 in safemode");
NameNodeAdapter.abortEditLogs(nn0);
cluster.transitionToActive(1);
- status = nn1.getNamesystem().getSafemode();
+ assertSafeMode(nn1, 5, 5);
+ }
+
+ /**
+ * Similar to {@link #testBlocksRemovedWhileInSafeMode()} except that
+ * the OP_DELETE edits arrive at the SBN before the block deletion reports.
+ * The tracking of safe blocks needs to properly account for the removal
+ * of the blocks as well as the safe count. This is a regression test for
+ * HDFS-2742.
+ */
+ @Test
+ public void testBlocksRemovedWhileInSafeModeEditsArriveFirst() throws Exception {
+ banner("Starting with NN0 active and NN1 standby, creating some blocks");
+ DFSTestUtil.createFile(fs, new Path("/test"), 10*BLOCK_SIZE, (short) 3, 1L);
+
+ // Roll edit log so that, when the SBN restarts, it will load
+ // the namespace during startup.
+ nn0.getRpcServer().rollEditLog();
+
+ banner("Restarting standby");
+ restartStandby();
+
+ // It will initially have all of the blocks necessary.
+ String status = nn1.getNamesystem().getSafemode();
assertTrue("Bad safemode status: '" + status + "'",
status.startsWith(
"Safe mode is ON." +
- "The reported blocks 5 has reached the threshold 0.9990 of " +
- "total blocks 5. Safe mode will be turned off automatically"));
+ "The reported blocks 10 has reached the threshold 0.9990 of " +
+ "total blocks 10. Safe mode will be turned off automatically"));
+
+ // Delete those blocks while the SBN is in safe mode.
+ // Immediately roll the edit log before the actual deletions are sent
+ // to the DNs.
+ banner("Removing the blocks without rolling the edit log");
+ fs.delete(new Path("/test"), true);
+ HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
+
+ // Should see removal of the blocks as well as their contribution to safe block count.
+ assertSafeMode(nn1, 0, 0);
+
+
+ banner("Triggering sending deletions to DNs and Deletion Reports");
+ BlockManagerTestUtil.computeAllPendingWork(
+ nn0.getNamesystem().getBlockManager());
+ cluster.triggerHeartbeats();
+ HATestUtil.waitForDNDeletions(cluster);
+ cluster.triggerDeletionReports();
+
+ // No change in assertion status here, but some of the consistency checks
+ // in safemode will fire here if we accidentally decrement safe block count
+ // below 0.
+ assertSafeMode(nn1, 0, 0);
+ }
+
+
+ /**
+ * Test that the number of safe blocks is accounted correctly even when
+ * blocks move between under-construction state and completed state.
+ * If a FINALIZED report arrives at the SBN before the block is marked
+ * COMPLETE, then when we get the OP_CLOSE we need to count it as "safe"
+ * at that point. This is a regression test for HDFS-2742.
+ */
+ @Test
+ public void testSafeBlockTracking() throws Exception {
+ banner("Starting with NN0 active and NN1 standby, creating some " +
+ "UC blocks plus some other blocks to force safemode");
+ DFSTestUtil.createFile(fs, new Path("/other-blocks"), 10*BLOCK_SIZE, (short) 3, 1L);
+
+ List<FSDataOutputStream> stms = Lists.newArrayList();
+ try {
+ for (int i = 0; i < 5; i++) {
+ FSDataOutputStream stm = fs.create(new Path("/test-uc-" + i));
+ stms.add(stm);
+ stm.write(1);
+ stm.hflush();
+ }
+ // Roll edit log so that, when the SBN restarts, it will load
+ // the namespace during startup and enter safemode.
+ nn0.getRpcServer().rollEditLog();
+ } finally {
+ for (FSDataOutputStream stm : stms) {
+ IOUtils.closeStream(stm);
+ }
+ }
+
+ banner("Restarting SBN");
+ restartStandby();
+ assertSafeMode(nn1, 10, 10);
+
+ banner("Allowing SBN to catch up");
+ HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
+ assertSafeMode(nn1, 15, 15);
}
/**
@@ -425,12 +589,7 @@ public class TestHASafeMode {
nn0.getRpcServer().rollEditLog();
restartStandby();
- String status = nn1.getNamesystem().getSafemode();
- assertTrue("Bad safemode status: '" + status + "'",
- status.startsWith(
- "Safe mode is ON." +
- "The reported blocks 6 has reached the threshold 0.9990 of " +
- "total blocks 6. Safe mode will be turned off automatically"));
+ assertSafeMode(nn1, 6, 6);
}
/**