You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/01/16 21:24:37 UTC
tez git commit: TEZ-1941. Memory provided by
*Context.getAvailableMemory needs to be setup explicitly. (sseth)
Repository: tez
Updated Branches:
refs/heads/master ea46f459c -> 4974fb235
TEZ-1941. Memory provided by *Context.getAvailableMemory needs to be
setup explicitly. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4974fb23
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4974fb23
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4974fb23
Branch: refs/heads/master
Commit: 4974fb235413fc67fbe48dc29cb651c4b0fe7ce3
Parents: ea46f45
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Jan 16 12:24:23 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Jan 16 12:24:23 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/runtime/api/ExecutionContext.java | 2 +-
.../org/apache/tez/runtime/api/TaskContext.java | 2 +-
.../dag/app/launcher/LocalContainerLauncher.java | 8 +++++---
.../apache/tez/mapreduce/processor/MapUtils.java | 3 ++-
.../processor/reduce/TestReduceProcessor.java | 3 ++-
tez-runtime-internals/findbugs-exclude.xml | 7 +++++++
.../runtime/LogicalIOProcessorRuntimeTask.java | 11 +++++++----
.../tez/runtime/api/impl/TezInputContextImpl.java | 4 ++--
.../runtime/api/impl/TezOutputContextImpl.java | 4 ++--
.../runtime/api/impl/TezProcessorContextImpl.java | 4 ++--
.../tez/runtime/api/impl/TezTaskContextImpl.java | 6 ++++--
.../org/apache/tez/runtime/task/TaskReporter.java | 11 ++++++-----
.../org/apache/tez/runtime/task/TezChild.java | 18 ++++++++++--------
.../apache/tez/runtime/task/TezTaskRunner.java | 4 ++--
.../TestLogicalIOProcessorRuntimeTask.java | 4 ++--
.../tez/runtime/task/TestTaskExecution.java | 2 +-
.../output/TestOnFileUnorderedKVOutput.java | 2 +-
18 files changed, 58 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index af4c60e..eec52dc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1941. Memory provided by *Context.getAvailableMemory needs to be setup explicitly.
TEZ-1879. Create local UGI instances for each task and the AM, when running in LocalMode.
TEZ-1661. LocalTaskScheduler hangs when shutdown.
TEZ-1934. TestAMRecovery may fail due to the execution order is not determined.
http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-api/src/main/java/org/apache/tez/runtime/api/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/ExecutionContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/ExecutionContext.java
index b120ecc..fa9a47f 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/ExecutionContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/ExecutionContext.java
@@ -17,7 +17,7 @@ package org.apache.tez.runtime.api;
import org.apache.hadoop.classification.InterfaceAudience;
/**
- * Execution context for a running task
+ * The context for the executor within which a task runs. May be shared between tasks
*
* This interface is not meant to be implemented by users
*/
http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
index 42918b9..8e22057 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
@@ -182,7 +182,7 @@ public interface TaskContext {
public int getVertexParallelism();
/**
- * Get the execution context for a running task
+ * Get the context for the executor. This may be shared across multiple tasks
* @return the execution context
*/
public ExecutionContext getExecutionContext();
http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 2f29569..9a83d3c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -89,7 +89,7 @@ public class LocalContainerLauncher extends AbstractService implements
private final AtomicBoolean serviceStopped = new AtomicBoolean(false);
private final String workingDirectory;
private final Map<String, String> localEnv = new HashMap<String, String>();
- private final ExecutionContext ExecutionContext;
+ private final ExecutionContext executionContext;
private final ConcurrentHashMap<ContainerId, ListenableFuture<TezChild.ContainerExecutionResult>>
runningContainers =
@@ -116,7 +116,7 @@ public class LocalContainerLauncher extends AbstractService implements
this.workingDirectory = workingDirectory;
AuxiliaryServiceHelper.setServiceDataIntoEnv(
ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(0), localEnv);
- ExecutionContext = new ExecutionContextImpl(InetAddress.getLocalHost().getHostName());
+ executionContext = new ExecutionContextImpl(InetAddress.getLocalHost().getHostName());
// User cannot be set here since it isn't available till a DAG is running.
}
@@ -328,9 +328,11 @@ public class LocalContainerLauncher extends AbstractService implements
containerEnv.putAll(localEnv);
containerEnv.put(Environment.USER.name(), context.getUser());
+ // TODO TEZ-1482. Control the memory available based on number of executors
TezChild tezChild =
TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier,
- attemptNumber, localDirs, workingDirectory, containerEnv, "", ExecutionContext, credentials);
+ attemptNumber, localDirs, workingDirectory, containerEnv, "", executionContext, credentials,
+ Runtime.getRuntime().maxMemory());
tezChild.setUmbilical(tezTaskUmbilicalProtocol);
return tezChild;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 8ec17eb..40bea0f 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -231,7 +231,8 @@ public class MapUtils {
umbilical,
serviceConsumerMetadata,
envMap,
- HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"));
+ HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"),
+ Runtime.getRuntime().maxMemory());
return task;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index b378f37..b205a42 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -223,7 +223,8 @@ public class TestReduceProcessor {
new TestUmbilical(),
serviceConsumerMetadata,
serviceProviderEnvMap,
- HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"));
+ HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"),
+ Runtime.getRuntime().maxMemory());
List<Event> destEvents = new LinkedList<Event>();
destEvents.add(dme);
http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/findbugs-exclude.xml b/tez-runtime-internals/findbugs-exclude.xml
index bc8a2c0..2b308de 100644
--- a/tez-runtime-internals/findbugs-exclude.xml
+++ b/tez-runtime-internals/findbugs-exclude.xml
@@ -33,6 +33,13 @@
</Match>
<Match>
+ <Class name="org.apache.tez.runtime.task.TezChild"/>
+ <Method name="<init>"/>
+ <Field name="localDirs"/>
+ <Bug pattern="EI_EXPOSE_REP2"/>
+ </Match>
+
+ <Match>
<Class name="~org\.apache\.tez\.runtime\.internals\.api\.events\.SystemEventProtos\$.*Proto" />
<Field name="PARSER"/>
<Bug pattern="MS_SHOULD_BE_FINAL"/>
http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index c3f90a5..9d9be37 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -133,12 +133,13 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
private final ObjectRegistry objectRegistry;
private final ExecutionContext ExecutionContext;
+ private final long memAvailable;
public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical,
Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> envMap,
Multimap<String, String> startedInputsMap, ObjectRegistry objectRegistry,
- String pid, ExecutionContext ExecutionContext) throws IOException {
+ String pid, ExecutionContext ExecutionContext, long memAvailable) throws IOException {
// TODO Remove jobToken from here post TEZ-421
super(taskSpec, tezConf, tezUmbilical, pid);
LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
@@ -176,6 +177,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
this.inputReadyTracker = new InputReadyTracker();
this.objectRegistry = objectRegistry;
this.ExecutionContext = ExecutionContext;
+ this.memAvailable = memAvailable;
}
/**
@@ -488,7 +490,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
inputSpec.getInputDescriptor().getUserPayload(), this,
serviceConsumerMetadata, envMap, initialMemoryDistributor,
inputSpec.getInputDescriptor(), inputMap, inputReadyTracker, objectRegistry,
- ExecutionContext);
+ ExecutionContext, memAvailable);
return inputContext;
}
@@ -502,7 +504,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
tezCounters, outputIndex,
outputSpec.getOutputDescriptor().getUserPayload(), this,
serviceConsumerMetadata, envMap, initialMemoryDistributor,
- outputSpec.getOutputDescriptor(), objectRegistry, ExecutionContext);
+ outputSpec.getOutputDescriptor(), objectRegistry, ExecutionContext,
+ memAvailable);
return outputContext;
}
@@ -514,7 +517,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
taskSpec.getTaskAttemptID(),
tezCounters, processorDescriptor.getUserPayload(), this,
serviceConsumerMetadata, envMap, initialMemoryDistributor,
- processorDescriptor, inputReadyTracker, objectRegistry, ExecutionContext);
+ processorDescriptor, inputReadyTracker, objectRegistry, ExecutionContext, memAvailable);
return processorContext;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index 460efbf..a15e072 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -68,12 +68,12 @@ public class TezInputContextImpl extends TezTaskContextImpl
Map<String, String> auxServiceEnv, MemoryDistributor memDist,
InputDescriptor inputDescriptor, Map<String, LogicalInput> inputs,
InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry,
- ExecutionContext ExecutionContext) {
+ ExecutionContext ExecutionContext, long memAvailable) {
super(conf, workDirs, appAttemptNumber, dagName, taskVertexName,
vertexParallelism, taskAttemptID, wrapCounters(counters,
taskVertexName, sourceVertexName, conf), runtimeTask, tezUmbilical,
serviceConsumerMetadata, auxServiceEnv, memDist, inputDescriptor,
- objectRegistry, ExecutionContext);
+ objectRegistry, ExecutionContext, memAvailable);
checkNotNull(inputIndex, "inputIndex is null");
checkNotNull(sourceVertexName, "sourceVertexName is null");
checkNotNull(inputs, "input map is null");
http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 88e87ce..d376b88 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -63,12 +63,12 @@ public class TezOutputContextImpl extends TezTaskContextImpl
Map<String, ByteBuffer> serviceConsumerMetadata,
Map<String, String> auxServiceEnv, MemoryDistributor memDist,
OutputDescriptor outputDescriptor, ObjectRegistry objectRegistry,
- ExecutionContext ExecutionContext) {
+ ExecutionContext ExecutionContext, long memAvailable) {
super(conf, workDirs, appAttemptNumber, dagName, taskVertexName,
vertexParallelism, taskAttemptID,
wrapCounters(counters, taskVertexName, destinationVertexName, conf),
runtimeTask, tezUmbilical, serviceConsumerMetadata,
- auxServiceEnv, memDist, outputDescriptor, objectRegistry, ExecutionContext);
+ auxServiceEnv, memDist, outputDescriptor, objectRegistry, ExecutionContext, memAvailable);
checkNotNull(outputIndex, "outputIndex is null");
checkNotNull(destinationVertexName, "destinationVertexName is null");
this.userPayload = userPayload;
http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index 3317c80..16f9a45 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -59,10 +59,10 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
Map<String, ByteBuffer> serviceConsumerMetadata,
Map<String, String> auxServiceEnv, MemoryDistributor memDist,
ProcessorDescriptor processorDescriptor, InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry,
- ExecutionContext ExecutionContext) {
+ ExecutionContext ExecutionContext, long memAvailable) {
super(conf, workDirs, appAttemptNumber, dagName, vertexName, vertexParallelism, taskAttemptID,
counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
- auxServiceEnv, memDist, processorDescriptor, objectRegistry, ExecutionContext);
+ auxServiceEnv, memDist, processorDescriptor, objectRegistry, ExecutionContext, memAvailable);
checkNotNull(inputReadyTracker, "inputReadyTracker is null");
this.userPayload = userPayload;
this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index d9de7b5..527b822 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -64,6 +64,7 @@ public abstract class TezTaskContextImpl implements TaskContext {
private final ObjectRegistry objectRegistry;
private final int vertexParallelism;
private final ExecutionContext ExecutionContext;
+ private final long memAvailable;
@Private
public TezTaskContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber,
@@ -72,7 +73,7 @@ public abstract class TezTaskContextImpl implements TaskContext {
TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata,
Map<String, String> auxServiceEnv, MemoryDistributor memDist,
EntityDescriptor<?> descriptor, ObjectRegistry objectRegistry,
- ExecutionContext ExecutionContext) {
+ ExecutionContext ExecutionContext, long memAvailable) {
checkNotNull(conf, "conf is null");
checkNotNull(dagName, "dagName is null");
checkNotNull(taskVertexName, "taskVertexName is null");
@@ -103,6 +104,7 @@ public abstract class TezTaskContextImpl implements TaskContext {
this.objectRegistry = objectRegistry;
this.vertexParallelism = vertexParallelism;
this.ExecutionContext = ExecutionContext;
+ this.memAvailable = memAvailable;
}
@Override
@@ -198,7 +200,7 @@ public abstract class TezTaskContextImpl implements TaskContext {
@Override
public long getTotalMemoryAvailableToTask() {
- return Runtime.getRuntime().maxMemory();
+ return memAvailable;
}
protected void signalFatalError(Throwable t, String message, EventMetaData sourceInfo) {
http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index 15dcbb0..0841984 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -142,7 +143,7 @@ public class TaskReporter {
* Keeps track of regular timed heartbeats. Is primarily used as a timing mechanism to send /
* log counters.
*/
- private int nonOobHeartbeatCounter = 0;
+ private AtomicInteger nonOobHeartbeatCounter = new AtomicInteger(0);
private int nextHeartbeatNumToLog = 0;
/*
* Tracks the last non-OOB heartbeat number at which counters were sent to the AM.
@@ -186,7 +187,7 @@ public class TaskReporter {
try {
boolean interrupted = condition.await(pollInterval, TimeUnit.MILLISECONDS);
if (!interrupted) {
- nonOobHeartbeatCounter++;
+ nonOobHeartbeatCounter.incrementAndGet();
}
} finally {
lock.unlock();
@@ -228,9 +229,9 @@ public class TaskReporter {
* real time decisions are made based on these counters, it can be sent once per second.
*/
// Not completely accurate, since OOB heartbeats could go out.
- if ((nonOobHeartbeatCounter - prevCounterSendHeartbeatNum) * pollInterval >= sendCounterInterval) {
+ if ((nonOobHeartbeatCounter.get() - prevCounterSendHeartbeatNum) * pollInterval >= sendCounterInterval) {
counters = task.getCounters();
- prevCounterSendHeartbeatNum = nonOobHeartbeatCounter;
+ prevCounterSendHeartbeatNum = nonOobHeartbeatCounter.get();
}
updateEvent = new TezEvent(new TaskStatusUpdateEvent(counters, task.getProgress()),
updateEventMetadata);
@@ -294,7 +295,7 @@ public class TaskReporter {
private void maybeLogCounters() {
if (LOG.isDebugEnabled()) {
- if (nonOobHeartbeatCounter == nextHeartbeatNumToLog) {
+ if (nonOobHeartbeatCounter.get() == nextHeartbeatNumToLog) {
LOG.debug("Counters: " + task.getCounters().toShortString());
nextHeartbeatNumToLog = (int) (nextHeartbeatNumToLog * (LOG_COUNTER_BACKOFF));
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 7a9b600..bfac476 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -99,10 +99,11 @@ public class TezChild {
private final ListeningExecutorService executor;
private final ObjectRegistryImpl objectRegistry;
private final String pid;
- private final ExecutionContext ExecutionContext;
+ private final ExecutionContext executionContext;
private final Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
private final Map<String, String> serviceProviderEnvMap;
private final Credentials credentials;
+ private final long memAvailable;
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private Multimap<String, String> startedInputsMap = HashMultimap.create();
@@ -116,8 +117,8 @@ public class TezChild {
String tokenIdentifier, int appAttemptNumber, String workingDir, String[] localDirs,
Map<String, String> serviceProviderEnvMap,
ObjectRegistryImpl objectRegistry, String pid,
- ExecutionContext ExecutionContext,
- Credentials credentials)
+ ExecutionContext executionContext,
+ Credentials credentials, long memAvailable)
throws IOException, InterruptedException {
this.defaultConf = conf;
this.containerIdString = containerIdentifier;
@@ -126,8 +127,9 @@ public class TezChild {
this.serviceProviderEnvMap = serviceProviderEnvMap;
this.workingDir = workingDir;
this.pid = pid;
- this.ExecutionContext = ExecutionContext;
+ this.executionContext = executionContext;
this.credentials = credentials;
+ this.memAvailable = memAvailable;
getTaskMaxSleepTime = defaultConf.getInt(
TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
@@ -233,7 +235,7 @@ public class TezChild {
TezTaskRunner taskRunner = new TezTaskRunner(defaultConf, childUGI,
localDirs, containerTask.getTaskSpec(), umbilical, appAttemptNumber,
serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter,
- executor, objectRegistry, pid, this.ExecutionContext);
+ executor, objectRegistry, pid, executionContext, memAvailable);
boolean shouldDie;
try {
shouldDie = !taskRunner.run();
@@ -409,7 +411,7 @@ public class TezChild {
public static TezChild newTezChild(Configuration conf, String host, int port, String containerIdentifier,
String tokenIdentifier, int attemptNumber, String[] localDirs, String workingDirectory,
Map<String, String> serviceProviderEnvMap, @Nullable String pid,
- ExecutionContext ExecutionContext, Credentials credentials)
+ ExecutionContext executionContext, Credentials credentials, long memAvailable)
throws IOException, InterruptedException, TezException {
// Pull in configuration specified for the session.
@@ -426,7 +428,7 @@ public class TezChild {
return new TezChild(conf, host, port, containerIdentifier, tokenIdentifier,
attemptNumber, workingDirectory, localDirs, serviceProviderEnvMap, objectRegistry, pid,
- ExecutionContext, credentials);
+ executionContext, credentials, memAvailable);
}
public static void main(String[] args) throws IOException, InterruptedException, TezException {
@@ -459,7 +461,7 @@ public class TezChild {
TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier,
tokenIdentifier, attemptNumber, localDirs, System.getenv(Environment.PWD.name()),
System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())),
- credentials);
+ credentials, Runtime.getRuntime().maxMemory());
tezChild.run();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
index 6e655f9..6606481 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -72,7 +72,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> serviceProviderEnvMap,
Multimap<String, String> startedInputsMap, TaskReporter taskReporter,
ListeningExecutorService executor, ObjectRegistry objectRegistry, String pid,
- ExecutionContext ExecutionContext)
+ ExecutionContext executionContext, long memAvailable)
throws IOException {
this.tezConf = tezConf;
this.ugi = ugi;
@@ -80,7 +80,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
this.executor = executor;
task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs, this,
serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, objectRegistry, pid,
- ExecutionContext);
+ executionContext, memAvailable);
taskReporter.registerTask(task, this);
taskRunning = new AtomicBoolean(true);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index 60f8d46..4d165b5 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -78,7 +78,7 @@ public class TestLogicalIOProcessorRuntimeTask {
LogicalIOProcessorRuntimeTask lio1 = new LogicalIOProcessorRuntimeTask(task1, 0, tezConf, null,
umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap, null,
- "", new ExecutionContextImpl("localhost"));
+ "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory());
lio1.initialize();
lio1.run();
@@ -96,7 +96,7 @@ public class TestLogicalIOProcessorRuntimeTask {
LogicalIOProcessorRuntimeTask lio2 = new LogicalIOProcessorRuntimeTask(task2, 0, tezConf, null,
umbilical, serviceConsumerMetadata, new HashMap<String, String>(), startedInputsMap, null,
- "", new ExecutionContextImpl("localhost"));
+ "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory());
lio2.initialize();
lio2.run();
lio2.close();
http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
index 4f94cfe..7890f89 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
@@ -717,7 +717,7 @@ public class TestTaskExecution {
TezTaskRunner taskRunner = new TezTaskRunner(tezConf, ugi, localDirs, taskSpec, umbilical, 1,
new HashMap<String, ByteBuffer>(), new HashMap<String, String>(), HashMultimap.<String, String> create(), taskReporter,
- executor, null, "", new ExecutionContextImpl("localhost"));
+ executor, null, "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory());
return taskRunner;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4974fb23/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 91d558f..1f78cbd 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -126,7 +126,7 @@ public class TestOnFileUnorderedKVOutput {
appAttemptNumber, tezUmbilical, dagName, taskVertexName, destinationVertexName,
-1, taskAttemptID, counters, 0, userPayload, runtimeTask,
null, auxEnv, new MemoryDistributor(1, 1, conf) , outputDescriptor, null,
- new ExecutionContextImpl("localhost"));
+ new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory());
UnorderedKVOutput kvOutput = new OnFileUnorderedKVOutputForTest(outputContext, 1);