You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cu...@apache.org on 2014/08/20 03:34:47 UTC
svn commit: r1619019 [5/11] - in
/hadoop/common/branches/YARN-1051/hadoop-hdfs-project:
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/
hadoop-hdfs-httpfs/src/main/ja...
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Wed Aug 20 01:34:29 2014
@@ -18,19 +18,9 @@
package org.apache.hadoop.hdfs.server.balancer;
import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
-import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.io.PrintStream;
-import java.net.Socket;
import java.net.URI;
import java.text.DateFormat;
import java.util.ArrayList;
@@ -39,55 +29,38 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Formatter;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
-import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Task;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Util;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
-import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.net.Node;
-import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import com.google.common.base.Preconditions;
+
/** <p>The balancer is a tool that balances disk space usage on an HDFS cluster
* when some datanodes become full or when new empty nodes join the cluster.
* The tool is deployed as an application program that can be run by the
@@ -188,664 +161,34 @@ import org.apache.hadoop.util.ToolRunner
@InterfaceAudience.Private
public class Balancer {
static final Log LOG = LogFactory.getLog(Balancer.class);
- final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB
- private static long WIN_WIDTH = 5400*1000L; // 1.5 hour
- /** The maximum number of concurrent blocks moves for
- * balancing purpose at a datanode
- */
- private static final int MAX_NO_PENDING_BLOCK_ITERATIONS = 5;
- public static final long DELAY_AFTER_ERROR = 10 * 1000L; //10 seconds
- public static final int BLOCK_MOVE_READ_TIMEOUT=20*60*1000; // 20 minutes
-
+ private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
+
+ private static final long GB = 1L << 30; //1GB
+ private static final long MAX_SIZE_TO_MOVE = 10*GB;
+
private static final String USAGE = "Usage: java "
+ Balancer.class.getSimpleName()
+ "\n\t[-policy <policy>]\tthe balancing policy: "
+ BalancingPolicy.Node.INSTANCE.getName() + " or "
+ BalancingPolicy.Pool.INSTANCE.getName()
- + "\n\t[-threshold <threshold>]\tPercentage of disk capacity";
+ + "\n\t[-threshold <threshold>]\tPercentage of disk capacity"
+ + "\n\t[-exclude [-f <hosts-file> | comma-sperated list of hosts]]"
+ + "\tExcludes the specified datanodes."
+ + "\n\t[-include [-f <hosts-file> | comma-sperated list of hosts]]"
+ + "\tIncludes only the specified datanodes.";
- private final NameNodeConnector nnc;
+ private final Dispatcher dispatcher;
private final BalancingPolicy policy;
- private final SaslDataTransferClient saslClient;
private final double threshold;
// all data node lists
- private final Collection<Source> overUtilizedDatanodes
- = new LinkedList<Source>();
- private final Collection<Source> aboveAvgUtilizedDatanodes
- = new LinkedList<Source>();
- private final Collection<BalancerDatanode> belowAvgUtilizedDatanodes
- = new LinkedList<BalancerDatanode>();
- private final Collection<BalancerDatanode> underUtilizedDatanodes
- = new LinkedList<BalancerDatanode>();
-
- private final Collection<Source> sources
- = new HashSet<Source>();
- private final Collection<BalancerDatanode> targets
- = new HashSet<BalancerDatanode>();
-
- private final Map<Block, BalancerBlock> globalBlockList
- = new HashMap<Block, BalancerBlock>();
- private final MovedBlocks movedBlocks = new MovedBlocks();
- /** Map (datanodeUuid -> BalancerDatanodes) */
- private final Map<String, BalancerDatanode> datanodeMap
- = new HashMap<String, BalancerDatanode>();
-
- private NetworkTopology cluster;
-
- private final ExecutorService moverExecutor;
- private final ExecutorService dispatcherExecutor;
- private final int maxConcurrentMovesPerNode;
-
- /* This class keeps track of a scheduled block move */
- private class PendingBlockMove {
- private BalancerBlock block;
- private Source source;
- private BalancerDatanode proxySource;
- private BalancerDatanode target;
-
- /** constructor */
- private PendingBlockMove() {
- }
-
- @Override
- public String toString() {
- final Block b = block.getBlock();
- return b + " with size=" + b.getNumBytes() + " from "
- + source.getDisplayName() + " to " + target.getDisplayName()
- + " through " + proxySource.getDisplayName();
- }
-
- /* choose a block & a proxy source for this pendingMove
- * whose source & target have already been chosen.
- *
- * Return true if a block and its proxy are chosen; false otherwise
- */
- private boolean chooseBlockAndProxy() {
- // iterate all source's blocks until find a good one
- for (Iterator<BalancerBlock> blocks=
- source.getBlockIterator(); blocks.hasNext();) {
- if (markMovedIfGoodBlock(blocks.next())) {
- blocks.remove();
- return true;
- }
- }
- return false;
- }
-
- /* Return true if the given block is good for the tentative move;
- * If it is good, add it to the moved list to marked as "Moved".
- * A block is good if
- * 1. it is a good candidate; see isGoodBlockCandidate
- * 2. can find a proxy source that's not busy for this move
- */
- private boolean markMovedIfGoodBlock(BalancerBlock block) {
- synchronized(block) {
- synchronized(movedBlocks) {
- if (isGoodBlockCandidate(source, target, block)) {
- this.block = block;
- if ( chooseProxySource() ) {
- movedBlocks.add(block);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Decided to move " + this);
- }
- return true;
- }
- }
- }
- }
- return false;
- }
-
- /* Now we find out source, target, and block, we need to find a proxy
- *
- * @return true if a proxy is found; otherwise false
- */
- private boolean chooseProxySource() {
- final DatanodeInfo targetDN = target.getDatanode();
- // if node group is supported, first try add nodes in the same node group
- if (cluster.isNodeGroupAware()) {
- for (BalancerDatanode loc : block.getLocations()) {
- if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN) && addTo(loc)) {
- return true;
- }
- }
- }
- // check if there is replica which is on the same rack with the target
- for (BalancerDatanode loc : block.getLocations()) {
- if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
- return true;
- }
- }
- // find out a non-busy replica
- for (BalancerDatanode loc : block.getLocations()) {
- if (addTo(loc)) {
- return true;
- }
- }
- return false;
- }
-
- // add a BalancerDatanode as proxy source for specific block movement
- private boolean addTo(BalancerDatanode bdn) {
- if (bdn.addPendingBlock(this)) {
- proxySource = bdn;
- return true;
- }
- return false;
- }
-
- /* Dispatch the block move task to the proxy source & wait for the response
- */
- private void dispatch() {
- Socket sock = new Socket();
- DataOutputStream out = null;
- DataInputStream in = null;
- try {
- sock.connect(
- NetUtils.createSocketAddr(target.datanode.getXferAddr()),
- HdfsServerConstants.READ_TIMEOUT);
- /* Unfortunately we don't have a good way to know if the Datanode is
- * taking a really long time to move a block, OR something has
- * gone wrong and it's never going to finish. To deal with this
- * scenario, we set a long timeout (20 minutes) to avoid hanging
- * the balancer indefinitely.
- */
- sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT);
-
- sock.setKeepAlive(true);
-
- OutputStream unbufOut = sock.getOutputStream();
- InputStream unbufIn = sock.getInputStream();
- ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
- Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
- IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
- unbufIn, nnc, accessToken, target.datanode);
- unbufOut = saslStreams.out;
- unbufIn = saslStreams.in;
- out = new DataOutputStream(new BufferedOutputStream(unbufOut,
- HdfsConstants.IO_FILE_BUFFER_SIZE));
- in = new DataInputStream(new BufferedInputStream(unbufIn,
- HdfsConstants.IO_FILE_BUFFER_SIZE));
-
- sendRequest(out, eb, StorageType.DEFAULT, accessToken);
- receiveResponse(in);
- bytesMoved.addAndGet(block.getNumBytes());
- LOG.info("Successfully moved " + this);
- } catch (IOException e) {
- LOG.warn("Failed to move " + this + ": " + e.getMessage());
- /* proxy or target may have an issue, insert a small delay
- * before using these nodes further. This avoids a potential storm
- * of "threads quota exceeded" Warnings when the balancer
- * gets out of sync with work going on in datanode.
- */
- proxySource.activateDelay(DELAY_AFTER_ERROR);
- target.activateDelay(DELAY_AFTER_ERROR);
- } finally {
- IOUtils.closeStream(out);
- IOUtils.closeStream(in);
- IOUtils.closeSocket(sock);
-
- proxySource.removePendingBlock(this);
- target.removePendingBlock(this);
-
- synchronized (this ) {
- reset();
- }
- synchronized (Balancer.this) {
- Balancer.this.notifyAll();
- }
- }
- }
-
- /* Send a block replace request to the output stream*/
- private void sendRequest(DataOutputStream out, ExtendedBlock eb,
- StorageType storageType,
- Token<BlockTokenIdentifier> accessToken) throws IOException {
- new Sender(out).replaceBlock(eb, storageType, accessToken,
- source.getStorageID(), proxySource.getDatanode());
- }
-
- /* Receive a block copy response from the input stream */
- private void receiveResponse(DataInputStream in) throws IOException {
- BlockOpResponseProto response = BlockOpResponseProto.parseFrom(
- vintPrefixed(in));
- if (response.getStatus() != Status.SUCCESS) {
- if (response.getStatus() == Status.ERROR_ACCESS_TOKEN)
- throw new IOException("block move failed due to access token error");
- throw new IOException("block move is failed: " +
- response.getMessage());
- }
- }
-
- /* reset the object */
- private void reset() {
- block = null;
- source = null;
- proxySource = null;
- target = null;
- }
-
- /* start a thread to dispatch the block move */
- private void scheduleBlockMove() {
- moverExecutor.execute(new Runnable() {
- @Override
- public void run() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Start moving " + PendingBlockMove.this);
- }
- dispatch();
- }
- });
- }
- }
-
- /* A class for keeping track of blocks in the Balancer */
- static private class BalancerBlock {
- private final Block block; // the block
- private final List<BalancerDatanode> locations
- = new ArrayList<BalancerDatanode>(3); // its locations
-
- /* Constructor */
- private BalancerBlock(Block block) {
- this.block = block;
- }
-
- /* clean block locations */
- private synchronized void clearLocations() {
- locations.clear();
- }
-
- /* add a location */
- private synchronized void addLocation(BalancerDatanode datanode) {
- if (!locations.contains(datanode)) {
- locations.add(datanode);
- }
- }
-
- /* Return if the block is located on <code>datanode</code> */
- private synchronized boolean isLocatedOnDatanode(
- BalancerDatanode datanode) {
- return locations.contains(datanode);
- }
-
- /* Return its locations */
- private synchronized List<BalancerDatanode> getLocations() {
- return locations;
- }
-
- /* Return the block */
- private Block getBlock() {
- return block;
- }
-
- /* Return the length of the block */
- private long getNumBytes() {
- return block.getNumBytes();
- }
- }
-
- /* The class represents a desired move of bytes between two nodes
- * and the target.
- * An object of this class is stored in a source node.
- */
- static private class NodeTask {
- private final BalancerDatanode datanode; //target node
- private long size; //bytes scheduled to move
-
- /* constructor */
- private NodeTask(BalancerDatanode datanode, long size) {
- this.datanode = datanode;
- this.size = size;
- }
-
- /* Get the node */
- private BalancerDatanode getDatanode() {
- return datanode;
- }
-
- /* Get the number of bytes that need to be moved */
- private long getSize() {
- return size;
- }
- }
-
-
- /* A class that keeps track of a datanode in Balancer */
- private static class BalancerDatanode {
- final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
- final DatanodeInfo datanode;
- final double utilization;
- final long maxSize2Move;
- private long scheduledSize = 0L;
- protected long delayUntil = 0L;
- // blocks being moved but not confirmed yet
- private final List<PendingBlockMove> pendingBlocks;
- private final int maxConcurrentMoves;
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + "[" + datanode
- + ", utilization=" + utilization + "]";
- }
-
- /* Constructor
- * Depending on avgutil & threshold, calculate maximum bytes to move
- */
- private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold,
- int maxConcurrentMoves) {
- datanode = node;
- utilization = policy.getUtilization(node);
- final double avgUtil = policy.getAvgUtilization();
- long maxSizeToMove;
-
- if (utilization >= avgUtil+threshold
- || utilization <= avgUtil-threshold) {
- maxSizeToMove = (long)(threshold*datanode.getCapacity()/100);
- } else {
- maxSizeToMove =
- (long)(Math.abs(avgUtil-utilization)*datanode.getCapacity()/100);
- }
- if (utilization < avgUtil ) {
- maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove);
- }
- this.maxSize2Move = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
- this.maxConcurrentMoves = maxConcurrentMoves;
- this.pendingBlocks = new ArrayList<PendingBlockMove>(maxConcurrentMoves);
- }
-
- /** Get the datanode */
- protected DatanodeInfo getDatanode() {
- return datanode;
- }
-
- /** Get the name of the datanode */
- protected String getDisplayName() {
- return datanode.toString();
- }
-
- /* Get the storage id of the datanode */
- protected String getStorageID() {
- return datanode.getDatanodeUuid();
- }
-
- /** Decide if still need to move more bytes */
- protected synchronized boolean hasSpaceForScheduling() {
- return scheduledSize<maxSize2Move;
- }
-
- /** Return the total number of bytes that need to be moved */
- protected synchronized long availableSizeToMove() {
- return maxSize2Move-scheduledSize;
- }
-
- /** increment scheduled size */
- protected synchronized void incScheduledSize(long size) {
- scheduledSize += size;
- }
-
- /** decrement scheduled size */
- protected synchronized void decScheduledSize(long size) {
- scheduledSize -= size;
- }
-
- /** get scheduled size */
- protected synchronized long getScheduledSize(){
- return scheduledSize;
- }
-
- /** get scheduled size */
- protected synchronized void setScheduledSize(long size){
- scheduledSize = size;
- }
-
- synchronized private void activateDelay(long delta) {
- delayUntil = Time.now() + delta;
- }
-
- synchronized private boolean isDelayActive() {
- if (delayUntil == 0 || Time.now() > delayUntil){
- delayUntil = 0;
- return false;
- }
- return true;
- }
-
- /* Check if the node can schedule more blocks to move */
- synchronized private boolean isPendingQNotFull() {
- if ( pendingBlocks.size() < this.maxConcurrentMoves ) {
- return true;
- }
- return false;
- }
-
- /* Check if all the dispatched moves are done */
- synchronized private boolean isPendingQEmpty() {
- return pendingBlocks.isEmpty();
- }
-
- /* Add a scheduled block move to the node */
- private synchronized boolean addPendingBlock(
- PendingBlockMove pendingBlock) {
- if (!isDelayActive() && isPendingQNotFull()) {
- return pendingBlocks.add(pendingBlock);
- }
- return false;
- }
-
- /* Remove a scheduled block move from the node */
- private synchronized boolean removePendingBlock(
- PendingBlockMove pendingBlock) {
- return pendingBlocks.remove(pendingBlock);
- }
- }
-
- /** A node that can be the sources of a block move */
- private class Source extends BalancerDatanode {
-
- /* A thread that initiates a block move
- * and waits for block move to complete */
- private class BlockMoveDispatcher implements Runnable {
- @Override
- public void run() {
- dispatchBlocks();
- }
- }
-
- private final ArrayList<NodeTask> nodeTasks = new ArrayList<NodeTask>(2);
- private long blocksToReceive = 0L;
- /* source blocks point to balancerBlocks in the global list because
- * we want to keep one copy of a block in balancer and be aware that
- * the locations are changing over time.
- */
- private final List<BalancerBlock> srcBlockList
- = new ArrayList<BalancerBlock>();
-
- /* constructor */
- private Source(DatanodeInfo node, BalancingPolicy policy, double threshold,
- int maxConcurrentMoves) {
- super(node, policy, threshold, maxConcurrentMoves);
- }
-
- /** Add a node task */
- private void addNodeTask(NodeTask task) {
- assert (task.datanode != this) :
- "Source and target are the same " + datanode;
- incScheduledSize(task.getSize());
- nodeTasks.add(task);
- }
-
- /* Return an iterator to this source's blocks */
- private Iterator<BalancerBlock> getBlockIterator() {
- return srcBlockList.iterator();
- }
-
- /* fetch new blocks of this source from namenode and
- * update this source's block list & the global block list
- * Return the total size of the received blocks in the number of bytes.
- */
- private long getBlockList() throws IOException {
- BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(datanode,
- Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive)).getBlocks();
- long bytesReceived = 0;
- for (BlockWithLocations blk : newBlocks) {
- bytesReceived += blk.getBlock().getNumBytes();
- BalancerBlock block;
- synchronized(globalBlockList) {
- block = globalBlockList.get(blk.getBlock());
- if (block==null) {
- block = new BalancerBlock(blk.getBlock());
- globalBlockList.put(blk.getBlock(), block);
- } else {
- block.clearLocations();
- }
-
- synchronized (block) {
- // update locations
- for (String datanodeUuid : blk.getDatanodeUuids()) {
- final BalancerDatanode d = datanodeMap.get(datanodeUuid);
- if (d != null) { // not an unknown datanode
- block.addLocation(d);
- }
- }
- }
- if (!srcBlockList.contains(block) && isGoodBlockCandidate(block)) {
- // filter bad candidates
- srcBlockList.add(block);
- }
- }
- }
- return bytesReceived;
- }
-
- /* Decide if the given block is a good candidate to move or not */
- private boolean isGoodBlockCandidate(BalancerBlock block) {
- for (NodeTask nodeTask : nodeTasks) {
- if (Balancer.this.isGoodBlockCandidate(this, nodeTask.datanode, block)) {
- return true;
- }
- }
- return false;
- }
-
- /* Return a block that's good for the source thread to dispatch immediately
- * The block's source, target, and proxy source are determined too.
- * When choosing proxy and target, source & target throttling
- * has been considered. They are chosen only when they have the capacity
- * to support this block move.
- * The block should be dispatched immediately after this method is returned.
- */
- private PendingBlockMove chooseNextBlockToMove() {
- for ( Iterator<NodeTask> tasks=nodeTasks.iterator(); tasks.hasNext(); ) {
- NodeTask task = tasks.next();
- BalancerDatanode target = task.getDatanode();
- PendingBlockMove pendingBlock = new PendingBlockMove();
- if (target.addPendingBlock(pendingBlock)) {
- // target is not busy, so do a tentative block allocation
- pendingBlock.source = this;
- pendingBlock.target = target;
- if ( pendingBlock.chooseBlockAndProxy() ) {
- long blockSize = pendingBlock.block.getNumBytes();
- decScheduledSize(blockSize);
- task.size -= blockSize;
- if (task.size == 0) {
- tasks.remove();
- }
- return pendingBlock;
- } else {
- // cancel the tentative move
- target.removePendingBlock(pendingBlock);
- }
- }
- }
- return null;
- }
-
- /* iterate all source's blocks to remove moved ones */
- private void filterMovedBlocks() {
- for (Iterator<BalancerBlock> blocks=getBlockIterator();
- blocks.hasNext();) {
- if (movedBlocks.contains(blocks.next())) {
- blocks.remove();
- }
- }
- }
-
- private static final int SOURCE_BLOCK_LIST_MIN_SIZE=5;
- /* Return if should fetch more blocks from namenode */
- private boolean shouldFetchMoreBlocks() {
- return srcBlockList.size()<SOURCE_BLOCK_LIST_MIN_SIZE &&
- blocksToReceive>0;
- }
-
- /* This method iteratively does the following:
- * it first selects a block to move,
- * then sends a request to the proxy source to start the block move
- * when the source's block list falls below a threshold, it asks
- * the namenode for more blocks.
- * It terminates when it has dispatch enough block move tasks or
- * it has received enough blocks from the namenode, or
- * the elapsed time of the iteration has exceeded the max time limit.
- */
- private static final long MAX_ITERATION_TIME = 20*60*1000L; //20 mins
- private void dispatchBlocks() {
- long startTime = Time.now();
- long scheduledSize = getScheduledSize();
- this.blocksToReceive = 2*scheduledSize;
- boolean isTimeUp = false;
- int noPendingBlockIteration = 0;
- while(!isTimeUp && getScheduledSize()>0 &&
- (!srcBlockList.isEmpty() || blocksToReceive>0)) {
- PendingBlockMove pendingBlock = chooseNextBlockToMove();
- if (pendingBlock != null) {
- // move the block
- pendingBlock.scheduleBlockMove();
- continue;
- }
-
- /* Since we can not schedule any block to move,
- * filter any moved blocks from the source block list and
- * check if we should fetch more blocks from the namenode
- */
- filterMovedBlocks(); // filter already moved blocks
- if (shouldFetchMoreBlocks()) {
- // fetch new blocks
- try {
- blocksToReceive -= getBlockList();
- continue;
- } catch (IOException e) {
- LOG.warn("Exception while getting block list", e);
- return;
- }
- } else {
- // source node cannot find a pendingBlockToMove, iteration +1
- noPendingBlockIteration++;
- // in case no blocks can be moved for source node's task,
- // jump out of while-loop after 5 iterations.
- if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) {
- setScheduledSize(0);
- }
- }
-
- // check if time is up or not
- if (Time.now()-startTime > MAX_ITERATION_TIME) {
- isTimeUp = true;
- continue;
- }
-
- /* Now we can not schedule any block to move and there are
- * no new blocks added to the source block list, so we wait.
- */
- try {
- synchronized(Balancer.this) {
- Balancer.this.wait(1000); // wait for targets/sources to be idle
- }
- } catch (InterruptedException ignored) {
- }
- }
- }
- }
+ private final Collection<Source> overUtilized = new LinkedList<Source>();
+ private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>();
+ private final Collection<StorageGroup> belowAvgUtilized
+ = new LinkedList<StorageGroup>();
+ private final Collection<StorageGroup> underUtilized
+ = new LinkedList<StorageGroup>();
/* Check that this Balancer is compatible with the Block Placement Policy
* used by the Namenode.
@@ -867,200 +210,185 @@ public class Balancer {
* when connection fails.
*/
Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
+ final long movedWinWidth = conf.getLong(
+ DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
+ DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
+ final int moverThreads = conf.getInt(
+ DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
+ DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT);
+ final int dispatcherThreads = conf.getInt(
+ DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
+ DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT);
+ final int maxConcurrentMovesPerNode = conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+ DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
+
+ this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
+ p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads,
+ maxConcurrentMovesPerNode, conf);
this.threshold = p.threshold;
this.policy = p.policy;
- this.nnc = theblockpool;
- cluster = NetworkTopology.getInstance(conf);
-
- this.moverExecutor = Executors.newFixedThreadPool(
- conf.getInt(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
- DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT));
- this.dispatcherExecutor = Executors.newFixedThreadPool(
- conf.getInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
- DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT));
- this.maxConcurrentMovesPerNode =
- conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
- DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
- this.saslClient = new SaslDataTransferClient(
- DataTransferSaslUtil.getSaslPropertiesResolver(conf),
- TrustedChannelResolver.getInstance(conf),
- conf.getBoolean(
- IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
- IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
}
- /* Given a data node set, build a network topology and decide
- * over-utilized datanodes, above average utilized datanodes,
- * below average utilized datanodes, and underutilized datanodes.
- * The input data node set is shuffled before the datanodes
- * are put into the over-utilized datanodes, above average utilized
- * datanodes, below average utilized datanodes, and
- * underutilized datanodes lists. This will add some randomness
- * to the node matching later on.
- *
- * @return the total number of bytes that are
- * needed to move to make the cluster balanced.
- * @param datanodes a set of datanodes
+ private static long getCapacity(DatanodeStorageReport report, StorageType t) {
+ long capacity = 0L;
+ for(StorageReport r : report.getStorageReports()) {
+ if (r.getStorage().getStorageType() == t) {
+ capacity += r.getCapacity();
+ }
+ }
+ return capacity;
+ }
+
+ private static long getRemaining(DatanodeStorageReport report, StorageType t) {
+ long remaining = 0L;
+ for(StorageReport r : report.getStorageReports()) {
+ if (r.getStorage().getStorageType() == t) {
+ remaining += r.getRemaining();
+ }
+ }
+ return remaining;
+ }
+
+ /**
+ * Given a datanode storage set, build a network topology and decide
+ * over-utilized storages, above average utilized storages,
+ * below average utilized storages, and underutilized storages.
+ * The input datanode storage set is shuffled in order to randomize
+ * to the storage matching later on.
+ *
+ * @return the number of bytes needed to move in order to balance the cluster.
*/
- private long initNodes(DatanodeInfo[] datanodes) {
+ private long init(List<DatanodeStorageReport> reports) {
// compute average utilization
- for (DatanodeInfo datanode : datanodes) {
- if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
- continue; // ignore decommissioning or decommissioned nodes
- }
- policy.accumulateSpaces(datanode);
+ for (DatanodeStorageReport r : reports) {
+ policy.accumulateSpaces(r);
}
policy.initAvgUtilization();
- /*create network topology and all data node lists:
- * overloaded, above-average, below-average, and underloaded
- * we alternates the accessing of the given datanodes array either by
- * an increasing order or a decreasing order.
- */
+ // create network topology and classify utilization collections:
+ // over-utilized, above-average, below-average and under-utilized.
long overLoadedBytes = 0L, underLoadedBytes = 0L;
- for (DatanodeInfo datanode : DFSUtil.shuffle(datanodes)) {
- if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
- continue; // ignore decommissioning or decommissioned nodes
- }
- cluster.add(datanode);
- BalancerDatanode datanodeS;
- final double avg = policy.getAvgUtilization();
- if (policy.getUtilization(datanode) >= avg) {
- datanodeS = new Source(datanode, policy, threshold, maxConcurrentMovesPerNode);
- if (isAboveAvgUtilized(datanodeS)) {
- this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
- } else {
- assert(isOverUtilized(datanodeS)) :
- datanodeS.getDisplayName()+ "is not an overUtilized node";
- this.overUtilizedDatanodes.add((Source)datanodeS);
- overLoadedBytes += (long)((datanodeS.utilization-avg
- -threshold)*datanodeS.datanode.getCapacity()/100.0);
+ for(DatanodeStorageReport r : reports) {
+ final DDatanode dn = dispatcher.newDatanode(r);
+ for(StorageType t : StorageType.asList()) {
+ final Double utilization = policy.getUtilization(r, t);
+ if (utilization == null) { // datanode does not have such storage type
+ continue;
}
- } else {
- datanodeS = new BalancerDatanode(datanode, policy, threshold,
- maxConcurrentMovesPerNode);
- if ( isBelowOrEqualAvgUtilized(datanodeS)) {
- this.belowAvgUtilizedDatanodes.add(datanodeS);
+
+ final long capacity = getCapacity(r, t);
+ final double utilizationDiff = utilization - policy.getAvgUtilization(t);
+ final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
+ final long maxSize2Move = computeMaxSize2Move(capacity,
+ getRemaining(r, t), utilizationDiff, threshold);
+
+ final StorageGroup g;
+ if (utilizationDiff > 0) {
+ final Source s = dn.addSource(t, maxSize2Move, dispatcher);
+ if (thresholdDiff <= 0) { // within threshold
+ aboveAvgUtilized.add(s);
+ } else {
+ overLoadedBytes += precentage2bytes(thresholdDiff, capacity);
+ overUtilized.add(s);
+ }
+ g = s;
} else {
- assert isUnderUtilized(datanodeS) : "isUnderUtilized("
- + datanodeS.getDisplayName() + ")=" + isUnderUtilized(datanodeS)
- + ", utilization=" + datanodeS.utilization;
- this.underUtilizedDatanodes.add(datanodeS);
- underLoadedBytes += (long)((avg-threshold-
- datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0);
+ g = dn.addStorageGroup(t, maxSize2Move);
+ if (thresholdDiff <= 0) { // within threshold
+ belowAvgUtilized.add(g);
+ } else {
+ underLoadedBytes += precentage2bytes(thresholdDiff, capacity);
+ underUtilized.add(g);
+ }
}
+ dispatcher.getStorageGroupMap().put(g);
}
- datanodeMap.put(datanode.getDatanodeUuid(), datanodeS);
}
- //logging
- logNodes();
+ logUtilizationCollections();
- assert (this.datanodeMap.size() ==
- overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
- aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size())
- : "Mismatched number of datanodes";
+ Preconditions.checkState(dispatcher.getStorageGroupMap().size()
+ == overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size()
+ + belowAvgUtilized.size(),
+ "Mismatched number of storage groups");
// return number of bytes to be moved in order to make the cluster balanced
return Math.max(overLoadedBytes, underLoadedBytes);
}
- /* log the over utilized & under utilized nodes */
- private void logNodes() {
- logNodes("over-utilized", overUtilizedDatanodes);
- if (LOG.isTraceEnabled()) {
- logNodes("above-average", aboveAvgUtilizedDatanodes);
- logNodes("below-average", belowAvgUtilizedDatanodes);
- }
- logNodes("underutilized", underUtilizedDatanodes);
- }
-
- private static <T extends BalancerDatanode> void logNodes(
- String name, Collection<T> nodes) {
- LOG.info(nodes.size() + " " + name + ": " + nodes);
+ private static long computeMaxSize2Move(final long capacity, final long remaining,
+ final double utilizationDiff, final double threshold) {
+ final double diff = Math.min(threshold, Math.abs(utilizationDiff));
+ long maxSizeToMove = precentage2bytes(diff, capacity);
+ if (utilizationDiff < 0) {
+ maxSizeToMove = Math.min(remaining, maxSizeToMove);
+ }
+ return Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
}
- /** A matcher interface for matching nodes. */
- private interface Matcher {
- /** Given the cluster topology, does the left node match the right node? */
- boolean match(NetworkTopology cluster, Node left, Node right);
+ private static long precentage2bytes(double precentage, long capacity) {
+ Preconditions.checkArgument(precentage >= 0,
+ "precentage = " + precentage + " < 0");
+ return (long)(precentage * capacity / 100.0);
}
- /** Match datanodes in the same node group. */
- static final Matcher SAME_NODE_GROUP = new Matcher() {
- @Override
- public boolean match(NetworkTopology cluster, Node left, Node right) {
- return cluster.isOnSameNodeGroup(left, right);
- }
- };
-
- /** Match datanodes in the same rack. */
- static final Matcher SAME_RACK = new Matcher() {
- @Override
- public boolean match(NetworkTopology cluster, Node left, Node right) {
- return cluster.isOnSameRack(left, right);
+ /* log the over utilized & under utilized nodes */
+ private void logUtilizationCollections() {
+ logUtilizationCollection("over-utilized", overUtilized);
+ if (LOG.isTraceEnabled()) {
+ logUtilizationCollection("above-average", aboveAvgUtilized);
+ logUtilizationCollection("below-average", belowAvgUtilized);
}
- };
+ logUtilizationCollection("underutilized", underUtilized);
+ }
- /** Match any datanode with any other datanode. */
- static final Matcher ANY_OTHER = new Matcher() {
- @Override
- public boolean match(NetworkTopology cluster, Node left, Node right) {
- return left != right;
- }
- };
+ private static <T extends StorageGroup>
+ void logUtilizationCollection(String name, Collection<T> items) {
+ LOG.info(items.size() + " " + name + ": " + items);
+ }
/**
* Decide all <source, target> pairs and
* the number of bytes to move from a source to a target
- * Maximum bytes to be moved per node is
- * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).
- * Return total number of bytes to move in this iteration
+ * Maximum bytes to be moved per storage group is
+ * min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).
+ * @return total number of bytes to move in this iteration
*/
- private long chooseNodes() {
+ private long chooseStorageGroups() {
// First, match nodes on the same node group if cluster is node group aware
- if (cluster.isNodeGroupAware()) {
- chooseNodes(SAME_NODE_GROUP);
+ if (dispatcher.getCluster().isNodeGroupAware()) {
+ chooseStorageGroups(Matcher.SAME_NODE_GROUP);
}
// Then, match nodes on the same rack
- chooseNodes(SAME_RACK);
+ chooseStorageGroups(Matcher.SAME_RACK);
// At last, match all remaining nodes
- chooseNodes(ANY_OTHER);
+ chooseStorageGroups(Matcher.ANY_OTHER);
- assert (datanodeMap.size() >= sources.size()+targets.size())
- : "Mismatched number of datanodes (" +
- datanodeMap.size() + " total, " +
- sources.size() + " sources, " +
- targets.size() + " targets)";
-
- long bytesToMove = 0L;
- for (Source src : sources) {
- bytesToMove += src.getScheduledSize();
- }
- return bytesToMove;
+ return dispatcher.bytesToMove();
}
/** Decide all <source, target> pairs according to the matcher. */
- private void chooseNodes(final Matcher matcher) {
+ private void chooseStorageGroups(final Matcher matcher) {
/* first step: match each overUtilized datanode (source) to
* one or more underUtilized datanodes (targets).
*/
- chooseDatanodes(overUtilizedDatanodes, underUtilizedDatanodes, matcher);
+ chooseStorageGroups(overUtilized, underUtilized, matcher);
/* match each remaining overutilized datanode (source) to
* below average utilized datanodes (targets).
* Note only overutilized datanodes that haven't had that max bytes to move
* satisfied in step 1 are selected
*/
- chooseDatanodes(overUtilizedDatanodes, belowAvgUtilizedDatanodes, matcher);
+ chooseStorageGroups(overUtilized, belowAvgUtilized, matcher);
/* match each remaining underutilized datanode (target) to
* above average utilized datanodes (source).
* Note only underutilized datanodes that have not had that max bytes to
* move satisfied in step 1 are selected.
*/
- chooseDatanodes(underUtilizedDatanodes, aboveAvgUtilizedDatanodes, matcher);
+ chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher);
}
/**
@@ -1068,13 +396,13 @@ public class Balancer {
* datanodes or the candidates are source nodes with (utilization > Avg), and
* the others are target nodes with (utilization < Avg).
*/
- private <D extends BalancerDatanode, C extends BalancerDatanode> void
- chooseDatanodes(Collection<D> datanodes, Collection<C> candidates,
+ private <G extends StorageGroup, C extends StorageGroup>
+ void chooseStorageGroups(Collection<G> groups, Collection<C> candidates,
Matcher matcher) {
- for (Iterator<D> i = datanodes.iterator(); i.hasNext();) {
- final D datanode = i.next();
- for(; chooseForOneDatanode(datanode, candidates, matcher); );
- if (!datanode.hasSpaceForScheduling()) {
+ for(final Iterator<G> i = groups.iterator(); i.hasNext();) {
+ final G g = i.next();
+ for(; choose4One(g, candidates, matcher); );
+ if (!g.hasSpaceForScheduling()) {
i.remove();
}
}
@@ -1084,18 +412,18 @@ public class Balancer {
* For the given datanode, choose a candidate and then schedule it.
* @return true if a candidate is chosen; false if no candidates is chosen.
*/
- private <C extends BalancerDatanode> boolean chooseForOneDatanode(
- BalancerDatanode dn, Collection<C> candidates, Matcher matcher) {
+ private <C extends StorageGroup> boolean choose4One(StorageGroup g,
+ Collection<C> candidates, Matcher matcher) {
final Iterator<C> i = candidates.iterator();
- final C chosen = chooseCandidate(dn, i, matcher);
-
+ final C chosen = chooseCandidate(g, i, matcher);
+
if (chosen == null) {
return false;
}
- if (dn instanceof Source) {
- matchSourceWithTargetToMove((Source)dn, chosen);
+ if (g instanceof Source) {
+ matchSourceWithTargetToMove((Source)g, chosen);
} else {
- matchSourceWithTargetToMove((Source)chosen, dn);
+ matchSourceWithTargetToMove((Source)chosen, g);
}
if (!chosen.hasSpaceForScheduling()) {
i.remove();
@@ -1103,27 +431,26 @@ public class Balancer {
return true;
}
- private void matchSourceWithTargetToMove(
- Source source, BalancerDatanode target) {
+ private void matchSourceWithTargetToMove(Source source, StorageGroup target) {
long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
- NodeTask nodeTask = new NodeTask(target, size);
- source.addNodeTask(nodeTask);
- target.incScheduledSize(nodeTask.getSize());
- sources.add(source);
- targets.add(target);
+ final Task task = new Task(target, size);
+ source.addTask(task);
+ target.incScheduledSize(task.getSize());
+ dispatcher.add(source, target);
LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
- +source.datanode.getName() + " to " + target.datanode.getName());
+ + source.getDisplayName() + " to " + target.getDisplayName());
}
/** Choose a candidate for the given datanode. */
- private <D extends BalancerDatanode, C extends BalancerDatanode>
- C chooseCandidate(D dn, Iterator<C> candidates, Matcher matcher) {
- if (dn.hasSpaceForScheduling()) {
+ private <G extends StorageGroup, C extends StorageGroup>
+ C chooseCandidate(G g, Iterator<C> candidates, Matcher matcher) {
+ if (g.hasSpaceForScheduling()) {
for(; candidates.hasNext(); ) {
final C c = candidates.next();
if (!c.hasSpaceForScheduling()) {
candidates.remove();
- } else if (matcher.match(cluster, dn.getDatanode(), c.getDatanode())) {
+ } else if (matcher.match(dispatcher.getCluster(),
+ g.getDatanodeInfo(), c.getDatanodeInfo())) {
return c;
}
}
@@ -1131,273 +458,25 @@ public class Balancer {
return null;
}
- private final AtomicLong bytesMoved = new AtomicLong();
-
- /* Start a thread to dispatch block moves for each source.
- * The thread selects blocks to move & sends request to proxy source to
- * initiate block move. The process is flow controlled. Block selection is
- * blocked if there are too many un-confirmed block moves.
- * Return the total number of bytes successfully moved in this iteration.
- */
- private long dispatchBlockMoves() throws InterruptedException {
- long bytesLastMoved = bytesMoved.get();
- Future<?>[] futures = new Future<?>[sources.size()];
- int i=0;
- for (Source source : sources) {
- futures[i++] = dispatcherExecutor.submit(source.new BlockMoveDispatcher());
- }
-
- // wait for all dispatcher threads to finish
- for (Future<?> future : futures) {
- try {
- future.get();
- } catch (ExecutionException e) {
- LOG.warn("Dispatcher thread failed", e.getCause());
- }
- }
-
- // wait for all block moving to be done
- waitForMoveCompletion();
-
- return bytesMoved.get()-bytesLastMoved;
- }
-
- // The sleeping period before checking if block move is completed again
- static private long blockMoveWaitTime = 30000L;
-
- /** set the sleeping period for block move completion check */
- static void setBlockMoveWaitTime(long time) {
- blockMoveWaitTime = time;
- }
-
- /* wait for all block move confirmations
- * by checking each target's pendingMove queue
- */
- private void waitForMoveCompletion() {
- boolean shouldWait;
- do {
- shouldWait = false;
- for (BalancerDatanode target : targets) {
- if (!target.isPendingQEmpty()) {
- shouldWait = true;
- }
- }
- if (shouldWait) {
- try {
- Thread.sleep(blockMoveWaitTime);
- } catch (InterruptedException ignored) {
- }
- }
- } while (shouldWait);
- }
-
- /** This window makes sure to keep blocks that have been moved within 1.5 hour.
- * Old window has blocks that are older;
- * Current window has blocks that are more recent;
- * Cleanup method triggers the check if blocks in the old window are
- * more than 1.5 hour old. If yes, purge the old window and then
- * move blocks in current window to old window.
- */
- private static class MovedBlocks {
- private long lastCleanupTime = Time.now();
- final private static int CUR_WIN = 0;
- final private static int OLD_WIN = 1;
- final private static int NUM_WINS = 2;
- final private List<HashMap<Block, BalancerBlock>> movedBlocks =
- new ArrayList<HashMap<Block, BalancerBlock>>(NUM_WINS);
-
- /* initialize the moved blocks collection */
- private MovedBlocks() {
- movedBlocks.add(new HashMap<Block,BalancerBlock>());
- movedBlocks.add(new HashMap<Block,BalancerBlock>());
- }
-
- /* add a block thus marking a block to be moved */
- synchronized private void add(BalancerBlock block) {
- movedBlocks.get(CUR_WIN).put(block.getBlock(), block);
- }
-
- /* check if a block is marked as moved */
- synchronized private boolean contains(BalancerBlock block) {
- return contains(block.getBlock());
- }
-
- /* check if a block is marked as moved */
- synchronized private boolean contains(Block block) {
- return movedBlocks.get(CUR_WIN).containsKey(block) ||
- movedBlocks.get(OLD_WIN).containsKey(block);
- }
-
- /* remove old blocks */
- synchronized private void cleanup() {
- long curTime = Time.now();
- // check if old win is older than winWidth
- if (lastCleanupTime + WIN_WIDTH <= curTime) {
- // purge the old window
- movedBlocks.set(OLD_WIN, movedBlocks.get(CUR_WIN));
- movedBlocks.set(CUR_WIN, new HashMap<Block, BalancerBlock>());
- lastCleanupTime = curTime;
- }
- }
- }
-
- /* Decide if it is OK to move the given block from source to target
- * A block is a good candidate if
- * 1. the block is not in the process of being moved/has not been moved;
- * 2. the block does not have a replica on the target;
- * 3. doing the move does not reduce the number of racks that the block has
- */
- private boolean isGoodBlockCandidate(Source source,
- BalancerDatanode target, BalancerBlock block) {
- // check if the block is moved or not
- if (movedBlocks.contains(block)) {
- return false;
- }
- if (block.isLocatedOnDatanode(target)) {
- return false;
- }
- if (cluster.isNodeGroupAware() &&
- isOnSameNodeGroupWithReplicas(target, block, source)) {
- return false;
- }
-
- boolean goodBlock = false;
- if (cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
- // good if source and target are on the same rack
- goodBlock = true;
- } else {
- boolean notOnSameRack = true;
- synchronized (block) {
- for (BalancerDatanode loc : block.locations) {
- if (cluster.isOnSameRack(loc.datanode, target.datanode)) {
- notOnSameRack = false;
- break;
- }
- }
- }
- if (notOnSameRack) {
- // good if target is target is not on the same rack as any replica
- goodBlock = true;
- } else {
- // good if source is on the same rack as on of the replicas
- for (BalancerDatanode loc : block.locations) {
- if (loc != source &&
- cluster.isOnSameRack(loc.datanode, source.datanode)) {
- goodBlock = true;
- break;
- }
- }
- }
- }
- return goodBlock;
- }
-
- /**
- * Check if there are any replica (other than source) on the same node group
- * with target. If true, then target is not a good candidate for placing
- * specific block replica as we don't want 2 replicas under the same nodegroup
- * after balance.
- * @param target targetDataNode
- * @param block dataBlock
- * @param source sourceDataNode
- * @return true if there are any replica (other than source) on the same node
- * group with target
- */
- private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode target,
- BalancerBlock block, Source source) {
- for (BalancerDatanode loc : block.locations) {
- if (loc != source &&
- cluster.isOnSameNodeGroup(loc.getDatanode(), target.getDatanode())) {
- return true;
- }
- }
- return false;
- }
-
/* reset all fields in a balancer preparing for the next iteration */
private void resetData(Configuration conf) {
- this.cluster = NetworkTopology.getInstance(conf);
- this.overUtilizedDatanodes.clear();
- this.aboveAvgUtilizedDatanodes.clear();
- this.belowAvgUtilizedDatanodes.clear();
- this.underUtilizedDatanodes.clear();
- this.datanodeMap.clear();
- this.sources.clear();
- this.targets.clear();
+ this.overUtilized.clear();
+ this.aboveAvgUtilized.clear();
+ this.belowAvgUtilized.clear();
+ this.underUtilized.clear();
this.policy.reset();
- cleanGlobalBlockList();
- this.movedBlocks.cleanup();
- }
-
- /* Remove all blocks from the global block list except for the ones in the
- * moved list.
- */
- private void cleanGlobalBlockList() {
- for (Iterator<Block> globalBlockListIterator=globalBlockList.keySet().iterator();
- globalBlockListIterator.hasNext();) {
- Block block = globalBlockListIterator.next();
- if(!movedBlocks.contains(block)) {
- globalBlockListIterator.remove();
- }
- }
- }
-
- /* Return true if the given datanode is overUtilized */
- private boolean isOverUtilized(BalancerDatanode datanode) {
- return datanode.utilization > (policy.getAvgUtilization()+threshold);
- }
-
- /* Return true if the given datanode is above or equal to average utilized
- * but not overUtilized */
- private boolean isAboveAvgUtilized(BalancerDatanode datanode) {
- final double avg = policy.getAvgUtilization();
- return (datanode.utilization <= (avg+threshold))
- && (datanode.utilization >= avg);
+ dispatcher.reset(conf);;
}
- /* Return true if the given datanode is underUtilized */
- private boolean isUnderUtilized(BalancerDatanode datanode) {
- return datanode.utilization < (policy.getAvgUtilization()-threshold);
- }
-
- /* Return true if the given datanode is below average utilized
- * but not underUtilized */
- private boolean isBelowOrEqualAvgUtilized(BalancerDatanode datanode) {
- final double avg = policy.getAvgUtilization();
- return (datanode.utilization >= (avg-threshold))
- && (datanode.utilization <= avg);
- }
-
- // Exit status
- enum ReturnStatus {
- // These int values will map directly to the balancer process's exit code.
- SUCCESS(0),
- IN_PROGRESS(1),
- ALREADY_RUNNING(-1),
- NO_MOVE_BLOCK(-2),
- NO_MOVE_PROGRESS(-3),
- IO_EXCEPTION(-4),
- ILLEGAL_ARGS(-5),
- INTERRUPTED(-6);
-
- final int code;
-
- ReturnStatus(int code) {
- this.code = code;
- }
- }
-
/** Run an iteration for all datanodes. */
- private ReturnStatus run(int iteration, Formatter formatter,
+ private ExitStatus run(int iteration, Formatter formatter,
Configuration conf) {
try {
- /* get all live datanodes of a cluster and their disk usage
- * decide the number of bytes need to be moved
- */
- final long bytesLeftToMove = initNodes(nnc.client.getDatanodeReport(DatanodeReportType.LIVE));
+ final List<DatanodeStorageReport> reports = dispatcher.init();
+ final long bytesLeftToMove = init(reports);
if (bytesLeftToMove == 0) {
System.out.println("The cluster is balanced. Exiting...");
- return ReturnStatus.SUCCESS;
+ return ExitStatus.SUCCESS;
} else {
LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
+ " to make the cluster balanced." );
@@ -1408,10 +487,10 @@ public class Balancer {
* in this iteration. Maximum bytes to be moved per node is
* Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).
*/
- final long bytesToMove = chooseNodes();
+ final long bytesToMove = chooseStorageGroups();
if (bytesToMove == 0) {
System.out.println("No block can be moved. Exiting...");
- return ReturnStatus.NO_MOVE_BLOCK;
+ return ExitStatus.NO_MOVE_BLOCK;
} else {
LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) +
" in this iteration");
@@ -1420,7 +499,7 @@ public class Balancer {
formatter.format("%-24s %10d %19s %18s %17s%n",
DateFormat.getDateTimeInstance().format(new Date()),
iteration,
- StringUtils.byteDesc(bytesMoved.get()),
+ StringUtils.byteDesc(dispatcher.getBytesMoved()),
StringUtils.byteDesc(bytesLeftToMove),
StringUtils.byteDesc(bytesToMove)
);
@@ -1431,24 +510,22 @@ public class Balancer {
* available to move.
* Exit no byte has been moved for 5 consecutive iterations.
*/
- if (!this.nnc.shouldContinue(dispatchBlockMoves())) {
- return ReturnStatus.NO_MOVE_PROGRESS;
+ if (!dispatcher.dispatchAndCheckContinue()) {
+ return ExitStatus.NO_MOVE_PROGRESS;
}
- return ReturnStatus.IN_PROGRESS;
+ return ExitStatus.IN_PROGRESS;
} catch (IllegalArgumentException e) {
System.out.println(e + ". Exiting ...");
- return ReturnStatus.ILLEGAL_ARGS;
+ return ExitStatus.ILLEGAL_ARGUMENTS;
} catch (IOException e) {
System.out.println(e + ". Exiting ...");
- return ReturnStatus.IO_EXCEPTION;
+ return ExitStatus.IO_EXCEPTION;
} catch (InterruptedException e) {
System.out.println(e + ". Exiting ...");
- return ReturnStatus.INTERRUPTED;
+ return ExitStatus.INTERRUPTED;
} finally {
- // shutdown thread pools
- dispatcherExecutor.shutdownNow();
- moverExecutor.shutdownNow();
+ dispatcher.shutdownNow();
}
}
@@ -1463,8 +540,8 @@ public class Balancer {
final long sleeptime = 2000*conf.getLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
- LOG.info("namenodes = " + namenodes);
- LOG.info("p = " + p);
+ LOG.info("namenodes = " + namenodes);
+ LOG.info("parameters = " + p);
final Formatter formatter = new Formatter(System.out);
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
@@ -1473,7 +550,10 @@ public class Balancer {
= new ArrayList<NameNodeConnector>(namenodes.size());
try {
for (URI uri : namenodes) {
- connectors.add(new NameNodeConnector(uri, conf));
+ final NameNodeConnector nnc = new NameNodeConnector(
+ Balancer.class.getSimpleName(), uri, BALANCER_ID_PATH, conf);
+ nnc.getKeyManager().startBlockKeyUpdater();
+ connectors.add(nnc);
}
boolean done = false;
@@ -1482,14 +562,14 @@ public class Balancer {
Collections.shuffle(connectors);
for(NameNodeConnector nnc : connectors) {
final Balancer b = new Balancer(nnc, p, conf);
- final ReturnStatus r = b.run(iteration, formatter, conf);
+ final ExitStatus r = b.run(iteration, formatter, conf);
// clean all lists
b.resetData(conf);
- if (r == ReturnStatus.IN_PROGRESS) {
+ if (r == ExitStatus.IN_PROGRESS) {
done = false;
- } else if (r != ReturnStatus.SUCCESS) {
+ } else if (r != ExitStatus.SUCCESS) {
//must be an error statue, return.
- return r.code;
+ return r.getExitCode();
}
}
@@ -1502,7 +582,7 @@ public class Balancer {
nnc.close();
}
}
- return ReturnStatus.SUCCESS.code;
+ return ExitStatus.SUCCESS.getExitCode();
}
/* Given elaspedTime in ms, return a printable string */
@@ -1526,21 +606,31 @@ public class Balancer {
}
static class Parameters {
- static final Parameters DEFALUT = new Parameters(
- BalancingPolicy.Node.INSTANCE, 10.0);
+ static final Parameters DEFAULT = new Parameters(
+ BalancingPolicy.Node.INSTANCE, 10.0,
+ Collections.<String> emptySet(), Collections.<String> emptySet());
final BalancingPolicy policy;
final double threshold;
+ // exclude the nodes in this set from balancing operations
+ Set<String> nodesToBeExcluded;
+ //include only these nodes in balancing operations
+ Set<String> nodesToBeIncluded;
- Parameters(BalancingPolicy policy, double threshold) {
+ Parameters(BalancingPolicy policy, double threshold,
+ Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) {
this.policy = policy;
this.threshold = threshold;
+ this.nodesToBeExcluded = nodesToBeExcluded;
+ this.nodesToBeIncluded = nodesToBeIncluded;
}
@Override
public String toString() {
return Balancer.class.getSimpleName() + "." + getClass().getSimpleName()
- + "[" + policy + ", threshold=" + threshold + "]";
+ + "[" + policy + ", threshold=" + threshold +
+ ", number of nodes to be excluded = "+ nodesToBeExcluded.size() +
+ ", number of nodes to be included = "+ nodesToBeIncluded.size() +"]";
}
}
@@ -1555,9 +645,6 @@ public class Balancer {
public int run(String[] args) {
final long startTime = Time.now();
final Configuration conf = getConf();
- WIN_WIDTH = conf.getLong(
- DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
- DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
try {
checkReplicationPolicyCompatibility(conf);
@@ -1566,10 +653,10 @@ public class Balancer {
return Balancer.run(namenodes, parse(args), conf);
} catch (IOException e) {
System.out.println(e + ". Exiting ...");
- return ReturnStatus.IO_EXCEPTION.code;
+ return ExitStatus.IO_EXCEPTION.getExitCode();
} catch (InterruptedException e) {
System.out.println(e + ". Exiting ...");
- return ReturnStatus.INTERRUPTED.code;
+ return ExitStatus.INTERRUPTED.getExitCode();
} finally {
System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date()));
System.out.println("Balancing took " + time2Str(Time.now()-startTime));
@@ -1578,15 +665,17 @@ public class Balancer {
/** parse command line arguments */
static Parameters parse(String[] args) {
- BalancingPolicy policy = Parameters.DEFALUT.policy;
- double threshold = Parameters.DEFALUT.threshold;
+ BalancingPolicy policy = Parameters.DEFAULT.policy;
+ double threshold = Parameters.DEFAULT.threshold;
+ Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded;
+ Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded;
if (args != null) {
try {
for(int i = 0; i < args.length; i++) {
- checkArgument(args.length >= 2, "args = " + Arrays.toString(args));
if ("-threshold".equalsIgnoreCase(args[i])) {
- i++;
+ checkArgument(++i < args.length,
+ "Threshold value is missing: args = " + Arrays.toString(args));
try {
threshold = Double.parseDouble(args[i]);
if (threshold < 1 || threshold > 100) {
@@ -1601,25 +690,52 @@ public class Balancer {
throw e;
}
} else if ("-policy".equalsIgnoreCase(args[i])) {
- i++;
+ checkArgument(++i < args.length,
+ "Policy value is missing: args = " + Arrays.toString(args));
try {
policy = BalancingPolicy.parse(args[i]);
} catch(IllegalArgumentException e) {
System.err.println("Illegal policy name: " + args[i]);
throw e;
}
+ } else if ("-exclude".equalsIgnoreCase(args[i])) {
+ checkArgument(++i < args.length,
+ "List of nodes to exclude | -f <filename> is missing: args = "
+ + Arrays.toString(args));
+ if ("-f".equalsIgnoreCase(args[i])) {
+ checkArgument(++i < args.length,
+ "File containing nodes to exclude is not specified: args = "
+ + Arrays.toString(args));
+ nodesTobeExcluded = Util.getHostListFromFile(args[i], "exclude");
+ } else {
+ nodesTobeExcluded = Util.parseHostList(args[i]);
+ }
+ } else if ("-include".equalsIgnoreCase(args[i])) {
+ checkArgument(++i < args.length,
+ "List of nodes to include | -f <filename> is missing: args = "
+ + Arrays.toString(args));
+ if ("-f".equalsIgnoreCase(args[i])) {
+ checkArgument(++i < args.length,
+ "File containing nodes to include is not specified: args = "
+ + Arrays.toString(args));
+ nodesTobeIncluded = Util.getHostListFromFile(args[i], "include");
+ } else {
+ nodesTobeIncluded = Util.parseHostList(args[i]);
+ }
} else {
throw new IllegalArgumentException("args = "
+ Arrays.toString(args));
}
}
+ checkArgument(nodesTobeExcluded.isEmpty() || nodesTobeIncluded.isEmpty(),
+ "-exclude and -include options cannot be specified together.");
} catch(RuntimeException e) {
printUsage(System.err);
throw e;
}
}
- return new Parameters(policy, threshold);
+ return new Parameters(policy, threshold, nodesTobeExcluded, nodesTobeIncluded);
}
private static void printUsage(PrintStream out) {
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java Wed Aug 20 01:34:29 2014
@@ -18,7 +18,11 @@
package org.apache.hadoop.hdfs.server.balancer;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.util.EnumCounters;
+import org.apache.hadoop.hdfs.util.EnumDoubles;
/**
* Balancing policy.
@@ -28,31 +32,43 @@ import org.apache.hadoop.hdfs.protocol.D
*/
@InterfaceAudience.Private
abstract class BalancingPolicy {
- long totalCapacity;
- long totalUsedSpace;
- private double avgUtilization;
+ final EnumCounters<StorageType> totalCapacities
+ = new EnumCounters<StorageType>(StorageType.class);
+ final EnumCounters<StorageType> totalUsedSpaces
+ = new EnumCounters<StorageType>(StorageType.class);
+ final EnumDoubles<StorageType> avgUtilizations
+ = new EnumDoubles<StorageType>(StorageType.class);
void reset() {
- totalCapacity = 0L;
- totalUsedSpace = 0L;
- avgUtilization = 0.0;
+ totalCapacities.reset();
+ totalUsedSpaces.reset();
+ avgUtilizations.reset();
}
/** Get the policy name. */
abstract String getName();
/** Accumulate used space and capacity. */
- abstract void accumulateSpaces(DatanodeInfo d);
+ abstract void accumulateSpaces(DatanodeStorageReport r);
void initAvgUtilization() {
- this.avgUtilization = totalUsedSpace*100.0/totalCapacity;
+ for(StorageType t : StorageType.asList()) {
+ final long capacity = totalCapacities.get(t);
+ if (capacity > 0L) {
+ final double avg = totalUsedSpaces.get(t)*100.0/capacity;
+ avgUtilizations.set(t, avg);
+ }
+ }
}
- double getAvgUtilization() {
- return avgUtilization;
+
+ double getAvgUtilization(StorageType t) {
+ return avgUtilizations.get(t);
}
- /** Return the utilization of a datanode */
- abstract double getUtilization(DatanodeInfo d);
+ /** @return the utilization of a particular storage type of a datanode;
+ * or return null if the datanode does not have such storage type.
+ */
+ abstract Double getUtilization(DatanodeStorageReport r, StorageType t);
@Override
public String toString() {
@@ -84,14 +100,25 @@ abstract class BalancingPolicy {
}
@Override
- void accumulateSpaces(DatanodeInfo d) {
- totalCapacity += d.getCapacity();
- totalUsedSpace += d.getDfsUsed();
+ void accumulateSpaces(DatanodeStorageReport r) {
+ for(StorageReport s : r.getStorageReports()) {
+ final StorageType t = s.getStorage().getStorageType();
+ totalCapacities.add(t, s.getCapacity());
+ totalUsedSpaces.add(t, s.getDfsUsed());
+ }
}
@Override
- double getUtilization(DatanodeInfo d) {
- return d.getDfsUsed()*100.0/d.getCapacity();
+ Double getUtilization(DatanodeStorageReport r, final StorageType t) {
+ long capacity = 0L;
+ long dfsUsed = 0L;
+ for(StorageReport s : r.getStorageReports()) {
+ if (s.getStorage().getStorageType() == t) {
+ capacity += s.getCapacity();
+ dfsUsed += s.getDfsUsed();
+ }
+ }
+ return capacity == 0L? null: dfsUsed*100.0/capacity;
}
}
@@ -108,14 +135,25 @@ abstract class BalancingPolicy {
}
@Override
- void accumulateSpaces(DatanodeInfo d) {
- totalCapacity += d.getCapacity();
- totalUsedSpace += d.getBlockPoolUsed();
+ void accumulateSpaces(DatanodeStorageReport r) {
+ for(StorageReport s : r.getStorageReports()) {
+ final StorageType t = s.getStorage().getStorageType();
+ totalCapacities.add(t, s.getCapacity());
+ totalUsedSpaces.add(t, s.getBlockPoolUsed());
+ }
}
@Override
- double getUtilization(DatanodeInfo d) {
- return d.getBlockPoolUsed()*100.0/d.getCapacity();
+ Double getUtilization(DatanodeStorageReport r, final StorageType t) {
+ long capacity = 0L;
+ long blockPoolUsed = 0L;
+ for(StorageReport s : r.getStorageReports()) {
+ if (s.getStorage().getStorageType() == t) {
+ capacity += s.getCapacity();
+ blockPoolUsed += s.getBlockPoolUsed();
+ }
+ }
+ return capacity == 0L? null: blockPoolUsed*100.0/capacity;
}
}
}
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Wed Aug 20 01:34:29 2014
@@ -17,113 +17,102 @@
*/
package org.apache.hadoop.hdfs.server.balancer;
+import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.URI;
-import java.util.EnumSet;
import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
-import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Daemon;
/**
- * The class provides utilities for {@link Balancer} to access a NameNode
+ * The class provides utilities for accessing a NameNode.
*/
@InterfaceAudience.Private
-class NameNodeConnector implements DataEncryptionKeyFactory {
- private static final Log LOG = Balancer.LOG;
- private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
+public class NameNodeConnector implements Closeable {
+ private static final Log LOG = LogFactory.getLog(NameNodeConnector.class);
+
private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
- final URI nameNodeUri;
- final String blockpoolID;
+ private final URI nameNodeUri;
+ private final String blockpoolID;
+
+ private final NamenodeProtocol namenode;
+ private final ClientProtocol client;
+ private final KeyManager keyManager;
+
+ private final FileSystem fs;
+ private final Path idPath;
+ private final OutputStream out;
- final NamenodeProtocol namenode;
- final ClientProtocol client;
- final FileSystem fs;
- final OutputStream out;
-
- private final boolean isBlockTokenEnabled;
- private final boolean encryptDataTransfer;
- private boolean shouldRun;
- private long keyUpdaterInterval;
- // used for balancer
private int notChangedIterations = 0;
- private BlockTokenSecretManager blockTokenSecretManager;
- private Daemon keyupdaterthread; // AccessKeyUpdater thread
- private DataEncryptionKey encryptionKey;
- NameNodeConnector(URI nameNodeUri,
+ public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
Configuration conf) throws IOException {
this.nameNodeUri = nameNodeUri;
+ this.idPath = idPath;
- this.namenode =
- NameNodeProxies.createProxy(conf, nameNodeUri, NamenodeProtocol.class)
- .getProxy();
- this.client =
- NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class)
- .getProxy();
+ this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
+ NamenodeProtocol.class).getProxy();
+ this.client = NameNodeProxies.createProxy(conf, nameNodeUri,
+ ClientProtocol.class).getProxy();
this.fs = FileSystem.get(nameNodeUri, conf);
final NamespaceInfo namespaceinfo = namenode.versionRequest();
this.blockpoolID = namespaceinfo.getBlockPoolID();
- final ExportedBlockKeys keys = namenode.getBlockKeys();
- this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
- if (isBlockTokenEnabled) {
- long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
- long blockTokenLifetime = keys.getTokenLifetime();
- LOG.info("Block token params received from NN: keyUpdateInterval="
- + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
- + blockTokenLifetime / (60 * 1000) + " min(s)");
- String encryptionAlgorithm = conf.get(
- DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
- this.blockTokenSecretManager = new BlockTokenSecretManager(
- blockKeyUpdateInterval, blockTokenLifetime, blockpoolID,
- encryptionAlgorithm);
- this.blockTokenSecretManager.addKeys(keys);
- /*
- * Balancer should sync its block keys with NN more frequently than NN
- * updates its block keys
- */
- this.keyUpdaterInterval = blockKeyUpdateInterval / 4;
- LOG.info("Balancer will update its block keys every "
- + keyUpdaterInterval / (60 * 1000) + " minute(s)");
- this.keyupdaterthread = new Daemon(new BlockKeyUpdater());
- this.shouldRun = true;
- this.keyupdaterthread.start();
- }
- this.encryptDataTransfer = fs.getServerDefaults(new Path("/"))
- .getEncryptDataTransfer();
- // Check if there is another balancer running.
+ final FsServerDefaults defaults = fs.getServerDefaults(new Path("/"));
+ this.keyManager = new KeyManager(blockpoolID, namenode,
+ defaults.getEncryptDataTransfer(), conf);
// Exit if there is another one running.
- out = checkAndMarkRunningBalancer();
+ out = checkAndMarkRunning();
if (out == null) {
- throw new IOException("Another balancer is running");
+ throw new IOException("Another " + name + " is running.");
}
}
- boolean shouldContinue(long dispatchBlockMoveBytes) {
+ /** @return the block pool ID */
+ public String getBlockpoolID() {
+ return blockpoolID;
+ }
+
+ /** @return blocks with locations. */
+ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
+ throws IOException {
+ return namenode.getBlocks(datanode, size);
+ }
+
+ /** @return live datanode storage reports. */
+ public DatanodeStorageReport[] getLiveDatanodeStorageReport()
+ throws IOException {
+ return client.getDatanodeStorageReport(DatanodeReportType.LIVE);
+ }
+
+ /** @return the key manager */
+ public KeyManager getKeyManager() {
+ return keyManager;
+ }
+
+ /** Should the instance continue running? */
+ public boolean shouldContinue(long dispatchBlockMoveBytes) {
if (dispatchBlockMoveBytes > 0) {
notChangedIterations = 0;
} else {
@@ -137,53 +126,25 @@ class NameNodeConnector implements DataE
return true;
}
- /** Get an access token for a block. */
- Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb
- ) throws IOException {
- if (!isBlockTokenEnabled) {
- return BlockTokenSecretManager.DUMMY_TOKEN;
- } else {
- if (!shouldRun) {
- throw new IOException(
- "Can not get access token. BlockKeyUpdater is not running");
- }
- return blockTokenSecretManager.generateToken(null, eb,
- EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE,
- BlockTokenSecretManager.AccessMode.COPY));
- }
- }
-
- @Override
- public DataEncryptionKey newDataEncryptionKey() {
- if (encryptDataTransfer) {
- synchronized (this) {
- if (encryptionKey == null) {
- encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
- }
- return encryptionKey;
- }
- } else {
- return null;
- }
- }
- /* The idea for making sure that there is no more than one balancer
+ /**
+ * The idea for making sure that there is no more than one instance
* running in an HDFS is to create a file in the HDFS, writes the hostname
- * of the machine on which the balancer is running to the file, but did not
- * close the file until the balancer exits.
- * This prevents the second balancer from running because it can not
+ * of the machine on which the instance is running to the file, but did not
+ * close the file until it exits.
+ *
+ * This prevents the second instance from running because it can not
* creates the file while the first one is running.
*
- * This method checks if there is any running balancer and
- * if no, mark yes if no.
+ * This method checks if there is any running instance. If no, mark yes.
* Note that this is an atomic operation.
*
- * Return null if there is a running balancer; otherwise the output stream
- * to the newly created file.
+ * @return null if there is a running instance;
+ * otherwise, the output stream to the newly created file.
*/
- private OutputStream checkAndMarkRunningBalancer() throws IOException {
+ private OutputStream checkAndMarkRunning() throws IOException {
try {
- final DataOutputStream out = fs.create(BALANCER_ID_PATH);
+ final DataOutputStream out = fs.create(idPath);
out.writeBytes(InetAddress.getLocalHost().getHostName());
out.flush();
return out;
@@ -196,24 +157,17 @@ class NameNodeConnector implements DataE
}
}
- /** Close the connection. */
- void close() {
- shouldRun = false;
- try {
- if (keyupdaterthread != null) {
- keyupdaterthread.interrupt();
- }
- } catch(Exception e) {
- LOG.warn("Exception shutting down access key updater thread", e);
- }
+ @Override
+ public void close() {
+ keyManager.close();
// close the output file
IOUtils.closeStream(out);
if (fs != null) {
try {
- fs.delete(BALANCER_ID_PATH, true);
+ fs.delete(idPath, true);
} catch(IOException ioe) {
- LOG.warn("Failed to delete " + BALANCER_ID_PATH, ioe);
+ LOG.warn("Failed to delete " + idPath, ioe);
}
}
}
@@ -221,31 +175,6 @@ class NameNodeConnector implements DataE
@Override
public String toString() {
return getClass().getSimpleName() + "[namenodeUri=" + nameNodeUri
- + ", id=" + blockpoolID
- + "]";
- }
-
- /**
- * Periodically updates access keys.
- */
- class BlockKeyUpdater implements Runnable {
- @Override
- public void run() {
- try {
- while (shouldRun) {
- try {
- blockTokenSecretManager.addKeys(namenode.getBlockKeys());
- } catch (IOException e) {
- LOG.error("Failed to set keys", e);
- }
- Thread.sleep(keyUpdaterInterval);
- }
- } catch (InterruptedException e) {
- LOG.debug("InterruptedException in block key updater thread", e);
- } catch (Throwable e) {
- LOG.error("Exception in block key updater thread", e);
- shouldRun = false;
- }
- }
+ + ", bpid=" + blockpoolID + "]";
}
}
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Wed Aug 20 01:34:29 2014
@@ -21,7 +21,6 @@ import java.util.LinkedList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.util.LightWeightGSet;
@@ -195,24 +194,12 @@ public class BlockInfo extends Block imp
* Add a {@link DatanodeStorageInfo} location for a block
*/
boolean addStorage(DatanodeStorageInfo storage) {
- boolean added = true;
- int idx = findDatanode(storage.getDatanodeDescriptor());
- if(idx >= 0) {
- if (getStorageInfo(idx) == storage) { // the storage is already there
- return false;
- } else {
- // The block is on the DN but belongs to a different storage.
- // Update our state.
- removeStorage(getStorageInfo(idx));
- added = false; // Just updating storage. Return false.
- }
- }
// find the last null node
int lastNode = ensureCapacity(1);
setStorageInfo(lastNode, storage);
setNext(lastNode, null);
setPrevious(lastNode, null);
- return added;
+ return true;
}
/**
@@ -241,31 +228,33 @@ public class BlockInfo extends Block imp
* Find specified DatanodeDescriptor.
* @return index or -1 if not found.
*/
- int findDatanode(DatanodeDescriptor dn) {
+ boolean findDatanode(DatanodeDescriptor dn) {
int len = getCapacity();
for(int idx = 0; idx < len; idx++) {
DatanodeDescriptor cur = getDatanode(idx);
- if(cur == dn)
- return idx;
- if(cur == null)
+ if(cur == dn) {
+ return true;
+ }
+ if(cur == null) {
break;
+ }
}
- return -1;
+ return false;
}
/**
* Find specified DatanodeStorageInfo.
- * @return index or -1 if not found.
+ * @return DatanodeStorageInfo or null if not found.
*/
- int findStorageInfo(DatanodeInfo dn) {
+ DatanodeStorageInfo findStorageInfo(DatanodeDescriptor dn) {
int len = getCapacity();
for(int idx = 0; idx < len; idx++) {
DatanodeStorageInfo cur = getStorageInfo(idx);
if(cur == null)
break;
if(cur.getDatanodeDescriptor() == dn)
- return idx;
+ return cur;
}
- return -1;
+ return null;
}
/**
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java Wed Aug 20 01:34:29 2014
@@ -373,12 +373,14 @@ public class BlockInfoUnderConstruction
sb.append("{blockUCState=").append(blockUCState)
.append(", primaryNodeIndex=").append(primaryNodeIndex)
.append(", replicas=[");
- Iterator<ReplicaUnderConstruction> iter = replicas.iterator();
- if (iter.hasNext()) {
- iter.next().appendStringTo(sb);
- while (iter.hasNext()) {
- sb.append(", ");
+ if (replicas != null) {
+ Iterator<ReplicaUnderConstruction> iter = replicas.iterator();
+ if (iter.hasNext()) {
iter.next().appendStringTo(sb);
+ while (iter.hasNext()) {
+ sb.append(", ");
+ iter.next().appendStringTo(sb);
+ }
}
}
sb.append("]}");