You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/06/24 01:36:22 UTC
[26/49] hadoop git commit: HDFS-9546: DiskBalancer: Add Execute
command. Contributed by Anu Engineer.
HDFS-9546: DiskBalancer: Add Execute command. Contributed by Anu Engineer.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1b39b283
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1b39b283
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1b39b283
Branch: refs/heads/HDFS-1312
Commit: 1b39b283c70854bf3b77f5ba9fbcce064bfea5c3
Parents: 75882ec
Author: Anu Engineer <ae...@apache.org>
Authored: Fri May 13 10:52:58 2016 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu Jun 23 18:21:08 2016 -0700
----------------------------------------------------------------------
.../server/diskbalancer/command/Command.java | 18 ++-
.../diskbalancer/command/ExecuteCommand.java | 119 +++++++++++++++++++
.../diskbalancer/command/PlanCommand.java | 22 ++--
.../apache/hadoop/hdfs/tools/DiskBalancer.java | 71 ++++++-----
4 files changed, 187 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b39b283/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java
index 6522434..919d549 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java
@@ -24,6 +24,7 @@ import org.apache.commons.cli.Option;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -81,8 +82,6 @@ public abstract class Command extends Configured {
public Command(Configuration conf) {
super(conf);
// These arguments are valid for all commands.
- addValidCommandParameters(DiskBalancer.NAMENODEURI, "Name Node URI or " +
- "file URI for cluster");
addValidCommandParameters(DiskBalancer.HELP, "Help for this command");
addValidCommandParameters("arg", "");
}
@@ -348,10 +347,25 @@ public abstract class Command extends Configured {
* @return OutputStream.
*/
protected FSDataOutputStream create(String fileName) throws IOException {
+ Preconditions.checkNotNull(fileName);
+ if(fs == null) {
+ fs = FileSystem.get(getConf());
+ }
return fs.create(new Path(this.diskBalancerLogs, fileName));
}
/**
+ * Returns a InputStream to read data.
+ */
+ protected FSDataInputStream open(String fileName) throws IOException {
+ Preconditions.checkNotNull(fileName);
+ if(fs == null) {
+ fs = FileSystem.get(getConf());
+ }
+ return fs.open(new Path(fileName));
+ }
+
+ /**
* Returns the output path where the plan and snapshot gets written.
*
* @return Path
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b39b283/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java
new file mode 100644
index 0000000..1f7e81f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer.command;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
+import org.apache.hadoop.hdfs.tools.DiskBalancer;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
+import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+
+
+/**
+ * executes a given plan.
+ */
+public class ExecuteCommand extends Command {
+
+ /**
+ * Constructs ExecuteCommand.
+ *
+ * @param conf - Configuration.
+ */
+ public ExecuteCommand(Configuration conf) {
+ super(conf);
+ addValidCommandParameters(DiskBalancer.EXECUTE, "Executes a given plan.");
+ addValidCommandParameters(DiskBalancer.NODE, "Name of the target node.");
+ }
+
+ /**
+ * Executes the Client Calls.
+ *
+ * @param cmd - CommandLine
+ */
+ @Override
+ public void execute(CommandLine cmd) throws Exception {
+ LOG.info("Executing \"execute plan\" command");
+ Preconditions.checkState(cmd.hasOption(DiskBalancer.EXECUTE));
+ verifyCommandOptions(DiskBalancer.EXECUTE, cmd);
+
+ String planFile = cmd.getOptionValue(DiskBalancer.EXECUTE);
+ Preconditions.checkArgument(planFile == null || planFile.isEmpty(),
+ "Invalid plan file specified.");
+
+ String planData = null;
+ try (FSDataInputStream plan = open(planFile)) {
+ planData = IOUtils.toString(plan);
+ }
+ submitPlan(planData);
+ }
+
+ /**
+ * Submits plan to a given data node.
+ *
+ * @param planData - PlanData Json String.
+ * @throws IOException
+ */
+ private void submitPlan(String planData) throws IOException {
+ Preconditions.checkNotNull(planData);
+ NodePlan plan = readPlan(planData);
+ String dataNodeAddress = plan.getNodeName() + ":" + plan.getPort();
+ Preconditions.checkNotNull(dataNodeAddress);
+ ClientDatanodeProtocol dataNode = getDataNodeProxy(dataNodeAddress);
+ String planHash = DigestUtils.sha512Hex(planData);
+ try {
+ dataNode.submitDiskBalancerPlan(planHash, DiskBalancer.PLAN_VERSION,
+ planData, false); // TODO : Support skipping date check.
+ } catch (DiskBalancerException ex) {
+ LOG.error("Submitting plan on {} failed. Result: {}, Message: {}",
+ plan.getNodeName(), ex.getResult().toString(), ex.getMessage());
+ throw ex;
+ }
+ }
+
+ /**
+ * Returns a plan from the Json Data.
+ *
+ * @param planData - Json String
+ * @return NodePlan
+ * @throws IOException
+ */
+ private NodePlan readPlan(String planData) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(planData, NodePlan.class);
+ }
+
+ /**
+ * Gets extended help for this command.
+ *
+ * @return Help Message
+ */
+ @Override
+ protected String getHelp() {
+ return "Execute command takes a plan and runs it against the node. e.g. " +
+ "hdfs diskbalancer -execute <nodename.plan.json> ";
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b39b283/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java
index 2422215..d346c84 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java
@@ -24,7 +24,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.tools.DiskBalancer;
-import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
+ .DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
import org.codehaus.jackson.map.ObjectMapper;
@@ -54,6 +55,9 @@ public class PlanCommand extends Command {
this.thresholdPercentage = 1;
this.bandwidth = 0;
this.maxError = 0;
+ addValidCommandParameters(DiskBalancer.NAMENODEURI, "Name Node URI or " +
+ "file URI for cluster");
+
addValidCommandParameters(DiskBalancer.OUTFILE, "Output file");
addValidCommandParameters(DiskBalancer.BANDWIDTH, "Maximum Bandwidth to " +
"be used while copying.");
@@ -61,7 +65,6 @@ public class PlanCommand extends Command {
"we tolerate before diskbalancer starts working.");
addValidCommandParameters(DiskBalancer.MAXERROR, "Max errors to tolerate " +
"between 2 disks");
- addValidCommandParameters(DiskBalancer.NODE, "Name / Address of the node.");
addValidCommandParameters(DiskBalancer.VERBOSE, "Run plan command in " +
"verbose mode.");
}
@@ -79,7 +82,7 @@ public class PlanCommand extends Command {
Preconditions.checkState(cmd.hasOption(DiskBalancer.PLAN));
verifyCommandOptions(DiskBalancer.PLAN, cmd);
- if (!cmd.hasOption(DiskBalancer.NODE)) {
+ if (cmd.getOptionValue(DiskBalancer.PLAN) == null) {
throw new IllegalArgumentException("A node name is required to create a" +
" plan.");
}
@@ -101,10 +104,11 @@ public class PlanCommand extends Command {
}
setOutputPath(output);
- DiskBalancerDataNode node = getNode(cmd.getOptionValue(DiskBalancer.NODE));
+ // -plan nodename is the command line argument.
+ DiskBalancerDataNode node = getNode(cmd.getOptionValue(DiskBalancer.PLAN));
if (node == null) {
throw new IllegalArgumentException("Unable to find the specified node. " +
- cmd.getOptionValue(DiskBalancer.NODE));
+ cmd.getOptionValue(DiskBalancer.PLAN));
}
this.thresholdPercentage = getThresholdPercentage(cmd);
setNodesToProcess(node);
@@ -115,16 +119,16 @@ public class PlanCommand extends Command {
LOG.info("Writing plan to : {}", getOutputPath());
System.out.printf("Writing plan to : %s%n", getOutputPath());
- try(FSDataOutputStream beforeStream = create(String.format(
+ try (FSDataOutputStream beforeStream = create(String.format(
DiskBalancer.BEFORE_TEMPLATE,
- cmd.getOptionValue(DiskBalancer.NODE)))) {
+ cmd.getOptionValue(DiskBalancer.PLAN)))) {
beforeStream.write(getCluster().toJson()
.getBytes(StandardCharsets.UTF_8));
}
- try(FSDataOutputStream planStream = create(String.format(
+ try (FSDataOutputStream planStream = create(String.format(
DiskBalancer.PLAN_TEMPLATE,
- cmd.getOptionValue(DiskBalancer.NODE)))) {
+ cmd.getOptionValue(DiskBalancer.PLAN)))) {
planStream.write(getPlan(plans).getBytes(StandardCharsets.UTF_8));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b39b283/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java
index e44d3dc..87fbf4a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.diskbalancer.command.Command;
+import org.apache.hadoop.hdfs.server.diskbalancer.command.ExecuteCommand;
import org.apache.hadoop.hdfs.server.diskbalancer.command.PlanCommand;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -48,15 +49,6 @@ import java.net.URISyntaxException;
*/
public class DiskBalancer extends Configured implements Tool {
/**
- * Construct a DiskBalancer.
- *
- * @param conf
- */
- public DiskBalancer(Configuration conf) {
- super(conf);
- }
-
- /**
* NameNodeURI can point to either a real namenode, or a json file that
* contains the diskBalancer data in json form, that jsonNodeConnector knows
* how to deserialize.
@@ -66,12 +58,10 @@ public class DiskBalancer extends Configured implements Tool {
* hdfs://namenode.uri or file:///data/myCluster.json
*/
public static final String NAMENODEURI = "uri";
-
/**
* Computes a plan for a given set of nodes.
*/
public static final String PLAN = "plan";
-
/**
* Output file name, for commands like report, plan etc. This is an optional
* argument, by default diskbalancer will write all its output to
@@ -79,53 +69,58 @@ public class DiskBalancer extends Configured implements Tool {
* against.
*/
public static final String OUTFILE = "out";
-
/**
* Help for the program.
*/
public static final String HELP = "help";
-
/**
* Percentage of data unevenness that we are willing to live with. For example
* - a value like 10 indicates that we are okay with 10 % +/- from
* idealStorage Target.
*/
public static final String THRESHOLD = "thresholdPercentage";
-
/**
* Specifies the maximum disk bandwidth to use per second.
*/
public static final String BANDWIDTH = "bandwidth";
-
/**
* Specifies the maximum errors to tolerate.
*/
public static final String MAXERROR = "maxerror";
-
/**
- * Node name or IP against which Disk Balancer is being run.
+ * Executes a given plan file on the target datanode.
+ */
+ public static final String EXECUTE = "execute";
+ /**
+ * Name or address of the node to execute against.
*/
public static final String NODE = "node";
-
/**
* Runs the command in verbose mode.
*/
public static final String VERBOSE = "v";
-
+ public static final int PLAN_VERSION = 1;
/**
* Template for the Before File. It is node.before.json.
*/
public static final String BEFORE_TEMPLATE = "%s.before.json";
-
/**
* Template for the plan file. it is node.plan.json.
*/
public static final String PLAN_TEMPLATE = "%s.plan.json";
-
private static final Logger LOG =
LoggerFactory.getLogger(DiskBalancer.class);
/**
+ * Construct a DiskBalancer.
+ *
+ * @param conf
+ */
+ public DiskBalancer(Configuration conf) {
+ super(conf);
+ }
+
+ /**
* Main for the DiskBalancer Command handling.
*
* @param argv - System Args Strings[]
@@ -164,16 +159,16 @@ public class DiskBalancer extends Configured implements Tool {
*/
private Options getOpts() {
Options opts = new Options();
- addCommands(opts);
+ addPlanCommands(opts);
return opts;
}
/**
- * Adds commands that we handle to opts.
+ * Adds commands for plan command.
*
- * @param opt - Optins
+ * @param opt - Options
*/
- private void addCommands(Options opt) {
+ private void addPlanCommands(Options opt) {
Option nameNodeUri =
new Option(NAMENODEURI, true, "NameNode URI. e.g http://namenode" +
@@ -187,7 +182,8 @@ public class DiskBalancer extends Configured implements Tool {
"e.g -out outfile.txt");
opt.addOption(outFile);
- Option plan = new Option(PLAN, false, "write plan to the default file");
+ Option plan = new Option(PLAN, true , "create a plan for the given node. " +
+ "e.g -plan <nodename> | <nodeIP> | <nodeUUID>");
opt.addOption(plan);
Option bandwidth = new Option(BANDWIDTH, true, "Maximum disk bandwidth to" +
@@ -204,13 +200,19 @@ public class DiskBalancer extends Configured implements Tool {
"can be tolerated while copying between a pair of disks.");
opt.addOption(maxErrors);
- Option node = new Option(NODE, true, "Node Name or IP");
- opt.addOption(node);
-
Option help =
new Option(HELP, true, "Help about a command or this message");
opt.addOption(help);
+ }
+ /**
+ * Adds execute command options.
+ * @param opt Options
+ */
+ private void addExecuteCommands(Options opt) {
+ Option execute = new Option(EXECUTE, true , "Takes a plan file and " +
+ "submits it for execution to the datanode. e.g -execute <planfile>");
+ opt.addOption(execute);
}
/**
@@ -238,18 +240,23 @@ public class DiskBalancer extends Configured implements Tool {
Command currentCommand = null;
try {
+
if (cmd.hasOption(DiskBalancer.PLAN)) {
currentCommand = new PlanCommand(getConf());
- } else {
+ }
+
+ if(cmd.hasOption(DiskBalancer.EXECUTE)) {
+ currentCommand = new ExecuteCommand(getConf());
+ }
+
+ if(currentCommand == null) {
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.printHelp(80, "hdfs diskbalancer -uri [args]",
"disk balancer commands", opts,
"Please correct your command and try again.");
return 1;
}
-
currentCommand.execute(cmd);
-
} catch (Exception ex) {
System.err.printf(ex.getMessage());
return 1;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org