You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/06/24 01:36:22 UTC

[26/49] hadoop git commit: HDFS-9546: DiskBalancer: Add Execute command. Contributed by Anu Engineer.

HDFS-9546: DiskBalancer: Add Execute command. Contributed by Anu Engineer.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1b39b283
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1b39b283
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1b39b283

Branch: refs/heads/HDFS-1312
Commit: 1b39b283c70854bf3b77f5ba9fbcce064bfea5c3
Parents: 75882ec
Author: Anu Engineer <ae...@apache.org>
Authored: Fri May 13 10:52:58 2016 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu Jun 23 18:21:08 2016 -0700

----------------------------------------------------------------------
 .../server/diskbalancer/command/Command.java    |  18 ++-
 .../diskbalancer/command/ExecuteCommand.java    | 119 +++++++++++++++++++
 .../diskbalancer/command/PlanCommand.java       |  22 ++--
 .../apache/hadoop/hdfs/tools/DiskBalancer.java  |  71 ++++++-----
 4 files changed, 187 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b39b283/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java
index 6522434..919d549 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java
@@ -24,6 +24,7 @@ import org.apache.commons.cli.Option;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -81,8 +82,6 @@ public abstract class Command extends Configured {
   public Command(Configuration conf) {
     super(conf);
     // These arguments are valid for all commands.
-    addValidCommandParameters(DiskBalancer.NAMENODEURI, "Name Node URI or " +
-        "file URI for cluster");
     addValidCommandParameters(DiskBalancer.HELP, "Help for this command");
     addValidCommandParameters("arg", "");
   }
@@ -348,10 +347,25 @@ public abstract class Command extends Configured {
    * @return OutputStream.
    */
   protected FSDataOutputStream create(String fileName) throws IOException {
+    Preconditions.checkNotNull(fileName);
+    if(fs == null) {
+      fs = FileSystem.get(getConf());
+    }
     return fs.create(new Path(this.diskBalancerLogs, fileName));
   }
 
   /**
+   * Returns a InputStream to read data.
+   */
+  protected FSDataInputStream open(String fileName) throws IOException {
+    Preconditions.checkNotNull(fileName);
+    if(fs == null) {
+      fs = FileSystem.get(getConf());
+    }
+    return  fs.open(new Path(fileName));
+  }
+
+  /**
    * Returns the output path where the plan and snapshot gets written.
    *
    * @return Path

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b39b283/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java
new file mode 100644
index 0000000..1f7e81f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer.command;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
+import org.apache.hadoop.hdfs.tools.DiskBalancer;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
+import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+
+
+/**
+ * executes a given plan.
+ */
+public class ExecuteCommand extends Command {
+
+  /**
+   * Constructs ExecuteCommand.
+   *
+   * @param conf - Configuration.
+   */
+  public ExecuteCommand(Configuration conf) {
+    super(conf);
+    addValidCommandParameters(DiskBalancer.EXECUTE, "Executes a given plan.");
+    addValidCommandParameters(DiskBalancer.NODE, "Name of the target node.");
+  }
+
+  /**
+   * Executes the Client Calls.
+   *
+   * @param cmd - CommandLine
+   */
+  @Override
+  public void execute(CommandLine cmd) throws Exception {
+    LOG.info("Executing \"execute plan\" command");
+    Preconditions.checkState(cmd.hasOption(DiskBalancer.EXECUTE));
+    verifyCommandOptions(DiskBalancer.EXECUTE, cmd);
+
+    String planFile = cmd.getOptionValue(DiskBalancer.EXECUTE);
+    Preconditions.checkArgument(planFile == null || planFile.isEmpty(),
+        "Invalid plan file specified.");
+
+    String planData = null;
+    try (FSDataInputStream plan = open(planFile)) {
+      planData = IOUtils.toString(plan);
+    }
+    submitPlan(planData);
+  }
+
+  /**
+   * Submits plan to a given data node.
+   *
+   * @param planData - PlanData Json String.
+   * @throws IOException
+   */
+  private void submitPlan(String planData) throws IOException {
+    Preconditions.checkNotNull(planData);
+    NodePlan plan = readPlan(planData);
+    String dataNodeAddress = plan.getNodeName() + ":" + plan.getPort();
+    Preconditions.checkNotNull(dataNodeAddress);
+    ClientDatanodeProtocol dataNode = getDataNodeProxy(dataNodeAddress);
+    String planHash = DigestUtils.sha512Hex(planData);
+    try {
+      dataNode.submitDiskBalancerPlan(planHash, DiskBalancer.PLAN_VERSION,
+          planData, false); // TODO : Support skipping date check.
+    } catch (DiskBalancerException ex) {
+      LOG.error("Submitting plan on  {} failed. Result: {}, Message: {}",
+          plan.getNodeName(), ex.getResult().toString(), ex.getMessage());
+      throw ex;
+    }
+  }
+
+  /**
+   * Returns a plan from the Json Data.
+   *
+   * @param planData - Json String
+   * @return NodePlan
+   * @throws IOException
+   */
+  private NodePlan readPlan(String planData) throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    return mapper.readValue(planData, NodePlan.class);
+  }
+
+  /**
+   * Gets extended help for this command.
+   *
+   * @return Help Message
+   */
+  @Override
+  protected String getHelp() {
+    return "Execute command takes a plan and runs it against the node. e.g. " +
+        "hdfs diskbalancer -execute <nodename.plan.json> ";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b39b283/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java
index 2422215..d346c84 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java
@@ -24,7 +24,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.tools.DiskBalancer;
-import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
+    .DiskBalancerDataNode;
 import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
 import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -54,6 +55,9 @@ public class PlanCommand extends Command {
     this.thresholdPercentage = 1;
     this.bandwidth = 0;
     this.maxError = 0;
+    addValidCommandParameters(DiskBalancer.NAMENODEURI, "Name Node URI or " +
+        "file URI for cluster");
+
     addValidCommandParameters(DiskBalancer.OUTFILE, "Output file");
     addValidCommandParameters(DiskBalancer.BANDWIDTH, "Maximum Bandwidth to " +
         "be used while copying.");
@@ -61,7 +65,6 @@ public class PlanCommand extends Command {
         "we tolerate before diskbalancer starts working.");
     addValidCommandParameters(DiskBalancer.MAXERROR, "Max errors to tolerate " +
         "between 2 disks");
-    addValidCommandParameters(DiskBalancer.NODE, "Name / Address of the node.");
     addValidCommandParameters(DiskBalancer.VERBOSE, "Run plan command in " +
         "verbose mode.");
   }
@@ -79,7 +82,7 @@ public class PlanCommand extends Command {
     Preconditions.checkState(cmd.hasOption(DiskBalancer.PLAN));
     verifyCommandOptions(DiskBalancer.PLAN, cmd);
 
-    if (!cmd.hasOption(DiskBalancer.NODE)) {
+    if (cmd.getOptionValue(DiskBalancer.PLAN) == null) {
       throw new IllegalArgumentException("A node name is required to create a" +
           " plan.");
     }
@@ -101,10 +104,11 @@ public class PlanCommand extends Command {
     }
     setOutputPath(output);
 
-    DiskBalancerDataNode node = getNode(cmd.getOptionValue(DiskBalancer.NODE));
+    // -plan nodename is the command line argument.
+    DiskBalancerDataNode node = getNode(cmd.getOptionValue(DiskBalancer.PLAN));
     if (node == null) {
       throw new IllegalArgumentException("Unable to find the specified node. " +
-          cmd.getOptionValue(DiskBalancer.NODE));
+          cmd.getOptionValue(DiskBalancer.PLAN));
     }
     this.thresholdPercentage = getThresholdPercentage(cmd);
     setNodesToProcess(node);
@@ -115,16 +119,16 @@ public class PlanCommand extends Command {
     LOG.info("Writing plan to : {}", getOutputPath());
     System.out.printf("Writing plan to : %s%n", getOutputPath());
 
-    try(FSDataOutputStream beforeStream = create(String.format(
+    try (FSDataOutputStream beforeStream = create(String.format(
         DiskBalancer.BEFORE_TEMPLATE,
-        cmd.getOptionValue(DiskBalancer.NODE)))) {
+        cmd.getOptionValue(DiskBalancer.PLAN)))) {
       beforeStream.write(getCluster().toJson()
           .getBytes(StandardCharsets.UTF_8));
     }
 
-    try(FSDataOutputStream planStream = create(String.format(
+    try (FSDataOutputStream planStream = create(String.format(
         DiskBalancer.PLAN_TEMPLATE,
-        cmd.getOptionValue(DiskBalancer.NODE)))) {
+        cmd.getOptionValue(DiskBalancer.PLAN)))) {
       planStream.write(getPlan(plans).getBytes(StandardCharsets.UTF_8));
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b39b283/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java
index e44d3dc..87fbf4a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.diskbalancer.command.Command;
+import org.apache.hadoop.hdfs.server.diskbalancer.command.ExecuteCommand;
 import org.apache.hadoop.hdfs.server.diskbalancer.command.PlanCommand;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -48,15 +49,6 @@ import java.net.URISyntaxException;
  */
 public class DiskBalancer extends Configured implements Tool {
   /**
-   * Construct a DiskBalancer.
-   *
-   * @param conf
-   */
-  public DiskBalancer(Configuration conf) {
-    super(conf);
-  }
-
-  /**
    * NameNodeURI can point to either a real namenode, or a json file that
    * contains the diskBalancer data in json form, that jsonNodeConnector knows
    * how to deserialize.
@@ -66,12 +58,10 @@ public class DiskBalancer extends Configured implements Tool {
    * hdfs://namenode.uri or file:///data/myCluster.json
    */
   public static final String NAMENODEURI = "uri";
-
   /**
    * Computes a plan for a given set of nodes.
    */
   public static final String PLAN = "plan";
-
   /**
    * Output file name, for commands like report, plan etc. This is an optional
    * argument, by default diskbalancer will write all its output to
@@ -79,53 +69,58 @@ public class DiskBalancer extends Configured implements Tool {
    * against.
    */
   public static final String OUTFILE = "out";
-
   /**
    * Help for the program.
    */
   public static final String HELP = "help";
-
   /**
    * Percentage of data unevenness that we are willing to live with. For example
    * - a value like 10 indicates that we are okay with 10 % +/- from
    * idealStorage Target.
    */
   public static final String THRESHOLD = "thresholdPercentage";
-
   /**
    * Specifies the maximum disk bandwidth to use per second.
    */
   public static final String BANDWIDTH = "bandwidth";
-
   /**
    * Specifies the maximum errors to tolerate.
    */
   public static final String MAXERROR = "maxerror";
-
   /**
-   * Node name or IP against which Disk Balancer is being run.
+   * Executes a given plan file on the target datanode.
+   */
+  public static final String EXECUTE = "execute";
+  /**
+   * Name or address of the node to execute against.
    */
   public static final String NODE = "node";
-
   /**
    * Runs the command in verbose mode.
    */
   public static final String VERBOSE = "v";
-
+  public static final int PLAN_VERSION = 1;
   /**
    * Template for the Before File. It is node.before.json.
    */
   public static final String BEFORE_TEMPLATE = "%s.before.json";
-
   /**
    * Template for the plan file. it is node.plan.json.
    */
   public static final String PLAN_TEMPLATE = "%s.plan.json";
-
   private static final Logger LOG =
       LoggerFactory.getLogger(DiskBalancer.class);
 
   /**
+   * Construct a DiskBalancer.
+   *
+   * @param conf
+   */
+  public DiskBalancer(Configuration conf) {
+    super(conf);
+  }
+
+  /**
    * Main for the  DiskBalancer Command handling.
    *
    * @param argv - System Args Strings[]
@@ -164,16 +159,16 @@ public class DiskBalancer extends Configured implements Tool {
    */
   private Options getOpts() {
     Options opts = new Options();
-    addCommands(opts);
+    addPlanCommands(opts);
     return opts;
   }
 
   /**
-   * Adds commands that we handle to opts.
+   * Adds commands for plan command.
    *
-   * @param opt - Optins
+   * @param opt - Options
    */
-  private void addCommands(Options opt) {
+  private void addPlanCommands(Options opt) {
 
     Option nameNodeUri =
         new Option(NAMENODEURI, true, "NameNode URI. e.g http://namenode" +
@@ -187,7 +182,8 @@ public class DiskBalancer extends Configured implements Tool {
             "e.g -out outfile.txt");
     opt.addOption(outFile);
 
-    Option plan = new Option(PLAN, false, "write plan to the default file");
+    Option plan = new Option(PLAN, true , "create a plan for the given node. " +
+        "e.g -plan <nodename> | <nodeIP> | <nodeUUID>");
     opt.addOption(plan);
 
     Option bandwidth = new Option(BANDWIDTH, true, "Maximum disk bandwidth to" +
@@ -204,13 +200,19 @@ public class DiskBalancer extends Configured implements Tool {
         "can be tolerated while copying between a pair of disks.");
     opt.addOption(maxErrors);
 
-    Option node = new Option(NODE, true, "Node Name or IP");
-    opt.addOption(node);
-
     Option help =
         new Option(HELP, true, "Help about a command or this message");
     opt.addOption(help);
+  }
 
+  /**
+   * Adds execute command options.
+   * @param opt Options
+   */
+  private void addExecuteCommands(Options opt) {
+    Option execute = new Option(EXECUTE, true , "Takes a plan file and " +
+        "submits it for execution to the datanode. e.g -execute <planfile>");
+    opt.addOption(execute);
   }
 
   /**
@@ -238,18 +240,23 @@ public class DiskBalancer extends Configured implements Tool {
     Command currentCommand = null;
 
     try {
+
       if (cmd.hasOption(DiskBalancer.PLAN)) {
         currentCommand = new PlanCommand(getConf());
-      } else {
+      }
+
+      if(cmd.hasOption(DiskBalancer.EXECUTE)) {
+        currentCommand = new ExecuteCommand(getConf());
+      }
+
+      if(currentCommand == null) {
         HelpFormatter helpFormatter = new HelpFormatter();
         helpFormatter.printHelp(80, "hdfs diskbalancer -uri [args]",
             "disk balancer commands", opts,
             "Please correct your command and try again.");
         return 1;
       }
-
       currentCommand.execute(cmd);
-
     } catch (Exception ex) {
       System.err.printf(ex.getMessage());
       return 1;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org