You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2013/10/17 00:15:34 UTC
svn commit: r1532924 [1/2] - in
/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/
src/main/java/org/apache/hadoop/hdfs/server/datanod...
Author: cmccabe
Date: Wed Oct 16 22:15:33 2013
New Revision: 1532924
URL: http://svn.apache.org/r1532924
Log:
HDFS-5096. Automatically cache new data added to a cached path (contributed by Colin Patrick McCabe)
Added:
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCachedBlocksList.java
Removed:
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateStoredBlocks.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReportProcessor.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UncacheBlocks.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java
Modified:
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt Wed Oct 16 22:15:33 2013
@@ -66,6 +66,9 @@ HDFS-4949 (Unreleased)
HDFS-5359. Allow LightWeightGSet#Iterator to remove elements.
(Contributed by Colin Patrick McCabe)
+ HDFS-5096. Automatically cache new data added to a cached path.
+ (Contributed by Colin Patrick McCabe)
+
OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Oct 16 22:15:33 2013
@@ -205,6 +205,9 @@ public class DFSConfigKeys extends Commo
public static final String DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES =
"dfs.namenode.list.cache.descriptors.num.responses";
public static final int DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES_DEFAULT = 100;
+ public static final String DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS =
+ "dfs.namenode.path.based.cache.refresh.interval.ms";
+ public static final long DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT = 300000L;
// Whether to enable datanode's stale state detection and usage for reads
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode";
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java Wed Oct 16 22:15:33 2013
@@ -61,17 +61,6 @@ public interface BlockCollection {
public short getBlockReplication();
/**
- * Set cache replication factor for the collection
- */
- public void setCacheReplication(short cacheReplication);
-
- /**
- * Get cache replication factor for the collection
- * @return cache replication value
- */
- public short getCacheReplication();
-
- /**
* Get the name of the collection.
*/
public String getName();
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Wed Oct 16 22:15:33 2013
@@ -85,7 +85,7 @@ public class BlockInfo extends Block imp
this.bc = bc;
}
- DatanodeDescriptor getDatanode(int index) {
+ public DatanodeDescriptor getDatanode(int index) {
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
return (DatanodeDescriptor)triplets[index*3];
@@ -153,7 +153,7 @@ public class BlockInfo extends Block imp
return info;
}
- int getCapacity() {
+ public int getCapacity() {
assert this.triplets != null : "BlockInfo is not initialized";
assert triplets.length % 3 == 0 : "Malformed BlockInfo";
return triplets.length / 3;
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Oct 16 22:15:33 2013
@@ -77,13 +77,14 @@ import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
/**
* Keeps information related to the blocks stored in the Hadoop cluster.
*/
@InterfaceAudience.Private
-public class BlockManager extends ReportProcessor {
+public class BlockManager {
static final Log LOG = LogFactory.getLog(BlockManager.class);
public static final Log blockLog = NameNode.blockStateChangeLog;
@@ -162,7 +163,7 @@ public class BlockManager extends Report
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
/** Blocks to be invalidated. */
- private final InvalidateStoredBlocks invalidateBlocks;
+ private final InvalidateBlocks invalidateBlocks;
/**
* After a failover, over-replicated blocks may not be handled
@@ -218,6 +219,7 @@ public class BlockManager extends Report
final boolean encryptDataTransfer;
// Max number of blocks to log info about during a block report.
+ private final long maxNumBlocksToLog;
/**
* When running inside a Standby node, the node may receive block reports
@@ -235,11 +237,10 @@ public class BlockManager extends Report
public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
final Configuration conf) throws IOException {
- super(conf);
this.namesystem = namesystem;
datanodeManager = new DatanodeManager(this, namesystem, conf);
heartbeatManager = datanodeManager.getHeartbeatManager();
- invalidateBlocks = new InvalidateStoredBlocks(datanodeManager);
+ invalidateBlocks = new InvalidateBlocks(datanodeManager);
// Compute the map capacity by allocating 2% of total memory
blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
@@ -299,7 +300,11 @@ public class BlockManager extends Report
this.encryptDataTransfer =
conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
-
+
+ this.maxNumBlocksToLog =
+ conf.getLong(DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
+ DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
+
LOG.info("defaultReplication = " + defaultReplication);
LOG.info("maxReplication = " + maxReplication);
LOG.info("minReplication = " + minReplication);
@@ -999,7 +1004,6 @@ public class BlockManager extends Report
* Adds block to list of blocks which will be invalidated on specified
* datanode and log the operation
*/
- @Override // ReportProcessor
void addToInvalidates(final Block block, final DatanodeInfo datanode) {
invalidateBlocks.add(block, datanode, true);
}
@@ -1045,8 +1049,7 @@ public class BlockManager extends Report
markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn);
}
- @Override // ReportProcessor
- void markBlockAsCorrupt(BlockToMarkCorrupt b,
+ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
DatanodeInfo dn) throws IOException {
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) {
@@ -1056,7 +1059,7 @@ public class BlockManager extends Report
BlockCollection bc = b.corrupted.getBlockCollection();
if (bc == null) {
- blockLogInfo("#markBlockAsCorrupt: " + b
+ blockLog.info("BLOCK markBlockAsCorrupt: " + b
+ " cannot be marked as corrupt as it does not belong to any file");
addToInvalidates(b.corrupted, node);
return;
@@ -1120,9 +1123,6 @@ public class BlockManager extends Report
this.shouldPostponeBlocksFromFuture = postpone;
}
- public boolean shouldPostponeBlocksFromFuture() {
- return this.shouldPostponeBlocksFromFuture;
- }
private void postponeBlock(Block blk) {
if (postponedMisreplicatedBlocks.add(blk)) {
@@ -1544,6 +1544,61 @@ public class BlockManager extends Report
*/
}
}
+
+ /**
+ * StatefulBlockInfo is used to build the "toUC" list, which is a list of
+ * updates to the information about under-construction blocks.
+ * Besides the block in question, it provides the ReplicaState
+ * reported by the datanode in the block report.
+ */
+ private static class StatefulBlockInfo {
+ final BlockInfoUnderConstruction storedBlock;
+ final ReplicaState reportedState;
+
+ StatefulBlockInfo(BlockInfoUnderConstruction storedBlock,
+ ReplicaState reportedState) {
+ this.storedBlock = storedBlock;
+ this.reportedState = reportedState;
+ }
+ }
+
+ /**
+ * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
+ * list of blocks that should be considered corrupt due to a block report.
+ */
+ private static class BlockToMarkCorrupt {
+ /** The corrupted block in a datanode. */
+ final BlockInfo corrupted;
+ /** The corresponding block stored in the BlockManager. */
+ final BlockInfo stored;
+ /** The reason to mark corrupt. */
+ final String reason;
+
+ BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason) {
+ Preconditions.checkNotNull(corrupted, "corrupted is null");
+ Preconditions.checkNotNull(stored, "stored is null");
+
+ this.corrupted = corrupted;
+ this.stored = stored;
+ this.reason = reason;
+ }
+
+ BlockToMarkCorrupt(BlockInfo stored, String reason) {
+ this(stored, stored, reason);
+ }
+
+ BlockToMarkCorrupt(BlockInfo stored, long gs, String reason) {
+ this(new BlockInfo(stored), stored, reason);
+ //the corrupted block in datanode has a different generation stamp
+ corrupted.setGenerationStamp(gs);
+ }
+
+ @Override
+ public String toString() {
+ return corrupted + "("
+ + (corrupted == stored? "same as stored": "stored=" + stored) + ")";
+ }
+ }
/**
* The given datanode is reporting all its blocks.
@@ -1635,6 +1690,46 @@ public class BlockManager extends Report
}
}
+ private void processReport(final DatanodeDescriptor node,
+ final BlockListAsLongs report) throws IOException {
+ // Normal case:
+ // Modify the (block-->datanode) map, according to the difference
+ // between the old and new block report.
+ //
+ Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
+ Collection<Block> toRemove = new LinkedList<Block>();
+ Collection<Block> toInvalidate = new LinkedList<Block>();
+ Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
+ Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+ reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);
+
+ // Process the blocks on each queue
+ for (StatefulBlockInfo b : toUC) {
+ addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
+ }
+ for (Block b : toRemove) {
+ removeStoredBlock(b, node);
+ }
+ int numBlocksLogged = 0;
+ for (BlockInfo b : toAdd) {
+ addStoredBlock(b, node, null, numBlocksLogged < maxNumBlocksToLog);
+ numBlocksLogged++;
+ }
+ if (numBlocksLogged > maxNumBlocksToLog) {
+ blockLog.info("BLOCK* processReport: logged info for " + maxNumBlocksToLog
+ + " of " + numBlocksLogged + " reported.");
+ }
+ for (Block b : toInvalidate) {
+ blockLog.info("BLOCK* processReport: "
+ + b + " on " + node + " size " + b.getNumBytes()
+ + " does not belong to any file");
+ addToInvalidates(b, node);
+ }
+ for (BlockToMarkCorrupt b : toCorrupt) {
+ markBlockAsCorrupt(b, node);
+ }
+ }
+
/**
* processFirstBlockReport is intended only for processing "initial" block
* reports, the first block report received from a DN after it registers.
@@ -1697,6 +1792,44 @@ public class BlockManager extends Report
}
}
+ private void reportDiff(DatanodeDescriptor dn,
+ BlockListAsLongs newReport,
+ Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
+ Collection<Block> toRemove, // remove from DatanodeDescriptor
+ Collection<Block> toInvalidate, // should be removed from DN
+ Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
+ Collection<StatefulBlockInfo> toUC) { // add to under-construction list
+ // place a delimiter in the list which separates blocks
+ // that have been reported from those that have not
+ BlockInfo delimiter = new BlockInfo(new Block(), 1);
+ boolean added = dn.addBlock(delimiter);
+ assert added : "Delimiting block cannot be present in the node";
+ int headIndex = 0; //currently the delimiter is in the head of the list
+ int curIndex;
+
+ if (newReport == null)
+ newReport = new BlockListAsLongs();
+ // scan the report and process newly reported blocks
+ BlockReportIterator itBR = newReport.getBlockReportIterator();
+ while(itBR.hasNext()) {
+ Block iblk = itBR.next();
+ ReplicaState iState = itBR.getCurrentReplicaState();
+ BlockInfo storedBlock = processReportedBlock(dn, iblk, iState,
+ toAdd, toInvalidate, toCorrupt, toUC);
+ // move block to the head of the list
+ if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) {
+ headIndex = dn.moveBlockToHead(storedBlock, curIndex, headIndex);
+ }
+ }
+ // collect blocks that have not been reported
+ // all of them are next to the delimiter
+ Iterator<? extends Block> it = new DatanodeDescriptor.BlockIterator(
+ delimiter.getNext(0), dn);
+ while(it.hasNext())
+ toRemove.add(it.next());
+ dn.removeBlock(delimiter);
+ }
+
/**
* Process a block replica reported by the data-node.
* No side effects except adding to the passed-in Collections.
@@ -1728,8 +1861,7 @@ public class BlockManager extends Report
* @return the up-to-date stored block, if it should be kept.
* Otherwise, null.
*/
- @Override // ReportProcessor
- BlockInfo processReportedBlock(final DatanodeDescriptor dn,
+ private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
final Block block, final ReplicaState reportedState,
final Collection<BlockInfo> toAdd,
final Collection<Block> toInvalidate,
@@ -1956,7 +2088,6 @@ assert storedBlock.findDatanode(dn) < 0
}
}
- @Override // ReportProcessor
void addStoredBlockUnderConstruction(
BlockInfoUnderConstruction block,
DatanodeDescriptor node,
@@ -2012,8 +2143,7 @@ assert storedBlock.findDatanode(dn) < 0
* needed replications if this takes care of the problem.
* @return the block that is stored in blockMap.
*/
- @Override // ReportProcessor
- Block addStoredBlock(final BlockInfo block,
+ private Block addStoredBlock(final BlockInfo block,
DatanodeDescriptor node,
DatanodeDescriptor delNodeHint,
boolean logEveryBlock)
@@ -2028,7 +2158,7 @@ assert storedBlock.findDatanode(dn) < 0
}
if (storedBlock == null || storedBlock.getBlockCollection() == null) {
// If this block does not belong to anyfile, then we are done.
- blockLogInfo("#addStoredBlock: " + block + " on "
+ blockLog.info("BLOCK* addStoredBlock: " + block + " on "
+ node + " size " + block.getNumBytes()
+ " but it does not belong to any file");
// we could add this block to invalidate set of this datanode.
@@ -2050,7 +2180,7 @@ assert storedBlock.findDatanode(dn) < 0
}
} else {
curReplicaDelta = 0;
- blockLogWarn("#addStoredBlock: "
+ blockLog.warn("BLOCK* addStoredBlock: "
+ "Redundant addStoredBlock request received for " + storedBlock
+ " on " + node + " size " + storedBlock.getNumBytes());
}
@@ -2108,6 +2238,20 @@ assert storedBlock.findDatanode(dn) < 0
return storedBlock;
}
+ private void logAddStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
+ if (!blockLog.isInfoEnabled()) {
+ return;
+ }
+
+ StringBuilder sb = new StringBuilder(500);
+ sb.append("BLOCK* addStoredBlock: blockMap updated: ")
+ .append(node)
+ .append(" is added to ");
+ storedBlock.appendStringTo(sb);
+ sb.append(" size " )
+ .append(storedBlock.getNumBytes());
+ blockLog.info(sb);
+ }
/**
* Invalidate corrupt replicas.
* <p>
@@ -2989,6 +3133,13 @@ assert storedBlock.findDatanode(dn) < 0
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
}
+ /**
+ * Get the replicas which are corrupt for a given block.
+ */
+ public Collection<DatanodeDescriptor> getCorruptReplicas(Block block) {
+ return corruptReplicas.getNodes(block);
+ }
+
/** @return the size of UnderReplicatedBlocks */
public int numOfUnderReplicatedBlocks() {
return neededReplications.size();
@@ -3129,21 +3280,4 @@ assert storedBlock.findDatanode(dn) < 0
public void shutdown() {
blocksMap.close();
}
-
- @Override // ReportProcessor
- int moveBlockToHead(DatanodeDescriptor dn, BlockInfo storedBlock,
- int curIndex, int headIndex) {
- return dn.moveBlockToHead(storedBlock, curIndex, headIndex);
- }
-
- @Override // ReportProcessor
- boolean addBlock(DatanodeDescriptor dn, BlockInfo block) {
- return dn.addBlock(block);
- }
-
- @Override // ReportProcessor
- boolean removeBlock(DatanodeDescriptor dn, BlockInfo block) {
- return dn.removeBlock(block);
- }
-
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Wed Oct 16 22:15:33 2013
@@ -19,284 +19,435 @@ package org.apache.hadoop.hdfs.server.bl
import static org.apache.hadoop.util.ExitUtil.terminate;
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
-import java.util.Map.Entry;
+import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.Namesystem;
-import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.namenode.CacheManager;
+import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
+import org.apache.hadoop.util.GSet;
+import org.apache.hadoop.util.Time;
/**
- * Periodically computes new replication work. This consists of two tasks:
- *
- * 1) Assigning blocks in the neededCacheBlocks to datanodes where they will be
- * cached. This moves them to the pendingCacheBlocks list.
- *
- * 2) Placing caching tasks in pendingCacheBlocks that have timed out
- * back into neededCacheBlocks for reassignment.
+ * Scans the namesystem, scheduling blocks to be cached as appropriate.
+ *
+ * The CacheReplicationMonitor does a full scan when the NameNode first
+ * starts up, and at configurable intervals afterwards.
*/
@InterfaceAudience.LimitedPrivate({"HDFS"})
-class CacheReplicationMonitor implements Runnable {
+public class CacheReplicationMonitor extends Thread implements Closeable {
private static final Log LOG =
LogFactory.getLog(CacheReplicationMonitor.class);
- private static final Log blockLog = NameNode.blockStateChangeLog;
+ private final FSNamesystem namesystem;
- private final Namesystem namesystem;
private final BlockManager blockManager;
- private final DatanodeManager datanodeManager;
- private final CacheReplicationManager cacheReplManager;
- private final UncacheBlocks blocksToUncache;
- private final LightWeightHashSet<Block> neededCacheBlocks;
- private final PendingReplicationBlocks pendingCacheBlocks;
+ private final CacheManager cacheManager;
+
+ private final GSet<CachedBlock, CachedBlock> cachedBlocks;
+
+ /**
+ * Pseudorandom number source
+ */
+ private final Random random = new Random();
+
+ /**
+ * The interval at which we scan the namesystem for caching changes.
+ */
+ private final long intervalMs;
+
+ /**
+ * True if we should rescan immediately, regardless of how much time
+ * elapsed since the previous scan.
+ */
+ private boolean rescanImmediately;
+
+ /**
+ * The monotonic time at which the current scan started.
+ */
+ private long scanTimeMs;
/**
- * Re-check period for computing cache replication work
+ * Mark status of the current scan.
*/
- private final long cacheReplicationRecheckInterval;
+ private boolean mark = false;
- public CacheReplicationMonitor(Namesystem namesystem,
- BlockManager blockManager, DatanodeManager datanodeManager,
- CacheReplicationManager cacheReplManager,
- UncacheBlocks blocksToUncache,
- LightWeightHashSet<Block> neededCacheBlocks,
- PendingReplicationBlocks pendingCacheBlocks,
- Configuration conf) {
+ /**
+ * True if this monitor should terminate.
+ */
+ private boolean shutdown;
+
+ /**
+ * Cache directives found in the previous scan.
+ */
+ private int scannedDirectives;
+
+ /**
+ * Blocks found in the previous scan.
+ */
+ private long scannedBlocks;
+
+ public CacheReplicationMonitor(FSNamesystem namesystem,
+ CacheManager cacheManager, long intervalMs) {
this.namesystem = namesystem;
- this.blockManager = blockManager;
- this.datanodeManager = datanodeManager;
- this.cacheReplManager = cacheReplManager;
-
- this.blocksToUncache = blocksToUncache;
- this.neededCacheBlocks = neededCacheBlocks;
- this.pendingCacheBlocks = pendingCacheBlocks;
-
- this.cacheReplicationRecheckInterval = conf.getInt(
- DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
- DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
+ this.blockManager = namesystem.getBlockManager();
+ this.cacheManager = cacheManager;
+ this.cachedBlocks = cacheManager.getCachedBlocks();
+ this.intervalMs = intervalMs;
}
@Override
public void run() {
- LOG.info("CacheReplicationMonitor is starting");
- while (namesystem.isRunning()) {
- try {
- computeCachingWork();
- processPendingCachingWork();
- Thread.sleep(cacheReplicationRecheckInterval);
- } catch (Throwable t) {
- if (!namesystem.isRunning()) {
- LOG.info("Stopping CacheReplicationMonitor.");
- if (!(t instanceof InterruptedException)) {
- LOG.info("CacheReplicationMonitor received an exception"
- + " while shutting down.", t);
+ shutdown = false;
+ rescanImmediately = true;
+ scanTimeMs = 0;
+ LOG.info("Starting CacheReplicationMonitor with interval " +
+ intervalMs + " milliseconds");
+ try {
+ long curTimeMs = Time.monotonicNow();
+ while (true) {
+ synchronized(this) {
+ while (true) {
+ if (shutdown) {
+ LOG.info("Shutting down CacheReplicationMonitor");
+ return;
+ }
+ if (rescanImmediately) {
+ LOG.info("Rescanning on request");
+ rescanImmediately = false;
+ break;
+ }
+ long delta = (scanTimeMs + intervalMs) - curTimeMs;
+ if (delta <= 0) {
+ LOG.info("Rescanning after " + (curTimeMs - scanTimeMs) +
+ " milliseconds");
+ break;
+ }
+ this.wait(delta);
+ curTimeMs = Time.monotonicNow();
}
- break;
}
- LOG.fatal("ReplicationMonitor thread received Runtime exception. ", t);
- terminate(1, t);
+ scanTimeMs = curTimeMs;
+ mark = !mark;
+ rescan();
+ curTimeMs = Time.monotonicNow();
+ LOG.info("Scanned " + scannedDirectives + " directive(s) and " +
+ scannedBlocks + " block(s) in " + (curTimeMs - scanTimeMs) + " " +
+ "millisecond(s).");
}
+ } catch (Throwable t) {
+ LOG.fatal("Thread exiting", t);
+ terminate(1, t);
}
}
/**
- * Assigns under-cached blocks to new datanodes.
+ * Kick the monitor thread.
+ *
+ * If it is sleeping, it will wake up and start scanning.
+ * If it is currently scanning, it will finish the scan and immediately do
+ * another one.
*/
- private void computeCachingWork() {
- List<Block> blocksToCache = null;
- namesystem.writeLock();
+ public synchronized void kick() {
+ rescanImmediately = true;
+ this.notifyAll();
+ }
+
+ /**
+ * Shut down and join the monitor thread.
+ */
+ @Override
+ public void close() throws IOException {
+ synchronized(this) {
+ if (shutdown) return;
+ shutdown = true;
+ this.notifyAll();
+ }
try {
- synchronized (neededCacheBlocks) {
- blocksToCache = neededCacheBlocks.pollAll();
+ if (this.isAlive()) {
+ this.join(60000);
}
- } finally {
- namesystem.writeUnlock();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
- computeCachingWorkForBlocks(blocksToCache);
- computeUncacheWork();
}
- private void computeCachingWorkForBlocks(List<Block> blocksToCache) {
- int requiredRepl, effectiveRepl, additionalRepl;
- List<DatanodeDescriptor> cachedNodes, storedNodes, targets;
-
- final HashMap<Block, List<DatanodeDescriptor>> work =
- new HashMap<Block, List<DatanodeDescriptor>>();
+ private void rescan() {
+ scannedDirectives = 0;
+ scannedBlocks = 0;
namesystem.writeLock();
try {
- synchronized (neededCacheBlocks) {
- for (Block block: blocksToCache) {
- // Required number of cached replicas
- requiredRepl = cacheReplManager.getCacheReplication(block);
- // Replicas that are safely cached
- cachedNodes = cacheReplManager.getSafeReplicas(
- cacheReplManager.cachedBlocksMap, block);
- // Replicas that are safely stored on disk
- storedNodes = cacheReplManager.getSafeReplicas(
- blockManager.blocksMap, block);
- // "effective" replication factor which includes pending
- // replication work
- effectiveRepl = cachedNodes.size()
- + pendingCacheBlocks.getNumReplicas(block);
- if (effectiveRepl >= requiredRepl) {
- neededCacheBlocks.remove(block);
- blockLog.info("BLOCK* Removing " + block
- + " from neededCacheBlocks as it has enough cached replicas");
- continue;
- }
- // Choose some replicas to cache if needed
- additionalRepl = requiredRepl - effectiveRepl;
- targets = new ArrayList<DatanodeDescriptor>(storedNodes.size());
- // Only target replicas that aren't already cached.
- for (DatanodeDescriptor dn: storedNodes) {
- if (!cachedNodes.contains(dn)) {
- targets.add(dn);
- }
- }
- if (targets.size() < additionalRepl) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Block " + block + " cannot be cached on additional"
- + " nodes because there are no more available datanodes"
- + " with the block on disk.");
- }
- }
- targets = CacheReplicationPolicy.chooseTargetsToCache(block, targets,
- additionalRepl);
- if (targets.size() < additionalRepl) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Block " + block + " cannot be cached on additional"
- + " nodes because there is not sufficient cache space on"
- + " available target datanodes.");
- }
- }
- // Continue if we couldn't get more cache targets
- if (targets.size() == 0) {
- continue;
- }
-
- // Update datanodes and blocks that were scheduled for caching
- work.put(block, targets);
- // Schedule caching on the targets
- for (DatanodeDescriptor target: targets) {
- target.addBlockToBeCached(block);
- }
- // Add block to the pending queue
- pendingCacheBlocks.increment(block,
- targets.toArray(new DatanodeDescriptor[] {}));
- if (blockLog.isDebugEnabled()) {
- blockLog.debug("BLOCK* block " + block
- + " is moved from neededCacheBlocks to pendingCacheBlocks");
- }
- // Remove from needed queue if it will be fully replicated
- if (effectiveRepl + targets.size() >= requiredRepl) {
- neededCacheBlocks.remove(block);
- }
- }
- }
+ rescanPathBasedCacheEntries();
+ } finally {
+ namesystem.writeUnlock();
+ }
+ namesystem.writeLock();
+ try {
+ rescanCachedBlockMap();
} finally {
namesystem.writeUnlock();
}
+ }
- if (blockLog.isInfoEnabled()) {
- // log which blocks have been scheduled for replication
- for (Entry<Block, List<DatanodeDescriptor>> item : work.entrySet()) {
- Block block = item.getKey();
- List<DatanodeDescriptor> nodes = item.getValue();
- StringBuilder targetList = new StringBuilder("datanode(s)");
- for (DatanodeDescriptor node: nodes) {
- targetList.append(' ');
- targetList.append(node);
+ /**
+ * Scan all PathBasedCacheEntries. Use the information to figure out
+ * what cache replication factor each block should have.
+ *
+ * @param mark Whether the current scan is setting or clearing the mark
+ */
+ private void rescanPathBasedCacheEntries() {
+ FSDirectory fsDir = namesystem.getFSDirectory();
+ for (PathBasedCacheEntry pce : cacheManager.getEntriesById().values()) {
+ scannedDirectives++;
+ String path = pce.getPath();
+ INode node;
+ try {
+ node = fsDir.getINode(path);
+ } catch (UnresolvedLinkException e) {
+ // We don't cache through symlinks
+ continue;
+ }
+ if (node == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No inode found at " + path);
+ }
+ } else if (node.isDirectory()) {
+ INodeDirectory dir = node.asDirectory();
+ ReadOnlyList<INode> children = dir.getChildrenList(null);
+ for (INode child : children) {
+ if (child.isFile()) {
+ rescanFile(pce, child.asFile());
+ }
+ }
+ } else if (node.isFile()) {
+ rescanFile(pce, node.asFile());
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring non-directory, non-file inode " + node +
+ " found at " + path);
}
- blockLog.info("BLOCK* ask " + targetList + " to cache " + block);
}
}
-
- if (blockLog.isDebugEnabled()) {
- blockLog.debug(
- "BLOCK* neededCacheBlocks = " + neededCacheBlocks.size()
- + " pendingCacheBlocks = " + pendingCacheBlocks.size());
+ }
+
+ /**
+ * Apply a PathBasedCacheEntry to a file.
+ *
+ * @param pce The PathBasedCacheEntry to apply.
+ * @param file The file.
+ */
+ private void rescanFile(PathBasedCacheEntry pce, INodeFile file) {
+ BlockInfo[] blockInfos = file.getBlocks();
+ for (BlockInfo blockInfo : blockInfos) {
+ if (!blockInfo.getBlockUCState().equals(BlockUCState.COMPLETE)) {
+ // We don't try to cache blocks that are under construction.
+ continue;
+ }
+ Block block = new Block(blockInfo.getBlockId());
+ CachedBlock ncblock = new CachedBlock(block.getBlockId(),
+ pce.getReplication(), mark);
+ CachedBlock ocblock = cachedBlocks.get(ncblock);
+ if (ocblock == null) {
+ cachedBlocks.put(ncblock);
+ } else {
+ if (mark != ocblock.getMark()) {
+ // Mark hasn't been set in this scan, so update replication and mark.
+ ocblock.setReplicationAndMark(pce.getReplication(), mark);
+ } else {
+ // Mark already set in this scan. Set replication to highest value in
+ // any PathBasedCacheEntry that covers this file.
+ ocblock.setReplicationAndMark((short)Math.max(
+ pce.getReplication(), ocblock.getReplication()), mark);
+ }
+ }
}
}
/**
- * Reassign pending caching work that has timed out
+ * Scan through the cached block map.
+ * Any blocks which are under-replicated should be assigned new Datanodes.
+ * Blocks that are over-replicated should be removed from Datanodes.
*/
- private void processPendingCachingWork() {
- Block[] timedOutItems = pendingCacheBlocks.getTimedOutBlocks();
- if (timedOutItems != null) {
- namesystem.writeLock();
- try {
- for (int i = 0; i < timedOutItems.length; i++) {
- Block block = timedOutItems[i];
- final short numCached = cacheReplManager.getNumCached(block);
- final short cacheReplication =
- cacheReplManager.getCacheReplication(block);
- // Needs to be cached if under-replicated
- if (numCached < cacheReplication) {
- synchronized (neededCacheBlocks) {
- neededCacheBlocks.add(block);
- }
- }
+ private void rescanCachedBlockMap() {
+ for (Iterator<CachedBlock> cbIter = cachedBlocks.iterator();
+ cbIter.hasNext(); ) {
+ scannedBlocks++;
+ CachedBlock cblock = cbIter.next();
+ List<DatanodeDescriptor> pendingCached =
+ cblock.getDatanodes(Type.PENDING_CACHED);
+ List<DatanodeDescriptor> cached =
+ cblock.getDatanodes(Type.CACHED);
+ List<DatanodeDescriptor> pendingUncached =
+ cblock.getDatanodes(Type.PENDING_UNCACHED);
+ // Remove nodes from PENDING_UNCACHED if they were actually uncached.
+ for (Iterator<DatanodeDescriptor> iter = pendingUncached.iterator();
+ iter.hasNext(); ) {
+ DatanodeDescriptor datanode = iter.next();
+ if (!cblock.isInList(datanode.getCached())) {
+ datanode.getPendingUncached().remove(cblock);
+ iter.remove();
}
- } finally {
- namesystem.writeUnlock();
+ }
+ // If the block's mark doesn't match with the mark of this scan, that
+ // means that this block couldn't be reached during this scan. That means
+ // it doesn't need to be cached any more.
+ int neededCached = (cblock.getMark() != mark) ?
+ 0 : cblock.getReplication();
+ int numCached = cached.size();
+ if (numCached >= neededCached) {
+ // If we have enough replicas, drop all pending cached.
+ for (DatanodeDescriptor datanode : pendingCached) {
+ datanode.getPendingCached().remove(cblock);
+ }
+ pendingCached.clear();
+ }
+ if (numCached < neededCached) {
+ // If we don't have enough replicas, drop all pending uncached.
+ for (DatanodeDescriptor datanode : pendingUncached) {
+ datanode.getPendingUncached().remove(cblock);
+ }
+ pendingUncached.clear();
+ }
+ int neededUncached = numCached -
+ (pendingUncached.size() + neededCached);
+ if (neededUncached > 0) {
+ addNewPendingUncached(neededUncached, cblock, cached,
+ pendingUncached);
+ } else {
+ int additionalCachedNeeded = neededCached -
+ (numCached + pendingCached.size());
+ if (additionalCachedNeeded > 0) {
+ addNewPendingCached(additionalCachedNeeded, cblock, cached,
+ pendingCached);
+ }
+ }
+ if ((neededCached == 0) &&
+ pendingUncached.isEmpty() &&
+ pendingCached.isEmpty()) {
+ // we have nothing more to do with this block.
+ cbIter.remove();
}
}
}
/**
- * Schedule blocks for uncaching at datanodes
- * @return total number of block for deletion
+ * Add new entries to the PendingUncached list.
+ *
+ * @param neededUncached The number of replicas that need to be uncached.
+ * @param cachedBlock The block which needs to be uncached.
+ * @param cached A list of DataNodes currently caching the block.
+ * @param pendingUncached A list of DataNodes that will soon uncache the
+ * block.
*/
- int computeUncacheWork() {
- final List<String> nodes = blocksToUncache.getStorageIDs();
- int blockCnt = 0;
- for (String node: nodes) {
- blockCnt += uncachingWorkForOneNode(node);
+ private void addNewPendingUncached(int neededUncached,
+ CachedBlock cachedBlock, List<DatanodeDescriptor> cached,
+ List<DatanodeDescriptor> pendingUncached) {
+ if (!cacheManager.isActive()) {
+ return;
+ }
+ // Figure out which replicas can be uncached.
+ LinkedList<DatanodeDescriptor> possibilities =
+ new LinkedList<DatanodeDescriptor>();
+ for (DatanodeDescriptor datanode : cached) {
+ if (!pendingUncached.contains(datanode)) {
+ possibilities.add(datanode);
+ }
+ }
+ while (neededUncached > 0) {
+ if (possibilities.isEmpty()) {
+ LOG.warn("Logic error: we're trying to uncache more replicas than " +
+ "actually exist for " + cachedBlock);
+ return;
+ }
+ DatanodeDescriptor datanode =
+ possibilities.remove(random.nextInt(possibilities.size()));
+ pendingUncached.add(datanode);
+ boolean added = datanode.getPendingUncached().add(cachedBlock);
+ assert added;
+ neededUncached--;
}
- return blockCnt;
}
-
+
/**
- * Gets the list of blocks scheduled for uncaching at a datanode and
- * schedules them for uncaching.
- *
- * @return number of blocks scheduled for removal
+ * Add new entries to the PendingCached list.
+ *
+ * @param neededCached The number of replicas that need to be cached.
+ * @param cachedBlock The block which needs to be cached.
+ * @param cached A list of DataNodes currently caching the block.
+ * @param pendingCached A list of DataNodes that will soon cache the
+ * block.
*/
- private int uncachingWorkForOneNode(String nodeId) {
- final List<Block> toInvalidate;
- final DatanodeDescriptor dn;
-
- namesystem.writeLock();
- try {
- // get blocks to invalidate for the nodeId
- assert nodeId != null;
- dn = datanodeManager.getDatanode(nodeId);
- if (dn == null) {
- blocksToUncache.remove(nodeId);
- return 0;
- }
- toInvalidate = blocksToUncache.invalidateWork(nodeId, dn);
- if (toInvalidate == null) {
- return 0;
+ private void addNewPendingCached(int neededCached,
+ CachedBlock cachedBlock, List<DatanodeDescriptor> cached,
+ List<DatanodeDescriptor> pendingCached) {
+ if (!cacheManager.isActive()) {
+ return;
+ }
+ // To figure out which replicas can be cached, we consult the
+ // blocksMap. We don't want to try to cache a corrupt replica, though.
+ BlockInfo blockInfo = blockManager.
+ getStoredBlock(new Block(cachedBlock.getBlockId()));
+ if (blockInfo == null) {
+ LOG.debug("Not caching block " + cachedBlock + " because it " +
+ "was deleted from all DataNodes.");
+ return;
+ }
+ if (!blockInfo.isComplete()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not caching block " + cachedBlock + " because it " +
+ "is not yet complete.");
+ }
+ return;
+ }
+ List<DatanodeDescriptor> possibilities = new LinkedList<DatanodeDescriptor>();
+ int numReplicas = blockInfo.getCapacity();
+ Collection<DatanodeDescriptor> corrupt =
+ blockManager.getCorruptReplicas(blockInfo);
+ for (int i = 0; i < numReplicas; i++) {
+ DatanodeDescriptor datanode = blockInfo.getDatanode(i);
+ if ((datanode != null) &&
+ ((!pendingCached.contains(datanode)) &&
+ ((corrupt == null) || (!corrupt.contains(datanode))))) {
+ possibilities.add(datanode);
}
- } finally {
- namesystem.writeUnlock();
}
- if (blockLog.isInfoEnabled()) {
- blockLog.info("BLOCK* " + getClass().getSimpleName()
- + ": ask " + dn + " to uncache " + toInvalidate);
+ while (neededCached > 0) {
+ if (possibilities.isEmpty()) {
+ LOG.warn("We need " + neededCached + " more replica(s) than " +
+ "actually exist to provide a cache replication of " +
+ cachedBlock.getReplication() + " for " + cachedBlock);
+ return;
+ }
+ DatanodeDescriptor datanode =
+ possibilities.remove(random.nextInt(possibilities.size()));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AddNewPendingCached: datanode " + datanode +
+ " will now cache block " + cachedBlock);
+ }
+ pendingCached.add(datanode);
+ boolean added = datanode.getPendingCached().add(cachedBlock);
+ assert added;
+ neededCached--;
}
- return toInvalidate.size();
}
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Wed Oct 16 22:15:33 2013
@@ -28,7 +28,9 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+import org.apache.hadoop.util.IntrusiveCollection;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
@@ -98,22 +100,72 @@ public class DatanodeDescriptor extends
}
/**
- * Head of the list of blocks on the datanode
+ * A list of CachedBlock objects on this datanode.
*/
- private volatile BlockInfo blockList = null;
+ public static class CachedBlocksList extends IntrusiveCollection<CachedBlock> {
+ public enum Type {
+ PENDING_CACHED,
+ CACHED,
+ PENDING_UNCACHED
+ }
+
+ private final DatanodeDescriptor datanode;
+
+ private final Type type;
+
+ CachedBlocksList(DatanodeDescriptor datanode, Type type) {
+ this.datanode = datanode;
+ this.type = type;
+ }
+
+ public DatanodeDescriptor getDatanode() {
+ return datanode;
+ }
+
+ public Type getType() {
+ return type;
+ }
+ }
+
/**
- * Number of blocks on the datanode
+ * The blocks which we want to cache on this DataNode.
*/
- private int numBlocks = 0;
+ private final CachedBlocksList pendingCached =
+ new CachedBlocksList(this, CachedBlocksList.Type.PENDING_CACHED);
/**
- * Head of the list of cached blocks on the datanode
+ * The blocks which we know are cached on this datanode.
+ * This list is updated by periodic cache reports.
*/
- private volatile BlockInfo cachedBlockList = null;
+ private final CachedBlocksList cached =
+ new CachedBlocksList(this, CachedBlocksList.Type.CACHED);
+
/**
- * Number of cached blocks on the datanode
+ * The blocks which we want to uncache on this DataNode.
*/
- private int numCachedBlocks = 0;
+ private final CachedBlocksList pendingUncached =
+ new CachedBlocksList(this, CachedBlocksList.Type.PENDING_UNCACHED);
+
+ public CachedBlocksList getPendingCached() {
+ return pendingCached;
+ }
+
+ public CachedBlocksList getCached() {
+ return cached;
+ }
+
+ public CachedBlocksList getPendingUncached() {
+ return pendingUncached;
+ }
+
+ /**
+ * Head of the list of blocks on the datanode
+ */
+ private volatile BlockInfo blockList = null;
+ /**
+ * Number of blocks on the datanode
+ */
+ private int numBlocks = 0;
// isAlive == heartbeats.contains(this)
// This is an optimization, because contains takes O(n) time on Arraylist
@@ -154,12 +206,6 @@ public class DatanodeDescriptor extends
/** A set of blocks to be invalidated by this datanode */
private LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<Block>();
- /** A queue of blocks to be cached by this datanode */
- private BlockQueue<Block> cacheBlocks = new BlockQueue<Block>();
- /** A set of blocks to be uncached by this datanode */
- private LightWeightHashSet<Block> blocksToUncache =
- new LightWeightHashSet<Block>();
-
/* Variables for maintaining number of blocks scheduled to be written to
* this datanode. This count is approximate and might be slightly bigger
* in case of errors (e.g. datanode does not report if an error occurs
@@ -287,43 +333,6 @@ public class DatanodeDescriptor extends
}
/**
- * Add block to the list of cached blocks on the data-node.
- * @return true if block was successfully added, false if already present
- */
- public boolean addCachedBlock(BlockInfo b) {
- if (!b.addNode(this))
- return false;
- // add to the head of the data-node list
- cachedBlockList = b.listInsert(cachedBlockList, this);
- numCachedBlocks++;
- return true;
- }
-
- /**
- * Remove block from the list of cached blocks on the data-node.
- * @return true if block was successfully removed, false if not present
- */
- public boolean removeCachedBlock(BlockInfo b) {
- cachedBlockList = b.listRemove(cachedBlockList, this);
- if (b.removeNode(this)) {
- numCachedBlocks--;
- return true;
- } else {
- return false;
- }
- }
-
- /**
- * Move block to the head of the list of cached blocks on the data-node.
- * @return the index of the head of the blockList
- */
- int moveCachedBlockToHead(BlockInfo b, int curIndex, int headIndex) {
- cachedBlockList = b.moveBlockToHead(cachedBlockList, this, curIndex,
- headIndex);
- return curIndex;
- }
-
- /**
* Used for testing only
* @return the head of the blockList
*/
@@ -332,11 +341,6 @@ public class DatanodeDescriptor extends
return blockList;
}
- @VisibleForTesting
- protected BlockInfo getCachedHead() {
- return cachedBlockList;
- }
-
/**
* Replace specified old block with a new one in the DataNodeDescriptor.
*
@@ -359,10 +363,13 @@ public class DatanodeDescriptor extends
setDfsUsed(0);
setXceiverCount(0);
this.blockList = null;
- this.cachedBlockList = null;
this.invalidateBlocks.clear();
- this.blocksToUncache.clear();
this.volumeFailures = 0;
+ // pendingCached, cached, and pendingUncached are protected by the
+ // FSN lock.
+ this.pendingCached.clear();
+ this.cached.clear();
+ this.pendingUncached.clear();
}
public void clearBlockQueues() {
@@ -371,20 +378,17 @@ public class DatanodeDescriptor extends
this.recoverBlocks.clear();
this.replicateBlocks.clear();
}
- synchronized(blocksToUncache) {
- this.blocksToUncache.clear();
- this.cacheBlocks.clear();
- }
+ // pendingCached, cached, and pendingUncached are protected by the
+ // FSN lock.
+ this.pendingCached.clear();
+ this.cached.clear();
+ this.pendingUncached.clear();
}
public int numBlocks() {
return numBlocks;
}
- public int numCachedBlocks() {
- return numCachedBlocks;
- }
-
/**
* Updates stats from datanode heartbeat.
*/
@@ -438,10 +442,6 @@ public class DatanodeDescriptor extends
return new BlockIterator(this.blockList, this);
}
- public Iterator<BlockInfo> getCachedBlockIterator() {
- return new BlockIterator(this.cachedBlockList, this);
- }
-
/**
* Store block replication work.
*/
@@ -451,14 +451,6 @@ public class DatanodeDescriptor extends
}
/**
- * Store block caching work.
- */
- void addBlockToBeCached(Block block) {
- assert(block != null);
- cacheBlocks.offer(block);
- }
-
- /**
* Store block recovery work.
*/
void addBlockToBeRecovered(BlockInfoUnderConstruction block) {
@@ -483,18 +475,6 @@ public class DatanodeDescriptor extends
}
/**
- * Store block uncaching work.
- */
- void addBlocksToBeUncached(List<Block> blocklist) {
- assert(blocklist != null && blocklist.size() > 0);
- synchronized (blocksToUncache) {
- for (Block blk : blocklist) {
- blocksToUncache.add(blk);
- }
- }
- }
-
- /**
* The number of work items that are pending to be replicated
*/
int getNumberOfBlocksToBeReplicated() {
@@ -502,13 +482,6 @@ public class DatanodeDescriptor extends
}
/**
- * The number of pending cache work items
- */
- int getNumberOfBlocksToBeCached() {
- return cacheBlocks.size();
- }
-
- /**
* The number of block invalidation items that are pending to
* be sent to the datanode
*/
@@ -518,23 +491,10 @@ public class DatanodeDescriptor extends
}
}
- /**
- * The number of pending uncache work items
- */
- int getNumberOfBlocksToBeUncached() {
- synchronized (blocksToUncache) {
- return blocksToUncache.size();
- }
- }
-
public List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
return replicateBlocks.poll(maxTransfers);
}
- public List<Block> getCacheBlocks() {
- return cacheBlocks.poll(cacheBlocks.size());
- }
-
public BlockInfoUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) {
List<BlockInfoUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
if(blocks == null)
@@ -554,17 +514,6 @@ public class DatanodeDescriptor extends
}
/**
- * Remove up to the maximum number of blocks to be uncached
- */
- public Block[] getInvalidateCacheBlocks() {
- synchronized (blocksToUncache) {
- Block[] deleteList = blocksToUncache.pollToArray(
- new Block[blocksToUncache.size()]);
- return deleteList.length == 0 ? null : deleteList;
- }
- }
-
- /**
* @return Approximate number of blocks currently scheduled to be written
* to this datanode.
*/
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Wed Oct 16 22:15:33 2013
@@ -49,6 +49,8 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
+import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.HostFileManager;
import org.apache.hadoop.hdfs.server.namenode.HostFileManager.Entry;
import org.apache.hadoop.hdfs.server.namenode.HostFileManager.EntrySet;
@@ -76,6 +78,7 @@ import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.IntrusiveCollection;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
@@ -168,6 +171,12 @@ public class DatanodeManager {
private boolean hasClusterEverBeenMultiRack = false;
/**
+ * Whether we should tell datanodes what to cache in replies to
+ * heartbeat messages.
+ */
+ private boolean sendCachingCommands = false;
+
+ /**
* The number of datanodes for each software version. This list should change
* during rolling upgrades.
* Software version -> Number of datanodes with this version
@@ -1305,26 +1314,17 @@ public class DatanodeManager {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
blockPoolId, blks));
}
-
- // Check pending caching
- List<Block> pendingCacheList = nodeinfo.getCacheBlocks();
- if (pendingCacheList != null) {
- long blockIds[] = new long[pendingCacheList.size()];
- for (int i = 0; i < pendingCacheList.size(); i++) {
- blockIds[i] = pendingCacheList.get(i).getBlockId();
- }
- cmds.add(new BlockIdCommand(DatanodeProtocol.DNA_CACHE, blockPoolId,
- blockIds));
- }
- // Check cached block invalidation
- blks = nodeinfo.getInvalidateCacheBlocks();
- if (blks != null) {
- long blockIds[] = new long[blks.length];
- for (int i = 0; i < blks.length; i++) {
- blockIds[i] = blks[i].getBlockId();
- }
- cmds.add(new BlockIdCommand(DatanodeProtocol.DNA_UNCACHE,
- blockPoolId, blockIds));
+ DatanodeCommand pendingCacheCommand =
+ getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
+ DatanodeProtocol.DNA_CACHE, blockPoolId);
+ if (pendingCacheCommand != null) {
+ cmds.add(pendingCacheCommand);
+ }
+ DatanodeCommand pendingUncacheCommand =
+ getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
+ DatanodeProtocol.DNA_UNCACHE, blockPoolId);
+ if (pendingUncacheCommand != null) {
+ cmds.add(pendingUncacheCommand);
}
blockManager.addKeyUpdateCommand(cmds, nodeinfo);
@@ -1346,6 +1346,40 @@ public class DatanodeManager {
}
/**
+ * Convert a CachedBlockList into a DatanodeCommand with a list of blocks.
+ *
+ * @param list The {@link CachedBlocksList}. This function
+ * clears the list.
+ * @param datanode The datanode.
+ * @param action The action to perform in the command.
+ * @param poolId The block pool id.
+ * @return A DatanodeCommand to be sent back to the DN, or null if
+ * there is nothing to be done.
+ */
+ private DatanodeCommand getCacheCommand(CachedBlocksList list,
+ DatanodeDescriptor datanode, int action, String poolId) {
+ int length = list.size();
+ if (length == 0) {
+ return null;
+ }
+ // Read and clear the existing cache commands.
+ long[] blockIds = new long[length];
+ int i = 0;
+ for (Iterator<CachedBlock> iter = list.iterator();
+ iter.hasNext(); ) {
+ CachedBlock cachedBlock = iter.next();
+ blockIds[i++] = cachedBlock.getBlockId();
+ iter.remove();
+ }
+ if (!sendCachingCommands) {
+ // Do not send caching commands unless the FSNamesystem told us we
+ // should.
+ return null;
+ }
+ return new BlockIdCommand(action, poolId, blockIds);
+ }
+
+ /**
* Tell all datanodes to use a new, non-persistent bandwidth value for
* dfs.balance.bandwidthPerSec.
*
@@ -1393,4 +1427,8 @@ public class DatanodeManager {
public String toString() {
return getClass().getSimpleName() + ": " + host2DatanodeMap;
}
+
+ public void setSendCachingCommands(boolean sendCachingCommands) {
+ this.sendCachingCommands = sendCachingCommands;
+ }
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Wed Oct 16 22:15:33 2013
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
+import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -34,22 +35,24 @@ import org.apache.hadoop.hdfs.util.Light
* on the machine in question.
*/
@InterfaceAudience.Private
-abstract class InvalidateBlocks {
+class InvalidateBlocks {
/** Mapping: StorageID -> Collection of Blocks */
private final Map<String, LightWeightHashSet<Block>> node2blocks =
new TreeMap<String, LightWeightHashSet<Block>>();
/** The total number of blocks in the map. */
private long numBlocks = 0L;
+ private final DatanodeManager datanodeManager;
+
+ InvalidateBlocks(final DatanodeManager datanodeManager) {
+ this.datanodeManager = datanodeManager;
+ }
+
/** @return the number of blocks to be invalidated . */
synchronized long numBlocks() {
return numBlocks;
}
- synchronized int numStorages() {
- return node2blocks.size();
- }
-
/**
* @return true if the given storage has the given block listed for
* invalidation. Blocks are compared including their generation stamps:
@@ -108,22 +111,22 @@ abstract class InvalidateBlocks {
}
}
- /**
- * Polls up to <i>limit</i> blocks from the list of to-be-invalidated Blocks
- * for a storage.
- */
- synchronized List<Block> pollNumBlocks(final String storageId, final int limit) {
- final LightWeightHashSet<Block> set = node2blocks.get(storageId);
- if (set == null) {
- return null;
+ /** Print the contents to out. */
+ synchronized void dump(final PrintWriter out) {
+ final int size = node2blocks.values().size();
+ out.println("Metasave: Blocks " + numBlocks
+ + " waiting deletion from " + size + " datanodes.");
+ if (size == 0) {
+ return;
}
- List<Block> polledBlocks = set.pollN(limit);
- // Remove the storage if the set is now empty
- if (set.isEmpty()) {
- remove(storageId);
+
+ for(Map.Entry<String,LightWeightHashSet<Block>> entry : node2blocks.entrySet()) {
+ final LightWeightHashSet<Block> blocks = entry.getValue();
+ if (blocks.size() > 0) {
+ out.println(datanodeManager.getDatanode(entry.getKey()));
+ out.println(blocks);
+ }
}
- numBlocks -= polledBlocks.size();
- return polledBlocks;
}
/** @return a list of the storage IDs. */
@@ -131,22 +134,26 @@ abstract class InvalidateBlocks {
return new ArrayList<String>(node2blocks.keySet());
}
- /**
- * Return the set of to-be-invalidated blocks for a storage.
- */
- synchronized LightWeightHashSet<Block> getBlocks(String storageId) {
- return node2blocks.get(storageId);
- }
+ synchronized List<Block> invalidateWork(
+ final String storageId, final DatanodeDescriptor dn) {
+ final LightWeightHashSet<Block> set = node2blocks.get(storageId);
+ if (set == null) {
+ return null;
+ }
- /**
- * Schedules invalidation work associated with a storage at the corresponding
- * datanode.
- * @param storageId Storage of blocks to be invalidated
- * @param dn Datanode where invalidation work will be scheduled
- * @return List of blocks scheduled for invalidation at the datanode
- */
- abstract List<Block> invalidateWork(final String storageId,
- final DatanodeDescriptor dn);
+ // # blocks that can be sent in one message is limited
+ final int limit = datanodeManager.blockInvalidateLimit;
+ final List<Block> toInvalidate = set.pollN(limit);
+
+ // If we send everything in this message, remove this node entry
+ if (set.isEmpty()) {
+ remove(storageId);
+ }
+
+ dn.addBlocksToBeInvalidated(toInvalidate);
+ numBlocks -= toInvalidate.size();
+ return toInvalidate;
+ }
synchronized void clear() {
node2blocks.clear();
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java Wed Oct 16 22:15:33 2013
@@ -29,27 +29,20 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
-import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.util.Daemon;
-/**
- * PendingReplicationBlocks is used in the BlockManager to track blocks that are
- * currently being replicated on disk and in the CacheReplicationManager to
- * track blocks that are currently being cached.
- *
- * <p>
- * PendingReplicationBlocks performs the following tasks:
- * </p>
- *
- * <ol>
- * <li>tracks in-flight replication or caching requests for a block at target
- * datanodes.</li>
- * <li>identifies requests that have timed out and need to be rescheduled at a
- * different datanode.</li>
- * </ol>
- */
-@InterfaceAudience.LimitedPrivate({"HDFS"})
+/***************************************************
+ * PendingReplicationBlocks does the bookkeeping of all
+ * blocks that are getting replicated.
+ *
+ * It does the following:
+ * 1) record blocks that are getting replicated at this instant.
+ * 2) a coarse grain timer to track age of replication request
+ * 3) a thread that periodically identifies replication-requests
+ * that never made it.
+ *
+ ***************************************************/
class PendingReplicationBlocks {
private static final Log LOG = BlockManager.LOG;
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Wed Oct 16 22:15:33 2013
@@ -628,6 +628,8 @@ class BPOfferService {
case DatanodeProtocol.DNA_FINALIZE:
case DatanodeProtocol.DNA_RECOVERBLOCK:
case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
+ case DatanodeProtocol.DNA_CACHE:
+ case DatanodeProtocol.DNA_UNCACHE:
LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction());
break;
default:
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Wed Oct 16 22:15:33 2013
@@ -21,24 +21,10 @@ package org.apache.hadoop.hdfs.server.da
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils;
@@ -50,10 +36,6 @@ import org.apache.hadoop.fs.ChecksumExce
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Manages caching for an FsDatasetImpl by using the mmap(2) and mlock(2)