You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/08/28 20:13:25 UTC

git commit: TEZ-1501. Add a test dag to generate load on the getTask RPC. (sseth)

Repository: tez
Updated Branches:
  refs/heads/master eb503c696 -> 7731e0044


TEZ-1501. Add a test dag to generate load on the getTask RPC. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7731e004
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7731e004
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7731e004

Branch: refs/heads/master
Commit: 7731e0044532eaea9b2e4356ceeedf32017c763c
Parents: eb503c6
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Aug 28 11:13:08 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Aug 28 11:13:08 2014 -0700

----------------------------------------------------------------------
 .../tez/mapreduce/examples/ExampleDriver.java   |   1 +
 .../tez/mapreduce/examples/RPCLoadGen.java      | 216 +++++++++++++++++++
 .../tez/mapreduce/examples/TezExampleBase.java  | 169 +++++++++++++++
 3 files changed, 386 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/7731e004/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index fbe7b4f..977b767 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -47,6 +47,7 @@ public class ExampleDriver {
     int exitCode = -1;
     ProgramDriver pgd = new ProgramDriver();
     try {
+      pgd.addClass("rpcloadgen", RPCLoadGen.class, "Run a DAG to generate load for the task to AM RPC");
       pgd.addClass("wordcount", MapredWordCount.class,
           "A map/reduce program that counts the words in the input files.");
       pgd.addClass("mapredwordcount", MapredWordCount.class,

http://git-wip-us.apache.org/repos/asf/tez/blob/7731e004/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
new file mode 100644
index 0000000..6776a72
--- /dev/null
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import com.google.common.base.Stopwatch;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+import sun.misc.IOUtils;
+
+public class RPCLoadGen extends TezExampleBase {
+
+  private static final Log LOG = LogFactory.getLog(RPCLoadGen.class);
+
+  private static final String VIA_RPC = "viaRpc";
+  private static final byte VIA_RPC_BYTE = (byte) 0x00;
+  private static final String VIA_HDFS_DIST_CACHE = "viaHdfsDistCache";
+  private static final byte VIA_HDFS_DIST_CACHE_BYTE = (byte) 0x01;
+  private static final String VIA_HDFS_DIRECT_READ = "viaHdfsDirectRead";
+  private static final byte VIA_HDFS_DIRECT_READ_BYTE = (byte) 0x02;
+
+  private static final String DISK_PAYLOAD_NAME = RPCLoadGen.class.getSimpleName() + "_payload";
+
+  private FileSystem fs;
+  private Path resourcePath;
+
+  @Override
+  protected final int runJob(String[] args, TezConfiguration tezConf, TezClient tezClient) throws
+      TezException, InterruptedException, IOException {
+    LOG.info("Running: " +
+        this.getClass().getSimpleName());
+    String mode = VIA_RPC;
+    if (args.length == 4) {
+      if (args[3].equals(VIA_RPC) || args[3].equals(VIA_HDFS_DIRECT_READ) ||
+          args[3].equals(VIA_HDFS_DIST_CACHE)) {
+        mode = args[3];
+      } else {
+        printUsage();
+        return 2;
+      }
+    }
+
+    int numTasks = Integer.parseInt(args[0]);
+    int maxSleepTimeMillis = Integer.parseInt(args[1]);
+    int payloadSizeBytes = Integer.parseInt(args[2]);
+    LOG.info("Parameters: numTasks=" + numTasks + ", maxSleepTime(ms)=" + maxSleepTimeMillis +
+        ", payloadSize(bytes)=" + payloadSizeBytes + ", mode=" + mode);
+
+    DAG dag = createDAG(tezConf, numTasks, maxSleepTimeMillis, payloadSizeBytes, mode);
+    try {
+      return runDag(dag, false, LOG);
+    } finally {
+      if (fs != null) {
+        if (resourcePath != null) {
+          fs.delete(resourcePath, false);
+        }
+      }
+    }
+  }
+
+  @Override
+  protected void printUsage() {
+    System.err.println(
+        "Usage: " + "RPCLoadGen <numTasks> <max_sleep_time_millis> <get_task_payload_size> [" +
+            "<" + VIA_RPC + ">|" + VIA_HDFS_DIST_CACHE + "|" + VIA_HDFS_DIRECT_READ + "]");
+    ToolRunner.printGenericCommandUsage(System.err);
+  }
+
+  @Override
+  protected final int validateArgs(String[] otherArgs) {
+    return (otherArgs.length >=3 && otherArgs.length <=4) ? 0 : 2;
+  }
+
+  private DAG createDAG(TezConfiguration conf, int numTasks, int maxSleepTimeMillis,
+                        int payloadSize, String mode) throws IOException {
+
+    Map<String, LocalResource> localResourceMap = new HashMap<String, LocalResource>();
+    UserPayload payload =
+        createUserPayload(conf, maxSleepTimeMillis, payloadSize, mode, localResourceMap);
+
+    Vertex vertex = Vertex.create("RPCLoadVertex",
+        ProcessorDescriptor.create(RPCSleepProcessor.class.getName()).setUserPayload(
+            payload), numTasks).addTaskLocalFiles(localResourceMap);
+
+    return DAG.create("RPCLoadGen").addVertex(vertex);
+  }
+
+  private UserPayload createUserPayload(TezConfiguration conf, int maxSleepTimeMillis,
+                                        int payloadSize, String mode,
+                                        Map<String, LocalResource> localResources) throws
+      IOException {
+    ByteBuffer payload;
+    if (mode.equals(VIA_RPC)) {
+      if (payloadSize < 5) {
+        payloadSize = 5; // To Configure the processor
+      }
+      byte[] payloadBytes = new byte[payloadSize];
+      new Random().nextBytes(payloadBytes);
+      payload = ByteBuffer.wrap(payloadBytes);
+      payload.put(4, VIA_RPC_BYTE); // ViaRPC
+    } else {
+      // Actual payload
+      byte[] payloadBytes = new byte[5];
+      payload = ByteBuffer.wrap(payloadBytes);
+
+      // Disk payload
+      byte[] diskPayload = new byte[payloadSize];
+      new Random().nextBytes(diskPayload);
+      fs = FileSystem.get(conf);
+      resourcePath = new Path(Path.SEPARATOR + "tmp", DISK_PAYLOAD_NAME);
+      System.err.println("ZZZ: HDFSPath: " + resourcePath);
+      resourcePath = fs.makeQualified(resourcePath);
+      System.err.println("ZZZ: HDFSPathResolved: " + resourcePath);
+      FSDataOutputStream dataOut = fs.create(resourcePath, true);
+      dataOut.write(diskPayload);
+      dataOut.close();
+      fs.setReplication(resourcePath, (short)10);
+      FileStatus fileStatus = fs.getFileStatus(resourcePath);
+
+      if (mode.equals(VIA_HDFS_DIST_CACHE)) {
+        LocalResource lr = LocalResource.newInstance(ConverterUtils.getYarnUrlFromPath(resourcePath),
+            LocalResourceType.ARCHIVE.FILE, LocalResourceVisibility.PRIVATE, fileStatus.getLen(),
+            fileStatus.getModificationTime());
+        localResources.put(DISK_PAYLOAD_NAME, lr);
+        payload.put(4, VIA_HDFS_DIST_CACHE_BYTE); // ViaRPC
+      } else if (mode.equals(VIA_HDFS_DIRECT_READ)) {
+        payload.put(4, VIA_HDFS_DIRECT_READ_BYTE); // ViaRPC
+      }
+    }
+
+    payload.putInt(0, maxSleepTimeMillis);
+    return UserPayload.create(payload);
+  }
+
+  public static class RPCSleepProcessor extends SimpleProcessor {
+
+    private final int sleepTimeMax;
+    private final byte modeByte;
+
+    public RPCSleepProcessor(ProcessorContext context) {
+      super(context);
+      sleepTimeMax = getContext().getUserPayload().getPayload().getInt(0);
+      modeByte = getContext().getUserPayload().getPayload().get(4);
+    }
+
+    @Override
+    public void run() throws Exception {
+      Stopwatch sw = new Stopwatch().start();
+      long sleepTime = new Random().nextInt(sleepTimeMax);
+      if (modeByte == VIA_RPC_BYTE) {
+        LOG.info("Received via RPC.");
+      } else if (modeByte == VIA_HDFS_DIST_CACHE_BYTE) {
+        LOG.info("Reading from local filesystem");
+        FileSystem localFs = FileSystem.getLocal(new Configuration());
+        FSDataInputStream is = localFs.open(new Path(DISK_PAYLOAD_NAME));
+        IOUtils.readFully(is, -1, false);
+      } else if (modeByte == VIA_HDFS_DIRECT_READ_BYTE) {
+        LOG.info("Reading from HDFS");
+        FileSystem fs = FileSystem.get(new Configuration());
+        FSDataInputStream is = fs.open(new Path(Path.SEPARATOR + "tmp", DISK_PAYLOAD_NAME));
+        IOUtils.readFully(is, -1, false);
+      } else {
+        throw new IllegalArgumentException("Unknown execution mode: [" + modeByte + "]");
+      }
+      LOG.info("TimeTakenToAccessPayload=" + sw.stop().elapsedMillis());
+      LOG.info("Sleeping for: " + sleepTime);
+      Thread.sleep(sleepTime);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new RPCLoadGen(), args);
+    System.exit(res);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/7731e004/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TezExampleBase.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TezExampleBase.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TezExampleBase.java
new file mode 100644
index 0000000..84a450c
--- /dev/null
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TezExampleBase.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+
+public abstract class TezExampleBase extends Configured implements Tool {
+
+  private TezClient tezClientInternal;
+
+  @Override
+  public final int run(String[] args) throws Exception {
+    Configuration conf = getConf();
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    return _execute(otherArgs, null, null);
+  }
+
+  /**
+   * Utility method to use the example from within code or a test.
+   *
+   * @param conf      the tez configuration instance which will be used to crate the DAG and
+   *                  possible the Tez Client.
+   * @param args      arguments to the example
+   * @param tezClient an existing running {@link org.apache.tez.client.TezClient} instance if one
+   *                  exists. If no TezClient is specified (null), one will be created based on the
+   *                  provided configuration
+   * @return
+   * @throws IOException
+   * @throws TezException
+   */
+  public int run(TezConfiguration conf, String[] args, @Nullable TezClient tezClient) throws
+      IOException,
+      TezException, InterruptedException {
+    setConf(conf);
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    return _execute(otherArgs, conf, tezClient);
+  }
+
+  /**
+   * @param dag           the dag to execute
+   * @param printCounters whether to print counters or not
+   * @param logger        the logger to use while printing diagnostics
+   * @return
+   * @throws TezException
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public int runDag(DAG dag, boolean printCounters, Log logger) throws TezException,
+      InterruptedException, IOException {
+    tezClientInternal.waitTillReady();
+    DAGClient dagClient = tezClientInternal.submitDAG(dag);
+    Set<StatusGetOpts> getOpts = Sets.newHashSet();
+    if (printCounters) {
+      getOpts.add(StatusGetOpts.GET_COUNTERS);
+    }
+
+    DAGStatus dagStatus;
+    dagStatus = dagClient.waitForCompletionWithStatusUpdates(getOpts);
+
+    if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+      logger.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+      return -1;
+    }
+    return 0;
+  }
+
+  private int _validateArgs(String[] args) {
+    int res = validateArgs(args);
+    if (res != 0) {
+      printUsage();
+      return res;
+    }
+    return 0;
+  }
+
+  private int _execute(String[] otherArgs, TezConfiguration tezConf, TezClient tezClient) throws
+      IOException, TezException, InterruptedException {
+
+    int result = _validateArgs(otherArgs);
+    if (result != 0) {
+      return result;
+    }
+
+    if (tezConf == null) {
+      tezConf = new TezConfiguration(getConf());
+    }
+    UserGroupInformation.setConfiguration(tezConf);
+    boolean ownTezClient = false;
+    if (tezClient == null) {
+      ownTezClient = true;
+      tezClientInternal = createTezClient(tezConf);
+    }
+    try {
+      return runJob(otherArgs, tezConf, tezClientInternal);
+    } finally {
+      if (ownTezClient && tezClientInternal != null) {
+        tezClientInternal.stop();
+      }
+    }
+  }
+
+  private TezClient createTezClient(TezConfiguration tezConf) throws IOException, TezException {
+    TezClient tezClient = TezClient.create(getClass().getSimpleName(), tezConf);
+    tezClient.start();
+    return tezClient;
+  }
+
+  /**
+   * Print usage instructions for this example
+   */
+  protected abstract void printUsage();
+
+  /**
+   * Validate the arguments
+   *
+   * @param otherArgs arguments, if any
+   * @return
+   */
+  protected abstract int validateArgs(String[] otherArgs);
+
+  /**
+   * Create and execute the actual DAG for the example
+   *
+   * @param args      arguments for execution
+   * @param tezConf   the tez configuration instance to be used while processing the DAG
+   * @param tezClient the tez client instance to use to run the DAG if any custom monitoring is
+   *                  required. Otherwise the utility method {@link #runDag(org.apache.tez.dag.api.DAG,
+   *                  boolean, org.apache.commons.logging.Log)} should be used
+   * @return
+   * @throws IOException
+   * @throws TezException
+   */
+  protected abstract int runJob(String[] args, TezConfiguration tezConf,
+                                TezClient tezClient) throws IOException, TezException,
+      InterruptedException;
+}