You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/09/29 01:10:25 UTC
tez git commit: TEZ-2836. Avoid setting framework/system counters for
tasks running in threads. (sseth)
Repository: tez
Updated Branches:
refs/heads/master 406721ab1 -> 8b412ee66
TEZ-2836. Avoid setting framework/system counters for tasks running in
threads. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8b412ee6
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8b412ee6
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8b412ee6
Branch: refs/heads/master
Commit: 8b412ee66fe042db60a567ff71639839af5fa854
Parents: 406721a
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Sep 28 16:10:11 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Sep 28 16:10:11 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../app/launcher/LocalContainerLauncher.java | 2 +-
.../tez/service/impl/ContainerRunnerImpl.java | 4 +-
.../tez/mapreduce/processor/MapUtils.java | 2 +-
.../processor/reduce/TestReduceProcessor.java | 2 +-
.../runtime/LogicalIOProcessorRuntimeTask.java | 5 +-
.../org/apache/tez/runtime/RuntimeTask.java | 12 ++-
.../org/apache/tez/runtime/task/TezChild.java | 14 ++--
.../apache/tez/runtime/task/TezTaskRunner2.java | 8 +-
.../TestLogicalIOProcessorRuntimeTask.java | 7 +-
.../tez/runtime/task/TestTaskExecution2.java | 77 +++++++++++++++++---
11 files changed, 99 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 01fa23e..d219127 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2836. Avoid setting framework/system counters for tasks running in threads.
TEZ-2398. Flaky test: TestFaultTolerance
TEZ-2833. Dont create extra directory during ATS file download
TEZ-2834. Make Tez preemption resilient to incorrect free resource reported
http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 6cd6fce..9267f00 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -356,7 +356,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
TezChild tezChild =
TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier,
attemptNumber, localDirs, workingDirectory, containerEnv, "", executionContext, credentials,
- memAvailable, context.getUser(), tezTaskUmbilicalProtocol);
+ memAvailable, context.getUser(), tezTaskUmbilicalProtocol, false);
return tezChild;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
index fb4c08f..ad05af9 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
@@ -305,7 +305,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
request.getContainerIdString(),
request.getTokenIdentifier(), request.getAppAttemptNumber(), workingDir, localDirs,
envMap, objectRegistry, pid,
- executionContext, credentials, memoryAvailable, request.getUser(), null);
+ executionContext, credentials, memoryAvailable, request.getUser(), null, false);
ContainerExecutionResult result = tezChild.run();
LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
sw.stop().elapsedMillis());
@@ -449,7 +449,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
request.getAppAttemptNumber(),
serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry,
pid,
- executionContext, memoryAvailable);
+ executionContext, memoryAvailable, false);
boolean shouldDie;
try {
http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 8841882..71aa87c 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -232,7 +232,7 @@ public class MapUtils {
serviceConsumerMetadata,
envMap,
HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"),
- Runtime.getRuntime().maxMemory());
+ Runtime.getRuntime().maxMemory(), true);
return task;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index fcb42b3..db78b6e 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -224,7 +224,7 @@ public class TestReduceProcessor {
serviceConsumerMetadata,
serviceProviderEnvMap,
HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"),
- Runtime.getRuntime().maxMemory());
+ Runtime.getRuntime().maxMemory(), true);
List<Event> destEvents = new LinkedList<Event>();
destEvents.add(dme);
http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 5b0e62f..5db96c5 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -156,10 +156,11 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical,
Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> envMap,
Multimap<String, String> startedInputsMap, ObjectRegistry objectRegistry,
- String pid, ExecutionContext ExecutionContext, long memAvailable) throws IOException {
+ String pid, ExecutionContext ExecutionContext, long memAvailable,
+ boolean updateSysCounters) throws IOException {
// Note: If adding any fields here, make sure they're cleaned up in the cleanupContext method.
// TODO Remove jobToken from here post TEZ-421
- super(taskSpec, tezConf, tezUmbilical, pid);
+ super(taskSpec, tezConf, tezUmbilical, pid, updateSysCounters);
LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
+ taskSpec);
int numInputs = taskSpec.getInputs().size();
http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index 33c0113..c9c6ba1 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -56,7 +56,7 @@ public abstract class RuntimeTask {
private final TaskStatistics statistics;
protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
- TezUmbilical tezUmbilical, String pid) {
+ TezUmbilical tezUmbilical, String pid, boolean setupSysCounterUpdater) {
this.taskSpec = taskSpec;
this.tezConf = tezConf;
this.tezUmbilical = tezUmbilical;
@@ -67,7 +67,11 @@ public abstract class RuntimeTask {
this.progress = 0.0f;
this.taskDone = new AtomicBoolean(false);
this.statistics = new TaskStatistics();
- this.counterUpdater = new TaskCounterUpdater(tezCounters, tezConf, pid);
+ if (setupSysCounterUpdater) {
+ this.counterUpdater = new TaskCounterUpdater(tezCounters, tezConf, pid);
+ } else {
+ this.counterUpdater = null;
+ }
}
protected enum State {
@@ -160,7 +164,9 @@ public abstract class RuntimeTask {
}
public void setFrameworkCounters() {
- this.counterUpdater.updateCounters();
+ if (counterUpdater != null) {
+ this.counterUpdater.updateCounters();
+ }
}
protected void setTaskDone() {
http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index edc8208..e9b48f4 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -109,6 +109,7 @@ public class TezChild {
private final long memAvailable;
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private final String user;
+ private final boolean updateSysCounters;
private Multimap<String, String> startedInputsMap = HashMultimap.create();
private final boolean ownUmbilical;
@@ -123,8 +124,8 @@ public class TezChild {
Map<String, String> serviceProviderEnvMap,
ObjectRegistryImpl objectRegistry, String pid,
ExecutionContext executionContext,
- Credentials credentials, long memAvailable, String user, TezTaskUmbilicalProtocol umbilical)
- throws IOException, InterruptedException {
+ Credentials credentials, long memAvailable, String user, TezTaskUmbilicalProtocol umbilical,
+ boolean updateSysCounters) throws IOException, InterruptedException {
this.defaultConf = conf;
this.containerIdString = containerIdentifier;
this.appAttemptNumber = appAttemptNumber;
@@ -136,6 +137,7 @@ public class TezChild {
this.credentials = credentials;
this.memAvailable = memAvailable;
this.user = user;
+ this.updateSysCounters = updateSysCounters;
getTaskMaxSleepTime = defaultConf.getInt(
TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
@@ -248,7 +250,7 @@ public class TezChild {
TezTaskRunner2 taskRunner = new TezTaskRunner2(defaultConf, childUGI,
localDirs, containerTask.getTaskSpec(), appAttemptNumber,
serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter,
- executor, objectRegistry, pid, executionContext, memAvailable);
+ executor, objectRegistry, pid, executionContext, memAvailable, updateSysCounters);
boolean shouldDie;
try {
TaskRunner2Result result = taskRunner.run();
@@ -433,7 +435,7 @@ public class TezChild {
String tokenIdentifier, int attemptNumber, String[] localDirs, String workingDirectory,
Map<String, String> serviceProviderEnvMap, @Nullable String pid,
ExecutionContext executionContext, Credentials credentials, long memAvailable, String user,
- TezTaskUmbilicalProtocol tezUmbilical)
+ TezTaskUmbilicalProtocol tezUmbilical, boolean updateSysCounters)
throws IOException, InterruptedException, TezException {
// Pull in configuration specified for the session.
@@ -446,7 +448,7 @@ public class TezChild {
return new TezChild(conf, host, port, containerIdentifier, tokenIdentifier,
attemptNumber, workingDirectory, localDirs, serviceProviderEnvMap, objectRegistry, pid,
- executionContext, credentials, memAvailable, user, tezUmbilical);
+ executionContext, credentials, memAvailable, user, tezUmbilical, updateSysCounters);
}
public static void main(String[] args) throws IOException, InterruptedException, TezException {
@@ -482,7 +484,7 @@ public class TezChild {
tokenIdentifier, attemptNumber, localDirs, System.getenv(Environment.PWD.name()),
System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())),
credentials, Runtime.getRuntime().maxMemory(), System
- .getenv(ApplicationConstants.Environment.USER.toString()), null);
+ .getenv(ApplicationConstants.Environment.USER.toString()), null, true);
tezChild.run();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index 7fd4c75..4fdc17d 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -55,7 +55,8 @@ public class TezTaskRunner2 {
private static final Logger LOG = LoggerFactory.getLogger(TezTaskRunner2.class);
- private final LogicalIOProcessorRuntimeTask task;
+ @VisibleForTesting
+ final LogicalIOProcessorRuntimeTask task;
private final UserGroupInformation ugi;
private final TaskReporterInterface taskReporter;
@@ -100,7 +101,8 @@ public class TezTaskRunner2 {
Multimap<String, String> startedInputsMap,
TaskReporterInterface taskReporter, ListeningExecutorService executor,
ObjectRegistry objectRegistry, String pid,
- ExecutionContext executionContext, long memAvailable) throws
+ ExecutionContext executionContext, long memAvailable,
+ boolean updateSysCounters) throws
IOException {
this.ugi = ugi;
this.taskReporter = taskReporter;
@@ -108,7 +110,7 @@ public class TezTaskRunner2 {
this.umbilicalAndErrorHandler = new UmbilicalAndErrorHandler();
this.task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs,
umbilicalAndErrorHandler, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap,
- objectRegistry, pid, executionContext, memAvailable);
+ objectRegistry, pid, executionContext, memAvailable, updateSysCounters);
}
/**
http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index 0acb7b8..0fc3919 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.nio.ByteBuffer;
-import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -35,7 +34,6 @@ import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -44,7 +42,6 @@ import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.AbstractLogicalInput;
import org.apache.tez.runtime.api.AbstractLogicalOutput;
import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.Reader;
@@ -85,7 +82,7 @@ public class TestLogicalIOProcessorRuntimeTask {
LogicalIOProcessorRuntimeTask lio1 = new LogicalIOProcessorRuntimeTask(task1, 0, tezConf, null,
umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap, null,
- "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory());
+ "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true);
try {
lio1.initialize();
@@ -113,7 +110,7 @@ public class TestLogicalIOProcessorRuntimeTask {
tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
LogicalIOProcessorRuntimeTask lio2 = new LogicalIOProcessorRuntimeTask(task2, 0, tezConf, null,
umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap, null,
- "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory());
+ "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true);
try {
lio2.initialize();
lio2.run();
http://git-wip-us.apache.org/repos/asf/tez/blob/8b412ee6/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
index 2123757..989753b 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
@@ -39,6 +39,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -49,6 +50,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
@@ -58,6 +64,7 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.ObjectRegistry;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
@@ -150,7 +157,8 @@ public class TestTaskExecution2 {
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
- TestProcessor.CONF_EMPTY);
+ TestProcessor.CONF_EMPTY, true);
+ LogicalIOProcessorRuntimeTask runtimeTask = taskRunner.task;
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture = taskExecutor.submit(
new TaskRunnerCallable2ForTest(taskRunner));
@@ -162,9 +170,12 @@ public class TestTaskExecution2 {
umbilical.verifyTaskSuccessEvent();
assertFalse(TestProcessor.wasAborted());
umbilical.resetTrackedEvents();
+ TezCounters tezCounters = runtimeTask.getCounters();
+ verifySysCounters(tezCounters, 5, 5);
taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
- TestProcessor.CONF_EMPTY);
+ TestProcessor.CONF_EMPTY, false);
+ runtimeTask = taskRunner.task;
// Setup the executor
taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
@@ -174,11 +185,14 @@ public class TestTaskExecution2 {
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskSuccessEvent();
assertFalse(TestProcessor.wasAborted());
+ tezCounters = runtimeTask.getCounters();
+ verifySysCounters(tezCounters, -1, -1);
} finally {
executor.shutdownNow();
}
}
+
// test task failed due to exception in Processor
@Test(timeout = 5000)
public void testFailedTaskTezException() throws IOException, InterruptedException, TezException,
@@ -231,7 +245,7 @@ public class TestTaskExecution2 {
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
- "NotExitedProcessor", TestProcessor.CONF_EMPTY, false);
+ "NotExitedProcessor", TestProcessor.CONF_EMPTY, false, true);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
@@ -484,6 +498,35 @@ public class TestTaskExecution2 {
}
}
+ private void verifySysCounters(TezCounters tezCounters, int minTaskCounterCount, int minFsCounterCount) {
+
+ Preconditions.checkArgument((minTaskCounterCount > 0 && minFsCounterCount > 0) ||
+ (minTaskCounterCount <= 0 && minFsCounterCount <= 0),
+ "Both targetCounter counts should be postitive or negative. A mix is not expected");
+
+ int numTaskCounters = 0;
+ int numFsCounters = 0;
+ for (CounterGroup counterGroup : tezCounters) {
+ if (counterGroup.getName().equals(TaskCounter.class.getName())) {
+ for (TezCounter ignored : counterGroup) {
+ numTaskCounters++;
+ }
+ } else if (counterGroup.getName().equals(FileSystemCounter.class.getName())) {
+ for (TezCounter ignored : counterGroup) {
+ numFsCounters++;
+ }
+ }
+ }
+
+ // If Target <=0, assert counter count is exactly 0
+ if (minTaskCounterCount <= 0) {
+ assertEquals(0, numTaskCounters);
+ assertEquals(0, numFsCounters);
+ } else {
+ assertTrue(numTaskCounters >= minTaskCounterCount);
+ assertTrue(numFsCounters >= minFsCounterCount);
+ }
+ }
private void verifyTaskRunnerResult(TaskRunner2Result taskRunner2Result,
EndReason expectedEndReason, Throwable expectedThrowable,
@@ -530,10 +573,20 @@ public class TestTaskExecution2 {
private TezTaskRunner2 createTaskRunner(ApplicationId appId,
TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical,
TaskReporter taskReporter,
- ListeningExecutorService executor, byte[] processorConf)
+ ListeningExecutorService executor, byte[] processorConf) throws
+ IOException {
+ return createTaskRunner(appId, umbilical, taskReporter, executor, processorConf, true);
+
+ }
+
+ private TezTaskRunner2 createTaskRunner(ApplicationId appId,
+ TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical,
+ TaskReporter taskReporter,
+ ListeningExecutorService executor, byte[] processorConf,
+ boolean updateSysCounters)
throws IOException {
return createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.class.getName(),
- processorConf, false);
+ processorConf, false, updateSysCounters);
}
private TezTaskRunner2ForTest createTaskRunnerForTest(ApplicationId appId,
@@ -544,14 +597,15 @@ public class TestTaskExecution2 {
throws IOException {
return (TezTaskRunner2ForTest) createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.class.getName(),
- processorConf, true);
+ processorConf, true, true);
}
private TezTaskRunner2 createTaskRunner(ApplicationId appId,
TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical,
TaskReporter taskReporter,
ListeningExecutorService executor, String processorClass,
- byte[] processorConf, boolean testRunner) throws
+ byte[] processorConf, boolean testRunner,
+ boolean updateSysCounters) throws
IOException {
TezConfiguration tezConf = new TezConfiguration(defaultConf);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
@@ -574,13 +628,13 @@ public class TestTaskExecution2 {
new HashMap<String, ByteBuffer>(), new HashMap<String, String>(),
HashMultimap.<String, String>create(), taskReporter,
executor, null, "", new ExecutionContextImpl("localhost"),
- Runtime.getRuntime().maxMemory());
+ Runtime.getRuntime().maxMemory(), updateSysCounters);
} else {
taskRunner = new TezTaskRunner2(tezConf, ugi, localDirs, taskSpec, 1,
new HashMap<String, ByteBuffer>(), new HashMap<String, String>(),
HashMultimap.<String, String>create(), taskReporter,
executor, null, "", new ExecutionContextImpl("localhost"),
- Runtime.getRuntime().maxMemory());
+ Runtime.getRuntime().maxMemory(), updateSysCounters);
}
return taskRunner;
@@ -604,10 +658,11 @@ public class TestTaskExecution2 {
ObjectRegistry objectRegistry,
String pid,
ExecutionContext executionContext,
- long memAvailable) throws IOException {
+ long memAvailable,
+ boolean updateSysCounters) throws IOException {
super(tezConf, ugi, localDirs, taskSpec, appAttemptNumber, serviceConsumerMetadata,
serviceProviderEnvMap, startedInputsMap, taskReporter, executor, objectRegistry, pid,
- executionContext, memAvailable);
+ executionContext, memAvailable, updateSysCounters);
}