You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2011/07/30 02:10:41 UTC
svn commit: r1152401 - in /hadoop/common/trunk/hdfs: ./
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/blockmanagement/
src/java/org/apache/hadoop/hdfs/server/datanode/ src/java/org/apa...
Author: szetszwo
Date: Sat Jul 30 00:10:39 2011
New Revision: 1152401
URL: http://svn.apache.org/viewvc?rev=1152401&view=rev
Log:
HDFS-2202. Add a new DFSAdmin command to set balancer bandwidth of datanodes without restarting. Contributed by Eric Payne
Added:
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/BalancerBandwidthCommand.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestBalancerBandwidth.java
Modified:
hadoop/common/trunk/hdfs/CHANGES.txt
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1152401&r1=1152400&r2=1152401&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Sat Jul 30 00:10:39 2011
@@ -303,6 +303,9 @@ Trunk (unreleased changes)
HDFS-2156. Make hdfs and mapreduce rpm only depend on the same major
version for common and hdfs. (eyang via omalley)
+ HDFS-2202. Add a new DFSAdmin command to set balancer bandwidth of
+ datanodes without restarting. (Eric Payne via szetszwo)
+
IMPROVEMENTS
HDFS-1875. MiniDFSCluster hard-codes dfs.datanode.address to localhost
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1152401&r1=1152400&r2=1152401&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java Sat Jul 30 00:10:39 2011
@@ -1269,6 +1269,18 @@ public class DFSClient implements FSCons
public void metaSave(String pathname) throws IOException {
namenode.metaSave(pathname);
}
+
+ /**
+ * Requests the namenode to tell all datanodes to use a new, non-persistent
+ * bandwidth value for dfs.balance.bandwidthPerSec.
+ * See {@link ClientProtocol#setBalancerBandwidth(long)}
+ * for more details.
+ *
+ * @see ClientProtocol#setBalancerBandwidth(long)
+ */
+ public void setBalancerBandwidth(long bandwidth) throws IOException {
+ namenode.setBalancerBandwidth(bandwidth);
+ }
/**
* @see ClientProtocol#finalizeUpgrade()
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1152401&r1=1152400&r2=1152401&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Sat Jul 30 00:10:39 2011
@@ -867,4 +867,17 @@ public class DistributedFileSystem exten
throws IOException {
dfs.cancelDelegationToken(token);
}
+
+ /**
+ * Requests the namenode to tell all datanodes to use a new, non-persistent
+ * bandwidth value for dfs.balance.bandwidthPerSec.
+ * The bandwidth parameter is the max number of bytes per second of network
+ * bandwidth to be used by a datanode during balancing.
+ *
+ * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
+ * @throws IOException
+ */
+ public void setBalancerBandwidth(long bandwidth) throws IOException {
+ dfs.setBalancerBandwidth(bandwidth);
+ }
}
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1152401&r1=1152400&r2=1152401&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Sat Jul 30 00:10:39 2011
@@ -67,9 +67,9 @@ public interface ClientProtocol extends
* Compared to the previous version the following changes have been introduced:
* (Only the latest change is reflected.
* The log of historical changes can be retrieved from the svn).
- * 67: Add block pool ID to Block
+ * 68: Add Balancer Bandwidth Command protocol
*/
- public static final long versionID = 67L;
+ public static final long versionID = 68L;
///////////////////////////////////////
// File contents
@@ -715,6 +715,15 @@ public interface ClientProtocol extends
* @throws IOException
*/
public void metaSave(String filename) throws IOException;
+
+ /**
+ * Tell all datanodes to use a new, non-persistent bandwidth value for
+ * dfs.balance.bandwidthPerSec.
+ *
+ * @param bandwidth Blanacer bandwidth in bytes per second for this datanode.
+ * @throws IOException
+ */
+ public void setBalancerBandwidth(long bandwidth) throws IOException;
/**
* Get the file info for a specific file or directory.
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1152401&r1=1152400&r2=1152401&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Sat Jul 30 00:10:39 2011
@@ -106,6 +106,14 @@ public class DatanodeDescriptor extends
public boolean isAlive = false;
public boolean needKeyUpdate = false;
+ // A system administrator can tune the balancer bandwidth parameter
+ // (dfs.balance.bandwidthPerSec) dynamically by calling
+ // "dfsadmin -setBalanacerBandwidth <newbandwidth>", at which point the
+ // following 'bandwidth' variable gets updated with the new value for each
+ // node. Once the heartbeat command is issued to update the value on the
+ // specified datanode, this value will be set back to 0.
+ private long bandwidth;
+
/** A queue of blocks to be replicated by this datanode */
private BlockQueue<BlockTargetPair> replicateBlocks = new BlockQueue<BlockTargetPair>();
/** A queue of blocks to be recovered by this datanode */
@@ -569,4 +577,20 @@ public class DatanodeDescriptor extends
public void updateRegInfo(DatanodeID nodeReg) {
super.updateRegInfo(nodeReg);
}
+
+ /**
+ * @return Blanacer bandwidth in bytes per second for this datanode.
+ */
+ public long getBalancerBandwidth() {
+ return this.bandwidth;
+ }
+
+ /**
+ * @param bandwidth Blanacer bandwidth in bytes per second for this datanode.
+ */
+ public void setBalancerBandwidth(long bandwidth) {
+ this.bandwidth = bandwidth;
+ }
+
+
}
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1152401&r1=1152400&r2=1152401&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Sat Jul 30 00:10:39 2011
@@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
import org.apache.hadoop.net.DNSToSwitchMapping;
@@ -749,7 +750,7 @@ public class DatanodeManager {
return new DatanodeCommand[] { brCommand };
}
- final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(3);
+ final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
//check pending replication
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
maxTransfers);
@@ -765,6 +766,14 @@ public class DatanodeManager {
}
namesystem.addKeyUpdateCommand(cmds, nodeinfo);
+
+ // check for balancer bandwidth update
+ if (nodeinfo.getBalancerBandwidth() > 0) {
+ cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
+ // set back to 0 to indicate that datanode has been sent the new value
+ nodeinfo.setBalancerBandwidth(0);
+ }
+
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
@@ -773,4 +782,26 @@ public class DatanodeManager {
return null;
}
+
+ /**
+ * Tell all datanodes to use a new, non-persistent bandwidth value for
+ * dfs.balance.bandwidthPerSec.
+ *
+ * A system administrator can tune the balancer bandwidth parameter
+ * (dfs.datanode.balance.bandwidthPerSec) dynamically by calling
+ * "dfsadmin -setBalanacerBandwidth newbandwidth", at which point the
+ * following 'bandwidth' variable gets updated with the new value for each
+ * node. Once the heartbeat command is issued to update the value on the
+ * specified datanode, this value will be set back to 0.
+ *
+ * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
+ * @throws IOException
+ */
+ public void setBalancerBandwidth(long bandwidth) throws IOException {
+ synchronized(datanodeMap) {
+ for (DatanodeDescriptor nodeInfo : datanodeMap.values()) {
+ nodeInfo.setBalancerBandwidth(bandwidth);
+ }
+ }
+ }
}
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1152401&r1=1152400&r2=1152401&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sat Jul 30 00:10:39 2011
@@ -108,6 +108,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtocolSignature;
@@ -1340,6 +1341,16 @@ public class DataNode extends Configured
((KeyUpdateCommand) cmd).getExportedKeys());
}
break;
+ case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
+ LOG.info("DatanodeCommand action: DNA_BALANCERBANDWIDTHUPDATE");
+ long bandwidth =
+ ((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue();
+ if (bandwidth > 0) {
+ DataXceiverServer dxcs =
+ (DataXceiverServer) dataXceiverServer.getRunnable();
+ dxcs.balanceThrottler.setBandwidth(bandwidth);
+ }
+ break;
default:
LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
}
@@ -2774,4 +2785,15 @@ public class DataNode extends Configured
return new DatanodeID(getMachineName(), getStorageId(),
infoServer.getPort(), getIpcPort());
}
+
+ /**
+ * Get current value of the max balancer bandwidth in bytes per second.
+ *
+ * @return bandwidth Blanacer bandwidth in bytes per second for this datanode.
+ */
+ public Long getBalancerBandwidth() {
+ DataXceiverServer dxcs =
+ (DataXceiverServer) this.dataXceiverServer.getRunnable();
+ return dxcs.balanceThrottler.getBandwidth();
+ }
}
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1152401&r1=1152400&r2=1152401&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sat Jul 30 00:10:39 2011
@@ -5066,5 +5066,13 @@ public class FSNamesystem implements FSC
getBlockManager().getDatanodeManager().removeDecomNodeFromList(nodeList);
}
-
+ /**
+ * Tell all datanodes to use a new, non-persistent bandwidth value for
+ * dfs.datanode.balance.bandwidthPerSec.
+ * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
+ * @throws IOException
+ */
+ public void setBalancerBandwidth(long bandwidth) throws IOException {
+ getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth);
+ }
}
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1152401&r1=1152400&r2=1152401&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Sat Jul 30 00:10:39 2011
@@ -1089,6 +1089,16 @@ public class NameNode implements Namenod
}
return new CorruptFileBlocks(files, lastCookie);
}
+
+ /**
+ * Tell all datanodes to use a new, non-persistent bandwidth value for
+ * dfs.datanode.balance.bandwidthPerSec.
+ * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
+ * @throws IOException
+ */
+ public void setBalancerBandwidth(long bandwidth) throws IOException {
+ namesystem.setBalancerBandwidth(bandwidth);
+ }
@Override // ClientProtocol
public ContentSummary getContentSummary(String path) throws IOException {
Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/BalancerBandwidthCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/BalancerBandwidthCommand.java?rev=1152401&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/BalancerBandwidthCommand.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/BalancerBandwidthCommand.java Sat Jul 30 00:10:39 2011
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+/*
+ * A system administrator can tune the balancer bandwidth parameter
+ * (dfs.balance.bandwidthPerSec) dynamically by calling
+ * "dfsadmin -setBalanacerBandwidth newbandwidth".
+ * This class is to define the command which sends the new bandwidth value to
+ * each datanode.
+ */
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * Balancer bandwidth command instructs each datanode to change its value for
+ * the max amount of network bandwidth it may use during the block balancing
+ * operation.
+ *
+ * The Balancer Bandwidth Command contains the new bandwidth value as its
+ * payload. The bandwidth value is in bytes per second.
+ */
+public class BalancerBandwidthCommand extends DatanodeCommand {
+ private final static long BBC_DEFAULTBANDWIDTH = 0L;
+
+ private long bandwidth;
+
+ /**
+ * Balancer Bandwidth Command constructor. Sets bandwidth to 0.
+ */
+ BalancerBandwidthCommand() {
+ this(BBC_DEFAULTBANDWIDTH);
+ }
+
+ /**
+ * Balancer Bandwidth Command constructor.
+ *
+ * @param bandwidth Blanacer bandwidth in bytes per second.
+ */
+ public BalancerBandwidthCommand(long bandwidth) {
+ super(DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE);
+ this.bandwidth = bandwidth;
+ }
+
+ /**
+ * Get current value of the max balancer bandwidth in bytes per second.
+ *
+ * @return bandwidth Blanacer bandwidth in bytes per second for this datanode.
+ */
+ public long getBalancerBandwidthValue() {
+ return this.bandwidth;
+ }
+
+ // ///////////////////////////////////////////////
+ // Writable
+ // ///////////////////////////////////////////////
+ static { // register a ctor
+ WritableFactories.setFactory(BalancerBandwidthCommand.class, new WritableFactory() {
+ public Writable newInstance() {
+ return new BalancerBandwidthCommand();
+ }
+ });
+ }
+
+ /**
+ * Writes the bandwidth payload to the Balancer Bandwidth Command packet.
+ * @param out DataOutput stream used for writing commands to the datanode.
+ * @throws IOException
+ */
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeLong(this.bandwidth);
+ }
+
+ /**
+ * Reads the bandwidth payload from the Balancer Bandwidth Command packet.
+ * @param in DataInput stream used for reading commands to the datanode.
+ * @throws IOException
+ */
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ this.bandwidth = in.readLong();
+ }
+}
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1152401&r1=1152400&r2=1152401&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Sat Jul 30 00:10:39 2011
@@ -45,9 +45,9 @@ import org.apache.avro.reflect.Nullable;
@InterfaceAudience.Private
public interface DatanodeProtocol extends VersionedProtocol {
/**
- * 27: Add block pool ID to Block
+ * 28: Add Balancer Bandwidth Command protocol.
*/
- public static final long versionID = 27L;
+ public static final long versionID = 28L;
// error code
final static int NOTIFY = 0;
@@ -67,6 +67,7 @@ public interface DatanodeProtocol extend
final static int DNA_FINALIZE = 5; // finalize previous upgrade
final static int DNA_RECOVERBLOCK = 6; // request a block recovery
final static int DNA_ACCESSKEYUPDATE = 7; // update access key
+ final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth
/**
* Register Datanode.
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1152401&r1=1152400&r2=1152401&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Sat Jul 30 00:10:39 2011
@@ -452,6 +452,40 @@ public class DFSAdmin extends FsShell {
return exitCode;
}
+ /**
+ * Command to ask the namenode to set the balancer bandwidth for all of the
+ * datanodes.
+ * Usage: java DFSAdmin -setBalancerBandwidth bandwidth
+ * @param argv List of of command line parameters.
+ * @param idx The index of the command that is being processed.
+ * @exception IOException
+ */
+ public int setBalancerBandwidth(String[] argv, int idx) throws IOException {
+ long bandwidth;
+ int exitCode = -1;
+
+ try {
+ bandwidth = Long.parseLong(argv[idx]);
+ } catch (NumberFormatException nfe) {
+ System.err.println("NumberFormatException: " + nfe.getMessage());
+ System.err.println("Usage: java DFSAdmin"
+ + " [-setBalancerBandwidth <bandwidth in bytes per second>]");
+ return exitCode;
+ }
+
+ FileSystem fs = getFS();
+ if (!(fs instanceof DistributedFileSystem)) {
+ System.err.println("FileSystem is " + fs.getUri());
+ return exitCode;
+ }
+
+ DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ dfs.setBalancerBandwidth(bandwidth);
+ exitCode = 0;
+
+ return exitCode;
+ }
+
private void printHelp(String cmd) {
String summary = "hadoop dfsadmin is the command to execute DFS administrative commands.\n" +
"The full syntax is: \n\n" +
@@ -469,6 +503,7 @@ public class DFSAdmin extends FsShell {
"\t[-printTopology]\n" +
"\t[-refreshNamenodes datanodehost:port]\n"+
"\t[-deleteBlockPool datanodehost:port blockpoolId [force]]\n"+
+ "\t[-setBalancerBandwidth <bandwidth>]\n" +
"\t[-help [cmd]]\n";
String report ="-report: \tReports basic filesystem information and statistics.\n";
@@ -546,6 +581,14 @@ public class DFSAdmin extends FsShell {
"\t\t will fail if datanode is still serving the block pool.\n" +
"\t\t Refer to refreshNamenodes to shutdown a block pool\n" +
"\t\t service on a datanode.\n";
+
+ String setBalancerBandwidth = "-setBalancerBandwidth <bandwidth>:\n" +
+ "\tChanges the network bandwidth used by each datanode during\n" +
+ "\tHDFS block balancing.\n\n" +
+ "\t\t<bandwidth> is the maximum number of bytes per second\n" +
+ "\t\tthat will be used by each datanode. This value overrides\n" +
+ "\t\tthe dfs.balance.bandwidthPerSec parameter.\n\n" +
+ "\t\t--- NOTE: The new value is not persistent on the DataNode.---\n";
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
"\t\tis specified.\n";
@@ -586,6 +629,8 @@ public class DFSAdmin extends FsShell {
System.out.println(refreshNamenodes);
} else if ("deleteBlockPool".equals(cmd)) {
System.out.println(deleteBlockPool);
+ } else if ("setBalancerBandwidth".equals(cmd)) {
+ System.out.println(setBalancerBandwidth);
} else if ("help".equals(cmd)) {
System.out.println(help);
} else {
@@ -879,6 +924,9 @@ public class DFSAdmin extends FsShell {
} else if ("-deleteBlockPool".equals(cmd)) {
System.err.println("Usage: java DFSAdmin"
+ " [-deleteBlockPool datanode-host:port blockpoolId [force]]");
+ } else if ("-setBalancerBandwidth".equals(cmd)) {
+ System.err.println("Usage: java DFSAdmin"
+ + " [-setBalancerBandwidth <bandwidth in bytes per second>]");
} else {
System.err.println("Usage: java DFSAdmin");
System.err.println(" [-report]");
@@ -899,6 +947,7 @@ public class DFSAdmin extends FsShell {
System.err.println(" ["+ClearQuotaCommand.USAGE+"]");
System.err.println(" ["+SetSpaceQuotaCommand.USAGE+"]");
System.err.println(" ["+ClearSpaceQuotaCommand.USAGE+"]");
+ System.err.println(" [-setBalancerBandwidth <bandwidth in bytes per second>]");
System.err.println(" [-help [cmd]]");
System.err.println();
ToolRunner.printGenericCommandUsage(System.err);
@@ -990,6 +1039,11 @@ public class DFSAdmin extends FsShell {
printUsage(cmd);
return exitCode;
}
+ } else if ("-setBalancerBandwidth".equals(cmd)) {
+ if (argv.length != 2) {
+ printUsage(cmd);
+ return exitCode;
+ }
}
// initialize DFSAdmin
@@ -1042,6 +1096,8 @@ public class DFSAdmin extends FsShell {
exitCode = refreshNamenodes(argv, i);
} else if ("-deleteBlockPool".equals(cmd)) {
exitCode = deleteBlockPool(argv, i);
+ } else if ("-setBalancerBandwidth".equals(cmd)) {
+ exitCode = setBalancerBandwidth(argv, i);
} else if ("-help".equals(cmd)) {
if (i < argv.length) {
printHelp(argv[i]);
Added: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestBalancerBandwidth.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestBalancerBandwidth.java?rev=1152401&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestBalancerBandwidth.java (added)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestBalancerBandwidth.java Sat Jul 30 00:10:39 2011
@@ -0,0 +1,93 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hdfs;
+
+import java.util.ArrayList;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+
+/**
+ * This test ensures that the balancer bandwidth is dynamically adjusted
+ * correctly.
+ */
+public class TestBalancerBandwidth extends TestCase {
+ final static private Configuration conf = new Configuration();
+ final static private int NUM_OF_DATANODES = 2;
+ final static private int DEFAULT_BANDWIDTH = 1024*1024;
+ public static final Log LOG = LogFactory.getLog(TestBalancerBandwidth.class);
+
+ public void testBalancerBandwidth() throws Exception {
+ /* Set bandwidthPerSec to a low value of 1M bps. */
+ conf.setLong(
+ DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
+ DEFAULT_BANDWIDTH);
+
+ /* Create and start cluster */
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_OF_DATANODES).build();
+ try {
+ cluster.waitActive();
+
+ DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
+
+ ArrayList<DataNode> datanodes = cluster.getDataNodes();
+ // Ensure value from the configuration is reflected in the datanodes.
+ assertEquals(DEFAULT_BANDWIDTH, (long) datanodes.get(0).getBalancerBandwidth());
+ assertEquals(DEFAULT_BANDWIDTH, (long) datanodes.get(1).getBalancerBandwidth());
+
+ // Dynamically change balancer bandwidth and ensure the updated value
+ // is reflected on the datanodes.
+ long newBandwidth = 12 * DEFAULT_BANDWIDTH; // 12M bps
+ fs.setBalancerBandwidth(newBandwidth);
+
+ // Give it a few seconds to propogate new the value to the datanodes.
+ try {
+ Thread.sleep(5000);
+ } catch (Exception e) {}
+
+ assertEquals(newBandwidth, (long) datanodes.get(0).getBalancerBandwidth());
+ assertEquals(newBandwidth, (long) datanodes.get(1).getBalancerBandwidth());
+
+ // Dynamically change balancer bandwidth to 0. Balancer bandwidth on the
+ // datanodes should remain as it was.
+ fs.setBalancerBandwidth(0);
+
+ // Give it a few seconds to propogate new the value to the datanodes.
+ try {
+ Thread.sleep(5000);
+ } catch (Exception e) {}
+
+ assertEquals(newBandwidth, (long) datanodes.get(0).getBalancerBandwidth());
+ assertEquals(newBandwidth, (long) datanodes.get(1).getBalancerBandwidth());
+ }finally {
+ cluster.shutdown();
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ new TestBalancerBandwidth().testBalancerBandwidth();
+ }
+}