You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by vi...@apache.org on 2013/10/30 23:22:22 UTC
svn commit: r1537330 [5/11] - in
/hadoop/common/branches/YARN-321/hadoop-hdfs-project: ./
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/
hadoop-hdfs-nfs/
hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/
hadoop-hdfs-nfs/sr...
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Oct 30 22:21:59 2013
@@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -74,6 +75,7 @@ import org.apache.hadoop.hdfs.util.Light
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
@@ -89,9 +91,6 @@ public class BlockManager {
static final Log LOG = LogFactory.getLog(BlockManager.class);
public static final Log blockLog = NameNode.blockStateChangeLog;
- /** Default load factor of map */
- public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
-
private static final String QUEUE_REASON_CORRUPT_STATE =
"it has the wrong state or generation stamp";
@@ -243,7 +242,8 @@ public class BlockManager {
invalidateBlocks = new InvalidateBlocks(datanodeManager);
// Compute the map capacity by allocating 2% of total memory
- blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
+ blocksMap = new BlocksMap(
+ LightWeightGSet.computeCapacity(2.0, "BlocksMap"));
blockplacement = BlockPlacementPolicy.getInstance(
conf, stats, datanodeManager.getNetworkTopology());
pendingReplications = new PendingReplicationBlocks(conf.getInt(
@@ -1258,22 +1258,19 @@ public class BlockManager {
namesystem.writeUnlock();
}
- HashMap<Node, Node> excludedNodes
- = new HashMap<Node, Node>();
+ final Set<Node> excludedNodes = new HashSet<Node>();
for(ReplicationWork rw : work){
// Exclude all of the containing nodes from being targets.
// This list includes decommissioning or corrupt nodes.
excludedNodes.clear();
for (DatanodeDescriptor dn : rw.containingNodes) {
- excludedNodes.put(dn, dn);
+ excludedNodes.add(dn);
}
// choose replication targets: NOT HOLDING THE GLOBAL LOCK
// It is costly to extract the filename for which chooseTargets is called,
// so for now we pass in the block collection itself.
- rw.targets = blockplacement.chooseTarget(rw.bc,
- rw.additionalReplRequired, rw.srcNode, rw.liveReplicaNodes,
- excludedNodes, rw.block.getNumBytes());
+ rw.chooseTargets(blockplacement, excludedNodes);
}
namesystem.writeLock();
@@ -1383,12 +1380,12 @@ public class BlockManager {
*
* @throws IOException
* if the number of targets < minimum replication.
- * @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor,
- * List, boolean, HashMap, long)
+ * @see BlockPlacementPolicy#chooseTarget(String, int, Node,
+ * List, boolean, Set, long)
*/
public DatanodeDescriptor[] chooseTarget(final String src,
final int numOfReplicas, final DatanodeDescriptor client,
- final HashMap<Node, Node> excludedNodes,
+ final Set<Node> excludedNodes,
final long blocksize, List<String> favoredNodes) throws IOException {
List<DatanodeDescriptor> favoredDatanodeDescriptors =
getDatanodeDescriptors(favoredNodes);
@@ -1788,6 +1785,14 @@ public class BlockManager {
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
node, iblk, reportedState);
+ // OpenFileBlocks only inside snapshots also will be added to safemode
+ // threshold. So we need to update such blocks to safemode
+ // refer HDFS-5283
+ BlockInfoUnderConstruction blockUC = (BlockInfoUnderConstruction) storedBlock;
+ if (namesystem.isInSnapshot(blockUC)) {
+ int numOfReplicas = blockUC.getNumExpectedLocations();
+ namesystem.incrementSafeBlockCount(numOfReplicas);
+ }
//and fall through to next clause
}
//add replica if appropriate
@@ -3238,6 +3243,13 @@ assert storedBlock.findDatanode(dn) < 0
this.priority = priority;
this.targets = null;
}
+
+ private void chooseTargets(BlockPlacementPolicy blockplacement,
+ Set<Node> excludedNodes) {
+ targets = blockplacement.chooseTarget(bc.getName(),
+ additionalReplRequired, srcNode, liveReplicaNodes, false,
+ excludedNodes, block.getNumBytes());
+ }
}
/**
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Wed Oct 30 22:21:59 2013
@@ -19,14 +19,15 @@ package org.apache.hadoop.hdfs.server.bl
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -55,25 +56,6 @@ public abstract class BlockPlacementPoli
* choose <i>numOfReplicas</i> data nodes for <i>writer</i>
* to re-replicate a block with size <i>blocksize</i>
* If not, return as many as we can.
- *
- * @param srcPath the file to which this chooseTargets is being invoked.
- * @param numOfReplicas additional number of replicas wanted.
- * @param writer the writer's machine, null if not in the cluster.
- * @param chosenNodes datanodes that have been chosen as targets.
- * @param blocksize size of the data to be written.
- * @return array of DatanodeDescriptor instances chosen as target
- * and sorted as a pipeline.
- */
- abstract DatanodeDescriptor[] chooseTarget(String srcPath,
- int numOfReplicas,
- DatanodeDescriptor writer,
- List<DatanodeDescriptor> chosenNodes,
- long blocksize);
-
- /**
- * choose <i>numOfReplicas</i> data nodes for <i>writer</i>
- * to re-replicate a block with size <i>blocksize</i>
- * If not, return as many as we can.
*
* @param srcPath the file to which this chooseTargets is being invoked.
* @param numOfReplicas additional number of replicas wanted.
@@ -87,48 +69,22 @@ public abstract class BlockPlacementPoli
*/
public abstract DatanodeDescriptor[] chooseTarget(String srcPath,
int numOfReplicas,
- DatanodeDescriptor writer,
+ Node writer,
List<DatanodeDescriptor> chosenNodes,
boolean returnChosenNodes,
- HashMap<Node, Node> excludedNodes,
+ Set<Node> excludedNodes,
long blocksize);
-
- /**
- * choose <i>numOfReplicas</i> data nodes for <i>writer</i>
- * If not, return as many as we can.
- * The base implemenatation extracts the pathname of the file from the
- * specified srcBC, but this could be a costly operation depending on the
- * file system implementation. Concrete implementations of this class should
- * override this method to avoid this overhead.
- *
- * @param srcBC block collection of file for which chooseTarget is invoked.
- * @param numOfReplicas additional number of replicas wanted.
- * @param writer the writer's machine, null if not in the cluster.
- * @param chosenNodes datanodes that have been chosen as targets.
- * @param blocksize size of the data to be written.
- * @return array of DatanodeDescriptor instances chosen as target
- * and sorted as a pipeline.
- */
- DatanodeDescriptor[] chooseTarget(BlockCollection srcBC,
- int numOfReplicas,
- DatanodeDescriptor writer,
- List<DatanodeDescriptor> chosenNodes,
- HashMap<Node, Node> excludedNodes,
- long blocksize) {
- return chooseTarget(srcBC.getName(), numOfReplicas, writer,
- chosenNodes, false, excludedNodes, blocksize);
- }
/**
- * Same as {@link #chooseTarget(String, int, DatanodeDescriptor, List, boolean,
- * HashMap, long)} with added parameter {@code favoredDatanodes}
+ * Same as {@link #chooseTarget(String, int, Node, List, boolean,
+ * Set, long)} with added parameter {@code favoredDatanodes}
* @param favoredNodes datanodes that should be favored as targets. This
* is only a hint and due to cluster state, namenode may not be
* able to place the blocks on these datanodes.
*/
DatanodeDescriptor[] chooseTarget(String src,
- int numOfReplicas, DatanodeDescriptor writer,
- HashMap<Node, Node> excludedNodes,
+ int numOfReplicas, Node writer,
+ Set<Node> excludedNodes,
long blocksize, List<DatanodeDescriptor> favoredNodes) {
// This class does not provide the functionality of placing
// a block in favored datanodes. The implementations of this class
@@ -139,18 +95,17 @@ public abstract class BlockPlacementPoli
}
/**
- * Verify that the block is replicated on at least minRacks different racks
- * if there is more than minRacks rack in the system.
+ * Verify if the block's placement meets requirement of placement policy,
+ * i.e. replicas are placed on no less than minRacks racks in the system.
*
* @param srcPath the full pathname of the file to be verified
* @param lBlk block with locations
- * @param minRacks number of racks the block should be replicated to
- * @return the difference between the required and the actual number of racks
- * the block is replicated to.
+ * @param numOfReplicas replica number of file to be verified
+ * @return the result of verification
*/
- abstract public int verifyBlockPlacement(String srcPath,
- LocatedBlock lBlk,
- int minRacks);
+ abstract public BlockPlacementStatus verifyBlockPlacement(String srcPath,
+ LocatedBlock lBlk,
+ int numOfReplicas);
/**
* Decide whether deleting the specified replica of the block still makes
* the block conform to the configured block placement policy.
@@ -183,7 +138,7 @@ public abstract class BlockPlacementPoli
/**
* Get an instance of the configured Block Placement Policy based on the
- * value of the configuration paramater dfs.block.replicator.classname.
+ * the configuration property {@link DFS_BLOCK_REPLICATOR_CLASSNAME_KEY}.
*
* @param conf the configuration to be used
* @param stats an object that is used to retrieve the load on the cluster
@@ -193,12 +148,12 @@ public abstract class BlockPlacementPoli
public static BlockPlacementPolicy getInstance(Configuration conf,
FSClusterStats stats,
NetworkTopology clusterMap) {
- Class<? extends BlockPlacementPolicy> replicatorClass =
- conf.getClass("dfs.block.replicator.classname",
- BlockPlacementPolicyDefault.class,
- BlockPlacementPolicy.class);
- BlockPlacementPolicy replicator = (BlockPlacementPolicy) ReflectionUtils.newInstance(
- replicatorClass, conf);
+ final Class<? extends BlockPlacementPolicy> replicatorClass = conf.getClass(
+ DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+ DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT,
+ BlockPlacementPolicy.class);
+ final BlockPlacementPolicy replicator = ReflectionUtils.newInstance(
+ replicatorClass, conf);
replicator.initialize(conf, stats, clusterMap);
return replicator;
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Wed Oct 30 22:21:59 2013
@@ -21,8 +21,7 @@ import static org.apache.hadoop.util.Tim
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
@@ -55,7 +54,15 @@ public class BlockPlacementPolicyDefault
private static final String enableDebugLogging =
"For more information, please enable DEBUG log level on "
- + LOG.getClass().getName();
+ + BlockPlacementPolicy.class.getName();
+
+ private static final ThreadLocal<StringBuilder> debugLoggingBuilder
+ = new ThreadLocal<StringBuilder>() {
+ @Override
+ protected StringBuilder initialValue() {
+ return new StringBuilder();
+ }
+ };
protected boolean considerLoad;
private boolean preferLocalNode = true;
@@ -95,40 +102,25 @@ public class BlockPlacementPolicyDefault
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
}
- protected ThreadLocal<StringBuilder> threadLocalBuilder =
- new ThreadLocal<StringBuilder>() {
- @Override
- protected StringBuilder initialValue() {
- return new StringBuilder();
- }
- };
-
- @Override
- public DatanodeDescriptor[] chooseTarget(String srcPath,
- int numOfReplicas,
- DatanodeDescriptor writer,
- List<DatanodeDescriptor> chosenNodes,
- long blocksize) {
- return chooseTarget(numOfReplicas, writer, chosenNodes, false,
- null, blocksize);
- }
-
@Override
public DatanodeDescriptor[] chooseTarget(String srcPath,
int numOfReplicas,
- DatanodeDescriptor writer,
+ Node writer,
List<DatanodeDescriptor> chosenNodes,
boolean returnChosenNodes,
- HashMap<Node, Node> excludedNodes,
+ Set<Node> excludedNodes,
long blocksize) {
return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
excludedNodes, blocksize);
}
@Override
- DatanodeDescriptor[] chooseTarget(String src, int numOfReplicas,
- DatanodeDescriptor writer, HashMap<Node, Node> excludedNodes,
- long blocksize, List<DatanodeDescriptor> favoredNodes) {
+ DatanodeDescriptor[] chooseTarget(String src,
+ int numOfReplicas,
+ Node writer,
+ Set<Node> excludedNodes,
+ long blocksize,
+ List<DatanodeDescriptor> favoredNodes) {
try {
if (favoredNodes == null || favoredNodes.size() == 0) {
// Favored nodes not specified, fall back to regular block placement.
@@ -137,8 +129,8 @@ public class BlockPlacementPolicyDefault
excludedNodes, blocksize);
}
- HashMap<Node, Node> favoriteAndExcludedNodes = excludedNodes == null ?
- new HashMap<Node, Node>() : new HashMap<Node, Node>(excludedNodes);
+ Set<Node> favoriteAndExcludedNodes = excludedNodes == null ?
+ new HashSet<Node>() : new HashSet<Node>(excludedNodes);
// Choose favored nodes
List<DatanodeDescriptor> results = new ArrayList<DatanodeDescriptor>();
@@ -157,10 +149,10 @@ public class BlockPlacementPolicyDefault
+ " with favored node " + favoredNode);
continue;
}
- favoriteAndExcludedNodes.put(target, target);
+ favoriteAndExcludedNodes.add(target);
}
- if (results.size() < numOfReplicas) {
+ if (results.size() < numOfReplicas) {
// Not enough favored nodes, choose other nodes.
numOfReplicas -= results.size();
DatanodeDescriptor[] remainingTargets =
@@ -181,18 +173,18 @@ public class BlockPlacementPolicyDefault
}
/** This is the implementation. */
- DatanodeDescriptor[] chooseTarget(int numOfReplicas,
- DatanodeDescriptor writer,
+ private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+ Node writer,
List<DatanodeDescriptor> chosenNodes,
boolean returnChosenNodes,
- HashMap<Node, Node> excludedNodes,
+ Set<Node> excludedNodes,
long blocksize) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
- return new DatanodeDescriptor[0];
+ return DatanodeDescriptor.EMPTY_ARRAY;
}
if (excludedNodes == null) {
- excludedNodes = new HashMap<Node, Node>();
+ excludedNodes = new HashSet<Node>();
}
int[] result = getMaxNodesPerRack(chosenNodes, numOfReplicas);
@@ -204,16 +196,15 @@ public class BlockPlacementPolicyDefault
for (DatanodeDescriptor node:chosenNodes) {
// add localMachine and related nodes to excludedNodes
addToExcludedNodes(node, excludedNodes);
- adjustExcludedNodes(excludedNodes, node);
}
if (!clusterMap.contains(writer)) {
- writer=null;
+ writer = null;
}
boolean avoidStaleNodes = (stats != null
&& stats.isAvoidingStaleDataNodesForWrite());
- DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
+ Node localNode = chooseTarget(numOfReplicas, writer,
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
if (!returnChosenNodes) {
results.removeAll(chosenNodes);
@@ -236,10 +227,20 @@ public class BlockPlacementPolicyDefault
return new int[] {numOfReplicas, maxNodesPerRack};
}
- /* choose <i>numOfReplicas</i> from all data nodes */
- private DatanodeDescriptor chooseTarget(int numOfReplicas,
- DatanodeDescriptor writer,
- HashMap<Node, Node> excludedNodes,
+ /**
+ * choose <i>numOfReplicas</i> from all data nodes
+ * @param numOfReplicas additional number of replicas wanted
+ * @param writer the writer's machine, could be a non-DatanodeDescriptor node
+ * @param excludedNodes datanodes that should not be considered as targets
+ * @param blocksize size of the data to be written
+ * @param maxNodesPerRack max nodes allowed per rack
+ * @param results the target nodes already chosen
+ * @param avoidStaleNodes avoid stale nodes in replica choosing
+ * @return local node of writer (not chosen node)
+ */
+ private Node chooseTarget(int numOfReplicas,
+ Node writer,
+ Set<Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results,
@@ -251,13 +252,13 @@ public class BlockPlacementPolicyDefault
int numOfResults = results.size();
boolean newBlock = (numOfResults==0);
- if (writer == null && !newBlock) {
+ if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) {
writer = results.get(0);
}
// Keep a copy of original excludedNodes
- final HashMap<Node, Node> oldExcludedNodes = avoidStaleNodes ?
- new HashMap<Node, Node>(excludedNodes) : null;
+ final Set<Node> oldExcludedNodes = avoidStaleNodes ?
+ new HashSet<Node>(excludedNodes) : null;
try {
if (numOfResults == 0) {
writer = chooseLocalNode(writer, excludedNodes, blocksize,
@@ -304,7 +305,7 @@ public class BlockPlacementPolicyDefault
// We need to additionally exclude the nodes that were added to the
// result list in the successful calls to choose*() above.
for (Node node : results) {
- oldExcludedNodes.put(node, node);
+ oldExcludedNodes.add(node);
}
// Set numOfReplicas, since it can get out of sync with the result list
// if the NotEnoughReplicasException was thrown in chooseRandom().
@@ -316,33 +317,30 @@ public class BlockPlacementPolicyDefault
return writer;
}
- /* choose <i>localMachine</i> as the target.
+ /**
+ * Choose <i>localMachine</i> as the target.
* if <i>localMachine</i> is not available,
* choose a node on the same rack
* @return the chosen node
*/
- protected DatanodeDescriptor chooseLocalNode(
- DatanodeDescriptor localMachine,
- HashMap<Node, Node> excludedNodes,
+ protected DatanodeDescriptor chooseLocalNode(Node localMachine,
+ Set<Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
- throws NotEnoughReplicasException {
+ throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
- if (preferLocalNode) {
+ if (preferLocalNode && localMachine instanceof DatanodeDescriptor) {
+ DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
// otherwise try local machine first
- Node oldNode = excludedNodes.put(localMachine, localMachine);
- if (oldNode == null) { // was not in the excluded list
- if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false,
- results, avoidStaleNodes)) {
- results.add(localMachine);
- // add localMachine and related nodes to excludedNode
- addToExcludedNodes(localMachine, excludedNodes);
- return localMachine;
+ if (excludedNodes.add(localMachine)) { // was not in the excluded list
+ if (addIfIsGoodTarget(localDatanode, excludedNodes, blocksize,
+ maxNodesPerRack, false, results, avoidStaleNodes) >= 0) {
+ return localDatanode;
}
}
}
@@ -358,26 +356,25 @@ public class BlockPlacementPolicyDefault
* @return number of new excluded nodes
*/
protected int addToExcludedNodes(DatanodeDescriptor localMachine,
- HashMap<Node, Node> excludedNodes) {
- Node node = excludedNodes.put(localMachine, localMachine);
- return node == null?1:0;
+ Set<Node> excludedNodes) {
+ return excludedNodes.add(localMachine) ? 1 : 0;
}
- /* choose one node from the rack that <i>localMachine</i> is on.
+ /**
+ * Choose one node from the rack that <i>localMachine</i> is on.
* if no such node is available, choose one node from the rack where
* a second replica is on.
* if still no such node is available, choose a random node
* in the cluster.
* @return the chosen node
*/
- protected DatanodeDescriptor chooseLocalRack(
- DatanodeDescriptor localMachine,
- HashMap<Node, Node> excludedNodes,
+ protected DatanodeDescriptor chooseLocalRack(Node localMachine,
+ Set<Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
- throws NotEnoughReplicasException {
+ throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
@@ -391,9 +388,7 @@ public class BlockPlacementPolicyDefault
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
- for(Iterator<DatanodeDescriptor> iter=results.iterator();
- iter.hasNext();) {
- DatanodeDescriptor nextNode = iter.next();
+ for(DatanodeDescriptor nextNode : results) {
if (nextNode != localMachine) {
newLocal = nextNode;
break;
@@ -416,7 +411,8 @@ public class BlockPlacementPolicyDefault
}
}
- /* choose <i>numOfReplicas</i> nodes from the racks
+ /**
+ * Choose <i>numOfReplicas</i> nodes from the racks
* that <i>localMachine</i> is NOT on.
* if not enough nodes are available, choose the remaining ones
* from the local rack
@@ -424,12 +420,12 @@ public class BlockPlacementPolicyDefault
protected void chooseRemoteRack(int numOfReplicas,
DatanodeDescriptor localMachine,
- HashMap<Node, Node> excludedNodes,
+ Set<Node> excludedNodes,
long blocksize,
int maxReplicasPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
- throws NotEnoughReplicasException {
+ throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
// randomly choose one node from remote racks
try {
@@ -443,91 +439,58 @@ public class BlockPlacementPolicyDefault
}
}
- /* Randomly choose one target from <i>nodes</i>.
- * @return the chosen node
+ /**
+ * Randomly choose one target from the given <i>scope</i>.
+ * @return the chosen node, if there is any.
*/
- protected DatanodeDescriptor chooseRandom(
- String nodes,
- HashMap<Node, Node> excludedNodes,
- long blocksize,
- int maxNodesPerRack,
- List<DatanodeDescriptor> results,
- boolean avoidStaleNodes)
- throws NotEnoughReplicasException {
- int numOfAvailableNodes =
- clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
- StringBuilder builder = null;
- if (LOG.isDebugEnabled()) {
- builder = threadLocalBuilder.get();
- builder.setLength(0);
- builder.append("[");
- }
- boolean badTarget = false;
- while(numOfAvailableNodes > 0) {
- DatanodeDescriptor chosenNode =
- (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
-
- Node oldNode = excludedNodes.put(chosenNode, chosenNode);
- if (oldNode == null) { // chosenNode was not in the excluded list
- numOfAvailableNodes--;
- if (isGoodTarget(chosenNode, blocksize,
- maxNodesPerRack, results, avoidStaleNodes)) {
- results.add(chosenNode);
- // add chosenNode and related nodes to excludedNode
- addToExcludedNodes(chosenNode, excludedNodes);
- adjustExcludedNodes(excludedNodes, chosenNode);
- return chosenNode;
- } else {
- badTarget = true;
- }
- }
- }
-
- String detail = enableDebugLogging;
- if (LOG.isDebugEnabled()) {
- if (badTarget && builder != null) {
- detail = builder.append("]").toString();
- builder.setLength(0);
- } else detail = "";
- }
- throw new NotEnoughReplicasException(detail);
+ protected DatanodeDescriptor chooseRandom(String scope,
+ Set<Node> excludedNodes,
+ long blocksize,
+ int maxNodesPerRack,
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes)
+ throws NotEnoughReplicasException {
+ return chooseRandom(1, scope, excludedNodes, blocksize, maxNodesPerRack,
+ results, avoidStaleNodes);
}
-
- /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
+
+ /**
+ * Randomly choose <i>numOfReplicas</i> targets from the given <i>scope</i>.
+ * @return the first chosen node, if there is any.
*/
- protected void chooseRandom(int numOfReplicas,
- String nodes,
- HashMap<Node, Node> excludedNodes,
+ protected DatanodeDescriptor chooseRandom(int numOfReplicas,
+ String scope,
+ Set<Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
- throws NotEnoughReplicasException {
+ throws NotEnoughReplicasException {
- int numOfAvailableNodes =
- clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
+ int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(
+ scope, excludedNodes);
StringBuilder builder = null;
if (LOG.isDebugEnabled()) {
- builder = threadLocalBuilder.get();
+ builder = debugLoggingBuilder.get();
builder.setLength(0);
builder.append("[");
}
boolean badTarget = false;
+ DatanodeDescriptor firstChosen = null;
while(numOfReplicas > 0 && numOfAvailableNodes > 0) {
DatanodeDescriptor chosenNode =
- (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
- Node oldNode = excludedNodes.put(chosenNode, chosenNode);
- if (oldNode == null) {
+ (DatanodeDescriptor)clusterMap.chooseRandom(scope);
+ if (excludedNodes.add(chosenNode)) { //was not in the excluded list
numOfAvailableNodes--;
- if (isGoodTarget(chosenNode, blocksize,
- maxNodesPerRack, results, avoidStaleNodes)) {
+ int newExcludedNodes = addIfIsGoodTarget(chosenNode, excludedNodes,
+ blocksize, maxNodesPerRack, considerLoad, results, avoidStaleNodes);
+ if (newExcludedNodes >= 0) {
numOfReplicas--;
- results.add(chosenNode);
- // add chosenNode and related nodes to excludedNode
- int newExcludedNodes = addToExcludedNodes(chosenNode, excludedNodes);
+ if (firstChosen == null) {
+ firstChosen = chosenNode;
+ }
numOfAvailableNodes -= newExcludedNodes;
- adjustExcludedNodes(excludedNodes, chosenNode);
} else {
badTarget = true;
}
@@ -544,34 +507,44 @@ public class BlockPlacementPolicyDefault
}
throw new NotEnoughReplicasException(detail);
}
+
+ return firstChosen;
}
-
+
/**
- * After choosing a node to place replica, adjust excluded nodes accordingly.
- * It should do nothing here as chosenNode is already put into exlcudeNodes,
- * but it can be overridden in subclass to put more related nodes into
- * excludedNodes.
- *
- * @param excludedNodes
- * @param chosenNode
- */
- protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
- Node chosenNode) {
- // do nothing here.
+ * If the given node is a good target, add it to the result list and
+ * update the set of excluded nodes.
+ * @return -1 if the given is not a good target;
+ * otherwise, return the number of nodes added to excludedNodes set.
+ */
+ int addIfIsGoodTarget(DatanodeDescriptor node,
+ Set<Node> excludedNodes,
+ long blockSize,
+ int maxNodesPerRack,
+ boolean considerLoad,
+ List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes) {
+ if (isGoodTarget(node, blockSize, maxNodesPerRack, considerLoad,
+ results, avoidStaleNodes)) {
+ results.add(node);
+ // add node and related nodes to excludedNode
+ return addToExcludedNodes(node, excludedNodes);
+ } else {
+ return -1;
+ }
}
- /* judge if a node is a good target.
- * return true if <i>node</i> has enough space,
- * does not have too much load, and the rack does not have too many nodes
- */
- private boolean isGoodTarget(DatanodeDescriptor node,
- long blockSize, int maxTargetPerRack,
- List<DatanodeDescriptor> results,
- boolean avoidStaleNodes) {
- return isGoodTarget(node, blockSize, maxTargetPerRack, this.considerLoad,
- results, avoidStaleNodes);
+ private static void logNodeIsNotChosen(DatanodeDescriptor node, String reason) {
+ if (LOG.isDebugEnabled()) {
+ // build the error message for later use.
+ debugLoggingBuilder.get()
+ .append(node).append(": ")
+ .append("Node ").append(NodeBase.getPath(node))
+ .append(" is not chosen because ")
+ .append(reason);
+ }
}
-
+
/**
* Determine if a node is a good target.
*
@@ -588,28 +561,20 @@ public class BlockPlacementPolicyDefault
* does not have too much load,
* and the rack does not have too many nodes.
*/
- protected boolean isGoodTarget(DatanodeDescriptor node,
+ private boolean isGoodTarget(DatanodeDescriptor node,
long blockSize, int maxTargetPerRack,
boolean considerLoad,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes) {
// check if the node is (being) decommissed
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
- if(LOG.isDebugEnabled()) {
- threadLocalBuilder.get().append(node.toString()).append(": ")
- .append("Node ").append(NodeBase.getPath(node))
- .append(" is not chosen because the node is (being) decommissioned ");
- }
+ logNodeIsNotChosen(node, "the node is (being) decommissioned ");
return false;
}
if (avoidStaleNodes) {
if (node.isStale(this.staleInterval)) {
- if (LOG.isDebugEnabled()) {
- threadLocalBuilder.get().append(node.toString()).append(": ")
- .append("Node ").append(NodeBase.getPath(node))
- .append(" is not chosen because the node is stale ");
- }
+ logNodeIsNotChosen(node, "the node is stale ");
return false;
}
}
@@ -618,11 +583,7 @@ public class BlockPlacementPolicyDefault
(node.getBlocksScheduled() * blockSize);
// check the remaining capacity of the target machine
if (blockSize* HdfsConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
- if(LOG.isDebugEnabled()) {
- threadLocalBuilder.get().append(node.toString()).append(": ")
- .append("Node ").append(NodeBase.getPath(node))
- .append(" is not chosen because the node does not have enough space ");
- }
+ logNodeIsNotChosen(node, "the node does not have enough space ");
return false;
}
@@ -634,11 +595,7 @@ public class BlockPlacementPolicyDefault
avgLoad = (double)stats.getTotalLoad()/size;
}
if (node.getXceiverCount() > (2.0 * avgLoad)) {
- if(LOG.isDebugEnabled()) {
- threadLocalBuilder.get().append(node.toString()).append(": ")
- .append("Node ").append(NodeBase.getPath(node))
- .append(" is not chosen because the node is too busy ");
- }
+ logNodeIsNotChosen(node, "the node is too busy ");
return false;
}
}
@@ -646,31 +603,25 @@ public class BlockPlacementPolicyDefault
// check if the target rack has chosen too many nodes
String rackname = node.getNetworkLocation();
int counter=1;
- for(Iterator<DatanodeDescriptor> iter = results.iterator();
- iter.hasNext();) {
- Node result = iter.next();
+ for(Node result : results) {
if (rackname.equals(result.getNetworkLocation())) {
counter++;
}
}
if (counter>maxTargetPerRack) {
- if(LOG.isDebugEnabled()) {
- threadLocalBuilder.get().append(node.toString()).append(": ")
- .append("Node ").append(NodeBase.getPath(node))
- .append(" is not chosen because the rack has too many chosen nodes ");
- }
+ logNodeIsNotChosen(node, "the rack has too many chosen nodes ");
return false;
}
return true;
}
- /* Return a pipeline of nodes.
+ /**
+ * Return a pipeline of nodes.
* The pipeline is formed finding a shortest path that
* starts from the writer and traverses all <i>nodes</i>
* This is basically a traveling salesman problem.
*/
- private DatanodeDescriptor[] getPipeline(
- DatanodeDescriptor writer,
+ private DatanodeDescriptor[] getPipeline(Node writer,
DatanodeDescriptor[] nodes) {
if (nodes.length==0) return nodes;
@@ -704,44 +655,38 @@ public class BlockPlacementPolicyDefault
}
@Override
- public int verifyBlockPlacement(String srcPath,
- LocatedBlock lBlk,
- int minRacks) {
+ public BlockPlacementStatus verifyBlockPlacement(String srcPath,
+ LocatedBlock lBlk, int numberOfReplicas) {
DatanodeInfo[] locs = lBlk.getLocations();
if (locs == null)
- locs = new DatanodeInfo[0];
+ locs = DatanodeDescriptor.EMPTY_ARRAY;
int numRacks = clusterMap.getNumOfRacks();
if(numRacks <= 1) // only one rack
- return 0;
- minRacks = Math.min(minRacks, numRacks);
+ return new BlockPlacementStatusDefault(
+ Math.min(numRacks, numberOfReplicas), numRacks);
+ int minRacks = Math.min(2, numberOfReplicas);
// 1. Check that all locations are different.
// 2. Count locations on different racks.
Set<String> racks = new TreeSet<String>();
for (DatanodeInfo dn : locs)
racks.add(dn.getNetworkLocation());
- return minRacks - racks.size();
+ return new BlockPlacementStatusDefault(racks.size(), minRacks);
}
@Override
public DatanodeDescriptor chooseReplicaToDelete(BlockCollection bc,
- Block block,
- short replicationFactor,
- Collection<DatanodeDescriptor> first,
- Collection<DatanodeDescriptor> second) {
+ Block block, short replicationFactor,
+ Collection<DatanodeDescriptor> first,
+ Collection<DatanodeDescriptor> second) {
long oldestHeartbeat =
now() - heartbeatInterval * tolerateHeartbeatMultiplier;
DatanodeDescriptor oldestHeartbeatNode = null;
long minSpace = Long.MAX_VALUE;
DatanodeDescriptor minSpaceNode = null;
- // pick replica from the first Set. If first is empty, then pick replicas
- // from second set.
- Iterator<DatanodeDescriptor> iter = pickupReplicaSet(first, second);
-
// Pick the node with the oldest heartbeat or with the least free space,
// if all hearbeats are within the tolerable heartbeat interval
- while (iter.hasNext() ) {
- DatanodeDescriptor node = iter.next();
+ for(DatanodeDescriptor node : pickupReplicaSet(first, second)) {
long free = node.getRemaining();
long lastHeartbeat = node.getLastUpdate();
if(lastHeartbeat < oldestHeartbeat) {
@@ -762,12 +707,10 @@ public class BlockPlacementPolicyDefault
* replica while second set contains remaining replica nodes.
* So pick up first set if not empty. If first is empty, then pick second.
*/
- protected Iterator<DatanodeDescriptor> pickupReplicaSet(
+ protected Collection<DatanodeDescriptor> pickupReplicaSet(
Collection<DatanodeDescriptor> first,
Collection<DatanodeDescriptor> second) {
- Iterator<DatanodeDescriptor> iter =
- first.isEmpty() ? second.iterator() : first.iterator();
- return iter;
+ return first.isEmpty() ? second : first;
}
@VisibleForTesting
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Wed Oct 30 22:21:59 2013
@@ -20,9 +20,9 @@ package org.apache.hadoop.hdfs.server.bl
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -63,31 +63,25 @@ public class BlockPlacementPolicyWithNod
* @return the chosen node
*/
@Override
- protected DatanodeDescriptor chooseLocalNode(
- DatanodeDescriptor localMachine,
- HashMap<Node, Node> excludedNodes,
- long blocksize,
- int maxNodesPerRack,
- List<DatanodeDescriptor> results,
- boolean avoidStaleNodes)
+ protected DatanodeDescriptor chooseLocalNode(Node localMachine,
+ Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
+ List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes);
- // otherwise try local machine first
- Node oldNode = excludedNodes.put(localMachine, localMachine);
- if (oldNode == null) { // was not in the excluded list
- if (isGoodTarget(localMachine, blocksize,
- maxNodesPerRack, false, results, avoidStaleNodes)) {
- results.add(localMachine);
- // Nodes under same nodegroup should be excluded.
- addNodeGroupToExcludedNodes(excludedNodes,
- localMachine.getNetworkLocation());
- return localMachine;
+ if (localMachine instanceof DatanodeDescriptor) {
+ DatanodeDescriptor localDataNode = (DatanodeDescriptor)localMachine;
+ // otherwise try local machine first
+ if (excludedNodes.add(localMachine)) { // was not in the excluded list
+ if (addIfIsGoodTarget(localDataNode, excludedNodes, blocksize,
+ maxNodesPerRack, false, results, avoidStaleNodes) >= 0) {
+ return localDataNode;
+ }
}
- }
+ }
// try a node on local node group
DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
@@ -105,34 +99,10 @@ public class BlockPlacementPolicyWithNod
* {@inheritDoc}
*/
@Override
- protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
- Node chosenNode) {
- // as node-group aware implementation, it should make sure no two replica
- // are placing on the same node group.
- addNodeGroupToExcludedNodes(excludedNodes, chosenNode.getNetworkLocation());
- }
-
- // add all nodes under specific nodegroup to excludedNodes.
- private void addNodeGroupToExcludedNodes(HashMap<Node, Node> excludedNodes,
- String nodeGroup) {
- List<Node> leafNodes = clusterMap.getLeaves(nodeGroup);
- for (Node node : leafNodes) {
- excludedNodes.put(node, node);
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- protected DatanodeDescriptor chooseLocalRack(
- DatanodeDescriptor localMachine,
- HashMap<Node, Node> excludedNodes,
- long blocksize,
- int maxNodesPerRack,
- List<DatanodeDescriptor> results,
- boolean avoidStaleNodes)
- throws NotEnoughReplicasException {
+ protected DatanodeDescriptor chooseLocalRack(Node localMachine,
+ Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
+ List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+ throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes,
@@ -148,9 +118,7 @@ public class BlockPlacementPolicyWithNod
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
- for(Iterator<DatanodeDescriptor> iter=results.iterator();
- iter.hasNext();) {
- DatanodeDescriptor nextNode = iter.next();
+ for(DatanodeDescriptor nextNode : results) {
if (nextNode != localMachine) {
newLocal = nextNode;
break;
@@ -181,13 +149,9 @@ public class BlockPlacementPolicyWithNod
*/
@Override
protected void chooseRemoteRack(int numOfReplicas,
- DatanodeDescriptor localMachine,
- HashMap<Node, Node> excludedNodes,
- long blocksize,
- int maxReplicasPerRack,
- List<DatanodeDescriptor> results,
- boolean avoidStaleNodes)
- throws NotEnoughReplicasException {
+ DatanodeDescriptor localMachine, Set<Node> excludedNodes,
+ long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results,
+ boolean avoidStaleNodes) throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
final String rackLocation = NetworkTopology.getFirstHalf(
@@ -210,10 +174,11 @@ public class BlockPlacementPolicyWithNod
* if still no such node is available, choose a random node in the cluster.
* @return the chosen node
*/
- private DatanodeDescriptor chooseLocalNodeGroup(NetworkTopologyWithNodeGroup clusterMap,
- DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes, long blocksize,
- int maxNodesPerRack, List<DatanodeDescriptor> results, boolean avoidStaleNodes)
- throws NotEnoughReplicasException {
+ private DatanodeDescriptor chooseLocalNodeGroup(
+ NetworkTopologyWithNodeGroup clusterMap, Node localMachine,
+ Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
+ List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+ throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes,
@@ -227,9 +192,7 @@ public class BlockPlacementPolicyWithNod
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
- for(Iterator<DatanodeDescriptor> iter=results.iterator();
- iter.hasNext();) {
- DatanodeDescriptor nextNode = iter.next();
+ for(DatanodeDescriptor nextNode : results) {
if (nextNode != localMachine) {
newLocal = nextNode;
break;
@@ -264,14 +227,14 @@ public class BlockPlacementPolicyWithNod
* within the same nodegroup
* @return number of new excluded nodes
*/
- protected int addToExcludedNodes(DatanodeDescriptor localMachine,
- HashMap<Node, Node> excludedNodes) {
+ @Override
+ protected int addToExcludedNodes(DatanodeDescriptor chosenNode,
+ Set<Node> excludedNodes) {
int countOfExcludedNodes = 0;
- String nodeGroupScope = localMachine.getNetworkLocation();
+ String nodeGroupScope = chosenNode.getNetworkLocation();
List<Node> leafNodes = clusterMap.getLeaves(nodeGroupScope);
for (Node leafNode : leafNodes) {
- Node node = excludedNodes.put(leafNode, leafNode);
- if (node == null) {
+ if (excludedNodes.add(leafNode)) {
// not a existing node in excludedNodes
countOfExcludedNodes++;
}
@@ -290,12 +253,12 @@ public class BlockPlacementPolicyWithNod
* If first is empty, then pick second.
*/
@Override
- public Iterator<DatanodeDescriptor> pickupReplicaSet(
+ public Collection<DatanodeDescriptor> pickupReplicaSet(
Collection<DatanodeDescriptor> first,
Collection<DatanodeDescriptor> second) {
// If no replica within same rack, return directly.
if (first.isEmpty()) {
- return second.iterator();
+ return second;
}
// Split data nodes in the first set into two sets,
// moreThanOne contains nodes on nodegroup with more than one replica
@@ -328,9 +291,7 @@ public class BlockPlacementPolicyWithNod
}
}
- Iterator<DatanodeDescriptor> iter =
- moreThanOne.isEmpty() ? exactlyOne.iterator() : moreThanOne.iterator();
- return iter;
+ return moreThanOne.isEmpty()? exactlyOne : moreThanOne;
}
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java Wed Oct 30 22:21:59 2013
@@ -57,11 +57,11 @@ class BlocksMap {
/** Constant {@link LightWeightGSet} capacity. */
private final int capacity;
- private volatile GSet<Block, BlockInfo> blocks;
+ private GSet<Block, BlockInfo> blocks;
- BlocksMap(final float loadFactor) {
+ BlocksMap(int capacity) {
// Use 2% of total memory to size the GSet capacity
- this.capacity = LightWeightGSet.computeCapacity(2.0, "BlocksMap");
+ this.capacity = capacity;
this.blocks = new LightWeightGSet<Block, BlockInfo>(capacity);
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Wed Oct 30 22:21:59 2013
@@ -42,7 +42,8 @@ import org.apache.hadoop.util.Time;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class DatanodeDescriptor extends DatanodeInfo {
-
+ public static final DatanodeDescriptor[] EMPTY_ARRAY = {};
+
// Stores status of decommissioning.
// If node is not decommissioning, do not use this object for anything.
public DecommissioningStatus decommissioningStatus = new DecommissioningStatus();
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Wed Oct 30 22:21:59 2013
@@ -17,21 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
-import static org.apache.hadoop.util.Time.now;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.net.InetAddresses;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -41,13 +29,8 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.namenode.HostFileManager;
import org.apache.hadoop.hdfs.server.namenode.HostFileManager.Entry;
@@ -55,32 +38,23 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.HostFileManager.MutableEntrySet;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
-import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
-import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.util.CyclicIteration;
import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.net.CachedDNSToSwitchMapping;
-import org.apache.hadoop.net.DNSToSwitchMapping;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.*;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
-import org.apache.hadoop.net.Node;
-import org.apache.hadoop.net.NodeBase;
-import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.net.InetAddresses;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+
+import static org.apache.hadoop.util.Time.now;
/**
* Manage datanodes, include decommission and other activities.
@@ -127,6 +101,8 @@ public class DatanodeManager {
private final int defaultInfoPort;
+ private final int defaultInfoSecurePort;
+
private final int defaultIpcPort;
/** Read include/exclude files*/
@@ -166,6 +142,7 @@ public class DatanodeManager {
*/
private boolean hasClusterEverBeenMultiRack = false;
+ private final boolean checkIpHostnameInRegistration;
/**
* The number of datanodes for each software version. This list should change
* during rolling upgrades.
@@ -188,7 +165,10 @@ public class DatanodeManager {
DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
this.defaultInfoPort = NetUtils.createSocketAddr(
conf.get(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY,
- DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT)).getPort();
+ DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT)).getPort();
+ this.defaultInfoSecurePort = NetUtils.createSocketAddr(
+ conf.get(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY,
+ DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT)).getPort();
this.defaultIpcPort = NetUtils.createSocketAddr(
conf.get(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
@@ -231,6 +211,12 @@ public class DatanodeManager {
LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
+ "=" + this.blockInvalidateLimit);
+ this.checkIpHostnameInRegistration = conf.getBoolean(
+ DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY,
+ DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT);
+ LOG.info(DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY
+ + "=" + checkIpHostnameInRegistration);
+
this.avoidStaleDataNodesForRead = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY,
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT);
@@ -753,11 +739,13 @@ public class DatanodeManager {
// Mostly called inside an RPC, update ip and peer hostname
String hostname = dnAddress.getHostName();
String ip = dnAddress.getHostAddress();
- if (!isNameResolved(dnAddress)) {
+ if (checkIpHostnameInRegistration && !isNameResolved(dnAddress)) {
// Reject registration of unresolved datanode to prevent performance
// impact of repetitive DNS lookups later.
- LOG.warn("Unresolved datanode registration from " + ip);
- throw new DisallowedDatanodeException(nodeReg);
+ final String message = "hostname cannot be resolved (ip="
+ + ip + ", hostname=" + hostname + ")";
+ LOG.warn("Unresolved datanode registration: " + message);
+ throw new DisallowedDatanodeException(nodeReg, message);
}
// update node registration with the ip and hostname from rpc request
nodeReg.setIpAddr(ip);
@@ -886,7 +874,12 @@ public class DatanodeManager {
// If the network location is invalid, clear the cached mappings
// so that we have a chance to re-add this DataNode with the
// correct network location later.
- dnsToSwitchMapping.reloadCachedMappings();
+ List<String> invalidNodeNames = new ArrayList<String>(3);
+ // clear cache for nodes in IP or Hostname
+ invalidNodeNames.add(nodeReg.getIpAddr());
+ invalidNodeNames.add(nodeReg.getHostName());
+ invalidNodeNames.add(nodeReg.getPeerHostName());
+ dnsToSwitchMapping.reloadCachedMappings(invalidNodeNames);
throw e;
}
}
@@ -1122,6 +1115,7 @@ public class DatanodeManager {
// The IP:port is sufficient for listing in a report
dnId = new DatanodeID(hostStr, "", "", port,
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
+ DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
} else {
String ipAddr = "";
@@ -1132,6 +1126,7 @@ public class DatanodeManager {
}
dnId = new DatanodeID(ipAddr, hostStr, "", port,
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
+ DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
}
return dnId;
@@ -1179,7 +1174,7 @@ public class DatanodeManager {
new DatanodeDescriptor(new DatanodeID(entry.getIpAddress(),
entry.getPrefix(), "",
entry.getPort() == 0 ? defaultXferPort : entry.getPort(),
- defaultInfoPort, defaultIpcPort));
+ defaultInfoPort, defaultInfoSecurePort, defaultIpcPort));
dn.setLastUpdate(0); // Consider this node dead for reporting
nodes.add(dn);
}
@@ -1198,17 +1193,17 @@ public class DatanodeManager {
/**
* Checks if name resolution was successful for the given address. If IP
* address and host name are the same, then it means name resolution has
- * failed. As a special case, the loopback address is also considered
+ * failed. As a special case, local addresses are also considered
* acceptable. This is particularly important on Windows, where 127.0.0.1 does
* not resolve to "localhost".
*
* @param address InetAddress to check
- * @return boolean true if name resolution successful or address is loopback
+ * @return boolean true if name resolution successful or address is local
*/
private static boolean isNameResolved(InetAddress address) {
String hostname = address.getHostName();
String ip = address.getHostAddress();
- return !hostname.equals(ip) || address.isLoopbackAddress();
+ return !hostname.equals(ip) || NetUtils.isLocalAddress(address);
}
private void setDatanodeDead(DatanodeDescriptor node) {
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Wed Oct 30 22:21:59 2013
@@ -18,24 +18,7 @@
package org.apache.hadoop.hdfs.server.common;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.URL;
-import java.net.URLEncoder;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.TreeSet;
-
-import javax.servlet.ServletContext;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.jsp.JspWriter;
+import com.google.common.base.Charsets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -47,13 +30,9 @@ import org.apache.hadoop.hdfs.BlockReade
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@@ -74,10 +53,22 @@ import org.apache.hadoop.security.author
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.VersionInfo;
-import com.google.common.base.Charsets;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.jsp.JspWriter;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.*;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
@InterfaceAudience.Private
public class JspHelper {
@@ -112,7 +103,7 @@ public class JspHelper {
return super.hashCode();
}
}
-
+
// compare two records based on their frequency
private static class NodeRecordComparator implements Comparator<NodeRecord> {
@@ -126,6 +117,27 @@ public class JspHelper {
return 0;
}
}
+
+ /**
+ * A helper class that generates the correct URL for different schema.
+ *
+ */
+ public static final class Url {
+ public static String authority(String scheme, DatanodeID d) {
+ if (scheme.equals("http")) {
+ return d.getInfoAddr();
+ } else if (scheme.equals("https")) {
+ return d.getInfoSecureAddr();
+ } else {
+ throw new IllegalArgumentException("Unknown scheme:" + scheme);
+ }
+ }
+
+ public static String url(String scheme, DatanodeID d) {
+ return scheme + "://" + authority(scheme, d);
+ }
+ }
+
public static DatanodeInfo bestNode(LocatedBlocks blks, Configuration conf)
throws IOException {
HashMap<DatanodeInfo, NodeRecord> map =
@@ -217,7 +229,7 @@ public class JspHelper {
offsetIntoBlock, amtToRead, true,
"JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
new DatanodeID(addr.getAddress().getHostAddress(),
- addr.getHostName(), poolId, addr.getPort(), 0, 0), null,
+ addr.getHostName(), poolId, addr.getPort(), 0, 0, 0), null,
null, null, false, CachingStrategy.newDefaultStrategy());
final byte[] buf = new byte[amtToRead];
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Wed Oct 30 22:21:59 2013
@@ -100,6 +100,7 @@ class BlockPoolSliceScanner {
private long currentPeriodStart = Time.now();
private long bytesLeft = 0; // Bytes to scan in this period
private long totalBytesToScan = 0;
+ private boolean isNewPeriod = true;
private final LogFileHandler verificationLog;
@@ -126,7 +127,10 @@ class BlockPoolSliceScanner {
public int compare(BlockScanInfo left, BlockScanInfo right) {
final long l = left.lastScanTime;
final long r = right.lastScanTime;
- return l < r? -1: l > r? 1: 0;
+ // compare blocks itself if scantimes are same to avoid.
+ // because TreeMap uses comparator if available to check existence of
+ // the object.
+ return l < r? -1: l > r? 1: left.compareTo(right);
}
};
@@ -148,8 +152,6 @@ class BlockPoolSliceScanner {
public boolean equals(Object that) {
if (this == that) {
return true;
- } else if (that == null || !(that instanceof BlockScanInfo)) {
- return false;
}
return super.equals(that);
}
@@ -539,10 +541,12 @@ class BlockPoolSliceScanner {
entry.genStamp));
if (info != null) {
if (processedBlocks.get(entry.blockId) == null) {
- updateBytesLeft(-info.getNumBytes());
+ if (isNewPeriod) {
+ updateBytesLeft(-info.getNumBytes());
+ }
processedBlocks.put(entry.blockId, 1);
}
- if (logIterator.isPrevious()) {
+ if (logIterator.isLastReadFromPrevious()) {
// write the log entry to current file
// so that the entry is preserved for later runs.
verificationLog.append(entry.verificationTime, entry.genStamp,
@@ -557,6 +561,7 @@ class BlockPoolSliceScanner {
} finally {
IOUtils.closeStream(logIterator);
}
+ isNewPeriod = false;
}
@@ -597,6 +602,7 @@ class BlockPoolSliceScanner {
// reset the byte counts :
bytesLeft = totalBytesToScan;
currentPeriodStart = Time.now();
+ isNewPeriod = true;
}
private synchronized boolean workRemainingInCurrentPeriod() {
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Oct 30 22:21:59 2013
@@ -18,66 +18,10 @@
package org.apache.hadoop.hdfs.server.datanode;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTPS_ENABLE_KEY;
-import static org.apache.hadoop.util.ExitUtil.terminate;
-
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.SocketChannel;
-import java.security.PrivilegedExceptionAction;
-import java.util.AbstractList;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.BlockingService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -95,38 +39,15 @@ import org.apache.hadoop.hdfs.HDFSPolicy
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.net.DomainPeerServer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
-import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.datatransfer.*;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
-import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
-import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
-import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
-import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.protocolPB.*;
+import org.apache.hadoop.hdfs.security.token.block.*;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
-import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
@@ -141,11 +62,7 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
-import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.http.HttpServer;
@@ -166,24 +83,21 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.*;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.JvmPauseMonitor;
-import org.apache.hadoop.util.ServicePlugin;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.BlockingService;
+import java.io.*;
+import java.net.*;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
+import java.security.PrivilegedExceptionAction;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.util.ExitUtil.terminate;
/**********************************************************
* DataNode is a class (and program) that stores a set of
@@ -265,6 +179,7 @@ public class DataNode extends Configured
private volatile boolean heartbeatsDisabledForTests = false;
private DataStorage storage = null;
private HttpServer infoServer = null;
+ private int infoSecurePort;
DataNodeMetrics metrics;
private InetSocketAddress streamingAddr;
@@ -389,16 +304,13 @@ public class DataNode extends Configured
InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
String infoHost = infoSocAddr.getHostName();
int tmpInfoPort = infoSocAddr.getPort();
- this.infoServer = (secureResources == null)
- ? new HttpServer.Builder().setName("datanode")
- .setBindAddress(infoHost).setPort(tmpInfoPort)
- .setFindPort(tmpInfoPort == 0).setConf(conf)
- .setACL(new AccessControlList(conf.get(DFS_ADMIN, " "))).build()
- : new HttpServer.Builder().setName("datanode")
- .setBindAddress(infoHost).setPort(tmpInfoPort)
- .setFindPort(tmpInfoPort == 0).setConf(conf)
- .setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
- .setConnector(secureResources.getListener()).build();
+ HttpServer.Builder builder = new HttpServer.Builder().setName("datanode")
+ .setBindAddress(infoHost).setPort(tmpInfoPort)
+ .setFindPort(tmpInfoPort == 0).setConf(conf)
+ .setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")));
+ this.infoServer = (secureResources == null) ? builder.build() :
+ builder.setConnector(secureResources.getListener()).build();
+
LOG.info("Opened info server at " + infoHost + ":" + tmpInfoPort);
if (conf.getBoolean(DFS_HTTPS_ENABLE_KEY, false)) {
boolean needClientAuth = conf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
@@ -412,6 +324,7 @@ public class DataNode extends Configured
if(LOG.isDebugEnabled()) {
LOG.debug("Datanode listening for SSL on " + secInfoSocAddr);
}
+ infoSecurePort = secInfoSocAddr.getPort();
}
this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
this.infoServer.addInternalServlet(null, "/getFileChecksum/*",
@@ -780,7 +693,8 @@ public class DataNode extends Configured
}
DatanodeID dnId = new DatanodeID(
streamingAddr.getAddress().getHostAddress(), hostName,
- getStorageId(), getXferPort(), getInfoPort(), getIpcPort());
+ getStorageId(), getXferPort(), getInfoPort(),
+ infoSecurePort, getIpcPort());
return new DatanodeRegistration(dnId, storageInfo,
new ExportedBlockKeys(), VersionInfo.getVersion());
}
@@ -878,7 +792,7 @@ public class DataNode extends Configured
* If this is the first block pool to register, this also initializes
* the datanode-scoped storage.
*
- * @param nsInfo the handshake response from the NN.
+ * @param bpos Block pool offer service
* @throws IOException if the NN is inconsistent with the local storage.
*/
void initBlockPool(BPOfferService bpos) throws IOException {
@@ -1403,15 +1317,13 @@ public class DataNode extends Configured
int numTargets = xferTargets.length;
if (numTargets > 0) {
- if (LOG.isInfoEnabled()) {
- StringBuilder xfersBuilder = new StringBuilder();
- for (int i = 0; i < numTargets; i++) {
- xfersBuilder.append(xferTargets[i]);
- xfersBuilder.append(" ");
- }
- LOG.info(bpReg + " Starting thread to transfer " +
- block + " to " + xfersBuilder);
+ StringBuilder xfersBuilder = new StringBuilder();
+ for (int i = 0; i < numTargets; i++) {
+ xfersBuilder.append(xferTargets[i]);
+ xfersBuilder.append(" ");
}
+ LOG.info(bpReg + " Starting thread to transfer " +
+ block + " to " + xfersBuilder);
new Daemon(new DataTransfer(xferTargets, block,
BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
@@ -2336,6 +2248,13 @@ public class DataNode extends Configured
}
/**
+ * @return the datanode's https port
+ */
+ public int getInfoSecurePort() {
+ return infoSecurePort;
+ }
+
+ /**
* Returned information is a JSON representation of a map with
* name node host name as the key and block pool Id as the value.
* Note that, if there are multiple NNs in an NA nameservice,