You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/13 23:58:05 UTC
git commit: TEZ-833. Have the Tez task framework set Framework
counters instead of MR Processors setting them. (sseth)
Updated Branches:
refs/heads/master 787921e80 -> 199ea91cd
TEZ-833. Have the Tez task framework set Framework counters instead of
MR Processors setting them. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/199ea91c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/199ea91c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/199ea91c
Branch: refs/heads/master
Commit: 199ea91cda92a99620cc337bd806d61521a950f3
Parents: 787921e
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Feb 13 14:57:26 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Feb 13 14:57:26 2014 -0800
----------------------------------------------------------------------
.../org/apache/tez/common/TezJobConfig.java | 3 +
.../apache/hadoop/mapred/YarnTezDagChild.java | 37 ++++-
.../mapreduce/examples/FilterLinesByWord.java | 44 +++++-
.../processor/FileSystemStatisticsUpdater.java | 84 ----------
.../tez/mapreduce/processor/GcTimeUpdater.java | 71 ---------
.../apache/tez/mapreduce/processor/MRTask.java | 103 ------------
.../runtime/LogicalIOProcessorRuntimeTask.java | 3 +-
.../org/apache/tez/runtime/RuntimeTask.java | 7 +
.../metrics/FileSystemStatisticUpdater.java | 79 ++++++++++
.../tez/runtime/metrics/GcTimeUpdater.java | 70 +++++++++
.../tez/runtime/metrics/TaskCounterUpdater.java | 156 +++++++++++++++++++
.../java/org/apache/tez/test/TestTezJobs.java | 146 +++++++++++++++++
12 files changed, 532 insertions(+), 271 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index 8d347d7..444a830 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -321,4 +321,7 @@ public class TezJobConfig {
public static final String TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_MAX_SIZE = "tez.runtime.broadcast.data-via-events.max-size";
public static final int TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_MAX_SIZE_DEFAULT = 200 << 10;// 200KB
+
+ /** Defines the ProcessTree implementation which will be used to collect resource utilization. */
+ public static final String TEZ_RESOURCE_CALCULATOR_PROCESS_TREE_CLASS = "tez.resource.calculator.process-tree.class";
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 9f7455a..0e0d036 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -31,7 +31,6 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -353,6 +352,13 @@ public class YarnTezDagChild {
public void signalFatalError(TezTaskAttemptID taskAttemptID,
String diagnostics,
EventMetaData sourceInfo) {
+ currentTask.setFrameworkCounters();
+ TezEvent statusUpdateEvent =
+ new TezEvent(new TaskStatusUpdateEvent(
+ currentTask.getCounters(), currentTask.getProgress()),
+ new EventMetaData(EventProducerConsumerType.SYSTEM,
+ currentTask.getVertexName(), "",
+ currentTask.getTaskAttemptID()));
TezEvent taskAttemptFailedEvent =
new TezEvent(new TaskAttemptFailedEvent(diagnostics),
sourceInfo);
@@ -360,7 +366,7 @@ public class YarnTezDagChild {
// Not setting taskComplete - since the main loop responsible for cleanup doesn't have
// control yet. Getting control depends on whether the I/P/O returns correctly after
// reporting an error.
- heartbeat(Collections.singletonList(taskAttemptFailedEvent));
+ heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent));
} catch (Throwable t) {
LOG.fatal("Failed to communicate task attempt failure to AM via"
+ " umbilical", t);
@@ -435,6 +441,9 @@ public class YarnTezDagChild {
}
taskCount++;
+ // Reset file system statistics for the new task.
+ FileSystem.clearStatistics();
+
// Re-use the UGI only if the Credentials have not changed.
if (containerTask.haveCredentialsChanged()) {
LOG.info("Refreshing UGI since Credentials have changed");
@@ -516,6 +525,8 @@ public class YarnTezDagChild {
currentTaskComplete.set(true);
// TODONEWTEZ Should the container continue to run if the running task reported a fatal error ?
if (!currentTask.hadFatalError()) {
+ // Set counters in case of a successful task.
+ currentTask.setFrameworkCounters();
TezEvent statusUpdateEvent =
new TezEvent(new TaskStatusUpdateEvent(
currentTask.getCounters(), currentTask.getProgress()),
@@ -525,7 +536,7 @@ public class YarnTezDagChild {
TezEvent taskCompletedEvent =
new TezEvent(new TaskAttemptCompletedEvent(), sourceInfo);
heartbeat(Arrays.asList(statusUpdateEvent, taskCompletedEvent));
- }
+ } // Should the fatalError be reported ?
} finally {
currentTask.cleanup();
}
@@ -556,14 +567,22 @@ public class YarnTezDagChild {
try {
taskLock.readLock().lock();
if (currentTask != null && !currentTask.hadFatalError()) {
+ // TODO Is this of any use if the heartbeat thread is being interrupted first ?
// Prevent dup failure events
+ currentTask.setFrameworkCounters();
+ TezEvent statusUpdateEvent =
+ new TezEvent(new TaskStatusUpdateEvent(
+ currentTask.getCounters(), currentTask.getProgress()),
+ new EventMetaData(EventProducerConsumerType.SYSTEM,
+ currentTask.getVertexName(), "",
+ currentTask.getTaskAttemptID()));
currentTask.setFatalError(e, "FS Error in Child JVM");
TezEvent taskAttemptFailedEvent =
new TezEvent(new TaskAttemptFailedEvent(
StringUtils.stringifyException(e)),
currentSourceInfo);
currentTaskComplete.set(true);
- heartbeat(Collections.singletonList(taskAttemptFailedEvent));
+ heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent));
}
} finally {
taskLock.readLock().unlock();
@@ -581,13 +600,21 @@ public class YarnTezDagChild {
taskLock.readLock().lock();
try {
if (currentTask != null && !currentTask.hadFatalError()) {
+ // TODO Is this of any use if the heartbeat thread is being interrupted first ?
// Prevent dup failure events
currentTask.setFatalError(throwable, "Error in Child JVM");
+ currentTask.setFrameworkCounters();
+ TezEvent statusUpdateEvent =
+ new TezEvent(new TaskStatusUpdateEvent(
+ currentTask.getCounters(), currentTask.getProgress()),
+ new EventMetaData(EventProducerConsumerType.SYSTEM,
+ currentTask.getVertexName(), "",
+ currentTask.getTaskAttemptID()));
TezEvent taskAttemptFailedEvent =
new TezEvent(new TaskAttemptFailedEvent(cause),
currentSourceInfo);
currentTaskComplete.set(true);
- heartbeat(Collections.singletonList(taskAttemptFailedEvent));
+ heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent));
}
} finally {
taskLock.readLock().unlock();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index f90cc58..d976d17 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -52,6 +52,7 @@ import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezSession;
import org.apache.tez.client.TezSessionConfiguration;
import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
@@ -67,6 +68,7 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.mapreduce.committer.MROutputCommitter;
import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
@@ -81,17 +83,27 @@ import org.apache.tez.runtime.api.TezRootInputInitializer;
import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
+import com.google.common.collect.Sets;
+
public class FilterLinesByWord {
private static Log LOG = LogFactory.getLog(FilterLinesByWord.class);
public static final String FILTER_PARAM_NAME = "tez.runtime.examples.filterbyword.word";
+
+ private TezCounters counters = null;
+ private int errorCode = 0;
+ private boolean exitOnCompletion = false;
+ public FilterLinesByWord(boolean exitOnCompletion) {
+ this.exitOnCompletion = exitOnCompletion;
+ }
+
private static void printUsage() {
System.err.println("Usage filtelinesrbyword <in> <out> <filter_word> [-generateSplitsInClient true/<false>]");
}
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, TezException {
+ public void run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, TezException {
Configuration conf = new Configuration();
String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Credentials credentials = new Credentials();
@@ -105,12 +117,14 @@ public class FilterLinesByWord {
} catch (ParseException e1) {
System.err.println("Invalid options");
printUsage();
- System.exit(2);
+ errorCode = 2;
+ return;
}
if (otherArgs.length != 3) {
printUsage();
- System.exit(2);
+ errorCode = 2;
+ return;
}
String inputPath = otherArgs[0];
@@ -120,7 +134,8 @@ public class FilterLinesByWord {
FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(outputPath))) {
System.err.println("Output directory : " + outputPath + " already exists");
- System.exit(2);
+ errorCode = 2;
+ return;
}
TezConfiguration tezConf = new TezConfiguration(conf);
@@ -270,17 +285,32 @@ public class FilterLinesByWord {
dagStatus = dagClient.getDAGStatus(null);
} catch (TezException e) {
LOG.fatal("Failed to get application progress. Exiting");
- System.exit(-1);
+ errorCode = -1;
+ return;
}
}
+
+ dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+ counters = dagStatus.getDAGCounters();
+
} finally {
fs.delete(stagingDir, true);
tezSession.stop();
}
- ExampleDriver.printDAGStatus(dagClient, vNames);
+ ExampleDriver.printDAGStatus(dagClient, vNames, true, true);
LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
- System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
+ errorCode = (dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
+ return;
+ }
+
+ public static void main(String[] args) throws IOException, InterruptedException,
+ ClassNotFoundException, TezException {
+ FilterLinesByWord fl = new FilterLinesByWord(true);
+ fl.run(args);
+ if (fl.exitOnCompletion) {
+ System.exit(fl.errorCode);
+ }
}
public static class TextLongPair implements Writable {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/FileSystemStatisticsUpdater.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/FileSystemStatisticsUpdater.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/FileSystemStatisticsUpdater.java
deleted file mode 100644
index ff17230..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/FileSystemStatisticsUpdater.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.mapreduce.processor;
-
-import java.util.List;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.tez.common.counters.FileSystemCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.common.counters.TezCounters;
-
- /**
- * An updater that tracks the last number reported for a given file
- * system and only creates the counters when they are needed.
- */
- class FileSystemStatisticUpdater {
- private List<FileSystem.Statistics> stats;
- private TezCounter readBytesCounter, writeBytesCounter,
- readOpsCounter, largeReadOpsCounter, writeOpsCounter;
- private String scheme;
- private TezCounters counters;
-
- FileSystemStatisticUpdater(TezCounters counters, List<FileSystem.Statistics> stats, String scheme) {
- this.stats = stats;
- this.scheme = scheme;
- this.counters = counters;
- }
-
- void updateCounters() {
- if (readBytesCounter == null) {
- readBytesCounter = counters.findCounter(scheme,
- FileSystemCounter.BYTES_READ);
- }
- if (writeBytesCounter == null) {
- writeBytesCounter = counters.findCounter(scheme,
- FileSystemCounter.BYTES_WRITTEN);
- }
- if (readOpsCounter == null) {
- readOpsCounter = counters.findCounter(scheme,
- FileSystemCounter.READ_OPS);
- }
- if (largeReadOpsCounter == null) {
- largeReadOpsCounter = counters.findCounter(scheme,
- FileSystemCounter.LARGE_READ_OPS);
- }
- if (writeOpsCounter == null) {
- writeOpsCounter = counters.findCounter(scheme,
- FileSystemCounter.WRITE_OPS);
- }
- long readBytes = 0;
- long writeBytes = 0;
- long readOps = 0;
- long largeReadOps = 0;
- long writeOps = 0;
- for (FileSystem.Statistics stat: stats) {
- readBytes = readBytes + stat.getBytesRead();
- writeBytes = writeBytes + stat.getBytesWritten();
- readOps = readOps + stat.getReadOps();
- largeReadOps = largeReadOps + stat.getLargeReadOps();
- writeOps = writeOps + stat.getWriteOps();
- }
- readBytesCounter.setValue(readBytes);
- writeBytesCounter.setValue(writeBytes);
- readOpsCounter.setValue(readOps);
- largeReadOpsCounter.setValue(largeReadOps);
- writeOpsCounter.setValue(writeOps);
- }
- }
-
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/GcTimeUpdater.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/GcTimeUpdater.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/GcTimeUpdater.java
deleted file mode 100644
index 79ff419..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/GcTimeUpdater.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.mapreduce.processor;
-
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.util.List;
-
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.common.counters.TaskCounter;
-
-/**
- * An updater that tracks the amount of time this task has spent in GC.
- */
- class GcTimeUpdater {
- private long lastGcMillis = 0;
- private List<GarbageCollectorMXBean> gcBeans = null;
- TezCounters counters;
-
- public GcTimeUpdater(TezCounters counters) {
- this.gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
- getElapsedGc(); // Initialize 'lastGcMillis' with the current time spent.
- this.counters = counters;
- }
-
- /**
- * @return the number of milliseconds that the gc has used for CPU
- * since the last time this method was called.
- */
- protected long getElapsedGc() {
- long thisGcMillis = 0;
- for (GarbageCollectorMXBean gcBean : gcBeans) {
- thisGcMillis += gcBean.getCollectionTime();
- }
-
- long delta = thisGcMillis - lastGcMillis;
- this.lastGcMillis = thisGcMillis;
- return delta;
- }
-
- /**
- * Increment the gc-elapsed-time counter.
- */
- public void incrementGcCounter() {
- if (null == counters) {
- return; // nothing to do.
- }
-
- TezCounter gcCounter =
- counters.findCounter(TaskCounter.GC_TIME_MILLIS);
- if (null != gcCounter) {
- gcCounter.increment(getElapsedGc());
- }
- }
- }
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index 242b798..a5fda8c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -22,10 +22,8 @@ import java.io.IOException;
import java.net.URI;
import java.text.NumberFormat;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -63,11 +61,9 @@ import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezTaskStatus.State;
import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.JobTokenIdentifier;
@@ -98,9 +94,6 @@ public abstract class MRTask {
// Current counters
transient TezCounters counters;
- protected GcTimeUpdater gcUpdater;
- private ResourceCalculatorProcessTree pTree;
- private long initCpuCumulativeTime = 0;
protected TezProcessorContext processorContext;
protected TaskAttemptID taskAttemptId;
protected Progress progress = new Progress();
@@ -125,12 +118,6 @@ public abstract class MRTask {
protected MRTaskReporter mrReporter;
protected boolean useNewApi;
- /**
- * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
- */
- private Map<String, FileSystemStatisticUpdater> statisticUpdaters =
- new HashMap<String, FileSystemStatisticUpdater>();
-
public MRTask(boolean isMap) {
this.isMap = isMap;
}
@@ -150,9 +137,6 @@ public abstract class MRTask {
(isMap ? TaskType.MAP : TaskType.REDUCE),
context.getTaskIndex()),
context.getTaskAttemptNumber());
- // TODO TEZAM4 Figure out initialization / run sequence of Input, Process,
- // Output. Phase is MR specific.
- gcUpdater = new GcTimeUpdater(counters);
byte[] userPayload = context.getUserPayload();
Configuration conf = TezUtils.createConfFromUserPayload(userPayload);
@@ -168,8 +152,6 @@ public abstract class MRTask {
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
context.getDAGAttemptNumber());
- initResourceCalculatorPlugin();
-
LOG.info("MRTask.inited: taskAttemptId = " + taskAttemptId.toString());
// TODO Post MRR
@@ -322,20 +304,6 @@ public abstract class MRTask {
}
}
-
- private void initResourceCalculatorPlugin() {
- Class<? extends ResourceCalculatorProcessTree> clazz =
- this.jobConf.getClass(MRConfig.RESOURCE_CALCULATOR_PROCESS_TREE,
- null, ResourceCalculatorProcessTree.class);
- pTree = ResourceCalculatorProcessTree
- .getResourceCalculatorProcessTree(System.getenv().get("JVM_PID"), clazz, this.jobConf);
- LOG.info(" Using ResourceCalculatorProcessTree : " + pTree);
- if (pTree != null) {
- pTree.updateProcessTree();
- initCpuCumulativeTime = pTree.getCumulativeCpuTime();
- }
- }
-
public TezProcessorContext getUmbilical() {
return this.processorContext;
}
@@ -419,7 +387,6 @@ public abstract class MRTask {
}
public void done() throws IOException, InterruptedException {
- updateCounters();
LOG.info("Task:" + taskAttemptId + " is done."
+ " And is in the process of committing");
@@ -435,12 +402,7 @@ public abstract class MRTask {
}
}
taskDone.set(true);
- // Make sure we send at least one set of counter increments. It's
- // ok to call updateCounters() in this thread after comm thread stopped.
- updateCounters();
sendLastUpdate();
- //signal the tasktracker that we are done
- //sendDone(umbilical);
}
/**
@@ -505,71 +467,6 @@ public abstract class MRTask {
}
}
-
- public void updateCounters() {
- // TODO Auto-generated method stub
- // TODO TEZAM Implement.
- Map<String, List<FileSystem.Statistics>> map = new
- HashMap<String, List<FileSystem.Statistics>>();
- for(Statistics stat: FileSystem.getAllStatistics()) {
- String uriScheme = stat.getScheme();
- if (map.containsKey(uriScheme)) {
- List<FileSystem.Statistics> list = map.get(uriScheme);
- list.add(stat);
- } else {
- List<FileSystem.Statistics> list = new ArrayList<FileSystem.Statistics>();
- list.add(stat);
- map.put(uriScheme, list);
- }
- }
- for (Map.Entry<String, List<FileSystem.Statistics>> entry: map.entrySet()) {
- FileSystemStatisticUpdater updater = statisticUpdaters.get(entry.getKey());
- if(updater==null) {//new FileSystem has been found in the cache
- updater =
- new FileSystemStatisticUpdater(counters, entry.getValue(),
- entry.getKey());
- statisticUpdaters.put(entry.getKey(), updater);
- }
- updater.updateCounters();
- }
-
- gcUpdater.incrementGcCounter();
- updateResourceCounters();
- }
-
- /**
- * Updates the {@link TaskCounter#COMMITTED_HEAP_BYTES} counter to reflect the
- * current total committed heap space usage of this JVM.
- */
- private void updateHeapUsageCounter() {
- long currentHeapUsage = Runtime.getRuntime().totalMemory();
- counters.findCounter(TaskCounter.COMMITTED_HEAP_BYTES)
- .setValue(currentHeapUsage);
- }
-
- /**
- * Update resource information counters
- */
- void updateResourceCounters() {
- // Update generic resource counters
- updateHeapUsageCounter();
-
- // Updating resources specified in ResourceCalculatorPlugin
- if (pTree == null) {
- return;
- }
- pTree.updateProcessTree();
- long cpuTime = pTree.getCumulativeCpuTime();
- long pMem = pTree.getCumulativeRssmem();
- long vMem = pTree.getCumulativeVmem();
- // Remove the CPU time consumed previously by JVM reuse
- cpuTime -= initCpuCumulativeTime;
- counters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime);
- counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
- counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
- }
-
-
public static String normalizeStatus(String status, Configuration conf) {
// Check to see if the status string is too long
// and truncate it if needed.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index a8bb1d4..21cc810 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -140,6 +140,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
this.state = State.NEW;
this.appAttemptNumber = appAttemptNumber;
int numInitializers = numInputs + numOutputs; // Processor is initialized in the main thread.
+ numInitializers = (numInitializers == 0 ? 1 : numInitializers);
this.initializerExecutor = Executors.newFixedThreadPool(
numInitializers,
new ThreadFactoryBuilder().setDaemon(true)
@@ -684,5 +685,5 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
public LogicalIOProcessor getProcessor() {
return this.processor;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index f018333..30fbe76 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -28,6 +28,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.metrics.TaskCounterUpdater;
public abstract class RuntimeTask {
@@ -41,6 +42,7 @@ public abstract class RuntimeTask {
protected final TezUmbilical tezUmbilical;
protected final AtomicInteger eventCounter;
private final AtomicBoolean taskDone;
+ private final TaskCounterUpdater counterUpdater;
protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
TezUmbilical tezUmbilical) {
@@ -51,6 +53,7 @@ public abstract class RuntimeTask {
this.eventCounter = new AtomicInteger(0);
this.progress = 0.0f;
this.taskDone = new AtomicBoolean(false);
+ this.counterUpdater = new TaskCounterUpdater(tezCounters, tezConf);
}
protected enum State {
@@ -98,6 +101,10 @@ public abstract class RuntimeTask {
public boolean isTaskDone() {
return taskDone.get();
}
+
+ public void setFrameworkCounters() {
+ this.counterUpdater.updateCounters();
+ }
protected void setTaskDone() {
taskDone.set(true);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java
new file mode 100644
index 0000000..bb15ef1
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+
+/**
+ * An updater that tracks the last number reported for a given file system and
+ * only creates the counters when they are needed.
+ */
+public class FileSystemStatisticUpdater {
+
+ private List<FileSystem.Statistics> stats;
+ private TezCounter readBytesCounter, writeBytesCounter, readOpsCounter, largeReadOpsCounter,
+ writeOpsCounter;
+ private String scheme;
+ private TezCounters counters;
+
+ FileSystemStatisticUpdater(TezCounters counters, List<FileSystem.Statistics> stats, String scheme) {
+ this.stats = stats;
+ this.scheme = scheme;
+ this.counters = counters;
+ }
+
+ void updateCounters() {
+ if (readBytesCounter == null) {
+ readBytesCounter = counters.findCounter(scheme, FileSystemCounter.BYTES_READ);
+ }
+ if (writeBytesCounter == null) {
+ writeBytesCounter = counters.findCounter(scheme, FileSystemCounter.BYTES_WRITTEN);
+ }
+ if (readOpsCounter == null) {
+ readOpsCounter = counters.findCounter(scheme, FileSystemCounter.READ_OPS);
+ }
+ if (largeReadOpsCounter == null) {
+ largeReadOpsCounter = counters.findCounter(scheme, FileSystemCounter.LARGE_READ_OPS);
+ }
+ if (writeOpsCounter == null) {
+ writeOpsCounter = counters.findCounter(scheme, FileSystemCounter.WRITE_OPS);
+ }
+ long readBytes = 0;
+ long writeBytes = 0;
+ long readOps = 0;
+ long largeReadOps = 0;
+ long writeOps = 0;
+ for (FileSystem.Statistics stat : stats) {
+ readBytes = readBytes + stat.getBytesRead();
+ writeBytes = writeBytes + stat.getBytesWritten();
+ readOps = readOps + stat.getReadOps();
+ largeReadOps = largeReadOps + stat.getLargeReadOps();
+ writeOps = writeOps + stat.getWriteOps();
+ }
+ readBytesCounter.setValue(readBytes);
+ writeBytesCounter.setValue(writeBytes);
+ readOpsCounter.setValue(readOps);
+ largeReadOpsCounter.setValue(largeReadOps);
+ writeOpsCounter.setValue(writeOps);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/GcTimeUpdater.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/GcTimeUpdater.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/GcTimeUpdater.java
new file mode 100644
index 0000000..5e551c5
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/GcTimeUpdater.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.util.List;
+
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.common.counters.TaskCounter;
+
+/**
+ * An updater that tracks the amount of time this task has spent in GC.
+ */
+class GcTimeUpdater {
+ private long lastGcMillis = 0;
+ private List<GarbageCollectorMXBean> gcBeans = null;
+ TezCounters counters;
+
+ public GcTimeUpdater(TezCounters counters) {
+ this.gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
+ getElapsedGc(); // Initialize 'lastGcMillis' with the current time spent.
+ this.counters = counters;
+ }
+
+ /**
+ * @return the number of milliseconds that the gc has used for CPU since the
+ * last time this method was called.
+ */
+ protected long getElapsedGc() {
+ long thisGcMillis = 0;
+ for (GarbageCollectorMXBean gcBean : gcBeans) {
+ thisGcMillis += gcBean.getCollectionTime();
+ }
+
+ long delta = thisGcMillis - lastGcMillis;
+ this.lastGcMillis = thisGcMillis;
+ return delta;
+ }
+
+ /**
+ * Increment the gc-elapsed-time counter.
+ */
+ void incrementGcCounter() {
+ if (null == counters) {
+ return; // nothing to do.
+ }
+
+ TezCounter gcCounter = counters.findCounter(TaskCounter.GC_TIME_MILLIS);
+ if (null != gcCounter) {
+ gcCounter.increment(getElapsedGc());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
new file mode 100644
index 0000000..07e1869
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounters;
+
+
+/**
+ * Updates counters with various task specific statistics. Currently, this
+ * should be invoked only once per task. TODO Eventually - change this so that
+ * counters can be updated incrementally during task execution.
+ */
+public class TaskCounterUpdater {
+
+ private static final Log LOG = LogFactory.getLog(TaskCounterUpdater.class);
+
+ private final TezCounters tezCounters;
+ private final Configuration conf;
+
+ private long initialCpuCumulativeTime;
+
+ /**
+ * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
+ */
+ private Map<String, FileSystemStatisticUpdater> statisticUpdaters =
+ new HashMap<String, FileSystemStatisticUpdater>();
+ protected GcTimeUpdater gcUpdater;
+ private ResourceCalculatorProcessTree pTree;
+ private long initCpuCumulativeTime = 0;
+
+ public TaskCounterUpdater(TezCounters counters, Configuration conf) {
+ this.tezCounters = counters;
+ this.conf = conf;
+ this.gcUpdater = new GcTimeUpdater(tezCounters);
+ initResourceCalculatorPlugin();
+ recordInitialCpuStats();
+ }
+
+
+ public void updateCounters() {
+ // FileSystemStatistics are reset each time a new task is seen by the
+ // container.
+ // This doesn't remove the fileSystem, and does not clear all statistics -
+ // so there is a potential of an unused FileSystem showing up for a
+ // Container, and strange values for READ_OPS etc.
+ Map<String, List<FileSystem.Statistics>> map = new
+ HashMap<String, List<FileSystem.Statistics>>();
+ for(Statistics stat: FileSystem.getAllStatistics()) {
+ String uriScheme = stat.getScheme();
+ if (map.containsKey(uriScheme)) {
+ List<FileSystem.Statistics> list = map.get(uriScheme);
+ list.add(stat);
+ } else {
+ List<FileSystem.Statistics> list = new ArrayList<FileSystem.Statistics>();
+ list.add(stat);
+ map.put(uriScheme, list);
+ }
+ }
+ for (Map.Entry<String, List<FileSystem.Statistics>> entry: map.entrySet()) {
+ FileSystemStatisticUpdater updater = statisticUpdaters.get(entry.getKey());
+ if(updater==null) {//new FileSystem has been found in the cache
+ updater =
+ new FileSystemStatisticUpdater(tezCounters, entry.getValue(),
+ entry.getKey());
+ statisticUpdaters.put(entry.getKey(), updater);
+ }
+ updater.updateCounters();
+ }
+
+ gcUpdater.incrementGcCounter();
+ updateResourceCounters();
+ }
+
+ private void recordInitialCpuStats() {
+ if (pTree != null) {
+ pTree.updateProcessTree();
+ initCpuCumulativeTime = pTree.getCumulativeCpuTime();
+ }
+ }
+
+ /**
+ * Update resource information counters
+ */
+ void updateResourceCounters() {
+ // Update generic resource counters
+ updateHeapUsageCounter();
+
+ // Updating resources specified in ResourceCalculatorPlugin
+ if (pTree == null) {
+ return;
+ }
+ pTree.updateProcessTree();
+ long cpuTime = pTree.getCumulativeCpuTime();
+ long pMem = pTree.getCumulativeRssmem();
+ long vMem = pTree.getCumulativeVmem();
+ // Remove the CPU time consumed previously by JVM reuse
+ cpuTime -= initCpuCumulativeTime;
+ tezCounters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime - initCpuCumulativeTime);
+ tezCounters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
+ tezCounters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
+ }
+
+ /**
+ * Updates the {@link TaskCounter#COMMITTED_HEAP_BYTES} counter to reflect the
+ * current total committed heap space usage of this JVM.
+ */
+ private void updateHeapUsageCounter() {
+ long currentHeapUsage = Runtime.getRuntime().totalMemory();
+ tezCounters.findCounter(TaskCounter.COMMITTED_HEAP_BYTES)
+ .setValue(currentHeapUsage);
+ }
+
+ private void initResourceCalculatorPlugin() {
+ Class<? extends ResourceCalculatorProcessTree> clazz = this.conf.getClass(
+ TezJobConfig.TEZ_RESOURCE_CALCULATOR_PROCESS_TREE_CLASS, null,
+ ResourceCalculatorProcessTree.class);
+
+ pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(
+ System.getenv().get("JVM_PID"), clazz, conf);
+
+ LOG.info(" Using ResourceCalculatorProcessTree : " + pTree);
+ if (pTree != null) {
+ pTree.updateProcessTree();
+ initCpuCumulativeTime = pTree.getCumulativeCpuTime();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/199ea91c/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
new file mode 100644
index 0000000..ebbc3c1
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.mapreduce.examples.ExampleDriver;
+import org.apache.tez.runtime.library.processor.SleepProcessor;
+import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Tests which do not rely on Map/Reduce processor
+ *
+ */
+public class TestTezJobs {
+
+ private static final Log LOG = LogFactory.getLog(TestTezJobs.class);
+
+ protected static MiniTezCluster mrrTezCluster;
+ protected static MiniDFSCluster dfsCluster;
+
+ private static Configuration conf = new Configuration();
+ private static FileSystem remoteFs;
+
+ private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestTezJobs.class.getName()
+ + "-tmpDir";
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ try {
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null)
+ .build();
+ remoteFs = dfsCluster.getFileSystem();
+ } catch (IOException io) {
+ throw new RuntimeException("problem starting mini dfs cluster", io);
+ }
+
+ if (mrrTezCluster == null) {
+ mrrTezCluster = new MiniTezCluster(TestTezJobs.class.getName(), 1, 1, 1);
+ Configuration conf = new Configuration();
+ conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+ mrrTezCluster.init(conf);
+ mrrTezCluster.start();
+ }
+
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (mrrTezCluster != null) {
+ mrrTezCluster.stop();
+ mrrTezCluster = null;
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+ // TODO Add cleanup code.
+ }
+
+ @Test(timeout = 60000)
+ public void testSleepJob() throws TezException, IOException, InterruptedException {
+ SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+ DAG dag = new DAG("TezSleepProcessor");
+ Vertex vertex = new Vertex("SleepVertex", new ProcessorDescriptor(
+ SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+ Resource.newInstance(1024, 1));
+ dag.addVertex(vertex);
+
+ TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+ Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(new Random()
+ .nextInt(100000))));
+ remoteFs.mkdirs(remoteStagingDir);
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+ TezClient tezClient = new TezClient(tezConf);
+ AMConfiguration amConf = new AMConfiguration(new HashMap<String, String>(),
+ new HashMap<String, LocalResource>(), tezConf, null);
+
+ DAGClient dagClient = tezClient.submitDAGApplication(dag, amConf);
+
+ DAGStatus dagStatus = dagClient.getDAGStatus(null);
+ while (!dagStatus.isCompleted()) {
+ LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+ + dagStatus.getState());
+ Thread.sleep(500l);
+ dagStatus = dagClient.getDAGStatus(null);
+ }
+ dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+
+ assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+ assertNotNull(dagStatus.getDAGCounters());
+ assertNotNull(dagStatus.getDAGCounters().getGroup(FileSystemCounter.class.getName()));
+ assertNotNull(dagStatus.getDAGCounters().findCounter(TaskCounter.GC_TIME_MILLIS));
+ ExampleDriver.printDAGStatus(dagClient, new String[] { "SleepVertex" }, true, true);
+
+ }
+}