You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/22 09:26:05 UTC
[06/50] [abbrv] tez git commit: TEZ-2285. Allow TaskCommunicators to
indicate task/container liveness. (sseth)
TEZ-2285. Allow TaskCommunicators to indicate task/container liveness. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/952980a1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/952980a1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/952980a1
Branch: refs/heads/master
Commit: 952980a1a2c7ee83d85de336d7d64d5f7114f864
Parents: bedadc7
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Apr 7 13:22:09 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 21 18:13:54 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../apache/tez/dag/api/TaskCommunicatorContext.java | 4 ++++
.../tez/dag/app/TaskAttemptListenerImpTezDag.java | 10 ++++++++++
.../apache/tez/dag/app/TezTaskCommunicatorImpl.java | 16 +++++++++-------
4 files changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/952980a1/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index e2c428d..9d6b220 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -14,5 +14,6 @@ ALL CHANGES:
TEZ-2241. Miscellaneous fixes after last reabse.
TEZ-2283. Fixes after rebase 04/07.
TEZ-2284. Separate TaskReporter into an interface.
+ TEZ-2285. Allow TaskCommunicators to indicate task/container liveness.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/952980a1/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index a85fb7f..0c3bac3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -43,6 +43,10 @@ public interface TaskCommunicatorContext {
boolean isKnownContainer(ContainerId containerId);
+ void taskAlive(TezTaskAttemptID taskAttemptId);
+
+ void containerAlive(ContainerId containerId);
+
// TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt*
void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId);
http://git-wip-us.apache.org/repos/asf/tez/blob/952980a1/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 3798b6f..a6994d2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -258,6 +258,16 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
@Override
+ public void taskAlive(TezTaskAttemptID taskAttemptId) {
+ taskHeartbeatHandler.pinged(taskAttemptId);
+ }
+
+ @Override
+ public void containerAlive(ContainerId containerId) {
+ pingContainerHeartbeatHandler(containerId);
+ }
+
+ @Override
public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) {
context.getEventHandler()
.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null));
http://git-wip-us.apache.org/repos/asf/tez/blob/952980a1/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index bba06fd..a4a707b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -65,17 +65,19 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
null, true, null, null, false);
private final TaskCommunicatorContext taskCommunicatorContext;
+ private final TezTaskUmbilicalProtocol taskUmbilical;
- private final ConcurrentMap<ContainerId, ContainerInfo> registeredContainers =
+ protected final ConcurrentMap<ContainerId, ContainerInfo> registeredContainers =
new ConcurrentHashMap<ContainerId, ContainerInfo>();
- private final ConcurrentMap<TaskAttempt, ContainerId> attemptToContainerMap =
+ protected final ConcurrentMap<TaskAttempt, ContainerId> attemptToContainerMap =
new ConcurrentHashMap<TaskAttempt, ContainerId>();
- private final TezTaskUmbilicalProtocol taskUmbilical;
- private final String tokenIdentifier;
- private final Token<JobTokenIdentifier> sessionToken;
+
+ protected final String tokenIdentifier;
+ protected final Token<JobTokenIdentifier> sessionToken;
protected InetSocketAddress address;
- private Server server;
+
+ protected volatile Server server;
public static final class ContainerInfo {
@@ -440,7 +442,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
// Holder for Task information, which eventually will likely be VertexImplm taskIndex, attemptIndex
- private static class TaskAttempt {
+ protected static class TaskAttempt {
// TODO TEZ-2003 Change this to work with VertexName, int id, int version
// TODO TEZ-2003 Avoid constructing this unit all over the place
private TezTaskAttemptID taskAttemptId;