You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ji...@apache.org on 2014/08/08 23:33:57 UTC
svn commit: r1616889 [1/2] - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/server/balancer/
src/test/java/org/apache/hadoop/hdfs/server/balancer/
Author: jing9
Date: Fri Aug 8 21:33:57 2014
New Revision: 1616889
URL: http://svn.apache.org/r1616889
Log:
HDFS-6828. Separate block replica dispatching from Balancer. Contributed by Tsz Wo Nicholas Sze.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1616889&r1=1616888&r2=1616889&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Aug 8 21:33:57 2014
@@ -384,6 +384,9 @@ Release 2.6.0 - UNRELEASED
HDFS-573. Porting libhdfs to Windows. (cnauroth)
+ HDFS-6828. Separate block replica dispatching from Balancer. (szetszwo via
+ jing9)
+
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1616889&r1=1616888&r2=1616889&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Aug 8 21:33:57 2014
@@ -18,19 +18,9 @@
package org.apache.hadoop.hdfs.server.balancer;
import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
-import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.io.PrintStream;
-import java.net.Socket;
import java.net.URI;
import java.text.DateFormat;
import java.util.ArrayList;
@@ -38,20 +28,11 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
-import java.util.EnumMap;
import java.util.Formatter;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -63,31 +44,15 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
-import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.BalancerDatanode;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Task;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Util;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
-import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
@@ -200,15 +165,7 @@ public class Balancer {
private static final long GB = 1L << 30; //1GB
private static final long MAX_SIZE_TO_MOVE = 10*GB;
- private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB;
- /** The maximum number of concurrent blocks moves for
- * balancing purpose at a datanode
- */
- private static final int MAX_NO_PENDING_BLOCK_ITERATIONS = 5;
- public static final long DELAY_AFTER_ERROR = 10 * 1000L; //10 seconds
- public static final int BLOCK_MOVE_READ_TIMEOUT=20*60*1000; // 20 minutes
-
private static final String USAGE = "Usage: java "
+ Balancer.class.getSimpleName()
+ "\n\t[-policy <policy>]\tthe balancing policy: "
@@ -220,16 +177,9 @@ public class Balancer {
+ "\n\t[-include [-f <hosts-file> | comma-sperated list of hosts]]"
+ "\tIncludes only the specified datanodes.";
- private final NameNodeConnector nnc;
- private final KeyManager keyManager;
-
+ private final Dispatcher dispatcher;
private final BalancingPolicy policy;
- private final SaslDataTransferClient saslClient;
private final double threshold;
- // set of data nodes to be excluded from balancing operations.
- Set<String> nodesToBeExcluded;
- //Restrict balancing to the following nodes.
- Set<String> nodesToBeIncluded;
// all data node lists
private final Collection<Source> overUtilized = new LinkedList<Source>();
@@ -238,634 +188,6 @@ public class Balancer {
= new LinkedList<BalancerDatanode.StorageGroup>();
private final Collection<BalancerDatanode.StorageGroup> underUtilized
= new LinkedList<BalancerDatanode.StorageGroup>();
-
- private final Collection<Source> sources = new HashSet<Source>();
- private final Collection<BalancerDatanode.StorageGroup> targets
- = new HashSet<BalancerDatanode.StorageGroup>();
-
- private final Map<Block, BalancerBlock> globalBlockList
- = new HashMap<Block, BalancerBlock>();
- private final MovedBlocks<BalancerDatanode.StorageGroup> movedBlocks;
-
- /** Map (datanodeUuid,storageType -> StorageGroup) */
- private final StorageGroupMap storageGroupMap = new StorageGroupMap();
-
- private NetworkTopology cluster;
-
- private final ExecutorService moverExecutor;
- private final ExecutorService dispatcherExecutor;
- private final int maxConcurrentMovesPerNode;
-
-
- private static class StorageGroupMap {
- private static String toKey(String datanodeUuid, StorageType storageType) {
- return datanodeUuid + ":" + storageType;
- }
-
- private final Map<String, BalancerDatanode.StorageGroup> map
- = new HashMap<String, BalancerDatanode.StorageGroup>();
-
- BalancerDatanode.StorageGroup get(String datanodeUuid, StorageType storageType) {
- return map.get(toKey(datanodeUuid, storageType));
- }
-
- void put(BalancerDatanode.StorageGroup g) {
- final String key = toKey(g.getDatanode().getDatanodeUuid(), g.storageType);
- final BalancerDatanode.StorageGroup existing = map.put(key, g);
- Preconditions.checkState(existing == null);
- }
-
- int size() {
- return map.size();
- }
-
- void clear() {
- map.clear();
- }
- }
- /* This class keeps track of a scheduled block move */
- private class PendingBlockMove {
- private BalancerBlock block;
- private Source source;
- private BalancerDatanode proxySource;
- private BalancerDatanode.StorageGroup target;
-
- /** constructor */
- private PendingBlockMove() {
- }
-
- @Override
- public String toString() {
- final Block b = block.getBlock();
- return b + " with size=" + b.getNumBytes() + " from "
- + source.getDisplayName() + " to " + target.getDisplayName()
- + " through " + proxySource.datanode;
- }
-
- /* choose a block & a proxy source for this pendingMove
- * whose source & target have already been chosen.
- *
- * Return true if a block and its proxy are chosen; false otherwise
- */
- private boolean chooseBlockAndProxy() {
- // iterate all source's blocks until find a good one
- for (Iterator<BalancerBlock> blocks=
- source.getBlockIterator(); blocks.hasNext();) {
- if (markMovedIfGoodBlock(blocks.next())) {
- blocks.remove();
- return true;
- }
- }
- return false;
- }
-
- /* Return true if the given block is good for the tentative move;
- * If it is good, add it to the moved list to marked as "Moved".
- * A block is good if
- * 1. it is a good candidate; see isGoodBlockCandidate
- * 2. can find a proxy source that's not busy for this move
- */
- private boolean markMovedIfGoodBlock(BalancerBlock block) {
- synchronized(block) {
- synchronized(movedBlocks) {
- if (isGoodBlockCandidate(source, target, block)) {
- this.block = block;
- if ( chooseProxySource() ) {
- movedBlocks.put(block);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Decided to move " + this);
- }
- return true;
- }
- }
- }
- }
- return false;
- }
-
- /* Now we find out source, target, and block, we need to find a proxy
- *
- * @return true if a proxy is found; otherwise false
- */
- private boolean chooseProxySource() {
- final DatanodeInfo targetDN = target.getDatanode();
- // if node group is supported, first try add nodes in the same node group
- if (cluster.isNodeGroupAware()) {
- for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
- if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN) && addTo(loc)) {
- return true;
- }
- }
- }
- // check if there is replica which is on the same rack with the target
- for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
- if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
- return true;
- }
- }
- // find out a non-busy replica
- for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
- if (addTo(loc)) {
- return true;
- }
- }
- return false;
- }
-
- /** add to a proxy source for specific block movement */
- private boolean addTo(BalancerDatanode.StorageGroup g) {
- final BalancerDatanode bdn = g.getBalancerDatanode();
- if (bdn.addPendingBlock(this)) {
- proxySource = bdn;
- return true;
- }
- return false;
- }
-
- /* Dispatch the block move task to the proxy source & wait for the response
- */
- private void dispatch() {
- Socket sock = new Socket();
- DataOutputStream out = null;
- DataInputStream in = null;
- try {
- sock.connect(
- NetUtils.createSocketAddr(target.getDatanode().getXferAddr()),
- HdfsServerConstants.READ_TIMEOUT);
- /* Unfortunately we don't have a good way to know if the Datanode is
- * taking a really long time to move a block, OR something has
- * gone wrong and it's never going to finish. To deal with this
- * scenario, we set a long timeout (20 minutes) to avoid hanging
- * the balancer indefinitely.
- */
- sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT);
-
- sock.setKeepAlive(true);
-
- OutputStream unbufOut = sock.getOutputStream();
- InputStream unbufIn = sock.getInputStream();
- ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), block.getBlock());
- Token<BlockTokenIdentifier> accessToken = keyManager.getAccessToken(eb);
- IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
- unbufIn, keyManager, accessToken, target.getDatanode());
- unbufOut = saslStreams.out;
- unbufIn = saslStreams.in;
- out = new DataOutputStream(new BufferedOutputStream(unbufOut,
- HdfsConstants.IO_FILE_BUFFER_SIZE));
- in = new DataInputStream(new BufferedInputStream(unbufIn,
- HdfsConstants.IO_FILE_BUFFER_SIZE));
-
- sendRequest(out, eb, StorageType.DEFAULT, accessToken);
- receiveResponse(in);
- bytesMoved.addAndGet(block.getNumBytes());
- LOG.info("Successfully moved " + this);
- } catch (IOException e) {
- LOG.warn("Failed to move " + this + ": " + e.getMessage());
- /* proxy or target may have an issue, insert a small delay
- * before using these nodes further. This avoids a potential storm
- * of "threads quota exceeded" Warnings when the balancer
- * gets out of sync with work going on in datanode.
- */
- proxySource.activateDelay(DELAY_AFTER_ERROR);
- target.getBalancerDatanode().activateDelay(DELAY_AFTER_ERROR);
- } finally {
- IOUtils.closeStream(out);
- IOUtils.closeStream(in);
- IOUtils.closeSocket(sock);
-
- proxySource.removePendingBlock(this);
- target.getBalancerDatanode().removePendingBlock(this);
-
- synchronized (this ) {
- reset();
- }
- synchronized (Balancer.this) {
- Balancer.this.notifyAll();
- }
- }
- }
-
- /* Send a block replace request to the output stream*/
- private void sendRequest(DataOutputStream out, ExtendedBlock eb,
- StorageType storageType,
- Token<BlockTokenIdentifier> accessToken) throws IOException {
- new Sender(out).replaceBlock(eb, storageType, accessToken,
- source.getDatanode().getDatanodeUuid(), proxySource.datanode);
- }
-
- /* Receive a block copy response from the input stream */
- private void receiveResponse(DataInputStream in) throws IOException {
- BlockOpResponseProto response = BlockOpResponseProto.parseFrom(
- vintPrefixed(in));
- if (response.getStatus() != Status.SUCCESS) {
- if (response.getStatus() == Status.ERROR_ACCESS_TOKEN)
- throw new IOException("block move failed due to access token error");
- throw new IOException("block move is failed: " +
- response.getMessage());
- }
- }
-
- /* reset the object */
- private void reset() {
- block = null;
- source = null;
- proxySource = null;
- target = null;
- }
-
- /* start a thread to dispatch the block move */
- private void scheduleBlockMove() {
- moverExecutor.execute(new Runnable() {
- @Override
- public void run() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Start moving " + PendingBlockMove.this);
- }
- dispatch();
- }
- });
- }
- }
-
- /* A class for keeping track of blocks in the Balancer */
- static class BalancerBlock extends MovedBlocks.Locations<BalancerDatanode.StorageGroup> {
- BalancerBlock(Block block) {
- super(block);
- }
- }
-
- /* The class represents a desired move of bytes between two nodes
- * and the target.
- * An object of this class is stored in a source.
- */
- static private class Task {
- private final BalancerDatanode.StorageGroup target;
- private long size; //bytes scheduled to move
-
- /* constructor */
- private Task(BalancerDatanode.StorageGroup target, long size) {
- this.target = target;
- this.size = size;
- }
- }
-
-
- /* A class that keeps track of a datanode in Balancer */
- private static class BalancerDatanode {
-
- /** A group of storages in a datanode with the same storage type. */
- private class StorageGroup {
- final StorageType storageType;
- final double utilization;
- final long maxSize2Move;
- private long scheduledSize = 0L;
-
- private StorageGroup(StorageType storageType, double utilization,
- long maxSize2Move) {
- this.storageType = storageType;
- this.utilization = utilization;
- this.maxSize2Move = maxSize2Move;
- }
-
- BalancerDatanode getBalancerDatanode() {
- return BalancerDatanode.this;
- }
-
- DatanodeInfo getDatanode() {
- return BalancerDatanode.this.datanode;
- }
-
- /** Decide if still need to move more bytes */
- protected synchronized boolean hasSpaceForScheduling() {
- return availableSizeToMove() > 0L;
- }
-
- /** @return the total number of bytes that need to be moved */
- synchronized long availableSizeToMove() {
- return maxSize2Move - scheduledSize;
- }
-
- /** increment scheduled size */
- synchronized void incScheduledSize(long size) {
- scheduledSize += size;
- }
-
- /** @return scheduled size */
- synchronized long getScheduledSize() {
- return scheduledSize;
- }
-
- /** Reset scheduled size to zero. */
- synchronized void resetScheduledSize() {
- scheduledSize = 0L;
- }
-
- /** @return the name for display */
- String getDisplayName() {
- return datanode + ":" + storageType;
- }
-
- @Override
- public String toString() {
- return "" + utilization;
- }
- }
-
- final DatanodeInfo datanode;
- final EnumMap<StorageType, StorageGroup> storageMap
- = new EnumMap<StorageType, StorageGroup>(StorageType.class);
- protected long delayUntil = 0L;
- // blocks being moved but not confirmed yet
- private final List<PendingBlockMove> pendingBlocks;
- private final int maxConcurrentMoves;
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + ":" + datanode + ":" + storageMap;
- }
-
- /* Constructor
- * Depending on avgutil & threshold, calculate maximum bytes to move
- */
- private BalancerDatanode(DatanodeStorageReport report,
- double threshold, int maxConcurrentMoves) {
- this.datanode = report.getDatanodeInfo();
- this.maxConcurrentMoves = maxConcurrentMoves;
- this.pendingBlocks = new ArrayList<PendingBlockMove>(maxConcurrentMoves);
- }
-
- private void put(StorageType storageType, StorageGroup g) {
- final StorageGroup existing = storageMap.put(storageType, g);
- Preconditions.checkState(existing == null);
- }
-
- StorageGroup addStorageGroup(StorageType storageType, double utilization,
- long maxSize2Move) {
- final StorageGroup g = new StorageGroup(storageType, utilization,
- maxSize2Move);
- put(storageType, g);
- return g;
- }
-
- Source addSource(StorageType storageType, double utilization,
- long maxSize2Move, Balancer balancer) {
- final Source s = balancer.new Source(storageType, utilization,
- maxSize2Move, this);
- put(storageType, s);
- return s;
- }
-
- synchronized private void activateDelay(long delta) {
- delayUntil = Time.now() + delta;
- }
-
- synchronized private boolean isDelayActive() {
- if (delayUntil == 0 || Time.now() > delayUntil){
- delayUntil = 0;
- return false;
- }
- return true;
- }
-
- /* Check if the node can schedule more blocks to move */
- synchronized private boolean isPendingQNotFull() {
- if ( pendingBlocks.size() < this.maxConcurrentMoves ) {
- return true;
- }
- return false;
- }
-
- /* Check if all the dispatched moves are done */
- synchronized private boolean isPendingQEmpty() {
- return pendingBlocks.isEmpty();
- }
-
- /* Add a scheduled block move to the node */
- private synchronized boolean addPendingBlock(
- PendingBlockMove pendingBlock) {
- if (!isDelayActive() && isPendingQNotFull()) {
- return pendingBlocks.add(pendingBlock);
- }
- return false;
- }
-
- /* Remove a scheduled block move from the node */
- private synchronized boolean removePendingBlock(
- PendingBlockMove pendingBlock) {
- return pendingBlocks.remove(pendingBlock);
- }
- }
-
- /** A node that can be the sources of a block move */
- private class Source extends BalancerDatanode.StorageGroup {
-
- /* A thread that initiates a block move
- * and waits for block move to complete */
- private class BlockMoveDispatcher implements Runnable {
- @Override
- public void run() {
- dispatchBlocks();
- }
- }
-
- private final List<Task> tasks = new ArrayList<Task>(2);
- private long blocksToReceive = 0L;
- /* source blocks point to balancerBlocks in the global list because
- * we want to keep one copy of a block in balancer and be aware that
- * the locations are changing over time.
- */
- private final List<BalancerBlock> srcBlockList
- = new ArrayList<BalancerBlock>();
-
- /* constructor */
- private Source(StorageType storageType, double utilization,
- long maxSize2Move, BalancerDatanode dn) {
- dn.super(storageType, utilization, maxSize2Move);
- }
-
- /** Add a task */
- private void addTask(Task task) {
- Preconditions.checkState(task.target != this,
- "Source and target are the same storage group " + getDisplayName());
- incScheduledSize(task.size);
- tasks.add(task);
- }
-
- /* Return an iterator to this source's blocks */
- private Iterator<BalancerBlock> getBlockIterator() {
- return srcBlockList.iterator();
- }
-
- /* fetch new blocks of this source from namenode and
- * update this source's block list & the global block list
- * Return the total size of the received blocks in the number of bytes.
- */
- private long getBlockList() throws IOException {
- final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
- final BlockWithLocations[] newBlocks = nnc.getNamenode().getBlocks(
- getDatanode(), size).getBlocks();
-
- long bytesReceived = 0;
- for (BlockWithLocations blk : newBlocks) {
- bytesReceived += blk.getBlock().getNumBytes();
- BalancerBlock block;
- synchronized(globalBlockList) {
- block = globalBlockList.get(blk.getBlock());
- if (block==null) {
- block = new BalancerBlock(blk.getBlock());
- globalBlockList.put(blk.getBlock(), block);
- } else {
- block.clearLocations();
- }
-
- synchronized (block) {
- // update locations
- final String[] datanodeUuids = blk.getDatanodeUuids();
- final StorageType[] storageTypes = blk.getStorageTypes();
- for (int i = 0; i < datanodeUuids.length; i++) {
- final BalancerDatanode.StorageGroup g = storageGroupMap.get(
- datanodeUuids[i], storageTypes[i]);
- if (g != null) { // not unknown
- block.addLocation(g);
- }
- }
- }
- if (!srcBlockList.contains(block) && isGoodBlockCandidate(block)) {
- // filter bad candidates
- srcBlockList.add(block);
- }
- }
- }
- return bytesReceived;
- }
-
- /* Decide if the given block is a good candidate to move or not */
- private boolean isGoodBlockCandidate(BalancerBlock block) {
- for (Task t : tasks) {
- if (Balancer.this.isGoodBlockCandidate(this, t.target, block)) {
- return true;
- }
- }
- return false;
- }
-
- /* Return a block that's good for the source thread to dispatch immediately
- * The block's source, target, and proxy source are determined too.
- * When choosing proxy and target, source & target throttling
- * has been considered. They are chosen only when they have the capacity
- * to support this block move.
- * The block should be dispatched immediately after this method is returned.
- */
- private PendingBlockMove chooseNextBlockToMove() {
- for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
- final Task task = i.next();
- final BalancerDatanode target = task.target.getBalancerDatanode();
- PendingBlockMove pendingBlock = new PendingBlockMove();
- if (target.addPendingBlock(pendingBlock)) {
- // target is not busy, so do a tentative block allocation
- pendingBlock.source = this;
- pendingBlock.target = task.target;
- if ( pendingBlock.chooseBlockAndProxy() ) {
- long blockSize = pendingBlock.block.getNumBytes();
- incScheduledSize(-blockSize);
- task.size -= blockSize;
- if (task.size == 0) {
- i.remove();
- }
- return pendingBlock;
- } else {
- // cancel the tentative move
- target.removePendingBlock(pendingBlock);
- }
- }
- }
- return null;
- }
-
- /* iterate all source's blocks to remove moved ones */
- private void filterMovedBlocks() {
- for (Iterator<BalancerBlock> blocks=getBlockIterator();
- blocks.hasNext();) {
- if (movedBlocks.contains(blocks.next().getBlock())) {
- blocks.remove();
- }
- }
- }
-
- private static final int SOURCE_BLOCK_LIST_MIN_SIZE=5;
- /* Return if should fetch more blocks from namenode */
- private boolean shouldFetchMoreBlocks() {
- return srcBlockList.size()<SOURCE_BLOCK_LIST_MIN_SIZE &&
- blocksToReceive>0;
- }
-
- /* This method iteratively does the following:
- * it first selects a block to move,
- * then sends a request to the proxy source to start the block move
- * when the source's block list falls below a threshold, it asks
- * the namenode for more blocks.
- * It terminates when it has dispatch enough block move tasks or
- * it has received enough blocks from the namenode, or
- * the elapsed time of the iteration has exceeded the max time limit.
- */
- private static final long MAX_ITERATION_TIME = 20*60*1000L; //20 mins
- private void dispatchBlocks() {
- long startTime = Time.now();
- long scheduledSize = getScheduledSize();
- this.blocksToReceive = 2*scheduledSize;
- boolean isTimeUp = false;
- int noPendingBlockIteration = 0;
- while(!isTimeUp && getScheduledSize()>0 &&
- (!srcBlockList.isEmpty() || blocksToReceive>0)) {
- PendingBlockMove pendingBlock = chooseNextBlockToMove();
- if (pendingBlock != null) {
- // move the block
- pendingBlock.scheduleBlockMove();
- continue;
- }
-
- /* Since we can not schedule any block to move,
- * filter any moved blocks from the source block list and
- * check if we should fetch more blocks from the namenode
- */
- filterMovedBlocks(); // filter already moved blocks
- if (shouldFetchMoreBlocks()) {
- // fetch new blocks
- try {
- blocksToReceive -= getBlockList();
- continue;
- } catch (IOException e) {
- LOG.warn("Exception while getting block list", e);
- return;
- }
- } else {
- // source node cannot find a pendingBlockToMove, iteration +1
- noPendingBlockIteration++;
- // in case no blocks can be moved for source node's task,
- // jump out of while-loop after 5 iterations.
- if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) {
- resetScheduledSize();
- }
- }
-
- // check if time is up or not
- if (Time.now()-startTime > MAX_ITERATION_TIME) {
- isTimeUp = true;
- continue;
- }
-
- /* Now we can not schedule any block to move and there are
- * no new blocks added to the source block list, so we wait.
- */
- try {
- synchronized(Balancer.this) {
- Balancer.this.wait(1000); // wait for targets/sources to be idle
- }
- } catch (InterruptedException ignored) {
- }
- }
- }
- }
/* Check that this Balancer is compatible with the Block Placement Policy
* used by the Namenode.
@@ -887,38 +209,12 @@ public class Balancer {
* when connection fails.
*/
Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
+ this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
+ p.nodesToBeExcluded, conf);
this.threshold = p.threshold;
this.policy = p.policy;
- this.nodesToBeExcluded = p.nodesToBeExcluded;
- this.nodesToBeIncluded = p.nodesToBeIncluded;
- this.nnc = theblockpool;
- this.keyManager = nnc.getKeyManager();
-
- final long movedWinWidth = conf.getLong(
- DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
- DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
- movedBlocks = new MovedBlocks<BalancerDatanode.StorageGroup>(movedWinWidth);
-
- cluster = NetworkTopology.getInstance(conf);
-
- this.moverExecutor = Executors.newFixedThreadPool(
- conf.getInt(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
- DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT));
- this.dispatcherExecutor = Executors.newFixedThreadPool(
- conf.getInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
- DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT));
- this.maxConcurrentMovesPerNode =
- conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
- DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
- this.saslClient = new SaslDataTransferClient(
- DataTransferSaslUtil.getSaslPropertiesResolver(conf),
- TrustedChannelResolver.getInstance(conf),
- conf.getBoolean(
- IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
- IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
}
-
private static long getCapacity(DatanodeStorageReport report, StorageType t) {
long capacity = 0L;
for(StorageReport r : report.getStorageReports()) {
@@ -939,26 +235,6 @@ public class Balancer {
return remaining;
}
- private boolean shouldIgnore(DatanodeInfo dn) {
- //ignore decommissioned nodes
- final boolean decommissioned = dn.isDecommissioned();
- //ignore decommissioning nodes
- final boolean decommissioning = dn.isDecommissionInProgress();
- // ignore nodes in exclude list
- final boolean excluded = Util.shouldBeExcluded(nodesToBeExcluded, dn);
- // ignore nodes not in the include list (if include list is not empty)
- final boolean notIncluded = !Util.shouldBeIncluded(nodesToBeIncluded, dn);
-
- if (decommissioned || decommissioning || excluded || notIncluded) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", "
- + decommissioning + ", " + excluded + ", " + notIncluded);
- }
- return true;
- }
- return false;
- }
-
/**
* Given a datanode storage set, build a network topology and decide
* over-utilized storages, above average utilized storages,
@@ -966,16 +242,11 @@ public class Balancer {
* The input datanode storage set is shuffled in order to randomize
* to the storage matching later on.
*
- * @return the total number of bytes that are
- * needed to move to make the cluster balanced.
- * @param reports a set of datanode storage reports
+ * @return the number of bytes needed to move in order to balance the cluster.
*/
- private long init(DatanodeStorageReport[] reports) {
+ private long init(List<DatanodeStorageReport> reports) {
// compute average utilization
for (DatanodeStorageReport r : reports) {
- if (shouldIgnore(r.getDatanodeInfo())) {
- continue;
- }
policy.accumulateSpaces(r);
}
policy.initAvgUtilization();
@@ -983,15 +254,8 @@ public class Balancer {
// create network topology and classify utilization collections:
// over-utilized, above-average, below-average and under-utilized.
long overLoadedBytes = 0L, underLoadedBytes = 0L;
- for(DatanodeStorageReport r : DFSUtil.shuffle(reports)) {
- final DatanodeInfo datanode = r.getDatanodeInfo();
- if (shouldIgnore(datanode)) {
- continue; // ignore decommissioning or decommissioned nodes
- }
- cluster.add(datanode);
-
- final BalancerDatanode dn = new BalancerDatanode(r, underLoadedBytes,
- maxConcurrentMovesPerNode);
+ for(DatanodeStorageReport r : reports) {
+ final BalancerDatanode dn = dispatcher.newDatanode(r);
for(StorageType t : StorageType.asList()) {
final Double utilization = policy.getUtilization(r, t);
if (utilization == null) { // datanode does not have such storage type
@@ -1006,7 +270,7 @@ public class Balancer {
final BalancerDatanode.StorageGroup g;
if (utilizationDiff > 0) {
- final Source s = dn.addSource(t, utilization, maxSize2Move, this);
+ final Source s = dn.addSource(t, utilization, maxSize2Move, dispatcher);
if (thresholdDiff <= 0) { // within threshold
aboveAvgUtilized.add(s);
} else {
@@ -1023,14 +287,15 @@ public class Balancer {
underUtilized.add(g);
}
}
- storageGroupMap.put(g);
+ dispatcher.getStorageGroupMap().put(g);
}
}
logUtilizationCollections();
- Preconditions.checkState(storageGroupMap.size() == overUtilized.size()
- + underUtilized.size() + aboveAvgUtilized.size() + belowAvgUtilized.size(),
+ Preconditions.checkState(dispatcher.getStorageGroupMap().size()
+ == overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size()
+ + belowAvgUtilized.size(),
"Mismatched number of storage groups");
// return number of bytes to be moved in order to make the cluster balanced
@@ -1077,7 +342,7 @@ public class Balancer {
*/
private long chooseStorageGroups() {
// First, match nodes on the same node group if cluster is node group aware
- if (cluster.isNodeGroupAware()) {
+ if (dispatcher.getCluster().isNodeGroupAware()) {
chooseStorageGroups(Matcher.SAME_NODE_GROUP);
}
@@ -1086,15 +351,7 @@ public class Balancer {
// At last, match all remaining nodes
chooseStorageGroups(Matcher.ANY_OTHER);
- Preconditions.checkState(storageGroupMap.size() >= sources.size() + targets.size(),
- "Mismatched number of datanodes (" + storageGroupMap.size() + " < "
- + sources.size() + " sources, " + targets.size() + " targets)");
-
- long bytesToMove = 0L;
- for (Source src : sources) {
- bytesToMove += src.getScheduledSize();
- }
- return bytesToMove;
+ return dispatcher.bytesToMove();
}
/** Decide all <source, target> pairs according to the matcher. */
@@ -1166,9 +423,8 @@ public class Balancer {
long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
final Task task = new Task(target, size);
source.addTask(task);
- target.incScheduledSize(task.size);
- sources.add(source);
- targets.add(target);
+ target.incScheduledSize(task.getSize());
+ dispatcher.add(source, target);
LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
+ source.getDisplayName() + " to " + target.getDisplayName());
}
@@ -1182,7 +438,8 @@ public class Balancer {
final C c = candidates.next();
if (!c.hasSpaceForScheduling()) {
candidates.remove();
- } else if (matcher.match(cluster, g.getDatanode(), c.getDatanode())) {
+ } else if (matcher.match(dispatcher.getCluster(),
+ g.getDatanode(), c.getDatanode())) {
return c;
}
}
@@ -1190,172 +447,16 @@ public class Balancer {
return null;
}
- private final AtomicLong bytesMoved = new AtomicLong();
-
- /* Start a thread to dispatch block moves for each source.
- * The thread selects blocks to move & sends request to proxy source to
- * initiate block move. The process is flow controlled. Block selection is
- * blocked if there are too many un-confirmed block moves.
- * Return the total number of bytes successfully moved in this iteration.
- */
- private long dispatchBlockMoves() throws InterruptedException {
- long bytesLastMoved = bytesMoved.get();
- Future<?>[] futures = new Future<?>[sources.size()];
- int i=0;
- for (Source source : sources) {
- futures[i++] = dispatcherExecutor.submit(source.new BlockMoveDispatcher());
- }
-
- // wait for all dispatcher threads to finish
- for (Future<?> future : futures) {
- try {
- future.get();
- } catch (ExecutionException e) {
- LOG.warn("Dispatcher thread failed", e.getCause());
- }
- }
-
- // wait for all block moving to be done
- waitForMoveCompletion();
-
- return bytesMoved.get()-bytesLastMoved;
- }
-
- // The sleeping period before checking if block move is completed again
- static private long blockMoveWaitTime = 30000L;
-
- /** set the sleeping period for block move completion check */
- static void setBlockMoveWaitTime(long time) {
- blockMoveWaitTime = time;
- }
-
- /* wait for all block move confirmations
- * by checking each target's pendingMove queue
- */
- private void waitForMoveCompletion() {
- boolean shouldWait;
- do {
- shouldWait = false;
- for (BalancerDatanode.StorageGroup target : targets) {
- if (!target.getBalancerDatanode().isPendingQEmpty()) {
- shouldWait = true;
- break;
- }
- }
- if (shouldWait) {
- try {
- Thread.sleep(blockMoveWaitTime);
- } catch (InterruptedException ignored) {
- }
- }
- } while (shouldWait);
- }
-
- /* Decide if it is OK to move the given block from source to target
- * A block is a good candidate if
- * 1. the block is not in the process of being moved/has not been moved;
- * 2. the block does not have a replica on the target;
- * 3. doing the move does not reduce the number of racks that the block has
- */
- private boolean isGoodBlockCandidate(Source source,
- BalancerDatanode.StorageGroup target, BalancerBlock block) {
- if (source.storageType != target.storageType) {
- return false;
- }
- // check if the block is moved or not
- if (movedBlocks.contains(block.getBlock())) {
- return false;
- }
- if (block.isLocatedOn(target)) {
- return false;
- }
- if (cluster.isNodeGroupAware() &&
- isOnSameNodeGroupWithReplicas(target, block, source)) {
- return false;
- }
-
- boolean goodBlock = false;
- if (cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
- // good if source and target are on the same rack
- goodBlock = true;
- } else {
- boolean notOnSameRack = true;
- synchronized (block) {
- for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
- if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
- notOnSameRack = false;
- break;
- }
- }
- }
- if (notOnSameRack) {
- // good if target is target is not on the same rack as any replica
- goodBlock = true;
- } else {
- // good if source is on the same rack as on of the replicas
- for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
- if (loc != source &&
- cluster.isOnSameRack(loc.getDatanode(), source.getDatanode())) {
- goodBlock = true;
- break;
- }
- }
- }
- }
- return goodBlock;
- }
-
- /**
- * Check if there are any replica (other than source) on the same node group
- * with target. If true, then target is not a good candidate for placing
- * specific block replica as we don't want 2 replicas under the same nodegroup
- * after balance.
- * @param target targetDataNode
- * @param block dataBlock
- * @param source sourceDataNode
- * @return true if there are any replica (other than source) on the same node
- * group with target
- */
- private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode.StorageGroup target,
- BalancerBlock block, Source source) {
- final DatanodeInfo targetDn = target.getDatanode();
- for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
- if (loc != source &&
- cluster.isOnSameNodeGroup(loc.getDatanode(), targetDn)) {
- return true;
- }
- }
- return false;
- }
-
/* reset all fields in a balancer preparing for the next iteration */
private void resetData(Configuration conf) {
- this.cluster = NetworkTopology.getInstance(conf);
this.overUtilized.clear();
this.aboveAvgUtilized.clear();
this.belowAvgUtilized.clear();
this.underUtilized.clear();
- this.storageGroupMap.clear();
- this.sources.clear();
- this.targets.clear();
this.policy.reset();
- cleanGlobalBlockList();
- this.movedBlocks.cleanup();
+ dispatcher.reset(conf);;
}
- /* Remove all blocks from the global block list except for the ones in the
- * moved list.
- */
- private void cleanGlobalBlockList() {
- for (Iterator<Block> globalBlockListIterator=globalBlockList.keySet().iterator();
- globalBlockListIterator.hasNext();) {
- Block block = globalBlockListIterator.next();
- if(!movedBlocks.contains(block)) {
- globalBlockListIterator.remove();
- }
- }
- }
-
// Exit status
enum ReturnStatus {
// These int values will map directly to the balancer process's exit code.
@@ -1379,11 +480,8 @@ public class Balancer {
private ReturnStatus run(int iteration, Formatter formatter,
Configuration conf) {
try {
- /* get all live datanodes of a cluster and their disk usage
- * decide the number of bytes need to be moved
- */
- final long bytesLeftToMove = init(
- nnc.getClient().getDatanodeStorageReport(DatanodeReportType.LIVE));
+ final List<DatanodeStorageReport> reports = dispatcher.init();
+ final long bytesLeftToMove = init(reports);
if (bytesLeftToMove == 0) {
System.out.println("The cluster is balanced. Exiting...");
return ReturnStatus.SUCCESS;
@@ -1409,7 +507,7 @@ public class Balancer {
formatter.format("%-24s %10d %19s %18s %17s%n",
DateFormat.getDateTimeInstance().format(new Date()),
iteration,
- StringUtils.byteDesc(bytesMoved.get()),
+ StringUtils.byteDesc(dispatcher.getBytesMoved()),
StringUtils.byteDesc(bytesLeftToMove),
StringUtils.byteDesc(bytesToMove)
);
@@ -1420,7 +518,7 @@ public class Balancer {
* available to move.
* Exit no byte has been moved for 5 consecutive iterations.
*/
- if (!this.nnc.shouldContinue(dispatchBlockMoves())) {
+ if (!dispatcher.dispatchAndCheckContinue()) {
return ReturnStatus.NO_MOVE_PROGRESS;
}
@@ -1435,9 +533,7 @@ public class Balancer {
System.out.println(e + ". Exiting ...");
return ReturnStatus.INTERRUPTED;
} finally {
- // shutdown thread pools
- dispatcherExecutor.shutdownNow();
- moverExecutor.shutdownNow();
+ dispatcher.shutdownNow();
}
}
@@ -1546,76 +642,6 @@ public class Balancer {
}
}
- static class Util {
-
- /**
- * @param datanode
- * @return returns true if data node is part of the excludedNodes.
- */
- static boolean shouldBeExcluded(Set<String> excludedNodes, DatanodeInfo datanode) {
- return isIn(excludedNodes, datanode);
- }
-
- /**
- * @param datanode
- * @return returns true if includedNodes is empty or data node is part of the includedNodes.
- */
- static boolean shouldBeIncluded(Set<String> includedNodes, DatanodeInfo datanode) {
- return (includedNodes.isEmpty() ||
- isIn(includedNodes, datanode));
- }
- /**
- * Match is checked using host name , ip address with and without port number.
- * @param datanodeSet
- * @param datanode
- * @return true if the datanode's transfer address matches the set of nodes.
- */
- private static boolean isIn(Set<String> datanodeSet, DatanodeInfo datanode) {
- return isIn(datanodeSet, datanode.getPeerHostName(), datanode.getXferPort()) ||
- isIn(datanodeSet, datanode.getIpAddr(), datanode.getXferPort()) ||
- isIn(datanodeSet, datanode.getHostName(), datanode.getXferPort());
- }
-
- /**
- * returns true if nodes contains host or host:port
- * @param nodes
- * @param host
- * @param port
- * @return
- */
- private static boolean isIn(Set<String> nodes, String host, int port) {
- if (host == null) {
- return false;
- }
- return (nodes.contains(host) || nodes.contains(host +":"+ port));
- }
-
- /**
- * parse a comma separated string to obtain set of host names
- * @param string
- * @return
- */
- static Set<String> parseHostList(String string) {
- String[] addrs = StringUtils.getTrimmedStrings(string);
- return new HashSet<String>(Arrays.asList(addrs));
- }
-
- /**
- * read set of host names from a file
- * @param fileName
- * @return
- */
- static Set<String> getHostListFromFile(String fileName) {
- Set<String> nodes = new HashSet <String> ();
- try {
- HostsFileReader.readFileToSet("nodes", fileName, nodes);
- return StringUtils.getTrimmedStrings(nodes);
- } catch (IOException e) {
- throw new IllegalArgumentException("Unable to open file: " + fileName);
- }
- }
- }
-
static class Cli extends Configured implements Tool {
/**
* Parse arguments and then run Balancer.
@@ -1688,7 +714,7 @@ public class Balancer {
checkArgument(++i < args.length,
"File containing nodes to exclude is not specified: args = "
+ Arrays.toString(args));
- nodesTobeExcluded = Util.getHostListFromFile(args[i]);
+ nodesTobeExcluded = Util.getHostListFromFile(args[i], "exclude");
} else {
nodesTobeExcluded = Util.parseHostList(args[i]);
}
@@ -1700,7 +726,7 @@ public class Balancer {
checkArgument(++i < args.length,
"File containing nodes to include is not specified: args = "
+ Arrays.toString(args));
- nodesTobeIncluded = Util.getHostListFromFile(args[i]);
+ nodesTobeIncluded = Util.getHostListFromFile(args[i], "include");
} else {
nodesTobeIncluded = Util.parseHostList(args[i]);
}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java?rev=1616889&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java Fri Aug 8 21:33:57 2014
@@ -0,0 +1,1060 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.balancer;
+
+import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.base.Preconditions;
+
+/** Dispatching block replica moves between datanodes. */
+@InterfaceAudience.Private
+public class Dispatcher {
+ static final Log LOG = LogFactory.getLog(Dispatcher.class);
+
+ private static final long GB = 1L << 30; // 1GB
+ private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2 * GB;
+
+ private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5;
+ private static final long DELAY_AFTER_ERROR = 10 * 1000L; // 10 seconds
+ private static final int BLOCK_MOVE_READ_TIMEOUT = 20 * 60 * 1000; // 20
+ // minutes
+
+ private final NameNodeConnector nnc;
+ private final KeyManager keyManager;
+ private final SaslDataTransferClient saslClient;
+
+ /** Set of datanodes to be excluded. */
+ private final Set<String> excludedNodes;
+ /** Restrict to the following nodes. */
+ private final Set<String> includedNodes;
+
+ private final Collection<Source> sources = new HashSet<Source>();
+ private final Collection<BalancerDatanode.StorageGroup> targets
+ = new HashSet<BalancerDatanode.StorageGroup>();
+
+ private final GlobalBlockMap globalBlocks = new GlobalBlockMap();
+ private final MovedBlocks<BalancerDatanode.StorageGroup> movedBlocks;
+
+ /** Map (datanodeUuid,storageType -> StorageGroup) */
+ private final StorageGroupMap storageGroupMap = new StorageGroupMap();
+
+ private NetworkTopology cluster;
+
+ private final ExecutorService moveExecutor;
+ private final ExecutorService dispatchExecutor;
+ /** The maximum number of concurrent blocks moves at a datanode */
+ private final int maxConcurrentMovesPerNode;
+
+ private final AtomicLong bytesMoved = new AtomicLong();
+
+ private static class GlobalBlockMap {
+ private final Map<Block, DBlock> map = new HashMap<Block, DBlock>();
+
+ /**
+ * Get the block from the map;
+ * if the block is not found, create a new block and put it in the map.
+ */
+ private DBlock get(Block b) {
+ DBlock block = map.get(b);
+ if (block == null) {
+ block = new DBlock(b);
+ map.put(b, block);
+ }
+ return block;
+ }
+
+ /** Remove all blocks except for the moved blocks. */
+ private void removeAllButRetain(
+ MovedBlocks<BalancerDatanode.StorageGroup> movedBlocks) {
+ for (Iterator<Block> i = map.keySet().iterator(); i.hasNext();) {
+ if (!movedBlocks.contains(i.next())) {
+ i.remove();
+ }
+ }
+ }
+ }
+
+ static class StorageGroupMap {
+ private static String toKey(String datanodeUuid, StorageType storageType) {
+ return datanodeUuid + ":" + storageType;
+ }
+
+ private final Map<String, BalancerDatanode.StorageGroup> map
+ = new HashMap<String, BalancerDatanode.StorageGroup>();
+
+ BalancerDatanode.StorageGroup get(String datanodeUuid,
+ StorageType storageType) {
+ return map.get(toKey(datanodeUuid, storageType));
+ }
+
+ void put(BalancerDatanode.StorageGroup g) {
+ final String key = toKey(g.getDatanode().getDatanodeUuid(), g.storageType);
+ final BalancerDatanode.StorageGroup existing = map.put(key, g);
+ Preconditions.checkState(existing == null);
+ }
+
+ int size() {
+ return map.size();
+ }
+
+ void clear() {
+ map.clear();
+ }
+ }
+
+ /** This class keeps track of a scheduled block move */
+ private class PendingMove {
+ private DBlock block;
+ private Source source;
+ private BalancerDatanode proxySource;
+ private BalancerDatanode.StorageGroup target;
+
+ private PendingMove() {
+ }
+
+ @Override
+ public String toString() {
+ final Block b = block.getBlock();
+ return b + " with size=" + b.getNumBytes() + " from "
+ + source.getDisplayName() + " to " + target.getDisplayName()
+ + " through " + proxySource.datanode;
+ }
+
+ /**
+ * Choose a block & a proxy source for this pendingMove whose source &
+ * target have already been chosen.
+ *
+ * @return true if a block and its proxy are chosen; false otherwise
+ */
+ private boolean chooseBlockAndProxy() {
+ // iterate all source's blocks until find a good one
+ for (Iterator<DBlock> i = source.getBlockIterator(); i.hasNext();) {
+ if (markMovedIfGoodBlock(i.next())) {
+ i.remove();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @return true if the given block is good for the tentative move.
+ */
+ private boolean markMovedIfGoodBlock(DBlock block) {
+ synchronized (block) {
+ synchronized (movedBlocks) {
+ if (isGoodBlockCandidate(source, target, block)) {
+ this.block = block;
+ if (chooseProxySource()) {
+ movedBlocks.put(block);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Decided to move " + this);
+ }
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Choose a proxy source.
+ *
+ * @return true if a proxy is found; otherwise false
+ */
+ private boolean chooseProxySource() {
+ final DatanodeInfo targetDN = target.getDatanode();
+ // if node group is supported, first try add nodes in the same node group
+ if (cluster.isNodeGroupAware()) {
+ for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
+ if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN)
+ && addTo(loc)) {
+ return true;
+ }
+ }
+ }
+ // check if there is replica which is on the same rack with the target
+ for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
+ if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
+ return true;
+ }
+ }
+ // find out a non-busy replica
+ for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
+ if (addTo(loc)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /** add to a proxy source for specific block movement */
+ private boolean addTo(BalancerDatanode.StorageGroup g) {
+ final BalancerDatanode bdn = g.getBalancerDatanode();
+ if (bdn.addPendingBlock(this)) {
+ proxySource = bdn;
+ return true;
+ }
+ return false;
+ }
+
+ /** Dispatch the move to the proxy source & wait for the response. */
+ private void dispatch() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Start moving " + this);
+ }
+
+ Socket sock = new Socket();
+ DataOutputStream out = null;
+ DataInputStream in = null;
+ try {
+ sock.connect(
+ NetUtils.createSocketAddr(target.getDatanode().getXferAddr()),
+ HdfsServerConstants.READ_TIMEOUT);
+ /*
+ * Unfortunately we don't have a good way to know if the Datanode is
+ * taking a really long time to move a block, OR something has gone
+ * wrong and it's never going to finish. To deal with this scenario, we
+ * set a long timeout (20 minutes) to avoid hanging the balancer
+ * indefinitely.
+ */
+ sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT);
+
+ sock.setKeepAlive(true);
+
+ OutputStream unbufOut = sock.getOutputStream();
+ InputStream unbufIn = sock.getInputStream();
+ ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
+ block.getBlock());
+ Token<BlockTokenIdentifier> accessToken = keyManager.getAccessToken(eb);
+ IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+ unbufIn, keyManager, accessToken, target.getDatanode());
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
+ out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+ HdfsConstants.IO_FILE_BUFFER_SIZE));
+ in = new DataInputStream(new BufferedInputStream(unbufIn,
+ HdfsConstants.IO_FILE_BUFFER_SIZE));
+
+ sendRequest(out, eb, accessToken);
+ receiveResponse(in);
+ bytesMoved.addAndGet(block.getNumBytes());
+ LOG.info("Successfully moved " + this);
+ } catch (IOException e) {
+ LOG.warn("Failed to move " + this + ": " + e.getMessage());
+ /*
+ * proxy or target may have an issue, insert a small delay before using
+ * these nodes further. This avoids a potential storm of
+ * "threads quota exceeded" Warnings when the balancer gets out of sync
+ * with work going on in datanode.
+ */
+ proxySource.activateDelay(DELAY_AFTER_ERROR);
+ target.getBalancerDatanode().activateDelay(DELAY_AFTER_ERROR);
+ } finally {
+ IOUtils.closeStream(out);
+ IOUtils.closeStream(in);
+ IOUtils.closeSocket(sock);
+
+ proxySource.removePendingBlock(this);
+ target.getBalancerDatanode().removePendingBlock(this);
+
+ synchronized (this) {
+ reset();
+ }
+ synchronized (Dispatcher.this) {
+ Dispatcher.this.notifyAll();
+ }
+ }
+ }
+
+ /** Send a block replace request to the output stream */
+ private void sendRequest(DataOutputStream out, ExtendedBlock eb,
+ Token<BlockTokenIdentifier> accessToken) throws IOException {
+ new Sender(out).replaceBlock(eb, target.storageType, accessToken, source
+ .getDatanode().getDatanodeUuid(), proxySource.datanode);
+ }
+
+ /** Receive a block copy response from the input stream */
+ private void receiveResponse(DataInputStream in) throws IOException {
+ BlockOpResponseProto response = BlockOpResponseProto
+ .parseFrom(vintPrefixed(in));
+ if (response.getStatus() != Status.SUCCESS) {
+ if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
+ throw new IOException("block move failed due to access token error");
+ }
+ throw new IOException("block move is failed: " + response.getMessage());
+ }
+ }
+
+ /** reset the object */
+ private void reset() {
+ block = null;
+ source = null;
+ proxySource = null;
+ target = null;
+ }
+ }
+
+ /** A class for keeping track of block locations in the dispatcher. */
+ private static class DBlock extends
+ MovedBlocks.Locations<BalancerDatanode.StorageGroup> {
+ DBlock(Block block) {
+ super(block);
+ }
+ }
+
+ /** The class represents a desired move. */
+ static class Task {
+ private final BalancerDatanode.StorageGroup target;
+ private long size; // bytes scheduled to move
+
+ Task(BalancerDatanode.StorageGroup target, long size) {
+ this.target = target;
+ this.size = size;
+ }
+
+ long getSize() {
+ return size;
+ }
+ }
+
+ /** A class that keeps track of a datanode. */
+ static class BalancerDatanode {
+
+ /** A group of storages in a datanode with the same storage type. */
+ class StorageGroup {
+ final StorageType storageType;
+ final double utilization;
+ final long maxSize2Move;
+ private long scheduledSize = 0L;
+
+ private StorageGroup(StorageType storageType, double utilization,
+ long maxSize2Move) {
+ this.storageType = storageType;
+ this.utilization = utilization;
+ this.maxSize2Move = maxSize2Move;
+ }
+
+ BalancerDatanode getBalancerDatanode() {
+ return BalancerDatanode.this;
+ }
+
+ DatanodeInfo getDatanode() {
+ return BalancerDatanode.this.datanode;
+ }
+
+ /** Decide if still need to move more bytes */
+ synchronized boolean hasSpaceForScheduling() {
+ return availableSizeToMove() > 0L;
+ }
+
+ /** @return the total number of bytes that need to be moved */
+ synchronized long availableSizeToMove() {
+ return maxSize2Move - scheduledSize;
+ }
+
+ /** increment scheduled size */
+ synchronized void incScheduledSize(long size) {
+ scheduledSize += size;
+ }
+
+ /** @return scheduled size */
+ synchronized long getScheduledSize() {
+ return scheduledSize;
+ }
+
+ /** Reset scheduled size to zero. */
+ synchronized void resetScheduledSize() {
+ scheduledSize = 0L;
+ }
+
+ /** @return the name for display */
+ String getDisplayName() {
+ return datanode + ":" + storageType;
+ }
+
+ @Override
+ public String toString() {
+ return "" + utilization;
+ }
+ }
+
+ final DatanodeInfo datanode;
+ final EnumMap<StorageType, StorageGroup> storageMap
+ = new EnumMap<StorageType, StorageGroup>(StorageType.class);
+ protected long delayUntil = 0L;
+ /** blocks being moved but not confirmed yet */
+ private final List<PendingMove> pendings;
+ private final int maxConcurrentMoves;
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + ":" + datanode + ":" + storageMap;
+ }
+
+ private BalancerDatanode(DatanodeStorageReport r, int maxConcurrentMoves) {
+ this.datanode = r.getDatanodeInfo();
+ this.maxConcurrentMoves = maxConcurrentMoves;
+ this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves);
+ }
+
+ private void put(StorageType storageType, StorageGroup g) {
+ final StorageGroup existing = storageMap.put(storageType, g);
+ Preconditions.checkState(existing == null);
+ }
+
+ StorageGroup addStorageGroup(StorageType storageType, double utilization,
+ long maxSize2Move) {
+ final StorageGroup g = new StorageGroup(storageType, utilization,
+ maxSize2Move);
+ put(storageType, g);
+ return g;
+ }
+
+ Source addSource(StorageType storageType, double utilization,
+ long maxSize2Move, Dispatcher balancer) {
+ final Source s = balancer.new Source(storageType, utilization,
+ maxSize2Move, this);
+ put(storageType, s);
+ return s;
+ }
+
+ synchronized private void activateDelay(long delta) {
+ delayUntil = Time.monotonicNow() + delta;
+ }
+
+ synchronized private boolean isDelayActive() {
+ if (delayUntil == 0 || Time.monotonicNow() > delayUntil) {
+ delayUntil = 0;
+ return false;
+ }
+ return true;
+ }
+
+ /** Check if the node can schedule more blocks to move */
+ synchronized boolean isPendingQNotFull() {
+ return pendings.size() < maxConcurrentMoves;
+ }
+
+ /** Check if all the dispatched moves are done */
+ synchronized boolean isPendingQEmpty() {
+ return pendings.isEmpty();
+ }
+
+ /** Add a scheduled block move to the node */
+ synchronized boolean addPendingBlock(PendingMove pendingBlock) {
+ if (!isDelayActive() && isPendingQNotFull()) {
+ return pendings.add(pendingBlock);
+ }
+ return false;
+ }
+
+ /** Remove a scheduled block move from the node */
+ synchronized boolean removePendingBlock(PendingMove pendingBlock) {
+ return pendings.remove(pendingBlock);
+ }
+ }
+
+ /** A node that can be the sources of a block move */
+ class Source extends BalancerDatanode.StorageGroup {
+
+ private final List<Task> tasks = new ArrayList<Task>(2);
+ private long blocksToReceive = 0L;
+ /**
+ * Source blocks point to the objects in {@link Dispatcher#globalBlocks}
+ * because we want to keep one copy of a block and be aware that the
+ * locations are changing over time.
+ */
+ private final List<DBlock> srcBlocks = new ArrayList<DBlock>();
+
+ private Source(StorageType storageType, double utilization,
+ long maxSize2Move, BalancerDatanode dn) {
+ dn.super(storageType, utilization, maxSize2Move);
+ }
+
+ /** Add a task */
+ void addTask(Task task) {
+ Preconditions.checkState(task.target != this,
+ "Source and target are the same storage group " + getDisplayName());
+ incScheduledSize(task.size);
+ tasks.add(task);
+ }
+
+ /** @return an iterator to this source's blocks */
+ Iterator<DBlock> getBlockIterator() {
+ return srcBlocks.iterator();
+ }
+
+ /**
+ * Fetch new blocks of this source from namenode and update this source's
+ * block list & {@link Dispatcher#globalBlocks}.
+ *
+ * @return the total size of the received blocks in the number of bytes.
+ */
+ private long getBlockList() throws IOException {
+ final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
+ final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanode(), size);
+
+ long bytesReceived = 0;
+ for (BlockWithLocations blk : newBlocks.getBlocks()) {
+ bytesReceived += blk.getBlock().getNumBytes();
+ synchronized (globalBlocks) {
+ final DBlock block = globalBlocks.get(blk.getBlock());
+ synchronized (block) {
+ block.clearLocations();
+
+ // update locations
+ final String[] datanodeUuids = blk.getDatanodeUuids();
+ final StorageType[] storageTypes = blk.getStorageTypes();
+ for (int i = 0; i < datanodeUuids.length; i++) {
+ final BalancerDatanode.StorageGroup g = storageGroupMap.get(
+ datanodeUuids[i], storageTypes[i]);
+ if (g != null) { // not unknown
+ block.addLocation(g);
+ }
+ }
+ }
+ if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) {
+ // filter bad candidates
+ srcBlocks.add(block);
+ }
+ }
+ }
+ return bytesReceived;
+ }
+
+ /** Decide if the given block is a good candidate to move or not */
+ private boolean isGoodBlockCandidate(DBlock block) {
+ for (Task t : tasks) {
+ if (Dispatcher.this.isGoodBlockCandidate(this, t.target, block)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Choose a move for the source. The block's source, target, and proxy
+ * are determined too. When choosing proxy and target, source &
+ * target throttling has been considered. They are chosen only when they
+ * have the capacity to support this block move. The block should be
+ * dispatched immediately after this method is returned.
+ *
+ * @return a move that's good for the source to dispatch immediately.
+ */
+ private PendingMove chooseNextMove() {
+ for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
+ final Task task = i.next();
+ final BalancerDatanode target = task.target.getBalancerDatanode();
+ PendingMove pendingBlock = new PendingMove();
+ if (target.addPendingBlock(pendingBlock)) {
+ // target is not busy, so do a tentative block allocation
+ pendingBlock.source = this;
+ pendingBlock.target = task.target;
+ if (pendingBlock.chooseBlockAndProxy()) {
+ long blockSize = pendingBlock.block.getNumBytes();
+ incScheduledSize(-blockSize);
+ task.size -= blockSize;
+ if (task.size == 0) {
+ i.remove();
+ }
+ return pendingBlock;
+ } else {
+ // cancel the tentative move
+ target.removePendingBlock(pendingBlock);
+ }
+ }
+ }
+ return null;
+ }
+
+ /** Iterate all source's blocks to remove moved ones */
+ private void removeMovedBlocks() {
+ for (Iterator<DBlock> i = getBlockIterator(); i.hasNext();) {
+ if (movedBlocks.contains(i.next().getBlock())) {
+ i.remove();
+ }
+ }
+ }
+
+ private static final int SOURCE_BLOCKS_MIN_SIZE = 5;
+
+ /** @return if should fetch more blocks from namenode */
+ private boolean shouldFetchMoreBlocks() {
+ return srcBlocks.size() < SOURCE_BLOCKS_MIN_SIZE && blocksToReceive > 0;
+ }
+
+ private static final long MAX_ITERATION_TIME = 20 * 60 * 1000L; // 20 mins
+
+ /**
+ * This method iteratively does the following: it first selects a block to
+ * move, then sends a request to the proxy source to start the block move
+ * when the source's block list falls below a threshold, it asks the
+ * namenode for more blocks. It terminates when it has dispatch enough block
+ * move tasks or it has received enough blocks from the namenode, or the
+ * elapsed time of the iteration has exceeded the max time limit.
+ */
+ private void dispatchBlocks() {
+ final long startTime = Time.monotonicNow();
+ this.blocksToReceive = 2 * getScheduledSize();
+ boolean isTimeUp = false;
+ int noPendingBlockIteration = 0;
+ while (!isTimeUp && getScheduledSize() > 0
+ && (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
+ final PendingMove p = chooseNextMove();
+ if (p != null) {
+ // move the block
+ moveExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ p.dispatch();
+ }
+ });
+ continue;
+ }
+
+ // Since we cannot schedule any block to move,
+ // remove any moved blocks from the source block list and
+ removeMovedBlocks(); // filter already moved blocks
+ // check if we should fetch more blocks from the namenode
+ if (shouldFetchMoreBlocks()) {
+ // fetch new blocks
+ try {
+ blocksToReceive -= getBlockList();
+ continue;
+ } catch (IOException e) {
+ LOG.warn("Exception while getting block list", e);
+ return;
+ }
+ } else {
+ // source node cannot find a pendingBlockToMove, iteration +1
+ noPendingBlockIteration++;
+ // in case no blocks can be moved for source node's task,
+ // jump out of while-loop after 5 iterations.
+ if (noPendingBlockIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) {
+ resetScheduledSize();
+ }
+ }
+
+ // check if time is up or not
+ if (Time.monotonicNow() - startTime > MAX_ITERATION_TIME) {
+ isTimeUp = true;
+ continue;
+ }
+
+ // Now we can not schedule any block to move and there are
+ // no new blocks added to the source block list, so we wait.
+ try {
+ synchronized (Dispatcher.this) {
+ Dispatcher.this.wait(1000); // wait for targets/sources to be idle
+ }
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+ }
+
+ Dispatcher(NameNodeConnector theblockpool, Set<String> includedNodes,
+ Set<String> excludedNodes, Configuration conf) {
+ this.nnc = theblockpool;
+ this.keyManager = nnc.getKeyManager();
+ this.excludedNodes = excludedNodes;
+ this.includedNodes = includedNodes;
+
+ final long movedWinWidth = conf.getLong(
+ DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
+ DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
+ movedBlocks = new MovedBlocks<BalancerDatanode.StorageGroup>(movedWinWidth);
+
+ this.cluster = NetworkTopology.getInstance(conf);
+
+ this.moveExecutor = Executors.newFixedThreadPool(conf.getInt(
+ DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
+ DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT));
+ this.dispatchExecutor = Executors.newFixedThreadPool(conf.getInt(
+ DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
+ DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT));
+ this.maxConcurrentMovesPerNode = conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+ DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
+
+ final boolean fallbackToSimpleAuthAllowed = conf.getBoolean(
+ CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+ CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
+ this.saslClient = new SaslDataTransferClient(
+ DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+ TrustedChannelResolver.getInstance(conf), fallbackToSimpleAuthAllowed);
+ }
+
+ StorageGroupMap getStorageGroupMap() {
+ return storageGroupMap;
+ }
+
+ NetworkTopology getCluster() {
+ return cluster;
+ }
+
+ long getBytesMoved() {
+ return bytesMoved.get();
+ }
+
+ long bytesToMove() {
+ Preconditions.checkState(
+ storageGroupMap.size() >= sources.size() + targets.size(),
+ "Mismatched number of storage groups (" + storageGroupMap.size()
+ + " < " + sources.size() + " sources + " + targets.size()
+ + " targets)");
+
+ long b = 0L;
+ for (Source src : sources) {
+ b += src.getScheduledSize();
+ }
+ return b;
+ }
+
+ void add(Source source, BalancerDatanode.StorageGroup target) {
+ sources.add(source);
+ targets.add(target);
+ }
+
+ private boolean shouldIgnore(DatanodeInfo dn) {
+ // ignore decommissioned nodes
+ final boolean decommissioned = dn.isDecommissioned();
+ // ignore decommissioning nodes
+ final boolean decommissioning = dn.isDecommissionInProgress();
+ // ignore nodes in exclude list
+ final boolean excluded = Util.isExcluded(excludedNodes, dn);
+ // ignore nodes not in the include list (if include list is not empty)
+ final boolean notIncluded = !Util.isIncluded(includedNodes, dn);
+
+ if (decommissioned || decommissioning || excluded || notIncluded) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", "
+ + decommissioning + ", " + excluded + ", " + notIncluded);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ /** Get live datanode storage reports and then build the network topology. */
+ List<DatanodeStorageReport> init() throws IOException {
+ final DatanodeStorageReport[] reports = nnc.getLiveDatanodeStorageReport();
+ final List<DatanodeStorageReport> trimmed = new ArrayList<DatanodeStorageReport>();
+ // create network topology and classify utilization collections:
+ // over-utilized, above-average, below-average and under-utilized.
+ for (DatanodeStorageReport r : DFSUtil.shuffle(reports)) {
+ final DatanodeInfo datanode = r.getDatanodeInfo();
+ if (shouldIgnore(datanode)) {
+ continue;
+ }
+ trimmed.add(r);
+ cluster.add(datanode);
+ }
+ return trimmed;
+ }
+
+ public BalancerDatanode newDatanode(DatanodeStorageReport r) {
+ return new BalancerDatanode(r, maxConcurrentMovesPerNode);
+ }
+
+ public boolean dispatchAndCheckContinue() throws InterruptedException {
+ return nnc.shouldContinue(dispatchBlockMoves());
+ }
+
+ /**
+ * Dispatch block moves for each source. The thread selects blocks to move &
+ * sends request to proxy source to initiate block move. The process is flow
+ * controlled. Block selection is blocked if there are too many un-confirmed
+ * block moves.
+ *
+ * @return the total number of bytes successfully moved in this iteration.
+ */
+ private long dispatchBlockMoves() throws InterruptedException {
+ final long bytesLastMoved = bytesMoved.get();
+ final Future<?>[] futures = new Future<?>[sources.size()];
+
+ final Iterator<Source> i = sources.iterator();
+ for (int j = 0; j < futures.length; j++) {
+ final Source s = i.next();
+ futures[j] = dispatchExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ s.dispatchBlocks();
+ }
+ });
+ }
+
+ // wait for all dispatcher threads to finish
+ for (Future<?> future : futures) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ LOG.warn("Dispatcher thread failed", e.getCause());
+ }
+ }
+
+ // wait for all block moving to be done
+ waitForMoveCompletion();
+
+ return bytesMoved.get() - bytesLastMoved;
+ }
+
+ /** The sleeping period before checking if block move is completed again */
+ static private long blockMoveWaitTime = 30000L;
+
+ /** set the sleeping period for block move completion check */
+ static void setBlockMoveWaitTime(long time) {
+ blockMoveWaitTime = time;
+ }
+
+ /** Wait for all block move confirmations. */
+ private void waitForMoveCompletion() {
+ for(;;) {
+ boolean empty = true;
+ for (BalancerDatanode.StorageGroup t : targets) {
+ if (!t.getBalancerDatanode().isPendingQEmpty()) {
+ empty = false;
+ break;
+ }
+ }
+ if (empty) {
+ return; //all pending queues are empty
+ }
+ try {
+ Thread.sleep(blockMoveWaitTime);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+
+ /**
+ * Decide if the block is a good candidate to be moved from source to target.
+ * A block is a good candidate if
+ * 1. the block is not in the process of being moved/has not been moved;
+ * 2. the block does not have a replica on the target;
+ * 3. doing the move does not reduce the number of racks that the block has
+ */
+ private boolean isGoodBlockCandidate(Source source,
+ BalancerDatanode.StorageGroup target, DBlock block) {
+ if (source.storageType != target.storageType) {
+ return false;
+ }
+ // check if the block is moved or not
+ if (movedBlocks.contains(block.getBlock())) {
+ return false;
+ }
+ if (block.isLocatedOn(target)) {
+ return false;
+ }
+ if (cluster.isNodeGroupAware()
+ && isOnSameNodeGroupWithReplicas(target, block, source)) {
+ return false;
+ }
+ if (reduceNumOfRacks(source, target, block)) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Determine whether moving the given block replica from source to target
+ * would reduce the number of racks of the block replicas.
+ */
+ private boolean reduceNumOfRacks(Source source,
+ BalancerDatanode.StorageGroup target, DBlock block) {
+ final DatanodeInfo sourceDn = source.getDatanode();
+ if (cluster.isOnSameRack(sourceDn, target.getDatanode())) {
+ // source and target are on the same rack
+ return false;
+ }
+ boolean notOnSameRack = true;
+ synchronized (block) {
+ for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
+ if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
+ notOnSameRack = false;
+ break;
+ }
+ }
+ }
+ if (notOnSameRack) {
+ // target is not on the same rack as any replica
+ return false;
+ }
+ for (BalancerDatanode.StorageGroup g : block.getLocations()) {
+ if (g != source && cluster.isOnSameRack(g.getDatanode(), sourceDn)) {
+ // source is on the same rack of another replica
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Check if there are any replica (other than source) on the same node group
+ * with target. If true, then target is not a good candidate for placing
+ * specific replica as we don't want 2 replicas under the same nodegroup.
+ *
+ * @return true if there are any replica (other than source) on the same node
+ * group with target
+ */
+ private boolean isOnSameNodeGroupWithReplicas(
+ BalancerDatanode.StorageGroup target, DBlock block, Source source) {
+ final DatanodeInfo targetDn = target.getDatanode();
+ for (BalancerDatanode.StorageGroup g : block.getLocations()) {
+ if (g != source && cluster.isOnSameNodeGroup(g.getDatanode(), targetDn)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /** Reset all fields in order to prepare for the next iteration */
+ void reset(Configuration conf) {
+ cluster = NetworkTopology.getInstance(conf);
+ storageGroupMap.clear();
+ sources.clear();
+ targets.clear();
+ globalBlocks.removeAllButRetain(movedBlocks);
+ movedBlocks.cleanup();
+ }
+
+ /** shutdown thread pools */
+ void shutdownNow() {
+ dispatchExecutor.shutdownNow();
+ moveExecutor.shutdownNow();
+ }
+
+ static class Util {
+ /** @return true if data node is part of the excludedNodes. */
+ static boolean isExcluded(Set<String> excludedNodes, DatanodeInfo dn) {
+ return isIn(excludedNodes, dn);
+ }
+
+ /**
+ * @return true if includedNodes is empty or data node is part of the
+ * includedNodes.
+ */
+ static boolean isIncluded(Set<String> includedNodes, DatanodeInfo dn) {
+ return (includedNodes.isEmpty() || isIn(includedNodes, dn));
+ }
+
+ /**
+ * Match is checked using host name , ip address with and without port
+ * number.
+ *
+ * @return true if the datanode's transfer address matches the set of nodes.
+ */
+ private static boolean isIn(Set<String> datanodes, DatanodeInfo dn) {
+ return isIn(datanodes, dn.getPeerHostName(), dn.getXferPort())
+ || isIn(datanodes, dn.getIpAddr(), dn.getXferPort())
+ || isIn(datanodes, dn.getHostName(), dn.getXferPort());
+ }
+
+ /** @return true if nodes contains host or host:port */
+ private static boolean isIn(Set<String> nodes, String host, int port) {
+ if (host == null) {
+ return false;
+ }
+ return (nodes.contains(host) || nodes.contains(host + ":" + port));
+ }
+
+ /**
+ * Parse a comma separated string to obtain set of host names
+ *
+ * @return set of host names
+ */
+ static Set<String> parseHostList(String string) {
+ String[] addrs = StringUtils.getTrimmedStrings(string);
+ return new HashSet<String>(Arrays.asList(addrs));
+ }
+
+ /**
+ * Read set of host names from a file
+ *
+ * @return set of host names
+ */
+ static Set<String> getHostListFromFile(String fileName, String type) {
+ Set<String> nodes = new HashSet<String>();
+ try {
+ HostsFileReader.readFileToSet(type, fileName, nodes);
+ return StringUtils.getTrimmedStrings(nodes);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(
+ "Failed to read host list from file: " + fileName);
+ }
+ }
+ }
+}