You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2007/12/05 20:49:16 UTC
svn commit: r601491 - in /lucene/hadoop/trunk: ./ bin/ conf/
src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/
Author: dhruba
Date: Wed Dec 5 11:49:16 2007
New Revision: 601491
URL: http://svn.apache.org/viewvc?rev=601491&view=rev
Log:
HADOOP-1652. A utility to balance data among datanodes in a HDFS cluster.
(Hairong Kuang via dhruba)
Added:
lucene/hadoop/trunk/bin/start-balancer.sh (with props)
lucene/hadoop/trunk/bin/stop-balancer.sh (with props)
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Balancer.java (with props)
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java (with props)
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/bin/hadoop
lucene/hadoop/trunk/conf/hadoop-default.xml
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=601491&r1=601490&r2=601491&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Dec 5 11:49:16 2007
@@ -37,6 +37,9 @@
HADOOP-2299. Defination of a login interface. A simple implementation for
Unix users and groups. (Hairong Kuang via dhruba)
+
+ HADOOP-1652. A utility to balance data among datanodes in a HDFS cluster.
+ (Hairong Kuang via dhruba)
IMPROVEMENTS
Modified: lucene/hadoop/trunk/bin/hadoop
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/bin/hadoop?rev=601491&r1=601490&r2=601491&view=diff
==============================================================================
--- lucene/hadoop/trunk/bin/hadoop (original)
+++ lucene/hadoop/trunk/bin/hadoop Wed Dec 5 11:49:16 2007
@@ -40,6 +40,7 @@
echo " dfsadmin run a DFS admin client"
echo " fsck run a DFS filesystem checking utility"
echo " fs run a generic filesystem user client"
+ echo " balancer run a cluster balancing utility"
echo " jobtracker run the MapReduce job Tracker node"
echo " pipes run a Pipes job"
echo " tasktracker run a MapReduce task Tracker node"
@@ -180,6 +181,8 @@
CLASS=org.apache.hadoop.dfs.DFSAdmin
elif [ "$COMMAND" = "fsck" ] ; then
CLASS=org.apache.hadoop.dfs.DFSck
+elif [ "$COMMAND" = "balancer" ] ; then
+ CLASS=org.apache.hadoop.dfs.Balancer
elif [ "$COMMAND" = "jobtracker" ] ; then
CLASS=org.apache.hadoop.mapred.JobTracker
elif [ "$COMMAND" = "tasktracker" ] ; then
Added: lucene/hadoop/trunk/bin/start-balancer.sh
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/bin/start-balancer.sh?rev=601491&view=auto
==============================================================================
--- lucene/hadoop/trunk/bin/start-balancer.sh (added)
+++ lucene/hadoop/trunk/bin/start-balancer.sh Wed Dec 5 11:49:16 2007
@@ -0,0 +1,5 @@
+#!/usr/bin/env bash
+
+# Start balancer daemon.
+
+hadoop-daemon.sh start balancer $@
Propchange: lucene/hadoop/trunk/bin/start-balancer.sh
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: lucene/hadoop/trunk/bin/start-balancer.sh
------------------------------------------------------------------------------
svn:executable = *
Propchange: lucene/hadoop/trunk/bin/start-balancer.sh
------------------------------------------------------------------------------
svn:keywords = Id Revision HeadURL
Added: lucene/hadoop/trunk/bin/stop-balancer.sh
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/bin/stop-balancer.sh?rev=601491&view=auto
==============================================================================
--- lucene/hadoop/trunk/bin/stop-balancer.sh (added)
+++ lucene/hadoop/trunk/bin/stop-balancer.sh Wed Dec 5 11:49:16 2007
@@ -0,0 +1,6 @@
+#!/usr/bin/env bash
+
+# Stop balancer daemon.
+# Run this on the machine where the balancer is running
+
+hadoop-daemon.sh stop balancer
Propchange: lucene/hadoop/trunk/bin/stop-balancer.sh
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: lucene/hadoop/trunk/bin/stop-balancer.sh
------------------------------------------------------------------------------
svn:executable = *
Propchange: lucene/hadoop/trunk/bin/stop-balancer.sh
------------------------------------------------------------------------------
svn:keywords = Id Revision HeadURL
Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=601491&r1=601490&r2=601491&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed Dec 5 11:49:16 2007
@@ -389,6 +389,16 @@
</property>
<property>
+ <name>dfs.balance.bandwidthPerSec</name>
+ <value>1048576</value>
+ <description>
+ Specifies the maximum amount of bandwidth that each datanode
+ can utilize for the balancing purpose in term of
+ the number of bytes per second.
+ </description>
+</property>
+
+<property>
<name>dfs.hosts</name>
<value></value>
<description>Names a file that contains a list of hosts that are
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Balancer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Balancer.java?rev=601491&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Balancer.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Balancer.java Wed Dec 5 11:49:16 2007
@@ -0,0 +1,1511 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.text.DateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Formatter;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.dfs.FSConstants.DatanodeReportType;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/** <p>The balancer is a tool that balances disk space usage on an HDFS cluster
+ * when some datanodes become full or when new empty nodes join the cluster.
+ * The tool is deployed as an application program that can be run by the
+ * cluster administrator on a live HDFS cluster while applications
+ * adding and deleting files.
+ *
+ * <p>SYNOPSIS
+ * <pre>
+ * To start:
+ * bin/start-balancer.sh [-threshold <threshold>]
+ * Example: bin/ start-balancer.sh
+ * start the balancer with a default threshold of 10%
+ * bin/ start-balancer.sh -threshold 5
+ * start the balancer with a threshold of 5%
+ * To stop:
+ * bin/ stop-balancer.sh
+ * </pre>
+ *
+ * <p>DESCRIPTION
+ * <p>The threshold parameter is a fraction in the range of (0%, 100%) with a
+ * default value of 10%. The threshold sets a target for whether the cluster
+ * is balanced. A cluster is balanced if for each datanode, the utilization
+ * of the node (ratio of used space at the node to total capacity of the node)
+ * differs from the utilization of the (ratio of used space in the cluster
+ * to total capacity of the cluster) by no more than the threshold value.
+ * The smaller the threshold, the more balanced a cluster will become.
+ * It takes more time to run the balancer for small threshold values.
+ * Also for a very small threshold the cluster may not be able to reach the
+ * balanced state when applications write and delete files concurrently.
+ *
+ * <p>The tool moves blocks from highly utilized datanodes to poorly
+ * utilized datanodes iteratively. In each iteration a datanode moves or
+ * receives no more than the lesser of 10G bytes or the threshold fraction
+ * of its capacity. Each iteration runs no more than 20 minutes.
+ * At the end of each iteration, the balancer obtains updated datanodes
+ * information from the namenode.
+ *
+ * <p>A system property that limits the balancer's use of bandwidth is
+ * defined in the default configuration file:
+ * <pre>
+ * <property>
+ * <name>dfs.balance.bandwidthPerSec</name>
+ * <value>1048576</value>
+ * <description> Specifies the maximum bandwidth that each datanode
+ * can utilize for the balancing purpose in term of the number of bytes
+ * per second. </description>
+ * </property>
+ * </pre>
+ *
+ * <p>This property determines the maximum speed at which a block will be
+ * moved from one datanode to another. The default value is 1MB/s. The higher
+ * the bandwidth, the faster a cluster can reach the balanced state,
+ * but with greater competition with application processes. If an
+ * administrator changes the value of this property in the configuration
+ * file, the change is observed when HDFS is next restarted.
+ *
+ * <p>MONITERING BALANCER PROGRESS
+ * <p>After the balancer is started, an output file name where the balancer
+ * progress will be recorded is printed on the screen. The administrator
+ * can monitor the running of the balancer by reading the output file.
+ * The output shows the balancer's status iteration by iteration. In each
+ * iteration it prints the starting time, the iteration number, the total
+ * number of bytes that have been moved in the previous iterations,
+ * the total number of bytes that are left to move in order for the cluster
+ * to be balanced, and the number of bytes that are being moved in this
+ * iteration. Normally "Bytes Already Moved" is increasing while "Bytes Left
+ * To Move" is decreasing.
+ *
+ * <p>Running multiple instances of the balancer in an HDFS cluster is
+ * prohibited by the tool.
+ *
+ * <p>The balancer automatically exits when any of the following five
+ * conditions is satisfied:
+ * <ol>
+ * <li>The cluster is balanced;
+ * <li>No block can be moved;
+ * <li>No block has been moved for five consecutive iterations;
+ * <li>An IOException occurs while communicating with the namenode;
+ * <li>Another balancer is running.
+ * </ol>
+ *
+ * <p>Upon exit, a balancer returns an exit code and prints one of the
+ * following messages to the output file in corresponding to the above exit
+ * reasons:
+ * <ol>
+ * <li>The cluster is balanced. Exiting
+ * <li>No block can be moved. Exiting...
+ * <li>No block has been moved for 3 iterations. Exiting...
+ * <li>Received an IO exception: failure reason. Exiting...
+ * <li>Another balancer is running. Exiting...
+ * </ol>
+ *
+ * <p>The administrator can interrupt the execution of the balancer at any
+ * time by running the command "stop-balancer.sh" on the machine where the
+ * balancer is running.
+ */
+
+public class Balancer implements Tool {
+ private static final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.dfs.Balancer");
+ final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB
+
+ private Configuration conf;
+
+ private double threshold = 10D;
+ private NamenodeProtocol namenode;
+ private ClientProtocol client;
+ private FileSystem fs;
+ private final static Random rnd = new Random();
+
+ // all data node lists
+ private Collection<Source> overUtilizedDatanodes
+ = new LinkedList<Source>();
+ private Collection<Source> aboveAvgUtilizedDatanodes
+ = new LinkedList<Source>();
+ private Collection<BalancerDatanode> belowAvgUtilizedDatanodes
+ = new LinkedList<BalancerDatanode>();
+ private Collection<BalancerDatanode> underUtilizedDatanodes
+ = new LinkedList<BalancerDatanode>();
+
+ private Collection<Source> sources
+ = new HashSet<Source>();
+ private Collection<BalancerDatanode> targets
+ = new HashSet<BalancerDatanode>();
+
+ private Map<Block, BalancerBlock> globalBlockList
+ = new HashMap<Block, BalancerBlock>();
+ private Map<Block, BalancerBlock> movedBlocks
+ = new HashMap<Block, BalancerBlock>();
+ private Map<String, BalancerDatanode> datanodes
+ = new HashMap<String, BalancerDatanode>();
+
+ private NetworkTopology cluster = new NetworkTopology();
+
+ private double avgUtilization = 0.0D;
+
+ /* This class keeps track of a scheduled block move */
+ private class PendingBlockMove {
+ private BalancerBlock block;
+ private Source source;
+ private BalancerDatanode proxySource;
+ private BalancerDatanode target;
+
+ /** constructor */
+ private PendingBlockMove() {
+ }
+
+ /* choose a block & a proxy source for this pendingMove
+ * whose source & target have already been chosen.
+ *
+ * Return true if a block and its proxy are chosen; false otherwise
+ */
+ private boolean chooseBlockAndProxy() {
+ // iterate all source's blocks until find a good one
+ for (Iterator<BalancerBlock> blocks=
+ source.getBlockIterator(); blocks.hasNext();) {
+ if (markMovedIfGoodBlock(blocks.next())) {
+ blocks.remove();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /* Return true if the given block is good for the tentative move;
+ * If it is good, add it to the moved list to marked as "Moved".
+ * A block is good if
+ * 1. it is a good candidate; see isGoodBlockCandidate
+ * 2. can find a proxy source that's not busy for this move
+ */
+ private boolean markMovedIfGoodBlock(BalancerBlock block) {
+ synchronized(block) {
+ synchronized(movedBlocks) {
+ if (isGoodBlockCandidate(source, target, block)) {
+ this.block = block;
+ if ( chooseProxySource() ) {
+ addToMoved(block);
+ LOG.info("Decided to move block "+ block.getBlockId()
+ +" with a length of "+FsShell.byteDesc(block.getNumBytes())
+ + " bytes from " + source.getName()
+ + " to " + target.getName()
+ + " using proxy source " + proxySource.getName() );
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ /* Now we find out source, target, and block, we need to find a proxy
+ *
+ * @return true if a proxy is found; otherwise false
+ */
+ private boolean chooseProxySource() {
+ // check if there is replica which is on the same rack with the target
+ for (BalancerDatanode loc : block.getLocations()) {
+ if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
+ if (loc.addPendingBlock(this)) {
+ proxySource = loc;
+ return true;
+ }
+ }
+ }
+ // find out a non-busy replica
+ for (BalancerDatanode loc : block.getLocations()) {
+ if (loc.addPendingBlock(this)) {
+ proxySource = loc;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /* Dispatch the block move task to the proxy source & wait for the response
+ */
+ private void dispatch() {
+ Socket sock = new Socket();
+ DataOutputStream out = null;
+ DataInputStream in = null;
+ try {
+ sock.connect(DataNode.createSocketAddr(
+ proxySource.datanode.getName()), FSConstants.READ_TIMEOUT);
+ long bandwidth = conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024);
+ sock.setSoTimeout(2*FSConstants.READ_TIMEOUT+
+ (int)(block.getNumBytes()*1500/bandwidth));
+ out = new DataOutputStream( new BufferedOutputStream(
+ sock.getOutputStream(), FSConstants.BUFFER_SIZE));
+ sendRequest(out);
+ in = new DataInputStream( new BufferedInputStream(
+ sock.getInputStream(), FSConstants.BUFFER_SIZE));
+ receiveResponse(in);
+ bytesMoved.inc(block.getNumBytes());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug( "Moving block " + block.getBlock().getBlockId() +
+ " from "+ source.getName() + " to " +
+ target.getName() + " through " +
+ proxySource.getName() +
+ " succeeded." );
+ }
+ } catch (SocketTimeoutException te) {
+ LOG.warn("Timeout moving block "+block.getBlockId()+
+ " from " + source.getName() + " to " +
+ target.getName() + " through " +
+ proxySource.getName());
+ } catch (IOException e) {
+ LOG.warn("Error moving block "+block.getBlockId()+
+ " from " + source.getName() + " to " +
+ target.getName() + " through " +
+ proxySource.getName() +
+ ": "+e.getMessage()+ "\n" +
+ StringUtils.stringifyException(e) );
+ } finally {
+ IOUtils.closeStream(out);
+ IOUtils.closeStream(in);
+ IOUtils.closeSocket(sock);
+
+ proxySource.removePendingBlock(this);
+ synchronized(target) {
+ target.removePendingBlock(this);
+ }
+
+ synchronized (this ) {
+ reset();
+ }
+ synchronized (Balancer.this) {
+ Balancer.this.notifyAll();
+ }
+ }
+ }
+
+ /* Send a block copy request to the outputstream*/
+ private void sendRequest(DataOutputStream out) throws IOException {
+ out.writeShort(FSConstants.DATA_TRANFER_VERSION);
+ out.writeByte(FSConstants.OP_COPY_BLOCK);
+ out.writeLong(block.getBlock().getBlockId());
+ Text.writeString(out, source.getStorageID());
+ target.write(out);
+ out.flush();
+ }
+
+ /* Receive a block copy response from the input stream */
+ private void receiveResponse(DataInputStream in) throws IOException {
+ short status = in.readShort();
+ if (status != FSConstants.OP_STATUS_SUCCESS) {
+ throw new IOException("Moving block "+block.getBlockId()+
+ " from "+source.getName() + " to " +
+ target.getName() + " through " +
+ proxySource.getName() +
+ "failed");
+ }
+ }
+
+ /* reset the object */
+ private void reset() {
+ block = null;
+ source = null;
+ proxySource = null;
+ target = null;
+ }
+
+ /* start a thread to dispatch the block move */
+ private void scheduleBlockMove() {
+ BlockMover blockMover = new BlockMover();
+ blockMover.setDaemon(true);
+ blockMover.setName("Block mover for "+ block.getBlockId() +
+ " from " + proxySource.getName() + " to " + target.getName());
+ LOG.info("Starting " + blockMover.getName());
+ blockMover.start();
+ }
+
+ /* A thread for moving a block */
+ private class BlockMover extends Thread {
+ BlockMover() {
+ }
+
+ public void run() {
+ dispatch();
+ }
+ }
+ }
+
+ /* A class for keeping track of blocks in the Balancer */
+ static private class BalancerBlock {
+ private Block block; // the block
+ private List<BalancerDatanode> locations
+ = new ArrayList<BalancerDatanode>(3); // its locations
+
+ /* Constructor */
+ private BalancerBlock(Block block) {
+ this.block = block;
+ }
+
+ /* clean block locations */
+ private synchronized void clearLocations() {
+ locations.clear();
+ }
+
+ /* add a location */
+ private synchronized void addLocation(BalancerDatanode datanode) {
+ if (!locations.contains(datanode)) {
+ locations.add(datanode);
+ }
+ }
+
+ /* Return if the block is located on <code>datanode</code> */
+ private synchronized boolean isLocatedOnDatanode(
+ BalancerDatanode datanode) {
+ return locations.contains(datanode);
+ }
+
+ /* Return its locations */
+ private synchronized List<BalancerDatanode> getLocations() {
+ return locations;
+ }
+
+ /* Return the block */
+ private Block getBlock() {
+ return block;
+ }
+
+ /* Return the block id */
+ private long getBlockId() {
+ return block.getBlockId();
+ }
+
+ /* Return the length of the block */
+ private long getNumBytes() {
+ return block.getNumBytes();
+ }
+ }
+
+ /* The class represents a desired move of bytes between two nodes
+ * and the target.
+ * An object of this class is stored in a source node.
+ */
+ static private class NodeTask {
+ private BalancerDatanode datanode; //target node
+ private long size; //bytes scheduled to move
+
+ /* constructor */
+ private NodeTask(BalancerDatanode datanode, long size) {
+ this.datanode = datanode;
+ this.size = size;
+ }
+
+ /* Get the node */
+ private BalancerDatanode getDatanode() {
+ return datanode;
+ }
+
+ /* Get the number of bytes that need to be moved */
+ private long getSize() {
+ return size;
+ }
+ }
+
+ /* Return the utilization of a datanode */
+ static private double getUtilization(DatanodeInfo datanode) {
+ return ((double)datanode.getDfsUsed())/datanode.getCapacity()*100;
+ }
+
+ /* A class that keeps track of a datanode in Balancer */
+ private static class BalancerDatanode implements Writable {
+ final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
+ final protected static short MAX_NUM_CONCURRENT_MOVES =
+ DataNode.MAX_BALANCING_THREADS;
+ protected DatanodeInfo datanode;
+ private double utilization;
+ protected long maxSizeToMove;
+ protected long scheduledSize = 0L;
+ // blocks being moved but not confirmed yet
+ private List<PendingBlockMove> pendingBlocks =
+ new ArrayList<PendingBlockMove>(MAX_NUM_CONCURRENT_MOVES);
+
+ /* Constructor
+ * Depending on avgutil & threshold, calculate maximum bytes to move
+ */
+ private BalancerDatanode(
+ DatanodeInfo node, double avgUtil, double threshold) {
+ datanode = node;
+ utilization = Balancer.getUtilization(node);
+
+ if (utilization >= avgUtil+threshold
+ || utilization <= avgUtil-threshold) {
+ maxSizeToMove = (long)(threshold*datanode.getCapacity()/100);
+ } else {
+ maxSizeToMove =
+ (long)(Math.abs(avgUtil-utilization)*datanode.getCapacity()/100);
+ }
+ if (utilization < avgUtil ) {
+ maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove);
+ }
+ maxSizeToMove = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
+ }
+
+ /** Get the datanode */
+ protected DatanodeInfo getDatanode() {
+ return datanode;
+ }
+
+ /** Get the name of the datanode */
+ protected String getName() {
+ return datanode.getName();
+ }
+
+ /* Get the storage id of the datanode */
+ protected String getStorageID() {
+ return datanode.getStorageID();
+ }
+
+ /** Decide if still need to move more bytes */
+ protected boolean isMoveQuotaFull() {
+ return scheduledSize<maxSizeToMove;
+ }
+
+ /** Return the total number of bytes that need to be moved */
+ protected long availableSizeToMove() {
+ return maxSizeToMove-scheduledSize;
+ }
+
+ /* increment scheduled size */
+ protected void incScheduledSize(long size) {
+ scheduledSize += size;
+ }
+
+ /* Check if the node can schedule more blocks to move */
+ synchronized private boolean isPendingQNotFull() {
+ if ( pendingBlocks.size() < MAX_NUM_CONCURRENT_MOVES ) {
+ return true;
+ }
+ return false;
+ }
+
+ /* Check if all the dispatched moves are done */
+ synchronized private boolean isPendingQEmpty() {
+ return pendingBlocks.isEmpty();
+ }
+
+ /* Add a scheduled block move to the node */
+ private synchronized boolean addPendingBlock(
+ PendingBlockMove pendingBlock) {
+ if (isPendingQNotFull()) {
+ return pendingBlocks.add(pendingBlock);
+ }
+ return false;
+ }
+
+ /* Remove a scheduled block move from the node */
+ private synchronized boolean removePendingBlock(
+ PendingBlockMove pendingBlock) {
+ return pendingBlocks.remove(pendingBlock);
+ }
+
+ /** The following two methods support the Writable interface */
+ /** Deserialize */
+ public void readFields(DataInput in) throws IOException {
+ datanode.readFields(in);
+ }
+
+ /** Serialize */
+ public void write(DataOutput out) throws IOException {
+ datanode.write(out);
+ }
+ }
+
+ /** A node that can be the sources of a block move */
+ private class Source extends BalancerDatanode {
+
+ /* A thread that initiates a block move
+ * and waits for block move to complete */
+ private class BlockMoveDispatcher extends Thread {
+ public void run() {
+ dispatchBlocks();
+ }
+ }
+
+ private ArrayList<NodeTask> nodeTasks = new ArrayList<NodeTask>(2);
+ private long blocksToReceive = 0L;
+ /* source blocks point to balancerBlocks in the global list because
+ * we want to keep one copy of a block in balancer and be aware that
+ * the locations are changing over time.
+ */
+ private List<BalancerBlock> srcBlockList
+ = new ArrayList<BalancerBlock>();
+
+ /* constructor */
+ private Source(DatanodeInfo node, double avgUtil, double threshold) {
+ super(node, avgUtil, threshold);
+ }
+
+ /** Add a node task */
+ private void addNodeTask(NodeTask task) {
+ assert (task.datanode != this) :
+ "Source and target are the same " + datanode.getName();
+ incScheduledSize(task.getSize());
+ nodeTasks.add(task);
+ }
+
+ /* Return an iterator to this source's blocks */
+ private Iterator<BalancerBlock> getBlockIterator() {
+ return srcBlockList.iterator();
+ }
+
+ /* fetch new blocks of this source from namenode and
+ * update this source's block list & the global block list
+ * Return the total size of the received blocks in the number of bytes.
+ */
+ private long getBlockList() throws IOException {
+ BlockWithLocations[] newBlocks = namenode.getBlocks(datanode,
+ (long)Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive)).getBlocks();
+ long bytesReceived = 0;
+ for (BlockWithLocations blk : newBlocks) {
+ bytesReceived += blk.getBlock().getNumBytes();
+ BalancerBlock block;
+ synchronized(globalBlockList) {
+ block = globalBlockList.get(blk.getBlock());
+ if (block==null) {
+ block = new BalancerBlock(blk.getBlock());
+ globalBlockList.put(blk.getBlock(), block);
+ } else {
+ block.clearLocations();
+ }
+
+ synchronized (block) {
+ // update locations
+ for ( String location : blk.getDatanodes() ) {
+ BalancerDatanode datanode = datanodes.get(location);
+ if (datanode != null) { // not an unknown datanode
+ block.addLocation(datanode);
+ }
+ }
+ }
+ if (!srcBlockList.contains(block) && isGoodBlockCandidate(block)) {
+ // filter bad candidates
+ srcBlockList.add(block);
+ }
+ }
+ }
+ return bytesReceived;
+ }
+
+ /* Decide if the given block is a good candidate to move or not */
+ private boolean isGoodBlockCandidate(BalancerBlock block) {
+ for (NodeTask nodeTask : nodeTasks) {
+ if (Balancer.this.isGoodBlockCandidate(this, nodeTask.datanode, block)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /* Return a block that's good for the source thread to dispatch immediately
+ * The block's source, target, and proxy source are determined too.
+ * When choosing proxy and target, source & target throttling
+ * has been considered. They are chosen only when they have the capacity
+ * to support this block move.
+ * The block should be dispatched immediately after this method is returned.
+ */
+ private PendingBlockMove chooseNextBlockToMove() {
+ for ( Iterator<NodeTask> tasks=nodeTasks.iterator(); tasks.hasNext(); ) {
+ NodeTask task = tasks.next();
+ BalancerDatanode target = task.getDatanode();
+ PendingBlockMove pendingBlock = new PendingBlockMove();
+ if ( target.addPendingBlock(pendingBlock) ) {
+ // target is not busy, so do a tentative block allocation
+ pendingBlock.source = this;
+ pendingBlock.target = target;
+ if ( pendingBlock.chooseBlockAndProxy() ) {
+ long blockSize = pendingBlock.block.getNumBytes();
+ scheduledSize -= blockSize;
+ task.size -= blockSize;
+ if (task.size == 0) {
+ tasks.remove();
+ }
+ return pendingBlock;
+ } else {
+ // cancel the tentative move
+ target.removePendingBlock(pendingBlock);
+ }
+ }
+ }
+ return null;
+ }
+
+ /* iterate all source's blocks to remove moved ones */
+ private void filterMovedBlocks() {
+ for (Iterator<BalancerBlock> blocks=getBlockIterator();
+ blocks.hasNext();) {
+ if (isMoved(blocks.next())) {
+ blocks.remove();
+ }
+ }
+ }
+
+ private static final int SOURCE_BLOCK_LIST_MIN_SIZE=5;
+ /* Return if should fetch more blocks from namenode */
+ private boolean shouldFetchMoreBlocks() {
+ return srcBlockList.size()<SOURCE_BLOCK_LIST_MIN_SIZE &&
+ blocksToReceive>0;
+ }
+
+ /* This method iteratively does the following:
+ * it first selects a block to move,
+ * then sends a request to the proxy source to start the block move
+ * when the source's block list falls below a threshold, it asks
+ * the namenode for more blocks.
+ * It terminates when it has dispatch enough block move tasks or
+ * it has received enough blocks from the namenode, or
+ * the elapsed time of the iteration has exceeded the max time limit.
+ */
+ private static final long MAX_ITERATION_TIME = 20*60*1000L; //20 mins
+ private void dispatchBlocks() {
+ long startTime = FSNamesystem.now();
+ this.blocksToReceive = 2*scheduledSize;
+ boolean isTimeUp = false;
+ while(!isTimeUp && scheduledSize>0 &&
+ (!srcBlockList.isEmpty() || blocksToReceive>0)) {
+ PendingBlockMove pendingBlock = chooseNextBlockToMove();
+ if (pendingBlock != null) {
+ // move the block
+ pendingBlock.scheduleBlockMove();
+ continue;
+ }
+
+ /* Since we can not schedule any block to move,
+ * filter any moved blocks from the source block list and
+ * check if we should fetch more blocks from the namenode
+ */
+ filterMovedBlocks(); // filter already moved blocks
+ if (shouldFetchMoreBlocks()) {
+ // fetch new blocks
+ try {
+ blocksToReceive -= getBlockList();
+ continue;
+ } catch (IOException e) {
+ LOG.warn(StringUtils.stringifyException(e));
+ return;
+ }
+ }
+
+ // check if time is up or not
+ if (FSNamesystem.now()-startTime > MAX_ITERATION_TIME) {
+ isTimeUp = true;
+ continue;
+ }
+
+ /* Now we can not schedule any block to move and there are
+ * no new blocks added to the source block list, so we wait.
+ */
+ try {
+ synchronized(Balancer.this) {
+ Balancer.this.wait(1000); // wait for targets/sources to be idle
+ }
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+ }
+
+ /** Default constructor */
+ Balancer() {
+ }
+
+ /** Construct a balancer from the given configuration */
+ Balancer(Configuration conf) {
+ setConf(conf);
+ }
+
+ /** Construct a balancer from the given configuration and threshold */
+ Balancer(Configuration conf, double threshold) {
+ setConf(conf);
+ this.threshold = threshold;
+ }
+
+ /**
+ * Run a balancer
+ * @param args
+ */
+ public static void main(String[] args) {
+ try {
+ System.exit( ToolRunner.run(null, new Balancer(), args) );
+ } catch (Throwable e) {
+ LOG.error(StringUtils.stringifyException(e));
+ System.exit(-1);
+ }
+
+ }
+
+ private static void printUsage() {
+ System.out.println("Usage: java Balancer");
+ System.out.println(" [-threshold <threshold>]\t"
+ +"percentage of disk capacity");
+ }
+
+ /* parse argument to get the threshold */
+ private double parseArgs(String[] args) {
+ double threshold=0;
+ int argsLen = (args == null) ? 0 : args.length;
+ if (argsLen==0) {
+ threshold = 10;
+ } else {
+ if (argsLen != 2 || !"-threshold".equalsIgnoreCase(args[0])) {
+ printUsage();
+ throw new IllegalArgumentException(Arrays.toString(args));
+ } else {
+ try {
+ threshold = Double.parseDouble(args[1]);
+ if (threshold < 0 || threshold >100) {
+ throw new NumberFormatException();
+ }
+ LOG.info( "Using a threshold of " + threshold );
+ } catch(NumberFormatException e) {
+ System.err.println(
+ "Expect a double parameter in the range of [0, 100]: "+ args[1]);
+ printUsage();
+ throw e;
+ }
+ }
+ }
+ return threshold;
+ }
+
+ /* Initialize balancer. It sets the value of the threshold, and
+ * builds the communication proxies to
+ * namenode as a client and a secondary namenode and retry proxies
+ * when connection fails.
+ */
+ private void init(double threshold) throws IOException {
+ this.threshold = threshold;
+ // get name node address
+ InetSocketAddress nameNodeAddr = DataNode.createSocketAddr(
+ conf.get("fs.default.name", "local"));
+ // connect to name node
+ this.namenode = createNamenode(nameNodeAddr, conf);
+ this.client = DFSClient.createNamenode(nameNodeAddr, conf);
+ this.fs = FileSystem.get(conf);
+ }
+
+ /* Build a NamenodeProtocol connection to the namenode and
+ * set up the retry policy */
+ private static NamenodeProtocol createNamenode(
+ InetSocketAddress nameNodeAddr, Configuration conf) throws IOException {
+ RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
+ 5, 200, TimeUnit.MILLISECONDS);
+ Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
+ new HashMap<Class<? extends Exception>, RetryPolicy>();
+ RetryPolicy methodPolicy = RetryPolicies.retryByException(
+ timeoutPolicy, exceptionToPolicyMap);
+ Map<String,RetryPolicy> methodNameToPolicyMap =
+ new HashMap<String, RetryPolicy>();
+ methodNameToPolicyMap.put("getBlocks", methodPolicy);
+
+ return (NamenodeProtocol) RetryProxy.create(
+ NamenodeProtocol.class,
+ RPC.getProxy(NamenodeProtocol.class,
+ NamenodeProtocol.versionID,
+ nameNodeAddr,
+ conf),
+ methodNameToPolicyMap);
+ }
+
+ /* Shuffle datanode array */
+ static private void shuffleArray(DatanodeInfo[] datanodes) {
+ for (int i=datanodes.length; i>1; i--) {
+ int randomIndex = rnd.nextInt(i);
+ DatanodeInfo tmp = datanodes[randomIndex];
+ datanodes[randomIndex] = datanodes[i-1];
+ datanodes[i-1] = tmp;
+ }
+ }
+
+ /* get all live datanodes of a cluster and their disk usage
+ * decide the number of bytes need to be moved
+ */
+ private long initNodes() throws IOException {
+ return initNodes(client.getDatanodeReport(DatanodeReportType.LIVE));
+ }
+
+ /* Given a data node set, build a network topology and decide
+ * over-utilized datanodes, above average utilized datanodes,
+ * below average utilized datanodes, and underutilized datanodes.
+ * The input data node set is shuffled before the datanodes
+ * are put into the over-utilized datanodes, above average utilized
+ * datanodes, below average utilized datanodes, and
+ * underutilized datanodes lists. This will add some randomness
+ * to the node matching later on.
+ *
+ * @return the total number of bytes that are
+ * needed to move to make the cluster balanced.
+ * @param datanodes a set of datanodes
+ */
+ private long initNodes(DatanodeInfo[] datanodes) {
+ // compute average utilization
+ long totalCapacity=0L, totalUsedSpace=0L;
+ for (DatanodeInfo datanode : datanodes) {
+ totalCapacity += datanode.getCapacity();
+ totalUsedSpace += datanode.getDfsUsed();
+ }
+ this.avgUtilization = ((double)totalUsedSpace)/totalCapacity*100;
+
+ /*create network topology and all data node lists:
+ * overloaded, above-average, below-average, and underloaded
+ * we alternates the accessing of the given datanodes array either by
+ * an increasing order or a decreasing order.
+ */
+ long overLoadedBytes = 0L, underLoadedBytes = 0L;
+ shuffleArray(datanodes);
+ for (DatanodeInfo datanode : datanodes) {
+ cluster.add(datanode);
+ BalancerDatanode datanodeS;
+ if (getUtilization(datanode) > avgUtilization) {
+ datanodeS = new Source(datanode, avgUtilization, threshold);
+ if (isAboveAvgUtilized(datanodeS)) {
+ this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
+ } else {
+ assert(isOverUtilized(datanodeS)) :
+ datanodeS.getName()+ "is not an overUtilized node";
+ this.overUtilizedDatanodes.add((Source)datanodeS);
+ overLoadedBytes += (long)((datanodeS.utilization-avgUtilization
+ -threshold)*datanodeS.datanode.getCapacity()/100.0);
+ }
+ } else {
+ datanodeS = new BalancerDatanode(datanode, avgUtilization, threshold);
+ if ( isBelowAvgUtilized(datanodeS)) {
+ this.belowAvgUtilizedDatanodes.add(datanodeS);
+ } else {
+ assert (isUnderUtilized(datanodeS)) :
+ datanodeS.getName()+ "is not an underUtilized node";
+ this.underUtilizedDatanodes.add(datanodeS);
+ underLoadedBytes += (long)((avgUtilization-threshold-
+ datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0);
+ }
+ }
+ this.datanodes.put(datanode.getStorageID(), datanodeS);
+ }
+
+ //logging
+ logImbalancedNodes();
+
+ assert (this.datanodes.size() ==
+ overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
+ aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size())
+ : "Mismatched number of datanodes";
+
+ // return number of bytes to be moved in order to make the cluster balanced
+ return Math.max(overLoadedBytes, underLoadedBytes);
+ }
+
+ /* log the over utilized & under utilized nodes */
+ private void logImbalancedNodes() {
+ StringBuilder msg = new StringBuilder();
+ msg.append(overUtilizedDatanodes.size());
+ msg.append(" over utilized nodes:");
+ for (Source node : overUtilizedDatanodes) {
+ msg.append( " " );
+ msg.append( node.getName() );
+ }
+ LOG.info(msg);
+ msg = new StringBuilder();
+ msg.append(underUtilizedDatanodes.size());
+ msg.append(" under utilized nodes: ");
+ for (BalancerDatanode node : underUtilizedDatanodes) {
+ msg.append( " " );
+ msg.append( node.getName() );
+ }
+ LOG.info(msg);
+ }
+
+ /* Decide all <source, target> pairs and
+ * the number of bytes to move from a source to a target
+ * Maximum bytes to be moved per node is
+ * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).
+ * Return total number of bytes to move in this iteration
+ */
+ private long chooseNodes() {
+ // Match nodes on the same rack first
+ chooseNodes(true);
+ // Then match nodes on different racks
+ chooseNodes(false);
+
+ assert (datanodes.size() ==
+ overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
+ aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size()+
+ sources.size()+targets.size())
+ : "Mismatched number of datanodes";
+
+ long bytesToMove = 0L;
+ for (Source src : sources) {
+ bytesToMove += src.scheduledSize;
+ }
+ return bytesToMove;
+ }
+
+ /* if onRack is true, decide all <source, target> pairs
+ * where source and target are on the same rack; Otherwise
+ * decide all <source, target> pairs where source and target are
+ * on different racks
+ */
+ private void chooseNodes(boolean onRack) {
+ /* first step: match each overUtilized datanode (source) to
+ * one or more underUtilized datanodes (targets).
+ */
+ chooseTargets(underUtilizedDatanodes.iterator(), onRack);
+
+ /* match each remaining overutilized datanode (source) to
+ * below average utilized datanodes (targets).
+ * Note only overutilized datanodes that haven't had that max bytes to move
+ * satisfied in step 1 are selected
+ */
+ chooseTargets(belowAvgUtilizedDatanodes.iterator(), onRack);
+
+ /* match each remaining underutilized datanode to
+ * above average utilized datanodes.
+ * Note only underutilized datanodes that have not had that max bytes to
+ * move satisfied in step 1 are selected.
+ */
+ chooseSources(aboveAvgUtilizedDatanodes.iterator(), onRack);
+ }
+
+ /* choose targets from the target candidate list for each over utilized
+ * source datanode. OnRackTarget determines if the chosen target
+ * should be on the same rack as the source
+ */
+ private void chooseTargets(
+ Iterator<BalancerDatanode> targetCandidates, boolean onRackTarget ) {
+ for (Iterator<Source> srcIterator = overUtilizedDatanodes.iterator();
+ srcIterator.hasNext();) {
+ Source source = srcIterator.next();
+ while (chooseTarget(source, targetCandidates, onRackTarget)) {
+ }
+ if (!source.isMoveQuotaFull()) {
+ srcIterator.remove();
+ }
+ }
+ return;
+ }
+
+ /* choose sources from the source candidate list for each under utilized
+ * target datanode. onRackSource determines if the chosen source
+ * should be on the same rack as the target
+ */
+ private void chooseSources(
+ Iterator<Source> sourceCandidates, boolean onRackSource) {
+ for (Iterator<BalancerDatanode> targetIterator =
+ underUtilizedDatanodes.iterator(); targetIterator.hasNext();) {
+ BalancerDatanode target = targetIterator.next();
+ while (chooseSource(target, sourceCandidates, onRackSource)) {
+ }
+ if (!target.isMoveQuotaFull()) {
+ targetIterator.remove();
+ }
+ }
+ return;
+ }
+
+ /* For the given source, choose targets from the target candidate list.
+ * OnRackTarget determines if the chosen target
+ * should be on the same rack as the source
+ */
+ private boolean chooseTarget(Source source,
+ Iterator<BalancerDatanode> targetCandidates, boolean onRackTarget) {
+ if (!source.isMoveQuotaFull()) {
+ return false;
+ }
+ boolean foundTarget = false;
+ BalancerDatanode target = null;
+ while (!foundTarget && targetCandidates.hasNext()) {
+ target = targetCandidates.next();
+ if (!target.isMoveQuotaFull()) {
+ targetCandidates.remove();
+ continue;
+ }
+ if (onRackTarget) {
+ // choose from on-rack nodes
+ if (cluster.isOnSameRack(source.datanode, target.datanode)) {
+ foundTarget = true;
+ }
+ } else {
+ // choose from off-rack nodes
+ if (!cluster.isOnSameRack(source.datanode, target.datanode)) {
+ foundTarget = true;
+ }
+ }
+ }
+ if (foundTarget) {
+ assert(target != null):"Choose a null target";
+ long size = Math.min(source.availableSizeToMove(),
+ target.availableSizeToMove());
+ NodeTask nodeTask = new NodeTask(target, size);
+ source.addNodeTask(nodeTask);
+ target.incScheduledSize(nodeTask.getSize());
+ sources.add(source);
+ targets.add(target);
+ if (!target.isMoveQuotaFull()) {
+ targetCandidates.remove();
+ }
+ LOG.info("Decided to move "+FsShell.byteDesc(size)+" bytes from "
+ +source.datanode.getName() + " to " + target.datanode.getName());
+ return true;
+ }
+ return false;
+ }
+
+ /* For the given target, choose sources from the source candidate list.
+ * OnRackSource determines if the chosen source
+ * should be on the same rack as the target
+ */
+ private boolean chooseSource(BalancerDatanode target,
+ Iterator<Source> sourceCandidates, boolean onRackSource) {
+ if (!target.isMoveQuotaFull()) {
+ return false;
+ }
+ boolean foundSource = false;
+ Source source = null;
+ while (!foundSource && sourceCandidates.hasNext()) {
+ source = sourceCandidates.next();
+ if (!source.isMoveQuotaFull()) {
+ sourceCandidates.remove();
+ continue;
+ }
+ if (onRackSource) {
+ // choose from on-rack nodes
+ if ( cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
+ foundSource = true;
+ }
+ } else {
+ // choose from off-rack nodes
+ if (!cluster.isOnSameRack(source.datanode, target.datanode)) {
+ foundSource = true;
+ }
+ }
+ }
+ if (foundSource) {
+ assert(source != null):"Choose a null source";
+ long size = Math.min(source.availableSizeToMove(),
+ target.availableSizeToMove());
+ NodeTask nodeTask = new NodeTask(target, size);
+ source.addNodeTask(nodeTask);
+ target.incScheduledSize(nodeTask.getSize());
+ sources.add(source);
+ targets.add(target);
+ if ( !source.isMoveQuotaFull()) {
+ sourceCandidates.remove();
+ }
+ LOG.info("Decided to move "+FsShell.byteDesc(size)+" bytes from "
+ +source.datanode.getName() + " to " + target.datanode.getName());
+ return true;
+ }
+ return false;
+ }
+
+ private static class BytesMoved {
+ private long bytesMoved = 0L;;
+ private synchronized void inc( long bytes ) {
+ bytesMoved += bytes;
+ }
+
+ private long get() {
+ return bytesMoved;
+ }
+ };
+ private BytesMoved bytesMoved = new BytesMoved();
+ private int notChangedIterations = 0;
+
+ /* Start a thread to dispatch block moves for each source.
+ * The thread selects blocks to move & sends request to proxy source to
+ * initiate block move. The process is flow controlled. Block selection is
+ * blocked if there are too many un-confirmed block moves.
+ * Return the total number of bytes successfully moved in this iteration.
+ */
+ private long dispatchBlockMoves() {
+ long bytesLastMoved = bytesMoved.get();
+ Source.BlockMoveDispatcher dispatchers[] =
+ new Source.BlockMoveDispatcher[sources.size()];
+ int i=0;
+ for (Source source : sources) {
+ dispatchers[i] = source.new BlockMoveDispatcher();
+ dispatchers[i].setName("Dispatcher for source " + source.getName());
+ LOG.info("Starting " + dispatchers[i].getName());
+ dispatchers[i++].start();
+ }
+ for (Source.BlockMoveDispatcher dispatcher : dispatchers) {
+ try {
+ dispatcher.join();
+ } catch (InterruptedException e) {
+ LOG.info(StringUtils.stringifyException(e));
+ }
+ }
+ waitForMoveCompletion();
+ return bytesMoved.get()-bytesLastMoved;
+ }
+
+ // The sleeping period before checking if block move is completed again
+ static private long blockMoveWaitTime = 30000L;
+
+ /** set the sleeping period for block move completion check */
+ static void setBlockMoveWaitTime(long time) {
+ blockMoveWaitTime = time;
+ }
+
+ /* wait for all block move confirmations
+ * by checking each target's pendingMove queue
+ */
+ private void waitForMoveCompletion() {
+ boolean shouldWait;
+ do {
+ shouldWait = false;
+ for (BalancerDatanode target : targets) {
+ if (!target.isPendingQEmpty()) {
+ shouldWait = true;
+ }
+ }
+ if (shouldWait) {
+ try {
+ Thread.sleep(blockMoveWaitTime);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ } while (shouldWait);
+ }
+
+ /* mark a block to be moved */
+ private void addToMoved(BalancerBlock block) {
+ synchronized(movedBlocks) {
+ movedBlocks.put(block.getBlock(), block);
+ }
+ }
+
+ /* check if a block is marked as moved */
+ private boolean isMoved(BalancerBlock block) {
+ synchronized(movedBlocks) {
+ return movedBlocks.containsKey(block.getBlock());
+ }
+ }
+
+ /* Decide if it is OK to move the given block from source to target
+ * A block is a good candidate if
+ * 1. the block is not in the process of being moved/has not been moved;
+ * 2. the block does not have a replica on the target;
+ * 3. doing the move does not reduce the number of racks that the block has
+ */
+ private boolean isGoodBlockCandidate(Source source,
+ BalancerDatanode target, BalancerBlock block) {
+ // check if the block is moved or not
+ if (isMoved(block)) {
+ return false;
+ }
+ if (block.isLocatedOnDatanode(target)) {
+ return false;
+ }
+
+ boolean goodBlock = false;
+ if (cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
+ // good if source and target are on the same rack
+ goodBlock = true;
+ } else {
+ boolean notOnSameRack = true;
+ synchronized (block) {
+ for (BalancerDatanode loc : block.locations) {
+ if (cluster.isOnSameRack(loc.datanode, target.datanode)) {
+ notOnSameRack = false;
+ break;
+ }
+ }
+ }
+ if (notOnSameRack) {
+ // good if target is target is not on the same rack as any replica
+ goodBlock = true;
+ } else {
+ // good if source is on the same rack as on of the replicas
+ for (BalancerDatanode loc : block.locations) {
+ if (loc != source &&
+ cluster.isOnSameRack(loc.datanode, source.datanode)) {
+ goodBlock = true;
+ break;
+ }
+ }
+ }
+ }
+ return goodBlock;
+ }
+
+ /* reset all fields in a balancer preparing for the next iteration */
+ private void resetData() {
+ this.cluster = new NetworkTopology();
+ this.overUtilizedDatanodes.clear();
+ this.aboveAvgUtilizedDatanodes.clear();
+ this.belowAvgUtilizedDatanodes.clear();
+ this.underUtilizedDatanodes.clear();
+ this.datanodes.clear();
+ this.sources.clear();
+ this.targets.clear();
+ this.avgUtilization = 0.0D;
+ cleanGlobalBlockList();
+ }
+
+ /* Remove all blocks from the global block list except for the ones in the
+ * moved list.
+ */
+ private void cleanGlobalBlockList() {
+ for (Iterator<Block> globalBlockListIterator=globalBlockList.keySet().iterator();
+ globalBlockListIterator.hasNext();) {
+ Block block = globalBlockListIterator.next();
+ if(!movedBlocks.containsKey(block)) {
+ globalBlockListIterator.remove();
+ }
+ }
+ }
+
+ /* Return true if the given datanode is overUtilized */
+ private boolean isOverUtilized(BalancerDatanode datanode) {
+ return datanode.utilization > (avgUtilization+threshold);
+ }
+
+ /* Return true if the given datanode is above average utilized
+ * but not overUtilized */
+ private boolean isAboveAvgUtilized(BalancerDatanode datanode) {
+ return (datanode.utilization <= (avgUtilization+threshold))
+ && (datanode.utilization > avgUtilization);
+ }
+
+ /* Return true if the given datanode is underUtilized */
+ private boolean isUnderUtilized(BalancerDatanode datanode) {
+ return datanode.utilization < (avgUtilization-threshold);
+ }
+
+ /* Return true if the given datanode is below average utilized
+ * but not underUtilized */
+ private boolean isBelowAvgUtilized(BalancerDatanode datanode) {
+ return (datanode.utilization >= (avgUtilization-threshold))
+ && (datanode.utilization < avgUtilization);
+ }
+
+ // Exit status
+ final public static int SUCCESS = 1;
+ final public static int ALREADY_RUNNING = -1;
+ final public static int NO_MOVE_BLOCK = -2;
+ final public static int NO_MOVE_PROGRESS = -3;
+ final public static int IO_EXCEPTION = -4;
+ final public static int ILLEGAL_ARGS = -5;
+ /** main method of Balancer
+ * @param args arguments to a Balancer
+ * @exception any exception occurs during datanode balancing
+ */
+ public int run(String[] args) throws Exception {
+ long startTime = FSNamesystem.now();
+ OutputStream out = null;
+ try {
+ // initialize a balancer
+ init(parseArgs(args));
+
+ /* Check if there is another balancer running.
+ * Exit if there is another one running.
+ */
+ out = checkAndMarkRunningBalancer();
+ if (out == null) {
+ System.out.println("Another balancer is running. Exiting...");
+ return ALREADY_RUNNING;
+ }
+
+ Formatter formatter = new Formatter(System.out);
+ System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
+ int iterations = 0;
+ while (true ) {
+ /* get all live datanodes of a cluster and their disk usage
+ * decide the number of bytes need to be moved
+ */
+ long bytesLeftToMove = initNodes();
+ if (bytesLeftToMove == 0) {
+ System.out.println("The cluster is balanced. Exiting...");
+ return SUCCESS;
+ } else {
+ LOG.info( "Need to move "+ FsShell.byteDesc(bytesLeftToMove)
+ +" bytes to make the cluster balanced." );
+ }
+
+ /* Decide all the nodes that will participate in the block move and
+ * the number of bytes that need to be moved from one node to another
+ * in this iteration. Maximum bytes to be moved per node is
+ * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).
+ */
+ long bytesToMove = chooseNodes();
+ if (bytesToMove == 0) {
+ System.out.println("No block can be moved. Exiting...");
+ return NO_MOVE_BLOCK;
+ } else {
+ LOG.info( "Will move " + FsShell.byteDesc(bytesToMove) +
+ "bytes in this iteration");
+ }
+
+ formatter.format("%-24s %10d %19s %18s %17s\n",
+ DateFormat.getDateTimeInstance().format(new Date()),
+ iterations,
+ FsShell.byteDesc(bytesMoved.get()),
+ FsShell.byteDesc(bytesLeftToMove),
+ FsShell.byteDesc(bytesToMove)
+ );
+
+ /* For each pair of <source, target>, start a thread that repeatedly
+ * decide a block to be moved and its proxy source,
+ * then initiates the move until all bytes are moved or no more block
+ * available to move.
+ * Exit no byte has been moved for 5 consecutive iterations.
+ */
+ if (dispatchBlockMoves() > 0) {
+ notChangedIterations = 0;
+ } else {
+ notChangedIterations++;
+ if (notChangedIterations >= 5) {
+ System.out.println(
+ "No block has been moved for 5 iterations. Exiting...");
+ return NO_MOVE_PROGRESS;
+ }
+ }
+
+ // clean all lists
+ resetData();
+
+ try {
+ Thread.sleep(2*conf.getLong("dfs.heartbeat.interval", 3));
+ } catch (InterruptedException ignored) {
+ }
+
+ iterations++;
+ }
+ } catch (IllegalArgumentException ae) {
+ return ILLEGAL_ARGS;
+ } catch (IOException e) {
+ System.out.println("Received an IO exception: " + e.getMessage() +
+ " . Exiting...");
+ return IO_EXCEPTION;
+ } finally {
+ IOUtils.closeStream(out);
+ try {
+ fs.delete(BALANCER_ID_PATH);
+ } catch(IOException ignored) {
+ }
+ System.out.println("Balancing took " +
+ time2Str(FSNamesystem.now()-startTime));
+ }
+ }
+
+ private Path BALANCER_ID_PATH = new Path("/system/balancer.id");
+ /* The idea for making sure that there is no more than one balancer
+ * running in an HDFS is to create a file in the HDFS, writes the IP address
+ * of the machine on which the balancer is running to the file, but did not
+ * close the file until the balancer exits.
+ * This prevents the second balancer from running because it can not
+ * creates the file while the first one is running.
+ *
+ * This method checks if there is any running balancer and
+ * if no, mark yes if no.
+ * Note that this is an atomic operation.
+ *
+ * Return null if there is a running balancer; otherwise the output stream
+ * to the newly created file.
+ */
+ private OutputStream checkAndMarkRunningBalancer() throws IOException {
+ try {
+ DataOutputStream out = fs.create(BALANCER_ID_PATH);
+ out. writeBytes(InetAddress.getLocalHost().getHostName());
+ out.flush();
+ return out;
+ } catch(RemoteException e) {
+ if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
+ return null;
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ /* Given elaspedTime in ms, return a printable string */
+ private static String time2Str(long elapsedTime) {
+ String unit;
+ double time = elapsedTime;
+ if (elapsedTime < 1000) {
+ unit = "milliseconds";
+ } else if (elapsedTime < 60*1000) {
+ unit = "seconds";
+ time = time/1000;
+ } else if (elapsedTime < 3600*1000) {
+ unit = "minutes";
+ time = time/(60*1000);
+ } else {
+ unit = "hours";
+ time = time/(3600*1000);
+ }
+
+ return time+" "+unit;
+ }
+
+ /** return this balancer's configuration */
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /** set this balancer's configuration */
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+}
Propchange: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Balancer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Balancer.java
------------------------------------------------------------------------------
svn:keywords = Id Revision HeadURL
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=601491&r1=601490&r2=601491&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Dec 5 11:49:16 2007
@@ -75,7 +75,7 @@
private TreeMap<String, OutputStream> pendingCreates =
new TreeMap<String, OutputStream>();
- private static ClientProtocol createNamenode(
+ static ClientProtocol createNamenode(
InetSocketAddress nameNodeAddr, Configuration conf)
throws IOException {
RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=601491&r1=601490&r2=601491&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Wed Dec 5 11:49:16 2007
@@ -115,10 +115,10 @@
int defaultBytesPerChecksum = 512;
// The following three fields are to support balancing
- final private static long BALANCE_BANDWIDTH = 1024L*1024; // 1MB/s
- final private static short MAX_BALANCING_THREADS = 5;
+ final static short MAX_BALANCING_THREADS = 5;
private Semaphore balancingSem = new Semaphore(MAX_BALANCING_THREADS);
- private Throttler balancingThrottler = new Throttler(BALANCE_BANDWIDTH);
+ long balanceBandwidth;
+ private Throttler balancingThrottler;
private static class DataNodeMetrics implements Updater {
private final MetricsRecord metricsRecord;
@@ -288,6 +288,11 @@
this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
DataNode.nameNodeAddr = nameNodeAddr;
+ //set up parameter for cluster balancing
+ this.balanceBandwidth = conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024);
+ LOG.info("Balancing bandwith is "+balanceBandwidth + " bytes/s");
+ this.balancingThrottler = new Throttler(balanceBandwidth);
+
//create a servlet to serve full-file content
String infoAddr = conf.get("dfs.datanode.http.bindAddress", "0.0.0.0:50075");
InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
@@ -554,8 +559,10 @@
synchronized(receivedBlockList) {
synchronized(delHints) {
int numBlocks = receivedBlockList.size();
- if (receivedBlockList.size() > 0) {
- assert(numBlocks==delHints.size());
+ if (numBlocks > 0) {
+ if(numBlocks!=delHints.size()) {
+ LOG.warn("Panic: receiveBlockList and delHints are not of the same length" );
+ }
//
// Send newly-received blockids to namenode
//
@@ -565,6 +572,9 @@
}
}
if (blockArray != null) {
+ if(delHintArray == null || delHintArray.length != blockArray.length ) {
+ LOG.warn("Panic: block array & delHintArray are not the same" );
+ }
namenode.blockReceived(dnRegistration, blockArray, delHintArray);
synchronized (receivedBlockList) {
synchronized (delHints) {
@@ -753,6 +763,9 @@
* client? For now we don't.
*/
private void notifyNamenodeReceivedBlock(Block block, String delHint) {
+ if(block==null || delHint==null) {
+ throw new IllegalArgumentException(block==null?"Block is null":"delHint is null");
+ }
synchronized (receivedBlockList) {
synchronized (delHints) {
receivedBlockList.add(block);
@@ -1149,7 +1162,7 @@
// notify name node
notifyNamenodeReceivedBlock(block, sourceID);
- LOG.info("Received block " + block +
+ LOG.info("Moved block " + block +
" from " + s.getRemoteSocketAddress());
} catch (IOException ioe) {
opStatus = OP_STATUS_ERROR;
@@ -1181,7 +1194,7 @@
/** Constructor */
Throttler(long bandwidthPerSec) {
- this(1000, bandwidthPerSec); // by default throttling period is 1s
+ this(500, bandwidthPerSec); // by default throttling period is 500ms
}
/** Constructor */
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?rev=601491&r1=601490&r2=601491&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Wed Dec 5 11:49:16 2007
@@ -234,6 +234,13 @@
+ "] is less than the number of datanodes [" + numDataNodes + "].");
}
+ if (simulatedCapacities != null
+ && numDataNodes > simulatedCapacities.length) {
+ throw new IllegalArgumentException( "The length of simulatedCapacities ["
+ + simulatedCapacities.length
+ + "] is less than the number of datanodes [" + numDataNodes + "].");
+ }
+
// Set up the right ports for the datanodes
conf.set("dfs.datanode.bindAddress", "0.0.0.0:0");
conf.set("dfs.datanode.http.bindAddress", "0.0.0.0:0");
@@ -263,9 +270,8 @@
}
if (simulatedCapacities != null) {
dnConf.setBoolean("dfs.datanode.simulateddatastorage", true);
- }
- if (simulatedCapacities != null && i < simulatedCapacities.length) {
- dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY, simulatedCapacities[i]);
+ dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
+ simulatedCapacities[i-curDatanodesNum]);
}
System.out.println("Starting DataNode " + i + " with dfs.data.dir: "
+ dnConf.get("dfs.data.dir"));
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java?rev=601491&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java Wed Dec 5 11:49:16 2007
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.FSConstants.DatanodeReportType;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import junit.framework.TestCase;
+/**
+ * This class tests if a balancer schedules tasks correctly.
+ */
+public class TestBalancer extends TestCase {
+ private static final Configuration CONF = new Configuration();
+ final private static long CAPACITY = 500L;
+ final private static String RACK0 = "/rack0";
+ final private static String RACK1 = "/rack1";
+ final private static String RACK2 = "/rack2";
+ final static private String fileName = "/tmp.txt";
+ final static private Path filePath = new Path(fileName);
+ private MiniDFSCluster cluster;
+
+ ClientProtocol client;
+
+ static final int DEFAULT_BLOCK_SIZE = 10;
+ private Balancer balancer;
+ private Random r = new Random();
+
+ static {
+ CONF.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ CONF.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE);
+ CONF.setLong("dfs.heartbeat.interval", 1L);
+ CONF.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+ Balancer.setBlockMoveWaitTime(1000L) ;
+ }
+
+ /* create a file with a length of <code>fileLen</code> */
+ private void createFile(long fileLen, short replicationFactor)
+ throws IOException {
+ FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, filePath, fileLen,
+ replicationFactor, r.nextLong());
+ DFSTestUtil.waitReplication(fs, filePath, replicationFactor);
+ }
+
+
+ /* fill up a cluster with <code>numNodes</code> datanodes
+ * whose used space to be <code>size</code>
+ */
+ private Block[] generateBlocks(long size, short numNodes) throws IOException {
+ cluster = new MiniDFSCluster( CONF, numNodes, true, null);
+ try {
+ cluster.waitActive();
+ client = DFSClient.createNamenode(
+ DataNode.createSocketAddr(CONF.get("fs.default.name")), CONF);
+
+ short replicationFactor = (short)(numNodes-1);
+ long fileLen = size/replicationFactor;
+ createFile(fileLen, replicationFactor);
+
+ List<LocatedBlock> locatedBlocks = cluster.getNameNode().
+ getBlockLocations(fileName, 0, fileLen).getLocatedBlocks();
+
+ int numOfBlocks = locatedBlocks.size();
+ Block[] blocks = new Block[numOfBlocks];
+ for(int i=0; i<numOfBlocks; i++) {
+ Block b = locatedBlocks.get(i).getBlock();
+ blocks[i] = new Block(b.getBlockId(), b.getNumBytes());
+ }
+
+ return blocks;
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /* Distribute all blocks according to the given distribution */
+ Block[][] distributeBlocks(Block[] blocks, short replicationFactor,
+ final long[] distribution ) {
+ // make a copy
+ long[] usedSpace = new long[distribution.length];
+ System.arraycopy(distribution, 0, usedSpace, 0, distribution.length);
+
+ List<List<Block>> blockReports =
+ new ArrayList<List<Block>>(usedSpace.length);
+ Block[][] results = new Block[usedSpace.length][];
+ for(int i=0; i<usedSpace.length; i++) {
+ blockReports.add(new ArrayList<Block>());
+ }
+ for(int i=0; i<blocks.length; i++) {
+ for(int j=0; j<replicationFactor; j++) {
+ boolean notChosen = true;
+ while(notChosen) {
+ int chosenIndex = r.nextInt(usedSpace.length);
+ if( usedSpace[chosenIndex]>0 ) {
+ notChosen = false;
+ blockReports.get(chosenIndex).add(blocks[i]);
+ usedSpace[chosenIndex] -= blocks[i].getNumBytes();
+ }
+ }
+ }
+ }
+ for(int i=0; i<usedSpace.length; i++) {
+ List<Block> nodeBlockList = blockReports.get(i);
+ results[i] = nodeBlockList.toArray(new Block[nodeBlockList.size()]);
+ }
+ return results;
+ }
+
+ /* we first start a cluster and fill the cluster up to a certain size.
+ * then redistribute blocks according the required distribution.
+ * Afterwards a balancer is running to balance the cluster.
+ */
+ private void testUnevenDistribution(
+ long distribution[], long capacities[], String[] racks) throws Exception {
+ int numDatanodes = distribution.length;
+ if (capacities.length != numDatanodes || racks.length != numDatanodes) {
+ throw new IllegalArgumentException("Array length is not the same");
+ }
+
+ // calculate total space that need to be filled
+ long totalUsedSpace=0L;
+ for(int i=0; i<distribution.length; i++) {
+ totalUsedSpace += distribution[i];
+ }
+
+ // fill the cluster
+ Block[] blocks = generateBlocks(totalUsedSpace, (short)numDatanodes);
+
+ // redistribute blocks
+ Block[][] blocksDN = distributeBlocks(
+ blocks, (short)(numDatanodes-1), distribution);
+
+ // restart the cluster: do NOT format the cluster
+ CONF.set("dfs.safemode.threshold.pct", "0.0f");
+ cluster = new MiniDFSCluster(0, CONF, numDatanodes,
+ false, true, null, racks, capacities);
+ cluster.waitActive();
+ client = DFSClient.createNamenode(
+ DataNode.createSocketAddr(CONF.get("fs.default.name")), CONF);
+
+ cluster.injectBlocks(blocksDN);
+
+ long totalCapacity = 0L;
+ for(long capacity:capacities) {
+ totalCapacity += capacity;
+ }
+ runBalancer(totalUsedSpace, totalCapacity);
+ }
+
+ /* wait for one heartbeat */
+ private void waitForHeartBeat( long expectedUsedSpace, long expectedTotalSpace )
+ throws IOException {
+ long[] status = client.getStats();
+ while(status[0] != expectedTotalSpace || status[1] != expectedUsedSpace ) {
+ try {
+ Thread.sleep(100L);
+ } catch(InterruptedException ignored) {
+ }
+ status = client.getStats();
+ }
+ }
+
+ /* This test start a one-node cluster, fill the node to be 30% full;
+ * It then adds an empty node and start balancing.
+ * @param newCapacity new node's capacity
+ * @param new
+ */
+ private void test(long[] capacities, String[] racks,
+ long newCapacity, String newRack) throws Exception {
+ int numOfDatanodes = capacities.length;
+ assertEquals(numOfDatanodes, racks.length);
+ cluster = new MiniDFSCluster(0, CONF, capacities.length, true, true, null,
+ racks, capacities);
+ try {
+ cluster.waitActive();
+ client = DFSClient.createNamenode(
+ DataNode.createSocketAddr(CONF.get("fs.default.name")), CONF);
+
+ long totalCapacity=0L;
+ for(long capacity:capacities) {
+ totalCapacity += capacity;
+ }
+ // fill up the cluster to be 30% full
+ long totalUsedSpace = totalCapacity*3/10;
+ createFile(totalUsedSpace/numOfDatanodes, (short)numOfDatanodes);
+ // start up an empty node with the same capacity and on the same rack
+ cluster.startDataNodes(CONF, 1, true, null,
+ new String[]{newRack}, new long[]{newCapacity});
+
+ totalCapacity += newCapacity;
+
+ // run balancer and validate results
+ runBalancer(totalUsedSpace, totalCapacity);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /* Start balancer and check if the cluster is balanced after the run */
+ private void runBalancer( long totalUsedSpace, long totalCapacity )
+ throws Exception {
+ waitForHeartBeat(totalUsedSpace, totalCapacity);
+
+ // start rebalancing
+ balancer = new Balancer(CONF);
+ balancer.run(new String[0]);
+
+ waitForHeartBeat(totalUsedSpace, totalCapacity);
+ boolean balanced;
+ do {
+ DatanodeInfo[] datanodeReport =
+ client.getDatanodeReport(DatanodeReportType.ALL);
+ assertEquals(datanodeReport.length, cluster.getDataNodes().size());
+ balanced = true;
+ double avgUtilization = ((double)totalUsedSpace)/totalCapacity*100;
+ for(DatanodeInfo datanode:datanodeReport) {
+ if(Math.abs(avgUtilization-
+ ((double)datanode.getDfsUsed())/datanode.getCapacity()*100)>10) {
+ balanced = false;
+ try {
+ Thread.sleep(100);
+ } catch(InterruptedException ignored) {
+ }
+ break;
+ }
+ }
+ } while(!balanced);
+
+ }
+ /** Test a cluster with even distribution,
+ * then a new empty node is added to the cluster*/
+ public void testBalancer0() throws Exception {
+ /** one-node cluster test*/
+ // add an empty node with half of the CAPACITY & the same rack
+ test(new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
+
+ /** two-node cluster test */
+ test(new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+ CAPACITY, RACK2);
+ }
+
+ /** Test unevenly distributed cluster */
+ public void testBalancer1() throws Exception {
+ testUnevenDistribution(
+ new long[] {50*CAPACITY/100, 10*CAPACITY/100},
+ new long[]{CAPACITY, CAPACITY},
+ new String[] {RACK0, RACK1});
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ TestBalancer balancerTest = new TestBalancer();
+ balancerTest.testBalancer0();
+ balancerTest.testBalancer1();
+ }
+}
Propchange: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java
------------------------------------------------------------------------------
svn:keywords = Id Revision HeadURL