You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/14 22:59:00 UTC
[42/50] [abbrv] tez git commit: TEZ-2626. Fix log lines with DEBUG in
messages, consolidate TEZ-2003 TODOs. (sseth)
TEZ-2626. Fix log lines with DEBUG in messages, consolidate TEZ-2003 TODOs. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f30a3fe3
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f30a3fe3
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f30a3fe3
Branch: refs/heads/TEZ-2003
Commit: f30a3fe3751666505ebdfe39bb7e1f32d2b09736
Parents: e445576
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Jul 30 13:39:40 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 14 13:47:10 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
pom.xml | 2 +-
.../serviceplugins/api/ContainerLauncher.java | 4 ---
.../tez/serviceplugins/api/TaskScheduler.java | 5 ++++
.../api/TaskSchedulerContext.java | 6 ++--
.../apache/tez/dag/api/TaskCommunicator.java | 29 +++++++-------------
.../tez/dag/api/TaskCommunicatorContext.java | 24 ++++++++--------
.../tez/dag/api/TaskHeartbeatRequest.java | 6 ++--
.../tez/dag/api/TaskHeartbeatResponse.java | 2 +-
.../dag/app/TaskAttemptListenerImpTezDag.java | 7 ++---
.../dag/app/TaskCommunicatorContextImpl.java | 2 +-
.../tez/dag/app/TezTaskCommunicatorImpl.java | 3 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 2 +-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 2 +-
.../dag/app/rm/TaskSchedulerEventHandler.java | 6 ----
.../rm/container/AMContainerEventAssignTA.java | 2 --
tez-ext-service-tests/pom.xml | 1 -
.../TezTestServiceContainerLauncher.java | 4 +--
.../TezTestServiceNoOpContainerLauncher.java | 2 +-
.../rm/TezTestServiceTaskSchedulerService.java | 6 ++--
.../tez/service/impl/ContainerRunnerImpl.java | 6 ++--
.../tez/tests/TestExternalTezServices.java | 2 --
.../internals/api/TaskReporterInterface.java | 4 +--
23 files changed, 51 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 9b3967a..c7a3dcc 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -41,5 +41,6 @@ ALL CHANGES:
TEZ-2653. Change service contexts to expose a user specified payload instead of the AM configuration.
TEZ-2441. Add tests for TezTaskRunner2.
TEZ-2657. Add tests for client side changes - specifying plugins, etc.
+ TEZ-2626. Fix log lines with DEBUG in messages, consolidate TEZ-2003 TODOs.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1ced4e7..7ae5f31 100644
--- a/pom.xml
+++ b/pom.xml
@@ -682,10 +682,10 @@
<module>tez-examples</module>
<module>tez-tests</module>
<module>tez-dag</module>
+ <module>tez-ext-service-tests</module>
<module>tez-ui</module>
<module>tez-plugins</module>
<module>tez-tools</module>
- <module>tez-ext-service-tests</module>
<module>tez-dist</module>
<module>docs</module>
</modules>
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
index 8337dcb..7f58f77 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
@@ -16,7 +16,6 @@ package org.apache.tez.serviceplugins.api;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.service.AbstractService;
import org.apache.tez.common.ServicePluginLifecycle;
/**
@@ -30,9 +29,6 @@ public abstract class ContainerLauncher implements ServicePluginLifecycle {
private final ContainerLauncherContext containerLauncherContext;
- // TODO TEZ-2003 Simplify this by moving away from AbstractService. Potentially Guava AbstractService.
- // A serviceInit(Configuration) is not likely to be very useful, and will expose unnecessary internal
- // configuration to the services if populated with the AM Configuration
public ContainerLauncher(ContainerLauncherContext containerLauncherContext) {
this.containerLauncherContext = containerLauncherContext;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
index a5b054f..9ff2bd5 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
@@ -27,6 +27,11 @@ import org.apache.tez.common.ServicePluginLifecycle;
@InterfaceStability.Unstable
public abstract class TaskScheduler implements ServicePluginLifecycle {
+ // TODO TEZ-2003 (post) TEZ-2668
+ // - Should setRegister / unregister be part of APIs when not YARN specific ?
+ // - Include vertex / task information in therequest so that the scheduler can make decisions
+ // around prioritizing tasks in the same vertex when others exist at the same priority.
+
private final TaskSchedulerContext taskSchedulerContext;
public TaskScheduler(TaskSchedulerContext taskSchedulerContext) {
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
index 6f37641..dbbf75c 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
@@ -53,7 +53,10 @@ public interface TaskSchedulerContext {
IDLE, RUNNING_APP, COMPLETED
}
- // TODO Post TEZ-2003. Remove references to YARN constructs like Container, ContainerStatus, NodeReport
+ // TODO TEZ-2003 (post) TEZ-2664. Remove references to YARN constructs like Container, ContainerStatus, NodeReport
+ // TODO TEZ-2003 (post) TEZ-2668 Enhancements to TaskScheduler interfaces
+ // - setApplicationRegistrationData may not be relevant to non YARN clusters
+ // - getAppFinalStatus may not be relevant to non YARN clusters
// upcall to app must be outside locks
public void taskAllocated(Object task,
Object appCookie,
@@ -78,7 +81,6 @@ public interface TaskSchedulerContext {
public float getProgress();
public void preemptContainer(ContainerId containerId);
- // TODO Post TEZ-2003. Another method which is primarily relevant to YARN clusters for unregistration.
public AppFinalStatus getFinalAppStatus();
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index f221414..794d390 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -27,9 +27,18 @@ import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.TaskSpec;
-// TODO TEZ-2003 Move this into the tez-api module
+// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
+// TODO TEZ-2003 (post) TEZ-2664. Ideally, don't expose YARN containerId; instead expose a Tez specific construct.
public abstract class TaskCommunicator implements ServicePluginLifecycle {
+ // TODO TEZ-2003 (post) TEZ-2666 Enhancements to interface
+ // - registerContainerEnd should provide the end reason / possible rename
+ // - get rid of getAddress
+ // - Add methods to support task preemption
+ // - Add a dagStarted notification, along with a payload
+ // - taskSpec breakup into a clean interface
+ // - Add methods to report task / container completion
+
private final TaskCommunicatorContext taskCommunicatorContext;
public TaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
@@ -52,36 +61,20 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
public void shutdown() throws Exception {
}
- // TODO Post TEZ-2003 Move this into the API module. Moving this requires abstractions for
- // TaskSpec and related classes. (assuming that's efficient for execution)
- // TODO TEZ-2003 Ideally, don't expose YARN containerId; instead expose a Tez specific construct.
- // TODO When talking to an external service, this plugin implementer may need access to a host:port
public abstract void registerRunningContainer(ContainerId containerId, String hostname, int port);
- // TODO TEZ-2003 Ideally, don't expose YARN containerId; instead expose a Tez specific construct.
public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason);
- // TODO TEZ-2003 Provide additional inforamtion like reason for container end / Task end.
- // Was it caused by preemption - or as a result of a general task completion / container completion
-
- // TODO TEZ-2003 TaskSpec breakup into a clean interface
- // TODO TEZ-2003 Add support for priority
public abstract void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec,
Map<String, LocalResource> additionalResources,
Credentials credentials,
boolean credentialsChanged, int priority);
- // TODO TEZ-2003. Are additional APIs required to mark a container as completed ? - for completeness.
-
- // TODO TEZ-2003 Remove reference to TaskAttemptID
public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason);
- // TODO TEZ-2003 This doesn't necessarily belong here. A server may not start within the AM.
public abstract InetSocketAddress getAddress();
- // TODO Eventually. Add methods here to support preemption of tasks.
-
/**
* Receive notifications on vertex state changes.
* <p/>
@@ -108,8 +101,6 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
* After this, the contents returned from querying the context may change at any point - due to
* the next dag being submitted.
*/
- // TODO TEZ-2003 This is extremely difficult to use. Add the dagStarted notification, and potentially
- // throw exceptions between a dagComplete and dagStart invocation.
public abstract void dagComplete(String dagName);
/**
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index a1e94a3..8073f6a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -28,23 +28,27 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
// Do not make calls into this from within a held lock.
-// TODO TEZ-2003 Move this into the tez-api module
+// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
public interface TaskCommunicatorContext {
- // TODO TEZ-2003 Add signalling back into this to indicate errors - e.g. Container unregsitered, task no longer running, etc.
-
- // TODO TEZ-2003 Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc.
+ // TODO TEZ-2003 (post) TEZ-2666 Enhancements to API
+ // - Consolidate usage of IDs
+ // - Split the heartbeat API to a liveness check and a status update
+ // - Rename and consolidate TaskHeartbeatResponse and TaskHeartbeatRequest
+ // - Fix taskStarted needs to be invoked before launching the actual task.
+ // - Potentially add methods to report availability stats to the scheduler
+ // - Report taskSuccess via a method instead of the heartbeat
+ // - Add methods to signal container / task state changes
+ // - Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc.
+ // - Handling of containres / tasks which no longer exist in the system (formalized interface instead of a shouldDie notification)
UserPayload getInitialUserPayload();
ApplicationAttemptId getApplicationAttemptId();
Credentials getCredentials();
- // TODO TEZ-2003 Move to vertex, taskIndex, version
boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException;
- // TODO TEZ-2003 Split the heartbeat API to a liveness check and a status update
- // KKK Rename this API
TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException;
boolean isKnownContainer(ContainerId containerId);
@@ -53,13 +57,10 @@ public interface TaskCommunicatorContext {
void containerAlive(ContainerId containerId);
- // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt*
void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId);
- // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt*
void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics);
- // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt*
void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics);
/**
@@ -72,9 +73,6 @@ public interface TaskCommunicatorContext {
* @param stateSet the set of states for which notifications are required. null implies all
*/
void registerForVertexStateUpdates(String vertexName, @Nullable Set<VertexState> stateSet);
- // TODO TEZ-2003 API. Should a method exist for task succeeded.
-
- // TODO Eventually Add methods to report availability stats to the scheduler.
/**
* Get the name of the currently executing dag
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
index b5ff991..d0c22d3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
@@ -19,13 +19,11 @@ import java.util.List;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.TezEvent;
-// TODO TEZ-2003 Move this into the tez-api module
+// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
public class TaskHeartbeatRequest {
- // TODO TEZ-2003 Ideally containerIdentifier should not be part of the request.
- // Replace with a task lookup - vertex name + task index
+ // TODO TEZ-2003 (post) TEZ-2666 Ideally containerIdentifier should not be part of the request.
private final String containerIdentifier;
- // TODO TEZ-2003 Get rid of the task attemptId reference if possible
private final TezTaskAttemptID taskAttemptId;
private final List<TezEvent> events;
private final int startIndex;
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
index 7f063c4..dcf89ff 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
@@ -18,7 +18,7 @@ import java.util.List;
import org.apache.tez.runtime.api.impl.TezEvent;
-// TODO TEZ-2003 Move this into the tez-api module
+// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
public class TaskHeartbeatResponse {
private final boolean shouldDie;
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index cc109a6..941e583 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -162,7 +162,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
LOG.info("Using Default Local Task Communicator");
return new TezLocalTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]);
} else {
- // TODO TEZ-2003. Use the payload
LOG.info("Using TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(), taskCommDescriptor.getClassName());
Class<? extends TaskCommunicator> taskCommClazz = (Class<? extends TaskCommunicator>) ReflectionUtils
.getClazz(taskCommDescriptor.getClassName());
@@ -217,7 +216,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
// This can happen when a task heartbeats. Meanwhile the container is unregistered.
// The information will eventually make it through to the plugin via a corresponding unregister.
// There's a race in that case between the unregister making it through, and this method returning.
- // TODO TEZ-2003. An exception back is likely a better approach than sending a shouldDie = true,
+ // TODO TEZ-2003 (post) TEZ-2666. An exception back is likely a better approach than sending a shouldDie = true,
// so that the plugin can handle the scenario. Alternately augment the response with error codes.
// Error codes would be better than exceptions.
LOG.info("Attempt: " + taskAttemptID + " is not recognized for heartbeats");
@@ -278,7 +277,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
String diagnostics) {
// Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
// and messages from the scheduler will release the container.
- // TODO TEZ-2003 Maybe consider un-registering here itself, since the task is not active anymore,
+ // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore,
// instead of waiting for the unregister to flow through the Container.
// Fix along the same lines as TEZ-2124 by introducing an explict context.
context.getEventHandler().handle(new TaskAttemptEventAttemptKilled(taskAttemptId,
@@ -290,7 +289,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
String diagnostics) {
// Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
// and messages from the scheduler will release the container.
- // TODO TEZ-2003 Maybe consider un-registering here itself, since the task is not active anymore,
+ // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore,
// instead of waiting for the unregister to flow through the Container.
// Fix along the same lines as TEZ-2124 by introducing an explict context.
context.getEventHandler().handle(new TaskAttemptEventAttemptFailed(taskAttemptId,
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index cc315b7..0f10305 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -43,6 +43,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
@InterfaceAudience.Private
public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, VertexStateUpdateListener {
+ // TODO TEZ-2003 (post) TEZ-2669 Propagate errors baack to the AM with proper error reporting
private final AppContext context;
private final TaskAttemptListenerImpTezDag taskAttemptListener;
@@ -188,7 +189,6 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
try {
taskAttemptListener.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex);
} catch (Exception e) {
- // TODO TEZ-2003 This needs to be propagated to the DAG as a user error.
throw new TezUncheckedException(e);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 2a5c80e..fb6d5e7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -463,9 +463,8 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
// Holder for Task information, which eventually will likely be VertexImplm taskIndex, attemptIndex
+ // TODO TEZ-2003. TEZ-2670. Remove this class.
protected static class TaskAttempt {
- // TODO TEZ-2003 Change this to work with VertexName, int id, int version
- // TODO TEZ-2003 Avoid constructing this unit all over the place
private TezTaskAttemptID taskAttemptId;
TaskAttempt(TezTaskAttemptID taskAttemptId) {
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 25518b0..17f5675 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1453,7 +1453,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
// check task resources, only check it in non-local mode
if (!appContext.isLocal()) {
for (Vertex v : vertexMap.values()) {
- // TODO TEZ-2003 (post) Ideally, this should be per source.
+ // TODO TEZ-2003 (post) TEZ-2624 Ideally, this should be per source.
if (v.getTaskResource().compareTo(appContext.getClusterInfo().getMaxContainerCapability()) > 0) {
String msg = "Vertex's TaskResource is beyond the cluster container capability," +
"Vertex=" + v.getLogIdentifier() +", Requested TaskResource=" + v.getTaskResource()
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index cb26c55..65ea3fb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -191,7 +191,7 @@ public class TaskAttemptImpl implements TaskAttempt,
private final StateMachine<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent> stateMachine;
- // TODO TEZ-2003 We may need some additional state management for STATUS_UPDATES, FAILED, KILLED coming in before
+ // TODO TEZ-2003 (post) TEZ-2667 We may need some additional state management for STATUS_UPDATES, FAILED, KILLED coming in before
// TASK_STARTED_REMOTELY. In case of a PUSH it's more intuitive to send TASK_STARTED_REMOTELY after communicating
// with the listening service and getting a response, which in turn can trigger STATUS_UPDATES / FAILED / KILLED
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 4c2e631..c86f638 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -502,7 +502,6 @@ public class TaskSchedulerEventHandler extends AbstractService implements
taskSchedulerServiceWrappers[i].start();
if (shouldUnregisterFlag.get()) {
// Flag may have been set earlier when task scheduler was not initialized
- // TODO TEZ-2003 Should setRegister / unregister be part of APIs when not YARN specific ?
// External services could need to talk to some other entity.
taskSchedulers[i].setShouldUnregister();
}
@@ -564,8 +563,6 @@ public class TaskSchedulerEventHandler extends AbstractService implements
appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS);
}
- // TODO TEZ-2003 Consolidate TaskSchedulerAppCallback methods once these methods are moved into context
-
// TaskSchedulerAppCallback methods with schedulerId, where relevant
public synchronized void taskAllocated(int schedulerId, Object task,
Object appCookie,
@@ -651,7 +648,6 @@ public class TaskSchedulerEventHandler extends AbstractService implements
Resource maxContainerCapability,
Map<ApplicationAccessType, String> appAcls,
ByteBuffer clientAMSecretKey) {
- // TODO TEZ-2003 (post) Ideally clusterInfo should be available per source rather than a global view.
this.appContext.getClusterInfo().setMaxContainerCapability(
maxContainerCapability);
this.appAcls = appAcls;
@@ -751,7 +747,6 @@ public class TaskSchedulerEventHandler extends AbstractService implements
this.shouldUnregisterFlag.set(true);
for (int i = 0 ; i < taskSchedulers.length ; i++) {
if (this.taskSchedulers[i] != null) {
- // TODO TEZ-2003 registration required for all schedulers ?
this.taskSchedulers[i].setShouldUnregister();
}
}
@@ -764,7 +759,6 @@ public class TaskSchedulerEventHandler extends AbstractService implements
public boolean hasUnregistered() {
boolean result = true;
for (int i = 0 ; i < taskSchedulers.length ; i++) {
- // TODO TEZ-2003 registration required for all schedulers ?
result |= this.taskSchedulers[i].hasUnregistered();
if (result == false) {
return result;
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
index 0398882..682cd02 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
@@ -27,8 +27,6 @@ import org.apache.tez.runtime.api.impl.TaskSpec;
public class AMContainerEventAssignTA extends AMContainerEvent {
- // TODO TEZ-2003. Add the task priority to this event.
-
private final TezTaskAttemptID attemptId;
// TODO Maybe have tht TAL pull the remoteTask from the TaskAttempt itself ?
private final TaskSpec remoteTaskSpec;
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-ext-service-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/pom.xml b/tez-ext-service-tests/pom.xml
index 907e129..f95f4ca 100644
--- a/tez-ext-service-tests/pom.xml
+++ b/tez-ext-service-tests/pom.xml
@@ -23,7 +23,6 @@
<version>0.8.0-TEZ-2003-SNAPSHOT</version>
</parent>
- <!-- TODO TEZ-2003 Merge this into the tez-tests module -->
<artifactId>tez-ext-service-tests</artifactId>
<dependencies>
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
index f31a07b..845a27b 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
@@ -37,8 +37,6 @@ import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainer
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-// TODO TEZ-2003 look for all LOG.*(DBG and LOG.*(DEBUG messages
-
public class TezTestServiceContainerLauncher extends ContainerLauncher {
// TODO Support interruptability of tasks which haven't yet been launched.
@@ -119,7 +117,7 @@ public class TezTestServiceContainerLauncher extends ContainerLauncher {
@Override
public void stopContainer(ContainerStopRequest stopRequest) {
- LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + stopRequest);
+ LOG.info("Ignoring stopContainer for event: " + stopRequest);
// that the container is actually done (normally received from RM)
// TODO Sending this out for an un-launched container is invalid
getContext().containerStopRequested(stopRequest.getContainerId());
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
index 7b42296..d265736 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
@@ -39,7 +39,7 @@ public class TezTestServiceNoOpContainerLauncher extends ContainerLauncher {
@Override
public void stopContainer(ContainerStopRequest stopRequest) {
- LOG.info("DEBUG: Ignoring STOP_REQUEST {}", stopRequest);
+ LOG.info("Ignoring stopRequest {}", stopRequest);
getContext().containerStopRequested(stopRequest.getContainerId());
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
index 0d87995..17f8a87 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
@@ -151,12 +151,12 @@ public class TezTestServiceTaskSchedulerService extends TaskScheduler {
@Override
public void blacklistNode(NodeId nodeId) {
- LOG.info("DEBUG: BlacklistNode not supported");
+ LOG.info("BlacklistNode not supported");
}
@Override
public void unblacklistNode(NodeId nodeId) {
- LOG.info("DEBUG: unBlacklistNode not supported");
+ LOG.info("unBlacklistNode not supported");
}
@Override
@@ -195,7 +195,7 @@ public class TezTestServiceTaskSchedulerService extends TaskScheduler {
@Override
public Object deallocateContainer(ContainerId containerId) {
- LOG.info("DEBUG: Ignoring deallocateContainer for containerId: " + containerId);
+ LOG.info("Ignoring deallocateContainer for containerId: " + containerId);
return null;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
index f3fc442..472a43c 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
@@ -173,7 +173,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
throw new TezException(e);
}
}
- LOG.info("DEBUG: Dirs are: " + Arrays.toString(localDirs));
+ LOG.info("Dirs for {} are {}", request.getContainerIdString(), Arrays.toString(localDirs));
// Setup workingDir. This is otherwise setup as Environment.PWD
@@ -193,7 +193,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
// TODO Unregistering does not happen at the moment, since there's no signals on when an app completes.
- LOG.info("DEBUG: Registering request with the ShuffleHandler");
+ LOG.info("Registering request with the ShuffleHandler for containerId {}", request.getContainerIdString());
ShuffleHandler.get().registerApplication(request.getApplicationIdString(), jobToken, request.getUser());
@@ -255,7 +255,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
// TODO Unregistering does not happen at the moment, since there's no signals on when an app completes.
- LOG.info("DEBUG: Registering request with the ShuffleHandler");
+ LOG.info("Registering request with the ShuffleHandler for containerId {}", request.getContainerIdString());
ShuffleHandler.get().registerApplication(request.getApplicationIdString(), jobToken, request.getUser());
TaskRunnerCallable callable = new TaskRunnerCallable(request, new Configuration(getConfig()),
new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs,
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
index 2c52ae3..3701455 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
@@ -130,8 +130,6 @@ public class TestExternalTezServices {
confForJobs.set(entry.getKey(), entry.getValue());
}
- // TODO TEZ-2003 Once per vertex configuration is possible, run separate tests for push vs pull (regular threaded execution)
-
Path stagingDirPath = new Path("/tmp/tez-staging-dir");
remoteFs.mkdirs(stagingDirPath);
// This is currently configured to push tasks into the Service, and then use the standard RPC
http://git-wip-us.apache.org/repos/asf/tez/blob/f30a3fe3/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
index 47a61ab..9a5a3ab 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
@@ -26,8 +26,6 @@ import org.apache.tez.runtime.task.ErrorReporter;
public interface TaskReporterInterface {
- // TODO TEZ-2003 Consolidate private API usage if making this public
-
void registerTask(RuntimeTask task, ErrorReporter errorReporter);
void unregisterTask(TezTaskAttemptID taskAttemptId);
@@ -43,4 +41,4 @@ public interface TaskReporterInterface {
void shutdown();
-}
+}
\ No newline at end of file