You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/08/28 20:13:25 UTC
git commit: TEZ-1501. Add a test dag to generate load on the getTask
RPC. (sseth)
Repository: tez
Updated Branches:
refs/heads/master eb503c696 -> 7731e0044
TEZ-1501. Add a test dag to generate load on the getTask RPC. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7731e004
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7731e004
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7731e004
Branch: refs/heads/master
Commit: 7731e0044532eaea9b2e4356ceeedf32017c763c
Parents: eb503c6
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Aug 28 11:13:08 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Aug 28 11:13:08 2014 -0700
----------------------------------------------------------------------
.../tez/mapreduce/examples/ExampleDriver.java | 1 +
.../tez/mapreduce/examples/RPCLoadGen.java | 216 +++++++++++++++++++
.../tez/mapreduce/examples/TezExampleBase.java | 169 +++++++++++++++
3 files changed, 386 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/7731e004/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index fbe7b4f..977b767 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -47,6 +47,7 @@ public class ExampleDriver {
int exitCode = -1;
ProgramDriver pgd = new ProgramDriver();
try {
+ pgd.addClass("rpcloadgen", RPCLoadGen.class, "Run a DAG to generate load for the task to AM RPC");
pgd.addClass("wordcount", MapredWordCount.class,
"A map/reduce program that counts the words in the input files.");
pgd.addClass("mapredwordcount", MapredWordCount.class,
http://git-wip-us.apache.org/repos/asf/tez/blob/7731e004/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
new file mode 100644
index 0000000..6776a72
--- /dev/null
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import com.google.common.base.Stopwatch;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+import sun.misc.IOUtils;
+
+public class RPCLoadGen extends TezExampleBase {
+
+ private static final Log LOG = LogFactory.getLog(RPCLoadGen.class);
+
+ private static final String VIA_RPC = "viaRpc";
+ private static final byte VIA_RPC_BYTE = (byte) 0x00;
+ private static final String VIA_HDFS_DIST_CACHE = "viaHdfsDistCache";
+ private static final byte VIA_HDFS_DIST_CACHE_BYTE = (byte) 0x01;
+ private static final String VIA_HDFS_DIRECT_READ = "viaHdfsDirectRead";
+ private static final byte VIA_HDFS_DIRECT_READ_BYTE = (byte) 0x02;
+
+ private static final String DISK_PAYLOAD_NAME = RPCLoadGen.class.getSimpleName() + "_payload";
+
+ private FileSystem fs;
+ private Path resourcePath;
+
+ @Override
+ protected final int runJob(String[] args, TezConfiguration tezConf, TezClient tezClient) throws
+ TezException, InterruptedException, IOException {
+ LOG.info("Running: " +
+ this.getClass().getSimpleName());
+ String mode = VIA_RPC;
+ if (args.length == 4) {
+ if (args[3].equals(VIA_RPC) || args[3].equals(VIA_HDFS_DIRECT_READ) ||
+ args[3].equals(VIA_HDFS_DIST_CACHE)) {
+ mode = args[3];
+ } else {
+ printUsage();
+ return 2;
+ }
+ }
+
+ int numTasks = Integer.parseInt(args[0]);
+ int maxSleepTimeMillis = Integer.parseInt(args[1]);
+ int payloadSizeBytes = Integer.parseInt(args[2]);
+ LOG.info("Parameters: numTasks=" + numTasks + ", maxSleepTime(ms)=" + maxSleepTimeMillis +
+ ", payloadSize(bytes)=" + payloadSizeBytes + ", mode=" + mode);
+
+ DAG dag = createDAG(tezConf, numTasks, maxSleepTimeMillis, payloadSizeBytes, mode);
+ try {
+ return runDag(dag, false, LOG);
+ } finally {
+ if (fs != null) {
+ if (resourcePath != null) {
+ fs.delete(resourcePath, false);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void printUsage() {
+ System.err.println(
+ "Usage: " + "RPCLoadGen <numTasks> <max_sleep_time_millis> <get_task_payload_size> [" +
+ "<" + VIA_RPC + ">|" + VIA_HDFS_DIST_CACHE + "|" + VIA_HDFS_DIRECT_READ + "]");
+ ToolRunner.printGenericCommandUsage(System.err);
+ }
+
+ @Override
+ protected final int validateArgs(String[] otherArgs) {
+ return (otherArgs.length >=3 && otherArgs.length <=4) ? 0 : 2;
+ }
+
+ private DAG createDAG(TezConfiguration conf, int numTasks, int maxSleepTimeMillis,
+ int payloadSize, String mode) throws IOException {
+
+ Map<String, LocalResource> localResourceMap = new HashMap<String, LocalResource>();
+ UserPayload payload =
+ createUserPayload(conf, maxSleepTimeMillis, payloadSize, mode, localResourceMap);
+
+ Vertex vertex = Vertex.create("RPCLoadVertex",
+ ProcessorDescriptor.create(RPCSleepProcessor.class.getName()).setUserPayload(
+ payload), numTasks).addTaskLocalFiles(localResourceMap);
+
+ return DAG.create("RPCLoadGen").addVertex(vertex);
+ }
+
+ private UserPayload createUserPayload(TezConfiguration conf, int maxSleepTimeMillis,
+ int payloadSize, String mode,
+ Map<String, LocalResource> localResources) throws
+ IOException {
+ ByteBuffer payload;
+ if (mode.equals(VIA_RPC)) {
+ if (payloadSize < 5) {
+ payloadSize = 5; // To Configure the processor
+ }
+ byte[] payloadBytes = new byte[payloadSize];
+ new Random().nextBytes(payloadBytes);
+ payload = ByteBuffer.wrap(payloadBytes);
+ payload.put(4, VIA_RPC_BYTE); // ViaRPC
+ } else {
+ // Actual payload
+ byte[] payloadBytes = new byte[5];
+ payload = ByteBuffer.wrap(payloadBytes);
+
+ // Disk payload
+ byte[] diskPayload = new byte[payloadSize];
+ new Random().nextBytes(diskPayload);
+ fs = FileSystem.get(conf);
+ resourcePath = new Path(Path.SEPARATOR + "tmp", DISK_PAYLOAD_NAME);
+ System.err.println("ZZZ: HDFSPath: " + resourcePath);
+ resourcePath = fs.makeQualified(resourcePath);
+ System.err.println("ZZZ: HDFSPathResolved: " + resourcePath);
+ FSDataOutputStream dataOut = fs.create(resourcePath, true);
+ dataOut.write(diskPayload);
+ dataOut.close();
+ fs.setReplication(resourcePath, (short)10);
+ FileStatus fileStatus = fs.getFileStatus(resourcePath);
+
+ if (mode.equals(VIA_HDFS_DIST_CACHE)) {
+ LocalResource lr = LocalResource.newInstance(ConverterUtils.getYarnUrlFromPath(resourcePath),
+ LocalResourceType.ARCHIVE.FILE, LocalResourceVisibility.PRIVATE, fileStatus.getLen(),
+ fileStatus.getModificationTime());
+ localResources.put(DISK_PAYLOAD_NAME, lr);
+ payload.put(4, VIA_HDFS_DIST_CACHE_BYTE); // ViaRPC
+ } else if (mode.equals(VIA_HDFS_DIRECT_READ)) {
+ payload.put(4, VIA_HDFS_DIRECT_READ_BYTE); // ViaRPC
+ }
+ }
+
+ payload.putInt(0, maxSleepTimeMillis);
+ return UserPayload.create(payload);
+ }
+
+ public static class RPCSleepProcessor extends SimpleProcessor {
+
+ private final int sleepTimeMax;
+ private final byte modeByte;
+
+ public RPCSleepProcessor(ProcessorContext context) {
+ super(context);
+ sleepTimeMax = getContext().getUserPayload().getPayload().getInt(0);
+ modeByte = getContext().getUserPayload().getPayload().get(4);
+ }
+
+ @Override
+ public void run() throws Exception {
+ Stopwatch sw = new Stopwatch().start();
+ long sleepTime = new Random().nextInt(sleepTimeMax);
+ if (modeByte == VIA_RPC_BYTE) {
+ LOG.info("Received via RPC.");
+ } else if (modeByte == VIA_HDFS_DIST_CACHE_BYTE) {
+ LOG.info("Reading from local filesystem");
+ FileSystem localFs = FileSystem.getLocal(new Configuration());
+ FSDataInputStream is = localFs.open(new Path(DISK_PAYLOAD_NAME));
+ IOUtils.readFully(is, -1, false);
+ } else if (modeByte == VIA_HDFS_DIRECT_READ_BYTE) {
+ LOG.info("Reading from HDFS");
+ FileSystem fs = FileSystem.get(new Configuration());
+ FSDataInputStream is = fs.open(new Path(Path.SEPARATOR + "tmp", DISK_PAYLOAD_NAME));
+ IOUtils.readFully(is, -1, false);
+ } else {
+ throw new IllegalArgumentException("Unknown execution mode: [" + modeByte + "]");
+ }
+ LOG.info("TimeTakenToAccessPayload=" + sw.stop().elapsedMillis());
+ LOG.info("Sleeping for: " + sleepTime);
+ Thread.sleep(sleepTime);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new RPCLoadGen(), args);
+ System.exit(res);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/7731e004/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TezExampleBase.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TezExampleBase.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TezExampleBase.java
new file mode 100644
index 0000000..84a450c
--- /dev/null
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TezExampleBase.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+
+public abstract class TezExampleBase extends Configured implements Tool {
+
+ private TezClient tezClientInternal;
+
+ @Override
+ public final int run(String[] args) throws Exception {
+ Configuration conf = getConf();
+ String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+ return _execute(otherArgs, null, null);
+ }
+
+ /**
+ * Utility method to use the example from within code or a test.
+ *
+ * @param conf the tez configuration instance which will be used to crate the DAG and
+ * possible the Tez Client.
+ * @param args arguments to the example
+ * @param tezClient an existing running {@link org.apache.tez.client.TezClient} instance if one
+ * exists. If no TezClient is specified (null), one will be created based on the
+ * provided configuration
+ * @return
+ * @throws IOException
+ * @throws TezException
+ */
+ public int run(TezConfiguration conf, String[] args, @Nullable TezClient tezClient) throws
+ IOException,
+ TezException, InterruptedException {
+ setConf(conf);
+ String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+ return _execute(otherArgs, conf, tezClient);
+ }
+
+ /**
+ * @param dag the dag to execute
+ * @param printCounters whether to print counters or not
+ * @param logger the logger to use while printing diagnostics
+ * @return
+ * @throws TezException
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public int runDag(DAG dag, boolean printCounters, Log logger) throws TezException,
+ InterruptedException, IOException {
+ tezClientInternal.waitTillReady();
+ DAGClient dagClient = tezClientInternal.submitDAG(dag);
+ Set<StatusGetOpts> getOpts = Sets.newHashSet();
+ if (printCounters) {
+ getOpts.add(StatusGetOpts.GET_COUNTERS);
+ }
+
+ DAGStatus dagStatus;
+ dagStatus = dagClient.waitForCompletionWithStatusUpdates(getOpts);
+
+ if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+ logger.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+ return -1;
+ }
+ return 0;
+ }
+
+ private int _validateArgs(String[] args) {
+ int res = validateArgs(args);
+ if (res != 0) {
+ printUsage();
+ return res;
+ }
+ return 0;
+ }
+
+ private int _execute(String[] otherArgs, TezConfiguration tezConf, TezClient tezClient) throws
+ IOException, TezException, InterruptedException {
+
+ int result = _validateArgs(otherArgs);
+ if (result != 0) {
+ return result;
+ }
+
+ if (tezConf == null) {
+ tezConf = new TezConfiguration(getConf());
+ }
+ UserGroupInformation.setConfiguration(tezConf);
+ boolean ownTezClient = false;
+ if (tezClient == null) {
+ ownTezClient = true;
+ tezClientInternal = createTezClient(tezConf);
+ }
+ try {
+ return runJob(otherArgs, tezConf, tezClientInternal);
+ } finally {
+ if (ownTezClient && tezClientInternal != null) {
+ tezClientInternal.stop();
+ }
+ }
+ }
+
+ private TezClient createTezClient(TezConfiguration tezConf) throws IOException, TezException {
+ TezClient tezClient = TezClient.create(getClass().getSimpleName(), tezConf);
+ tezClient.start();
+ return tezClient;
+ }
+
+ /**
+ * Print usage instructions for this example
+ */
+ protected abstract void printUsage();
+
+ /**
+ * Validate the arguments
+ *
+ * @param otherArgs arguments, if any
+ * @return
+ */
+ protected abstract int validateArgs(String[] otherArgs);
+
+ /**
+ * Create and execute the actual DAG for the example
+ *
+ * @param args arguments for execution
+ * @param tezConf the tez configuration instance to be used while processing the DAG
+ * @param tezClient the tez client instance to use to run the DAG if any custom monitoring is
+ * required. Otherwise the utility method {@link #runDag(org.apache.tez.dag.api.DAG,
+ * boolean, org.apache.commons.logging.Log)} should be used
+ * @return
+ * @throws IOException
+ * @throws TezException
+ */
+ protected abstract int runJob(String[] args, TezConfiguration tezConf,
+ TezClient tezClient) throws IOException, TezException,
+ InterruptedException;
+}