You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2008/03/19 01:17:19 UTC
svn commit: r638655 - in /hadoop/core/trunk: ./
src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/
Author: shv
Date: Tue Mar 18 17:17:17 2008
New Revision: 638655
URL: http://svn.apache.org/viewvc?rev=638655&view=rev
Log:
HADOOP-2606. ReplicationMonitor selects data-nodes to replicate directly from needed replication blocks instead of looking up for the blocks for each live data-node. Contributed by Konstantin Shvachko.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Storage.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=638655&r1=638654&r2=638655&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Mar 18 17:17:17 2008
@@ -123,6 +123,10 @@
HADOOP-2423. Code optimization in FSNamesystem.mkdirs.
(Tsz Wo (Nicholas), SZE via dhruba)
+ HADOOP-2606. ReplicationMonitor selects data-nodes to replicate directly
+ from needed replication blocks instead of looking up for the blocks for
+ each live data-node. (shv)
+
BUG FIXES
HADOOP-2195. '-mkdir' behaviour is now closer to Linux shell in case of
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=638655&r1=638654&r2=638655&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Tue Mar 18 17:17:17 2008
@@ -97,7 +97,7 @@
volatile boolean shouldRun = true;
private LinkedList<Block> receivedBlockList = new LinkedList<Block>();
private LinkedList<String> delHints = new LinkedList<String>();
- final private static String EMPTY_DEL_HINT = "";
+ final static String EMPTY_DEL_HINT = "";
int xmitsInProgress = 0;
Daemon dataXceiveServer = null;
ThreadGroup threadGroup = null;
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java?rev=638655&r1=638654&r2=638655&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java Tue Mar 18 17:17:17 2008
@@ -214,30 +214,25 @@
return new BlockIterator(this.blockList, this);
}
- /*
+ /**
* Store block replication work.
*/
- void addBlocksToBeReplicated(Block[] blocklist,
- DatanodeDescriptor[][] targets) {
- assert(blocklist != null && targets != null);
- assert(blocklist.length > 0 && targets.length > 0);
+ void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
+ assert(block != null && targets != null && targets.length > 0);
synchronized (replicateBlocks) {
- assert(blocklist.length == targets.length);
- for (int i = 0; i < blocklist.length; i++) {
- replicateBlocks.add(blocklist[i]);
- replicateTargetSets.add(targets[i]);
- }
+ replicateBlocks.add(block);
+ replicateTargetSets.add(targets);
}
}
- /*
+ /**
* Store block invalidation work.
*/
- void addBlocksToBeInvalidated(Block[] blocklist) {
- assert(blocklist != null && blocklist.length > 0);
+ void addBlocksToBeInvalidated(List<Block> blocklist) {
+ assert(blocklist != null && blocklist.size() > 0);
synchronized (invalidateBlocks) {
- for (int i = 0; i < blocklist.length; i++) {
- invalidateBlocks.add(blocklist[i]);
+ for(Block blk : blocklist) {
+ invalidateBlocks.add(blk);
}
}
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=638655&r1=638654&r2=638655&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue Mar 18 17:17:17 2008
@@ -203,8 +203,11 @@
private long decommissionRecheckInterval;
// default block size of a file
private long defaultBlockSize = 0;
- private int replIndex = 0; // last datanode used for replication work
- static int REPL_WORK_PER_ITERATION = 32; // max percent datanodes per iteration
+
+ /**
+ * Last block index used for replication work.
+ */
+ private int replIndex = 0;
public static FSNamesystem fsNamesystemObject;
private String localMachine;
@@ -392,10 +395,10 @@
"heartbeat.recheck.interval", 5 * 60 * 1000); // 5 minutes
this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
10 * heartbeatInterval;
- this.replicationRecheckInterval = 3 * 1000; // 3 second
- this.replicationRecheckInterval = conf.getInt("dfs.replication.interval", 3) * 1000;
- this.decommissionRecheckInterval = conf.getInt("dfs.namenode.decommission.interval",
- 5 * 60) * 1000;
+ this.replicationRecheckInterval =
+ conf.getInt("dfs.replication.interval", 3) * 1000L;
+ this.decommissionRecheckInterval =
+ conf.getInt("dfs.namenode.decommission.interval", 5 * 60) * 1000L;
this.defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
this.maxFsObjects = conf.getLong("dfs.max.objects", 0);
this.blockInvalidateLimit = Math.max(this.blockInvalidateLimit,
@@ -474,18 +477,14 @@
synchronized (neededReplications) {
out.println("Metasave: Blocks waiting for replication: " +
neededReplications.size());
- if (neededReplications.size() > 0) {
- for (Iterator<Block> it = neededReplications.iterator();
- it.hasNext();) {
- Block block = it.next();
- out.print(block);
- for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
- jt.hasNext();) {
- DatanodeDescriptor node = jt.next();
- out.print(" " + node + " : ");
- }
- out.println("");
+ for (Block block : neededReplications) {
+ out.print(block);
+ for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
+ jt.hasNext();) {
+ DatanodeDescriptor node = jt.next();
+ out.print(" " + node + " : ");
}
+ out.println("");
}
}
@@ -2212,6 +2211,8 @@
* Periodically calls computeReplicationWork().
*/
class ReplicationMonitor implements Runnable {
+ static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
+ static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
public void run() {
while (fsRunning) {
try {
@@ -2219,6 +2220,8 @@
processPendingReplications();
Thread.sleep(replicationRecheckInterval);
} catch (InterruptedException ie) {
+ LOG.warn("ReplicationMonitor thread received InterruptedException." + ie);
+ break;
} catch (IOException ie) {
LOG.warn("ReplicationMonitor thread received exception. " + ie);
} catch (Throwable t) {
@@ -2229,81 +2232,276 @@
}
}
+ /////////////////////////////////////////////////////////
+ //
+ // These methods are called by the Namenode system, to see
+ // if there is any work for registered datanodes.
+ //
+ /////////////////////////////////////////////////////////
/**
- * Look at a few datanodes and compute any replication work that
- * can be scheduled on them. The datanode will be infomed of this
- * work at the next heartbeat.
- */
- void computeDatanodeWork() throws IOException {
- int numiter = 0;
- int foundwork = 0;
- int hsize = 0;
- int lastReplIndex = -1;
+ * Compute block replication and block invalidation work
+ * that can be scheduled on data-nodes.
+ * The datanode will be informed of this work at the next heartbeat.
+ *
+ * @return number of blocks scheduled for replication or removal.
+ */
+ int computeDatanodeWork() throws IOException {
+ int workFound = 0;
+ int blocksToProcess = 0;
+ int nodesToProcess = 0;
+ synchronized(heartbeats) {
+ blocksToProcess = (int)(heartbeats.size()
+ * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
+ nodesToProcess = (int)Math.ceil((double)heartbeats.size()
+ * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100);
+ }
+
+ workFound = computeReplicationWork(blocksToProcess);
+ if(workFound == 0)
+ workFound = computeInvalidateWork(nodesToProcess);
+ return workFound;
+ }
+
+ private int computeInvalidateWork(int nodesToProcess) {
+ int blockCnt = 0;
+ for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) {
+ int work = invalidateWorkForOneNode();
+ if(work == 0)
+ break;
+ blockCnt += work;
+ }
+ return blockCnt;
+ }
- while (true) {
- DatanodeDescriptor node = null;
+ /**
+ * Scan blocks in {@link #neededReplications} and assign replication
+ * work to data-nodes they belong to.
+ *
+ * The number of process blocks equals either twice the number of live
+ * data-nodes or the number of under-replicated blocks whichever is less.
+ *
+ * @return number of blocks scheduled for replication during this iteration.
+ */
+ private synchronized int computeReplicationWork(
+ int blocksToProcess) throws IOException {
+ int scheduledReplicationCount = 0;
+ // blocks should not be replicated or removed if safe mode is on
+ if (isInSafeMode())
+ return scheduledReplicationCount;
- //
- // pick the datanode that was the last one in the
- // previous invocation of this method.
- //
- synchronized (heartbeats) {
- hsize = heartbeats.size();
- if (numiter++ >= hsize) {
- // no change in replIndex.
- if (lastReplIndex >= 0) {
- //next time, start after where the last replication was scheduled
- replIndex = lastReplIndex;
- }
- break;
- }
- if (replIndex >= hsize) {
+ synchronized(neededReplications) {
+ // # of blocks to process equals either twice the number of live
+ // data-nodes or the number of under-replicated blocks whichever is less
+ blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
+ if(blocksToProcess == 0)
+ return scheduledReplicationCount;
+
+ // Go through all blocks that need replications.
+ // Select source and target nodes for replication.
+ Iterator<Block> neededReplicationsIterator = neededReplications.iterator();
+ // skip to the first unprocessed block, which is at replIndex
+ for(int i=0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
+ neededReplicationsIterator.next();
+ }
+ // process blocks
+ for(int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) {
+ if( ! neededReplicationsIterator.hasNext()) {
+ // start from the beginning
replIndex = 0;
- }
- node = heartbeats.get(replIndex);
- replIndex++;
- }
+ blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
+ if(blkCnt >= blocksToProcess)
+ break;
+ neededReplicationsIterator = neededReplications.iterator();
+ assert neededReplicationsIterator.hasNext() :
+ "neededReplications should not be empty.";
+ }
+
+ Block block = neededReplicationsIterator.next();
+
+ // block should belong to a file
+ INodeFile fileINode = blocksMap.getINode(block);
+ if(fileINode == null) { // abandoned block
+ neededReplicationsIterator.remove(); // remove from neededReplications
+ replIndex--;
+ continue;
+ }
+ int requiredReplication = fileINode.getReplication();
+
+ // get a source data-node
+ List<DatanodeDescriptor> containingNodes =
+ new ArrayList<DatanodeDescriptor>();
+ NumberReplicas numReplicas = new NumberReplicas();
+ DatanodeDescriptor srcNode =
+ chooseSourceDatanode(block, containingNodes, numReplicas);
+ if(srcNode == null) // block can not be replicated from any node
+ continue;
+
+ // do not schedule more if enough replicas is already pending
+ int numEffectiveReplicas = numReplicas.liveReplicas() +
+ pendingReplications.getNumReplicas(block);
+ if(numEffectiveReplicas >= requiredReplication) {
+ neededReplicationsIterator.remove(); // remove from neededReplications
+ replIndex--;
+ NameNode.stateChangeLog.info("BLOCK* "
+ + "Removing block " + block.getBlockName()
+ + " from neededReplications as it does not belong to any file.");
+ continue;
+ }
+
+ // choose replication targets
+ int maxTargets =
+ maxReplicationStreams - srcNode.getNumberOfBlocksToBeReplicated();
+ assert maxTargets > 0 : "Datanode " + srcNode.getName()
+ + " should have not been selected as a source for replication.";
+ DatanodeDescriptor targets[] = replicator.chooseTarget(
+ Math.min(requiredReplication - numEffectiveReplicas, maxTargets),
+ srcNode, containingNodes, null, block.getNumBytes());
+ if(targets.length == 0)
+ continue;
+ // Add block to the to be replicated list
+ srcNode.addBlockToBeReplicated(block, targets);
+ scheduledReplicationCount++;
- //
- // Is there replication work to be computed for this datanode?
- //
- int precomputed = node.getNumberOfBlocksToBeReplicated();
- int needed = this.maxReplicationStreams - precomputed;
- boolean doReplication = false;
- boolean doInvalidation = false;
- if (needed > 0) {
- //
- // Compute replication work and store work into the datanode
- //
- Object replsets[] = pendingTransfers(node, needed);
- if (replsets != null) {
- doReplication = true;
- addBlocksToBeReplicated(node, (Block[])replsets[0],
- (DatanodeDescriptor[][])replsets[1]);
- lastReplIndex = replIndex;
+ // Move the block-replication into a "pending" state.
+ // The reason we use 'pending' is so we can retry
+ // replications that fail after an appropriate amount of time.
+ if(numEffectiveReplicas + targets.length >= requiredReplication) {
+ neededReplicationsIterator.remove(); // remove from neededReplications
+ replIndex--;
+ pendingReplications.add(block, targets.length);
+ NameNode.stateChangeLog.debug(
+ "BLOCK* block " + block.getBlockName()
+ + " is moved from neededReplications to pendingReplications");
+ }
+ if (NameNode.stateChangeLog.isInfoEnabled()) {
+ StringBuffer targetList = new StringBuffer("datanode(s)");
+ for (int k = 0; k < targets.length; k++) {
+ targetList.append(' ');
+ targetList.append(targets[k].getName());
+ }
+ NameNode.stateChangeLog.info(
+ "BLOCK* ask "
+ + srcNode.getName() + " to replicate "
+ + block.getBlockName() + " to " + targetList);
+ NameNode.stateChangeLog.debug(
+ "BLOCK* neededReplications = " + neededReplications.size()
+ + " pendingReplications = " + pendingReplications.size());
}
}
- if (!doReplication) {
- //
- // Determine if block deletion is pending for this datanode
- //
- Block blocklist[] = blocksToInvalidate(node);
- if (blocklist != null) {
- doInvalidation = true;
- addBlocksToBeInvalidated(node, blocklist);
- }
+ }
+ return scheduledReplicationCount;
+ }
+
+ /**
+ * Parse the data-nodes the block belongs to and choose one,
+ * which will be the replication source.
+ *
+ * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes
+ * since the former do not have write traffic and hence are less busy.
+ * We do not use already decommissioned nodes as a source.
+ * Otherwise we choose a random node among those that did not reach their
+ * replication limit.
+ *
+ * In addition form a list of all nodes containing the block
+ * and calculate its replication numbers.
+ */
+ private DatanodeDescriptor chooseSourceDatanode(
+ Block block,
+ List<DatanodeDescriptor> containingNodes,
+ NumberReplicas numReplicas) {
+ containingNodes.clear();
+ DatanodeDescriptor srcNode = null;
+ int live = 0;
+ int decommissioned = 0;
+ Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
+ while(it.hasNext()) {
+ DatanodeDescriptor node = it.next();
+ if(!node.isDecommissionInProgress() && !node.isDecommissioned())
+ live++;
+ else
+ decommissioned++;
+ containingNodes.add(node);
+ if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
+ continue; // already reached replication limit
+ // the block must not be scheduled for removal on srcNode
+ Collection<Block> excessBlocks =
+ excessReplicateMap.get(node.getStorageID());
+ if(excessBlocks != null && excessBlocks.contains(block))
+ continue;
+ // never use already decommissioned nodes
+ if(node.isDecommissioned())
+ continue;
+ // we prefer nodes that are in DECOMMISSION_INPROGRESS state
+ if(node.isDecommissionInProgress() || srcNode == null) {
+ srcNode = node;
+ continue;
+ }
+ if(srcNode.isDecommissionInProgress())
+ continue;
+ // switch to a different node randomly
+ // this to prevent from deterministically selecting the same node even
+ // if the node failed to replicate the block on previous iterations
+ if(r.nextBoolean())
+ srcNode = node;
+ }
+ if(numReplicas != null)
+ numReplicas.initialize(live, decommissioned);
+ return srcNode;
+ }
+
+ /**
+ * Get blocks to invalidate for the first node
+ * in {@link #recentInvalidateSets}.
+ *
+ * @return number of blocks scheduled for removal during this iteration.
+ */
+ private synchronized int invalidateWorkForOneNode() {
+ // blocks should not be replicated or removed if safe mode is on
+ if (isInSafeMode())
+ return 0;
+ if(recentInvalidateSets.isEmpty())
+ return 0;
+ // get blocks to invalidate for the first node
+ String firstNodeId = recentInvalidateSets.keySet().iterator().next();
+ assert firstNodeId != null;
+ DatanodeDescriptor dn = datanodeMap.get(firstNodeId);
+ Collection<Block> invalidateSet = recentInvalidateSets.remove(firstNodeId);
+
+ if(invalidateSet == null || dn == null)
+ return 0;
+
+ ArrayList<Block> blocksToInvalidate =
+ new ArrayList<Block>(blockInvalidateLimit);
+
+ // # blocks that can be sent in one message is limited
+ Iterator<Block> it = invalidateSet.iterator();
+ for(int blkCount = 0; blkCount < blockInvalidateLimit && it.hasNext();
+ blkCount++) {
+ blocksToInvalidate.add(it.next());
+ it.remove();
+ }
+
+ // If we could not send everything in this message, reinsert this item
+ // into the collection.
+ if(it.hasNext())
+ recentInvalidateSets.put(firstNodeId, invalidateSet);
+
+ dn.addBlocksToBeInvalidated(blocksToInvalidate);
+
+ if(NameNode.stateChangeLog.isInfoEnabled()) {
+ StringBuffer blockList = new StringBuffer();
+ for(Block blk : blocksToInvalidate) {
+ blockList.append(' ');
+ blockList.append(blk.getBlockName());
}
- if (doReplication || doInvalidation) {
- //
- // If we have already computed work for a predefined
- // number of datanodes in this iteration, then relax
- //
- if (foundwork > ((hsize * REPL_WORK_PER_ITERATION)/100)) {
- break;
- }
- foundwork++;
- }
+ NameNode.stateChangeLog.info("BLOCK* ask "
+ + dn.getName() + " to delete " + blockList);
}
+ return blocksToInvalidate.size();
+ }
+
+ void setNodeReplicationLimit(int limit) {
+ this.maxReplicationStreams = limit;
}
/**
@@ -2326,36 +2524,6 @@
}
/**
- * Add more replication work for this datanode.
- */
- synchronized void addBlocksToBeReplicated(DatanodeDescriptor node,
- Block[] blocklist,
- DatanodeDescriptor[][] targets)
- throws IOException {
- //
- // Find the datanode with the FSNamesystem lock held.
- //
- DatanodeDescriptor n = getDatanode(node);
- if (n != null) {
- n.addBlocksToBeReplicated(blocklist, targets);
- }
- }
-
- /**
- * Add more block invalidation work for this datanode.
- */
- synchronized void addBlocksToBeInvalidated(DatanodeDescriptor node,
- Block[] blocklist) throws IOException {
- //
- // Find the datanode with the FSNamesystem lock held.
- //
- DatanodeDescriptor n = getDatanode(node);
- if (n != null) {
- n.addBlocksToBeInvalidated(blocklist);
- }
- }
-
- /**
* remove a datanode descriptor
* @param nodeID datanode ID
*/
@@ -3125,78 +3293,23 @@
short getMinReplication() { return (short)minReplication; }
short getDefaultReplication() { return (short)defaultReplication; }
- /////////////////////////////////////////////////////////
- //
- // These methods are called by the Namenode system, to see
- // if there is any work for a given datanode.
- //
- /////////////////////////////////////////////////////////
-
- /**
- * Check if there are any recently-deleted blocks a datanode should remove.
- */
- public synchronized Block[] blocksToInvalidate(DatanodeID nodeID) {
- // Ask datanodes to perform block delete
- // only if safe mode is off.
- if (isInSafeMode())
- return null;
-
- Collection<Block> invalidateSet = recentInvalidateSets.remove(
- nodeID.getStorageID());
-
- if (invalidateSet == null) {
- return null;
- }
-
- Iterator<Block> it = null;
- int sendNum = invalidateSet.size();
- ArrayList<Block> sendBlock = new ArrayList<Block>(sendNum);
-
- //
- // calculate the number of blocks that we send in one message
- //
- sendNum = Math.min(sendNum, blockInvalidateLimit);
-
- //
- // Copy the first chunk into sendBlock
- //
- for (it = invalidateSet.iterator(); sendNum > 0; sendNum--) {
- assert(it.hasNext());
- sendBlock.add(it.next());
- it.remove();
- }
-
- //
- // If we could not send everything in this message, reinsert this item
- // into the collection.
- //
- if (it.hasNext()) {
- recentInvalidateSets.put(nodeID.getStorageID(), invalidateSet);
- }
-
- if (NameNode.stateChangeLog.isInfoEnabled()) {
- StringBuffer blockList = new StringBuffer();
- for (int i = 0; i < sendBlock.size(); i++) {
- blockList.append(' ');
- Block block = sendBlock.get(i);
- blockList.append(block.getBlockName());
- }
- NameNode.stateChangeLog.info("BLOCK* NameSystem.blockToInvalidate: "
- +"ask "+nodeID.getName()+" to delete " + blockList);
- }
- return sendBlock.toArray(new Block[sendBlock.size()]);
- }
-
-
/**
* A immutable object that stores the number of live replicas and
* the number of decommissined Replicas.
*/
- static class NumberReplicas {
+ private static class NumberReplicas {
private int liveReplicas;
private int decommissionedReplicas;
+ NumberReplicas() {
+ initialize(0, 0);
+ }
+
NumberReplicas(int live, int decommissioned) {
+ initialize(live, decommissioned);
+ }
+
+ void initialize(int live, int decommissioned) {
liveReplicas = live;
decommissionedReplicas = decommissioned;
}
@@ -3236,32 +3349,6 @@
}
/**
- * Returns a newly allocated list of all nodes. Returns a count of
- * live and decommissioned nodes.
- */
- ArrayList<DatanodeDescriptor> containingNodeList(Block b, NumberReplicas[] numReplicas) {
- ArrayList<DatanodeDescriptor> nodeList =
- new ArrayList<DatanodeDescriptor>();
- int count = 0;
- int live = 0;
- for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b);
- it.hasNext();) {
- DatanodeDescriptor node = it.next();
- if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
- live++;
- }
- else {
- count++;
- }
- nodeList.add(node);
- }
- if (numReplicas != null) {
- numReplicas[0] = new NumberReplicas(live, count);
- }
- return nodeList;
- }
-
- /**
* Return true if there are any blocks on this node that have not
* yet reached their replication factor. Otherwise returns false.
*/
@@ -3318,140 +3405,6 @@
return false;
}
- /**
- * Return with a list of Block/DataNodeInfo sets, indicating
- * where various Blocks should be copied, ASAP.
- *
- * The Array that we return consists of two objects:
- * The 1st elt is an array of Blocks.
- * The 2nd elt is a 2D array of DatanodeDescriptor objs, identifying the
- * target sequence for the Block at the appropriate index.
- *
- */
- public synchronized Object[] pendingTransfers(DatanodeID srcNode,
- int needed) {
- // Ask datanodes to perform block replication
- // only if safe mode is off.
- if (isInSafeMode())
- return null;
-
- synchronized (neededReplications) {
- Object results[] = null;
-
- if (neededReplications.size() > 0) {
- //
- // Go through all blocks that need replications. See if any
- // are present at the current node. If so, ask the node to
- // replicate them.
- //
- List<Block> replicateBlocks = new ArrayList<Block>();
- List<NumberReplicas> numCurrentReplicas = new ArrayList<NumberReplicas>();
- List<DatanodeDescriptor[]> replicateTargetSets;
- replicateTargetSets = new ArrayList<DatanodeDescriptor[]>();
- NumberReplicas[] allReplicas = new NumberReplicas[1];
- for (Iterator<Block> it = neededReplications.iterator(); it.hasNext();) {
- if (needed <= 0) {
- break;
- }
- Block block = it.next();
- long blockSize = block.getNumBytes();
- INodeFile fileINode = blocksMap.getINode(block);
- if (fileINode == null) { // block does not belong to any file
- it.remove();
- } else {
- List<DatanodeDescriptor> containingNodes =
- containingNodeList(block, allReplicas);
- Collection<Block> excessBlocks = excessReplicateMap.get(
- srcNode.getStorageID());
-
- // srcNode must contain the block, and the block must
- // not be scheduled for removal on that node
- if (containingNodes.contains(srcNode)
- && (excessBlocks == null || !excessBlocks.contains(block))) {
- int numCurrentReplica = allReplicas[0].liveReplicas() +
- pendingReplications.getNumReplicas(block);
- NumberReplicas repl = new NumberReplicas(numCurrentReplica,
- allReplicas[0].decommissionedReplicas());
- if (numCurrentReplica >= fileINode.getReplication()) {
- it.remove();
- } else {
- DatanodeDescriptor targets[] = replicator.chooseTarget(
- Math.min(fileINode.getReplication() - numCurrentReplica,
- needed),
- datanodeMap.get(srcNode.getStorageID()),
- containingNodes, null, blockSize);
- if (targets.length > 0) {
- // Build items to return
- replicateBlocks.add(block);
- numCurrentReplicas.add(repl);
- replicateTargetSets.add(targets);
- needed -= targets.length;
- }
- }
- }
- }
- }
-
- //
- // Move the block-replication into a "pending" state.
- // The reason we use 'pending' is so we can retry
- // replications that fail after an appropriate amount of time.
- // (REMIND - mjc - this timer is not yet implemented.)
- //
- if (replicateBlocks.size() > 0) {
- int i = 0;
- for (Iterator<Block> it = replicateBlocks.iterator(); it.hasNext(); i++) {
- Block block = it.next();
- DatanodeDescriptor targets[] = replicateTargetSets.get(i);
- int numCurrentReplica = numCurrentReplicas.get(i).liveReplicas();
- int numExpectedReplica = blocksMap.getINode(block).getReplication();
- if (numCurrentReplica + targets.length >= numExpectedReplica) {
- neededReplications.remove(
- block,
- numCurrentReplica,
- numCurrentReplicas.get(i).decommissionedReplicas(),
- numExpectedReplica);
- pendingReplications.add(block, targets.length);
- NameNode.stateChangeLog.debug(
- "BLOCK* NameSystem.pendingTransfer: "
- + block.getBlockName()
- + " is removed from neededReplications to pendingReplications");
- }
-
- if (NameNode.stateChangeLog.isInfoEnabled()) {
- StringBuffer targetList = new StringBuffer("datanode(s)");
- for (int k = 0; k < targets.length; k++) {
- targetList.append(' ');
- targetList.append(targets[k].getName());
- }
- NameNode.stateChangeLog.info(
- "BLOCK* NameSystem.pendingTransfer: " + "ask "
- + srcNode.getName() + " to replicate "
- + block.getBlockName() + " to " + targetList);
- NameNode.stateChangeLog.debug(
- "BLOCK* neededReplications = " + neededReplications.size()
- + " pendingReplications = " + pendingReplications.size());
- }
- }
-
- //
- // Build returned objects from above lists
- //
- DatanodeDescriptor targetMatrix[][] =
- new DatanodeDescriptor[replicateTargetSets.size()][];
- for (i = 0; i < targetMatrix.length; i++) {
- targetMatrix[i] = replicateTargetSets.get(i);
- }
-
- results = new Object[2];
- results[0] = replicateBlocks.toArray(new Block[replicateBlocks.size()]);
- results[1] = targetMatrix;
- }
- }
- return results;
- }
- }
-
/**
* Keeps track of which datanodes are allowed to connect to the namenode.
*/
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java?rev=638655&r1=638654&r2=638655&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java Tue Mar 18 17:17:17 2008
@@ -100,7 +100,7 @@
/**
* The total number of blocks that are undergoing replication
*/
- long size() {
+ int size() {
return pendingReplications.size();
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Storage.java?rev=638655&r1=638654&r2=638655&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Storage.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Storage.java Tue Mar 18 17:17:17 2008
@@ -298,7 +298,7 @@
return StorageState.NORMAL;
if (hasPrevious)
throw new InconsistentFSStateException(root,
- "version file in current directory it is missing.");
+ "version file in current directory is missing.");
return StorageState.NOT_FORMATTED;
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java?rev=638655&r1=638654&r2=638655&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java Tue Mar 18 17:17:17 2008
@@ -23,7 +23,7 @@
* Blocks have replication priority, with priority 0 indicating the highest
* Blocks have only one replicas has the highest
*/
-class UnderReplicatedBlocks {
+class UnderReplicatedBlocks implements Iterable<Block> {
private static final int LEVEL = 3;
private List<TreeSet<Block>> priorityQueues = new ArrayList<TreeSet<Block>>();
@@ -173,7 +173,7 @@
}
/* return a iterator of all the under replication blocks */
- synchronized Iterator<Block> iterator() {
+ public synchronized Iterator<Block> iterator() {
return new Iterator<Block>() {
private int level;
private List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>();
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java?rev=638655&r1=638654&r2=638655&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java Tue Mar 18 17:17:17 2008
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.dfs;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
@@ -77,6 +79,14 @@
// We do not need many handlers, since each thread simulates a handler
// by calling name-node methods directly
config.setInt("dfs.namenode.handler.count", 1);
+ // set exclude file
+ config.set("dfs.hosts.exclude", "${hadoop.tmp.dir}/dfs/hosts/exclude");
+ File excludeFile = new File(config.get("dfs.hosts.exclude", "exclude"));
+ if(! excludeFile.exists()) {
+ if(!excludeFile.getParentFile().mkdirs())
+ throw new IOException("NNThroughputBenchmark: cannot mkdir " + excludeFile);
+ }
+ new FileOutputStream(excludeFile).close();
// Start the NameNode
String[] args = new String[] {};
nameNode = NameNode.createNameNode(args, config);
@@ -113,6 +123,8 @@
protected long cumulativeTime = 0; // sum of times for each op
protected long elapsedTime = 0; // time from start to finish
+ protected List<StatsDaemon> daemons;
+
/**
* Operation name.
*/
@@ -155,6 +167,11 @@
*/
abstract long executeOp(int daemonId, int inputIdx, String arg1) throws IOException;
+ /**
+ * Print the results of the benchmarking.
+ */
+ abstract void printResults();
+
OperationStatsBase() {
baseDir = BASE_DIR_NAME + "/" + getOpName();
replication = (short) config.getInt("dfs.replication", 3);
@@ -163,7 +180,7 @@
}
void benchmark() throws IOException {
- List<StatsDaemon> daemons = new ArrayList<StatsDaemon>();
+ daemons = new ArrayList<StatsDaemon>();
long start = 0;
try {
numOpsExecuted = 0;
@@ -191,7 +208,7 @@
for(StatsDaemon d : daemons)
d.start();
} finally {
- while(isInPorgress(daemons)) {
+ while(isInPorgress()) {
// try {Thread.sleep(500);} catch (InterruptedException e) {}
}
elapsedTime = System.currentTimeMillis() - start;
@@ -202,9 +219,9 @@
}
}
- private boolean isInPorgress(List<StatsDaemon> daemons) {
+ private boolean isInPorgress() {
for(StatsDaemon d : daemons)
- if(d.isInPorgress())
+ if(d.isInProgress())
return true;
return false;
}
@@ -269,7 +286,7 @@
return false;
}
- void printResults() {
+ void printStats() {
LOG.info("--- " + getOpName() + " stats ---");
LOG.info("# operations: " + getNumOpsExecuted());
LOG.info("Elapsed Time: " + getElapsedTime());
@@ -293,7 +310,6 @@
this.daemonId = daemonId;
this.opsPerThread = nrOps;
this.statsOp = op;
- // this.clientName = statsOp.getClientName(daemonId);
setName(toString());
}
@@ -322,9 +338,16 @@
}
}
- boolean isInPorgress() {
+ boolean isInProgress() {
return localNumOpsExecuted < opsPerThread;
}
+
+ /**
+ * Schedule to stop this daemon.
+ */
+ void terminate() {
+ opsPerThread = localNumOpsExecuted;
+ }
}
/**
@@ -379,10 +402,8 @@
long fNum = fileCount % filesPerDirectory;
if(fNum == 0) {
currentDir = getNextDirName();
- // System.out.println("currentDir: " + currentDir);
}
String fn = currentDir + "/" + FILE_NAME_PREFFIX + fileCount;
- // System.out.println("getNextFileName(): " + fn + " fileCount = " + fileCount);
fileCount++;
return fn;
}
@@ -481,7 +502,7 @@
LOG.info("nrFiles = " + numOpsRequired);
LOG.info("nrThreads = " + numThreads);
LOG.info("nrFilesPerDir = " + nameGenerator.filesPerDirectory);
- super.printResults();
+ printStats();
}
}
@@ -538,6 +559,8 @@
private static class TinyDatanode implements Comparable<String> {
private static final long DF_CAPACITY = 100*1024*1024;
private static final long DF_USED = 0;
+
+ NamespaceInfo nsInfo;
DatanodeRegistration dnRegistration;
Block[] blocks;
int nrBlocks; // actual number of blocks
@@ -563,15 +586,23 @@
this.nrBlocks = 0;
}
+ String getName() {
+ return dnRegistration.getName();
+ }
+
void register() throws IOException {
// get versions from the namenode
- NamespaceInfo nsInfo = nameNode.versionRequest();
+ nsInfo = nameNode.versionRequest();
dnRegistration.setStorageInfo(new DataStorage(nsInfo, ""));
DataNode.setNewStorageID(dnRegistration);
// register datanode
dnRegistration = nameNode.register(dnRegistration);
}
+ /**
+ * Send a heartbeat to the name-node.
+ * Ignore reply commands.
+ */
void sendHeartbeat() throws IOException {
// register datanode
DatanodeCommand cmd = nameNode.sendHeartbeat(
@@ -597,7 +628,46 @@
}
public int compareTo(String name) {
- return dnRegistration.getName().compareTo(name);
+ return getName().compareTo(name);
+ }
+
+ /**
+ * Send a heartbeat to the name-node and replicate blocks if requested.
+ */
+ int replicateBlocks() throws IOException {
+ // register datanode
+ DatanodeCommand cmd = nameNode.sendHeartbeat(
+ dnRegistration, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, 0, 0);
+ if(cmd == null || cmd.getAction() != DatanodeProtocol.DNA_TRANSFER)
+ return 0;
+ // Send a copy of a block to another datanode
+ BlockCommand bcmd = (BlockCommand)cmd;
+ return transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
+ }
+
+ /**
+ * Transfer blocks to another data-node.
+ * Just report on behalf of the other data-node
+ * that the blocks have been received.
+ */
+ private int transferBlocks( Block blocks[],
+ DatanodeInfo xferTargets[][]
+ ) throws IOException {
+ for(int i = 0; i < blocks.length; i++) {
+ DatanodeInfo blockTargets[] = xferTargets[i];
+ for(int t = 0; t < blockTargets.length; t++) {
+ DatanodeInfo dnInfo = blockTargets[t];
+ DatanodeRegistration receivedDNReg;
+ receivedDNReg = new DatanodeRegistration(dnInfo.getName());
+ receivedDNReg.setStorageInfo(
+ new DataStorage(nsInfo, dnInfo.getStorageID()));
+ receivedDNReg.setInfoPort(dnInfo.getInfoPort());
+ nameNode.blockReceived( receivedDNReg,
+ new Block[] {blocks[i]},
+ new String[] {DataNode.EMPTY_DEL_HINT});
+ }
+ }
+ return blocks.length;
}
}
@@ -611,7 +681,8 @@
class BlockReportStats extends OperationStatsBase {
static final String OP_BLOCK_REPORT_NAME = "blockReport";
static final String OP_BLOCK_REPORT_USAGE =
- "-op blockReport [-datanodes T] [-reports R] [-blocksPerReport B] [-blocksPerFile F]";
+ "-op blockReport [-datanodes T] [-reports N] " +
+ "[-blocksPerReport B] [-blocksPerFile F]";
private int blocksPerReport;
private int blocksPerFile;
@@ -670,10 +741,10 @@
for(int idx=0; idx < nrDatanodes; idx++) {
datanodes[idx] = new TinyDatanode(idx, blocksPerReport);
datanodes[idx].register();
- assert datanodes[idx].dnRegistration.getName().compareTo(prevDNName) > 0
+ assert datanodes[idx].getName().compareTo(prevDNName) > 0
: "Data-nodes must be sorted lexicographically.";
datanodes[idx].sendHeartbeat();
- prevDNName = datanodes[idx].dnRegistration.getName();
+ prevDNName = datanodes[idx].getName();
}
int numResolved = 0;
DatanodeInfo[] dnInfos = nameNode.getDatanodeReport(DatanodeReportType.ALL);
@@ -741,14 +812,6 @@
return end-start;
}
- /**
- * Defines data-node name since client are data-nodes in this case.
- */
- @Override
- String getClientName(int idx) {
- return getOpName() + "-client-" + idx;
- }
-
void printResults() {
String blockDistribution = "";
String delim = "(";
@@ -762,9 +825,164 @@
LOG.info("datanodes = " + numThreads + " " + blockDistribution);
LOG.info("blocksPerReport = " + blocksPerReport);
LOG.info("blocksPerFile = " + blocksPerFile);
- super.printResults();
+ printStats();
}
- }
+ } // end BlockReportStats
+
+ /**
+ * Measures how fast replication monitor can compute data-node work.
+ *
+ * It runs only one thread until no more work can be scheduled.
+ */
+ class ReplicationStats extends OperationStatsBase {
+ static final String OP_REPLICATION_NAME = "replication";
+ static final String OP_REPLICATION_USAGE =
+ "-op replication [-datanodes T] [-nodesToDecommission D] " +
+ "[-nodeReplicationLimit C] [-totalBlocks B] [-replication R]";
+
+ private BlockReportStats blockReportObject;
+ private int numDatanodes;
+ private int nodesToDecommission;
+ private int nodeReplicationLimit;
+ private int totalBlocks;
+ private int numDecommissionedBlocks;
+ private int numPendingBlocks;
+
+ ReplicationStats(String[] args) {
+ super();
+ numThreads = 1;
+ numDatanodes = 3;
+ nodesToDecommission = 1;
+ nodeReplicationLimit = 100;
+ totalBlocks = 100;
+ parseArguments(args);
+ // number of operations is 4 times the number of decommissioned
+ // blocks divided by the number of needed replications scanned
+ // by the replication monitor in one iteration
+ numOpsRequired = (totalBlocks*replication*nodesToDecommission*2)
+ / (numDatanodes*numDatanodes);
+
+ String[] blkReportArgs = {
+ "-op", "blockReport",
+ "-datanodes", String.valueOf(numDatanodes),
+ "-blocksPerReport", String.valueOf(totalBlocks*replication/numDatanodes),
+ "-blocksPerFile", String.valueOf(numDatanodes)};
+ blockReportObject = new BlockReportStats(blkReportArgs);
+ numDecommissionedBlocks = 0;
+ numPendingBlocks = 0;
+ }
+
+ String getOpName() {
+ return OP_REPLICATION_NAME;
+ }
+
+ void parseArguments(String[] args) {
+ boolean ignoreUnrelatedOptions = verifyOpArgument(args);
+ for (int i = 2; i < args.length; i++) { // parse command line
+ if(args[i].equals("-datanodes")) {
+ if(i+1 == args.length) printUsage();
+ numDatanodes = Integer.parseInt(args[++i]);
+ } else if(args[i].equals("-nodesToDecommission")) {
+ if(i+1 == args.length) printUsage();
+ nodesToDecommission = Integer.parseInt(args[++i]);
+ } else if(args[i].equals("-nodeReplicationLimit")) {
+ if(i+1 == args.length) printUsage();
+ nodeReplicationLimit = Integer.parseInt(args[++i]);
+ } else if(args[i].equals("-totalBlocks")) {
+ if(i+1 == args.length) printUsage();
+ totalBlocks = Integer.parseInt(args[++i]);
+ } else if(args[i].equals("-replication")) {
+ if(i+1 == args.length) printUsage();
+ replication = Short.parseShort(args[++i]);
+ } else if(!ignoreUnrelatedOptions)
+ printUsage();
+ }
+ }
+
+ void generateInputs(int[] ignore) throws IOException {
+ // start data-nodes; create a bunch of files; generate block reports.
+ blockReportObject.generateInputs(ignore);
+ // stop replication monitor
+ nameNode.namesystem.replthread.interrupt();
+ try {
+ nameNode.namesystem.replthread.join();
+ } catch(InterruptedException ei) {
+ return;
+ }
+ // report blocks once
+ int nrDatanodes = blockReportObject.getNumDatanodes();
+ for(int idx=0; idx < nrDatanodes; idx++) {
+ blockReportObject.executeOp(idx, 0, null);
+ }
+ // decommission data-nodes
+ decommissionNodes();
+ // set node replication limit
+ nameNode.namesystem.setNodeReplicationLimit(nodeReplicationLimit);
+ }
+
+ private void decommissionNodes() throws IOException {
+ String excludeFN = config.get("dfs.hosts.exclude", "exclude");
+ FileOutputStream excludeFile = new FileOutputStream(excludeFN);
+ excludeFile.getChannel().truncate(0L);
+ int nrDatanodes = blockReportObject.getNumDatanodes();
+ numDecommissionedBlocks = 0;
+ for(int i=0; i < nodesToDecommission; i++) {
+ TinyDatanode dn = blockReportObject.datanodes[nrDatanodes-1-i];
+ numDecommissionedBlocks += dn.nrBlocks;
+ excludeFile.write(dn.getName().getBytes());
+ excludeFile.write('\n');
+ LOG.info("Datanode " + dn.getName() + " is decommissioned.");
+ }
+ excludeFile.close();
+ nameNode.refreshNodes();
+ }
+
+ /**
+ * Does not require the argument
+ */
+ String getExecutionArgument(int daemonId) {
+ return null;
+ }
+
+ long executeOp(int daemonId, int inputIdx, String ignore) throws IOException {
+ assert daemonId < numThreads : "Wrong daemonId.";
+ long start = System.currentTimeMillis();
+ // compute datanode work
+ int work = nameNode.namesystem.computeDatanodeWork();
+ long end = System.currentTimeMillis();
+ numPendingBlocks += work;
+ if(work == 0)
+ daemons.get(daemonId).terminate();
+ return end-start;
+ }
+
+ void printResults() {
+ String blockDistribution = "";
+ String delim = "(";
+ int totalReplicas = 0;
+ for(int idx=0; idx < blockReportObject.getNumDatanodes(); idx++) {
+ totalReplicas += blockReportObject.datanodes[idx].nrBlocks;
+ blockDistribution += delim + blockReportObject.datanodes[idx].nrBlocks;
+ delim = ", ";
+ }
+ blockDistribution += ")";
+ LOG.info("--- " + getOpName() + " inputs ---");
+ LOG.info("numOpsRequired = " + numOpsRequired);
+ LOG.info("datanodes = " + numDatanodes + " " + blockDistribution);
+ LOG.info("decommissioned datanodes = " + nodesToDecommission);
+ LOG.info("datanode replication limit = " + nodeReplicationLimit);
+ LOG.info("total blocks = " + totalBlocks);
+ printStats();
+ LOG.info("decommissioned blocks = " + numDecommissionedBlocks);
+ LOG.info("pending replications = " + numPendingBlocks);
+ LOG.info("replications per sec: " + getBlocksPerSecond());
+ }
+
+ private double getBlocksPerSecond() {
+ return elapsedTime == 0 ? 0 : 1000*(double)numPendingBlocks / elapsedTime;
+ }
+
+ } // end ReplicationStats
static void printUsage() {
System.err.println("Usage: NNThroughputBenchmark"
@@ -772,6 +990,7 @@
+ " | \n\t" + CreateFileStats.OP_CREATE_USAGE
+ " | \n\t" + OpenFileStats.OP_OPEN_USAGE
+ " | \n\t" + BlockReportStats.OP_BLOCK_REPORT_USAGE
+ + " | \n\t" + ReplicationStats.OP_REPLICATION_USAGE
);
System.exit(-1);
}
@@ -802,6 +1021,10 @@
}
if(runAll || BlockReportStats.OP_BLOCK_REPORT_NAME.equals(type)) {
opStat = bench.new BlockReportStats(args);
+ ops.add(opStat);
+ }
+ if(runAll || ReplicationStats.OP_REPLICATION_NAME.equals(type)) {
+ opStat = bench.new ReplicationStats(args);
ops.add(opStat);
}
if(ops.size() == 0)