You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/11 00:28:36 UTC
git commit: TEZ-426. Changes in YarnTezDagChild for logical task
using TaskSpec. (hitesh)
Updated Branches:
refs/heads/TEZ-398 da0cca381 -> ee47464dd
TEZ-426. Changes in YarnTezDagChild for logical task using TaskSpec. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/ee47464d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/ee47464d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/ee47464d
Branch: refs/heads/TEZ-398
Commit: ee47464ddb9a2bce75fdc8805a8786416d0c6078
Parents: da0cca3
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Sep 10 15:28:15 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Sep 10 15:28:15 2013 -0700
----------------------------------------------------------------------
.../apache/hadoop/mapred/YarnTezDagChild.java | 274 +++++++++++++++----
.../dag/app/TaskAttemptListenerImpTezDag.java | 25 +-
.../engine/newapi/events/InputFailedEvent.java | 9 +
tez-engine-api/src/main/proto/Events.proto | 12 +-
.../tez/common/TezTaskUmbilicalProtocol.java | 4 +-
.../events/TaskAttemptCompletedEvent.java | 28 ++
.../newapi/events/TaskAttemptFailedEvent.java | 35 +++
.../engine/newapi/events/TaskFailedEvent.java | 35 ---
.../tez/engine/newapi/impl/EventType.java | 3 +-
.../apache/tez/engine/newapi/impl/TezEvent.java | 78 +++++-
.../engine/newapi/impl/TezHeartbeatRequest.java | 9 +-
.../newapi/impl/TezHeartbeatResponse.java | 38 ++-
.../engine/newapi/impl/TezInputContextImpl.java | 19 +-
.../newapi/impl/TezOutputContextImpl.java | 18 +-
.../newapi/impl/TezProcessorContextImpl.java | 18 +-
.../engine/newapi/impl/TezTaskContextImpl.java | 8 +-
.../tez/engine/newapi/impl/TezUmbilical.java | 197 +------------
.../LogicalIOProcessorRuntimeTask.java | 38 ++-
tez-engine/src/main/proto/Events.proto | 5 +-
.../apache/hadoop/mapred/LocalJobRunnerTez.java | 12 +-
.../tez/mapreduce/TestUmbilicalProtocol.java | 11 +-
21 files changed, 531 insertions(+), 345 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 2bac9b6..3141910 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -26,6 +26,14 @@ import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSError;
@@ -39,6 +47,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
@@ -56,9 +65,9 @@ import org.apache.tez.common.counters.Limits;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.api.Task;
import org.apache.tez.engine.common.objectregistry.ObjectLifeCycle;
import org.apache.tez.engine.common.objectregistry.ObjectRegistryImpl;
import org.apache.tez.engine.common.objectregistry.ObjectRegistryModule;
@@ -67,7 +76,11 @@ import org.apache.tez.engine.common.security.TokenCache;
import org.apache.tez.engine.newapi.impl.InputSpec;
import org.apache.tez.engine.newapi.impl.OutputSpec;
import org.apache.tez.engine.newapi.impl.TaskSpec;
-import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.engine.newapi.impl.TezEvent;
+import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
+import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
+import org.apache.tez.engine.newapi.impl.TezUmbilical;
+import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.mapreduce.input.SimpleInput;
import org.apache.tez.mapreduce.output.SimpleOutput;
@@ -82,6 +95,140 @@ public class YarnTezDagChild {
private static final Logger LOG = Logger.getLogger(YarnTezDagChild.class);
+ private static AtomicBoolean stopped = new AtomicBoolean(false);
+
+ private static String containerIdStr;
+ private static int eventCounter = 0;
+ private static int maxEventsToGet = 0;
+ private static LinkedList<TezEvent> eventsToSend =
+ new LinkedList<TezEvent>();
+ private static ConcurrentLinkedQueue<TezEvent> eventsToBeProcessed =
+ new ConcurrentLinkedQueue<TezEvent>();
+ private static AtomicLong requestCounter = new AtomicLong(0);
+ private static TezTaskAttemptID currentTaskAttemptID;
+ private static long amPollInterval;
+ private static TezTaskUmbilicalProtocol umbilical;
+ private static Object eventLock = new Object();
+ private static ReentrantReadWriteLock taskLock = new ReentrantReadWriteLock();
+ private static LogicalIOProcessorRuntimeTask currentTask = null;
+
+ private static Thread startHeartbeatThread() {
+ Thread heartbeatThread = new Thread(new Runnable() {
+ public void run() {
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+ try {
+ Thread.sleep(amPollInterval);
+ try {
+ heartbeat();
+ } catch (TezException e) {
+ LOG.error("Error communicating with AM: " + e.getMessage() , e);
+ // TODO TODONEWTEZ
+ } catch (InvalidToken e) {
+ LOG.error("Error in authenticating with AM: ", e);
+ // TODO TODONEWTEZ
+ } catch (Exception e) {
+ LOG.error("Error in heartbeating with AM. ", e);
+ // TODO TODONEWTEZ
+ }
+ } catch (InterruptedException e) {
+ if (!stopped.get()) {
+ LOG.warn("Heartbeat thread interrupted. Returning.");
+ }
+ return;
+ }
+ }
+ }
+ });
+ heartbeatThread.setName("Tez Container Heartbeat Thread ["
+ + containerIdStr + "]");
+ heartbeatThread.start();
+ return heartbeatThread;
+ }
+
+ private static Thread startRouterThread() {
+ Thread eventRouterThread = new Thread(new Runnable() {
+ public void run() {
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+ while (true) {
+ try {
+ taskLock.readLock().lock();
+ if (currentTask != null) {
+ break;
+ }
+ } finally {
+ taskLock.readLock().unlock();
+ }
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ if (!stopped.get()) {
+ LOG.warn("Event Router thread interrupted. Returning.");
+ }
+ return;
+ }
+ }
+ try {
+ TezEvent e = eventsToBeProcessed.poll();
+ if (e == null) {
+ eventsToBeProcessed.wait();
+ continue;
+ }
+ // TODO TODONEWTEZ
+ if (!e.getDestinationInfo().getTaskAttemptID().equals(
+ currentTaskAttemptID)) {
+ // error? or block?
+ continue;
+ }
+ try {
+ taskLock.readLock().lock();
+ if (currentTask != null) {
+ currentTask.handleEvent(e);
+ }
+ } finally {
+ taskLock.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ if (!stopped.get()) {
+ LOG.warn("Event Router thread interrupted. Returning.");
+ }
+ return;
+ }
+ }
+ }
+ });
+ eventRouterThread.setName("Tez Container Event Router Thread ["
+ + containerIdStr + "]");
+ eventRouterThread.start();
+ return eventRouterThread;
+ }
+
+ private static void heartbeat() throws TezException, IOException {
+ try {
+ taskLock.readLock().lock();
+ if (currentTask == null) {
+ return;
+ }
+ } finally {
+ taskLock.readLock().unlock();
+ }
+ synchronized (eventLock) {
+ List<TezEvent> events = new ArrayList<TezEvent>();
+ events.addAll(eventsToSend);
+ long reqId = requestCounter.incrementAndGet();
+ TezHeartbeatRequest request = new TezHeartbeatRequest(reqId, events,
+ currentTaskAttemptID, eventCounter, maxEventsToGet);
+ TezHeartbeatResponse response = umbilical.heartbeat(request);
+ if (response.getLastRequestId() != reqId) {
+ // TODO TODONEWTEZ
+ throw new TezException("AM and Task out of sync");
+ }
+ eventsToSend.clear();
+ eventCounter += response.getEvents().size();
+ eventsToBeProcessed.addAll(response.getEvents());
+ eventsToBeProcessed.notifyAll();
+ }
+ }
+
public static void main(String[] args) throws Throwable {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
if (LOG.isDebugEnabled()) {
@@ -90,8 +237,8 @@ public class YarnTezDagChild {
final Configuration defaultConf = new Configuration();
TezUtils.addUserSpecifiedTezConfiguration(defaultConf);
- // Security settings will be loaded based on core-site and core-default. Don't
- // depend on the jobConf for this.
+ // Security settings will be loaded based on core-site and core-default.
+ // Don't depend on the jobConf for this.
UserGroupInformation.setConfiguration(defaultConf);
Limits.setConfiguration(defaultConf);
@@ -127,6 +274,13 @@ public class YarnTezDagChild {
}
}
+ amPollInterval = defaultConf.getLong(
+ TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
+ TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
+ maxEventsToGet = defaultConf.getInt(
+ TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
+ TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT);
+
// Create TaskUmbilicalProtocol as actual task owner.
UserGroupInformation taskOwner =
UserGroupInformation.createRemoteUser(tokenIdentifier);
@@ -134,7 +288,7 @@ public class YarnTezDagChild {
Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(credentials);
SecurityUtil.setTokenService(jobToken, address);
taskOwner.addToken(jobToken);
- final TezTaskUmbilicalProtocol umbilical =
+ umbilical =
taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
@Override
public TezTaskUmbilicalProtocol run() throws Exception {
@@ -143,6 +297,18 @@ public class YarnTezDagChild {
}
});
+ Thread heartbeatThread = startHeartbeatThread();
+ Thread eventRouterThread = startRouterThread();
+
+ TezUmbilical tezUmbilical = new TezUmbilical() {
+ @Override
+ public void addEvents(Collection<TezEvent> events) {
+ synchronized (eventLock) {
+ eventsToSend.addAll(events);
+ }
+ }
+ };
+
// report non-pid to application master
String pid = System.getenv().get("JVM_PID");
if (LOG.isDebugEnabled()) {
@@ -151,7 +317,6 @@ public class YarnTezDagChild {
TaskSpec taskSpec = null;
ContainerTask containerTask = null;
UserGroupInformation childUGI = null;
- TezTaskAttemptID taskAttemptId = null;
ContainerContext containerContext = new ContainerContext(
containerIdentifier, pid);
int getTaskMaxSleepTime = defaultConf.getInt(
@@ -174,8 +339,9 @@ public class YarnTezDagChild {
}
LOG.info("TaskInfo: shouldDie: "
+ containerTask.shouldDie()
- + (containerTask.shouldDie() == true ? "" : ", taskAttemptId: "
- + containerTask.getTaskSpec().getTaskAttemptID()));
+ + (containerTask.shouldDie() == true ?
+ "" : ", currentTaskAttemptId: "
+ + containerTask.getTaskSpec().getTaskAttemptID()));
if (containerTask.shouldDie()) {
return;
@@ -186,25 +352,29 @@ public class YarnTezDagChild {
LOG.debug("New container task context:"
+ taskSpec.toString());
}
- taskAttemptId = taskSpec.getTaskAttemptID();
- TezVertexID newVertexId = taskAttemptId.getTaskID().getVertexID();
- if (currentVertexId != null) {
- if (!currentVertexId.equals(newVertexId)) {
- objectRegistry.clearCache(ObjectLifeCycle.VERTEX);
- }
- if (!currentVertexId.getDAGId().equals(newVertexId.getDAGId())) {
- objectRegistry.clearCache(ObjectLifeCycle.DAG);
+ try {
+ taskLock.writeLock().lock();
+ currentTaskAttemptID = taskSpec.getTaskAttemptID();
+ TezVertexID newVertexId =
+ currentTaskAttemptID.getTaskID().getVertexID();
+
+ if (currentVertexId != null) {
+ if (!currentVertexId.equals(newVertexId)) {
+ objectRegistry.clearCache(ObjectLifeCycle.VERTEX);
+ }
+ if (!currentVertexId.getDAGId().equals(newVertexId.getDAGId())) {
+ objectRegistry.clearCache(ObjectLifeCycle.DAG);
+ }
}
+ currentVertexId = newVertexId;
+ updateLoggers(currentTaskAttemptID);
+ currentTask = createLogicalTask(
+ taskSpec, defaultConf, tezUmbilical);
+ } finally {
+ taskLock.writeLock().unlock();
}
- currentVertexId = newVertexId;
-
- updateLoggers(taskAttemptId);
-
- final Task t = createAndConfigureTezTask(taskSpec, umbilical,
- credentials, jobToken, attemptNumber);
- final Configuration conf = ((RuntimeTask)t).getConfiguration();
// TODO Initiate Java VM metrics
// JvmMetrics.initSingleton(containerId.toString(), job.getSessionId());
@@ -216,7 +386,15 @@ public class YarnTezDagChild {
childUGI.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
- runTezTask(t, umbilical, conf); // run the task
+ currentTask.initialize();
+ currentTask.run();
+ currentTask.close();
+ try {
+ taskLock.writeLock().lock();
+ currentTask = null;
+ } finally {
+ taskLock.writeLock().unlock();
+ }
return null;
}
});
@@ -225,15 +403,18 @@ public class YarnTezDagChild {
}
} catch (FSError e) {
LOG.fatal("FSError from child", e);
- umbilical.fsError(taskAttemptId, e.getMessage());
+ umbilical.fsError(currentTaskAttemptID, e.getMessage());
} catch (Throwable throwable) {
LOG.fatal("Error running child : "
+ StringUtils.stringifyException(throwable));
- if (taskAttemptId != null) {
+ if (currentTaskAttemptID != null) {
String cause = StringUtils.stringifyException(throwable);
- umbilical.fatalError(taskAttemptId, cause);
+ umbilical.fatalError(currentTaskAttemptID, cause);
}
} finally {
+ stopped.set(true);
+ eventRouterThread.join();
+ heartbeatThread.join();
RPC.stopProxy(umbilical);
DefaultMetricsSystem.shutdown();
// Shutting down log4j of the child-vm...
@@ -297,17 +478,15 @@ public class YarnTezDagChild {
conf.set(TezJobConfig.JOB_LOCAL_DIR, workDir.toString());
}
- private static Task createAndConfigureTezTask(
- TaskSpec taskSpec, TezTaskUmbilicalProtocol master,
- Credentials cxredentials, Token<JobTokenIdentifier> jobToken,
- int appAttemptId) throws IOException, InterruptedException {
-
- Configuration conf = new Configuration(false);
- // set tcp nodelay
- conf.setBoolean("ipc.client.tcpnodelay", true);
- conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, appAttemptId);
+ private static LogicalIOProcessorRuntimeTask createLogicalTask(
+ TaskSpec taskSpec, Configuration conf,
+ TezUmbilical tezUmbilical) throws IOException {
+ // FIXME TODONEWTEZ
+ // conf.setBoolean("ipc.client.tcpnodelay", true);
+ // conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, appAttemptId);
configureLocalDirs(conf);
+ FileSystem.get(conf).setWorkingDirectory(getWorkingDirectory(conf));
// FIXME need Input/Output vertices else we have this hack
if (taskSpec.getInputs().isEmpty()) {
@@ -326,29 +505,10 @@ public class YarnTezDagChild {
taskSpec.getOutputs().add(
new OutputSpec("null", simpleOutputDesc, 0));
}
- Task t = null;
-
- // FIXME TODONEWTEZ
-
- // RuntimeUtils.createRuntimeTask(taskSpec);
- // t.initialize(conf, taskSpec.getProcessorUserPayload(), master);
-
- // FIXME wrapper should initialize all of processor, inputs and outputs
- // Currently, processor is inited via task init
- // and processor then inits inputs and outputs
- return t;
+ return new LogicalIOProcessorRuntimeTask(taskSpec, conf,
+ tezUmbilical);
}
- private static void runTezTask(
- Task t, TezTaskUmbilicalProtocol master, Configuration conf)
- throws IOException, InterruptedException {
- // use job-specified working directory
- FileSystem.get(conf).setWorkingDirectory(getWorkingDirectory(conf));
-
- // Run!
- t.run();
- t.close();
- }
private static Path getWorkingDirectory(Configuration conf) {
String name = conf.get(JobContext.WORKING_DIR);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 6d64a58..2c242de 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -59,7 +59,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerTask;
import org.apache.tez.dag.app.security.authorize.MRAMPolicyProvider;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.common.security.JobTokenSecretManager;
-import org.apache.tez.engine.newapi.events.TaskFailedEvent;
+import org.apache.tez.engine.newapi.events.TaskAttemptFailedEvent;
import org.apache.tez.engine.newapi.impl.TezEvent;
import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
@@ -577,10 +577,11 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
@Override
- public void taskFailed(TezTaskAttemptID taskAttemptId,
- TezEvent tezEvent) throws IOException {
- TaskFailedEvent taskFailedEvent = (TaskFailedEvent) tezEvent.getEvent();
- LOG.fatal("Task: " + taskAttemptId + " - failed : "
+ public void taskAttemptFailed(TezTaskAttemptID taskAttemptId,
+ TezEvent tezAttemptFailedEvent) throws IOException {
+ TaskAttemptFailedEvent taskFailedEvent =
+ (TaskAttemptFailedEvent) tezAttemptFailedEvent.getEvent();
+ LOG.fatal("Task Attempt: " + taskAttemptId + " - failed : "
+ taskFailedEvent.getDiagnostics());
reportDiagnosticInfo(taskAttemptId, "Error: "
+ taskFailedEvent.getDiagnostics());
@@ -590,4 +591,18 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
+ @Override
+ public void taskAttemptCompleted(TezTaskAttemptID taskAttemptId,
+ TezEvent taskAttemptCompletedEvent) throws IOException {
+ LOG.info("Task attempt completed acknowledgement from "
+ + taskAttemptId.toString());
+
+ taskHeartbeatHandler.progressing(taskAttemptId);
+ pingContainerHeartbeatHandler(taskAttemptId);
+
+ context.getEventHandler().handle(
+ new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_DONE));
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputFailedEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputFailedEvent.java
index ac49250..042590e 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputFailedEvent.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputFailedEvent.java
@@ -56,6 +56,15 @@ public class InputFailedEvent extends Event{
this.sourceIndex = sourceIndex;
}
+ @Private
+ public InputFailedEvent(int sourceIndex,
+ int targetIndex,
+ int version) {
+ this.sourceIndex = sourceIndex;
+ this.targetIndex = targetIndex;
+ this.version = version;
+ }
+
public int getSourceIndex() {
return sourceIndex;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine-api/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/proto/Events.proto b/tez-engine-api/src/main/proto/Events.proto
index 3651c35..21cacf6 100644
--- a/tez-engine-api/src/main/proto/Events.proto
+++ b/tez-engine-api/src/main/proto/Events.proto
@@ -27,8 +27,18 @@ message DataMovementEventProto {
optional int32 version = 4;
}
-message InputDataErrorEventProto {
+message InputReadErrorEventProto {
optional int32 index = 1;
optional string diagnostics = 2;
optional int32 version = 3;
}
+
+message InputFailedEventProto {
+ optional int32 source_index = 1;
+ optional int32 target_index = 2;
+ optional int32 version = 4;
+}
+
+message InputInformationEventProto {
+ optional bytes user_payload = 1;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java b/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
index af2193c..c1289e6 100644
--- a/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
+++ b/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
@@ -87,7 +87,9 @@ public interface TezTaskUmbilicalProtocol extends Master {
public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
throws IOException, TezException;
- public void taskFailed(TezTaskAttemptID attemptID,
+ public void taskAttemptFailed(TezTaskAttemptID attemptID,
TezEvent taskFailedEvent) throws IOException;
+ public void taskAttemptCompleted(TezTaskAttemptID attemptID,
+ TezEvent taskAttemptCompletedEvent) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptCompletedEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptCompletedEvent.java
new file mode 100644
index 0000000..d3a582d
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptCompletedEvent.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.events;
+
+import org.apache.tez.engine.newapi.Event;
+
+public class TaskAttemptCompletedEvent extends Event {
+
+ public TaskAttemptCompletedEvent() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptFailedEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptFailedEvent.java
new file mode 100644
index 0000000..772d7fe
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptFailedEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.events;
+
+import org.apache.tez.engine.newapi.Event;
+
+public class TaskAttemptFailedEvent extends Event {
+
+ private final String diagnostics;
+
+ public TaskAttemptFailedEvent(String diagnostics) {
+ this.diagnostics = diagnostics;
+ }
+
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskFailedEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskFailedEvent.java
deleted file mode 100644
index ddd346f..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskFailedEvent.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.events;
-
-import org.apache.tez.engine.newapi.Event;
-
-public class TaskFailedEvent extends Event {
-
- private final String diagnostics;
-
- public TaskFailedEvent(String diagnostics) {
- this.diagnostics = diagnostics;
- }
-
- public String getDiagnostics() {
- return diagnostics;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
index 5b71d70..51a1b24 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
@@ -19,7 +19,8 @@
package org.apache.tez.engine.newapi.impl;
public enum EventType {
- TASK_FAILED_EVENT,
+ TASK_ATTEMPT_COMPLETED_EVENT,
+ TASK_ATTEMPT_FAILED_EVENT,
DATA_MOVEMENT_EVENT,
INPUT_READ_ERROR_EVENT,
INPUT_FAILED_EVENT,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
index da3e551..f8cc3ed 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
@@ -25,12 +25,18 @@ import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.engine.api.events.EventProtos.DataMovementEventProto;
-import org.apache.tez.engine.api.events.EventProtos.InputDataErrorEventProto;
-import org.apache.tez.engine.api.events.SystemEventProtos.TaskFailedEventProto;
+import org.apache.tez.engine.api.events.EventProtos.InputFailedEventProto;
+import org.apache.tez.engine.api.events.EventProtos.InputInformationEventProto;
+import org.apache.tez.engine.api.events.EventProtos.InputReadErrorEventProto;
+import org.apache.tez.engine.api.events.SystemEventProtos.TaskAttemptCompletedEventProto;
+import org.apache.tez.engine.api.events.SystemEventProtos.TaskAttemptFailedEventProto;
import org.apache.tez.engine.newapi.Event;
import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.newapi.events.InputInformationEvent;
import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
-import org.apache.tez.engine.newapi.events.TaskFailedEvent;
+import org.apache.tez.engine.newapi.events.TaskAttemptCompletedEvent;
+import org.apache.tez.engine.newapi.events.TaskAttemptFailedEvent;
import com.google.protobuf.ByteString;
@@ -54,8 +60,14 @@ public class TezEvent implements Writable {
eventType = EventType.DATA_MOVEMENT_EVENT;
} else if (event instanceof InputReadErrorEvent) {
eventType = EventType.INPUT_READ_ERROR_EVENT;
- } else if (event instanceof TaskFailedEvent) {
- eventType = EventType.TASK_FAILED_EVENT;
+ } else if (event instanceof TaskAttemptFailedEvent) {
+ eventType = EventType.TASK_ATTEMPT_FAILED_EVENT;
+ } else if (event instanceof TaskAttemptCompletedEvent) {
+ eventType = EventType.TASK_ATTEMPT_COMPLETED_EVENT;
+ } else if (event instanceof InputInformationEvent) {
+ eventType = EventType.INTPUT_INFORMATION_EVENT;
+ } else if (event instanceof InputFailedEvent) {
+ eventType = EventType.INPUT_FAILED_EVENT;
} else {
throw new TezUncheckedException("Unknown event, event="
+ event.getClass().getName());
@@ -104,17 +116,35 @@ public class TezEvent implements Writable {
break;
case INPUT_READ_ERROR_EVENT:
InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
- eventBytes = InputDataErrorEventProto.newBuilder()
+ eventBytes = InputReadErrorEventProto.newBuilder()
.setIndex(ideEvt.getIndex())
.setDiagnostics(ideEvt.getDiagnostics())
.build().toByteArray();
break;
- case TASK_FAILED_EVENT:
- TaskFailedEvent tfEvt = (TaskFailedEvent) event;
- eventBytes = TaskFailedEventProto.newBuilder()
+ case TASK_ATTEMPT_FAILED_EVENT:
+ TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event;
+ eventBytes = TaskAttemptFailedEventProto.newBuilder()
.setDiagnostics(tfEvt.getDiagnostics())
.build().toByteArray();
break;
+ case TASK_ATTEMPT_COMPLETED_EVENT:
+ eventBytes = TaskAttemptCompletedEventProto.newBuilder()
+ .build().toByteArray();
+ break;
+ case INPUT_FAILED_EVENT:
+ InputFailedEvent ifEvt = (InputFailedEvent) event;
+ eventBytes = InputFailedEventProto.newBuilder()
+ .setSourceIndex(ifEvt.getSourceIndex())
+ .setTargetIndex(ifEvt.getTargetIndex())
+ .setVersion(ifEvt.getVersion()).build().toByteArray();
+ case INTPUT_INFORMATION_EVENT:
+ InputInformationEvent iEvt = (InputInformationEvent) event;
+ eventBytes = InputInformationEventProto.newBuilder()
+ .setUserPayload(ByteString.copyFrom(iEvt.getUserPayload()))
+ .build().toByteArray();
+ default:
+ throw new TezUncheckedException("Unknown TezEvent"
+ + ", type=" + eventType);
}
out.writeInt(eventType.ordinal());
out.writeInt(eventBytes.length);
@@ -138,16 +168,34 @@ public class TezEvent implements Writable {
dmProto.getUserPayload().toByteArray());
break;
case INPUT_READ_ERROR_EVENT:
- InputDataErrorEventProto ideProto =
- InputDataErrorEventProto.parseFrom(eventBytes);
+ InputReadErrorEventProto ideProto =
+ InputReadErrorEventProto.parseFrom(eventBytes);
event = new InputReadErrorEvent(ideProto.getDiagnostics(),
ideProto.getIndex(), ideProto.getVersion());
break;
- case TASK_FAILED_EVENT:
- TaskFailedEventProto tfProto =
- TaskFailedEventProto.parseFrom(eventBytes);
- event = new TaskFailedEvent(tfProto.getDiagnostics());
+ case TASK_ATTEMPT_FAILED_EVENT:
+ TaskAttemptFailedEventProto tfProto =
+ TaskAttemptFailedEventProto.parseFrom(eventBytes);
+ event = new TaskAttemptFailedEvent(tfProto.getDiagnostics());
+ break;
+ case TASK_ATTEMPT_COMPLETED_EVENT:
+ event = new TaskAttemptCompletedEvent();
+ break;
+ case INPUT_FAILED_EVENT:
+ InputFailedEventProto ifProto =
+ InputFailedEventProto.parseFrom(eventBytes);
+ event = new InputFailedEvent(ifProto.getSourceIndex(),
+ ifProto.getTargetIndex(), ifProto.getVersion());
+ break;
+ case INTPUT_INFORMATION_EVENT:
+ InputInformationEventProto infoProto =
+ InputInformationEventProto.parseFrom(eventBytes);
+ event = new InputInformationEvent(
+ infoProto.getUserPayload().toByteArray());
break;
+ default:
+ throw new TezUncheckedException("Unknown TezEvent"
+ + ", type=" + eventType);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
index cda456c..e47f14b 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
@@ -35,11 +35,12 @@ public class TezHeartbeatRequest implements Writable {
private TezTaskAttemptID currentTaskAttemptID;
private int startIndex;
private int maxEvents;
+ private long requestId;
public TezHeartbeatRequest() {
}
- public TezHeartbeatRequest(List<TezEvent> events,
+ public TezHeartbeatRequest(long requestId, List<TezEvent> events,
TezTaskAttemptID taskAttemptID,
int startIndex, int maxEvents) {
this.events = Collections.unmodifiableList(events);
@@ -60,6 +61,10 @@ public class TezHeartbeatRequest implements Writable {
return maxEvents;
}
+ public long getRequestId() {
+ return requestId;
+ }
+
public TezTaskAttemptID getCurrentTaskAttemptID() {
return currentTaskAttemptID;
}
@@ -78,6 +83,7 @@ public class TezHeartbeatRequest implements Writable {
}
out.writeInt(startIndex);
out.writeInt(maxEvents);
+ out.writeLong(requestId);
}
@Override
@@ -97,6 +103,7 @@ public class TezHeartbeatRequest implements Writable {
}
startIndex = in.readInt();
maxEvents = in.readInt();
+ requestId = in.readLong();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
index 35c961b..572c7b6 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
@@ -18,13 +18,22 @@
package org.apache.tez.engine.newapi.impl;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.hadoop.io.Writable;
-public class TezHeartbeatResponse {
+public class TezHeartbeatResponse implements Writable {
- private final List<TezEvent> events;
+ private long lastRequestId;
+ private List<TezEvent> events;
+
+ public TezHeartbeatResponse() {
+ }
public TezHeartbeatResponse(List<TezEvent> events) {
this.events = Collections.unmodifiableList(events);
@@ -34,4 +43,29 @@ public class TezHeartbeatResponse {
return events;
}
+ public long getLastRequestId() {
+ return lastRequestId;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(lastRequestId);
+ out.writeInt(events.size());
+ for (TezEvent e : events) {
+ e.write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ lastRequestId = in.readLong();
+ int eventCount = in.readInt();
+ events = new ArrayList<TezEvent>(eventCount);
+ for (int i = 0; i < eventCount; ++i) {
+ TezEvent e = new TezEvent();
+ e.readFields(in);
+ events.add(e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
index 567057a..b4558d0 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
@@ -18,6 +18,7 @@
package org.apache.tez.engine.newapi.impl;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -26,26 +27,38 @@ import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.newapi.Event;
import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.newapi.impl.EventMetaData.EventGenerator;
public class TezInputContextImpl extends TezTaskContextImpl
implements TezInputContext {
private final byte[] userPayload;
private final String sourceVertexName;
+ private final TezUmbilical tezUmbilical;
+ private final EventMetaData sourceInfo;
@Private
- public TezInputContextImpl(Configuration conf, String taskVertexName,
+ public TezInputContextImpl(Configuration conf,
+ TezUmbilical tezUmbilical, String taskVertexName,
String sourceVertexName, TezTaskAttemptID taskAttemptID,
TezCounters counters, byte[] userPayload) {
super(conf, taskVertexName, taskAttemptID, counters);
+ this.tezUmbilical = tezUmbilical;
this.userPayload = userPayload;
this.sourceVertexName = sourceVertexName;
+ this.sourceInfo = new EventMetaData(
+ EventGenerator.INPUT, taskVertexName, sourceVertexName,
+ taskAttemptID);
}
@Override
public void sendEvents(List<Event> events) {
- // TODO Auto-generated method stub
-
+ List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+ for (Event e : events) {
+ TezEvent tEvt = new TezEvent(e, sourceInfo);
+ tezEvents.add(tEvt);
+ }
+ tezUmbilical.addEvents(tezEvents);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
index a2ce60b..ba632db 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
@@ -18,6 +18,7 @@
package org.apache.tez.engine.newapi.impl;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -26,27 +27,38 @@ import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.newapi.Event;
import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.newapi.impl.EventMetaData.EventGenerator;
public class TezOutputContextImpl extends TezTaskContextImpl
implements TezOutputContext {
private final byte[] userPayload;
private final String destinationVertexName;
+ private final TezUmbilical tezUmbilical;
+ private final EventMetaData sourceInfo;
@Private
- public TezOutputContextImpl(Configuration conf, String taskVertexName,
+ public TezOutputContextImpl(Configuration conf,
+ TezUmbilical tezUmbilical, String taskVertexName,
String destinationVertexName,
TezTaskAttemptID taskAttemptID, TezCounters counters,
byte[] userPayload) {
super(conf, taskVertexName, taskAttemptID, counters);
this.userPayload = userPayload;
this.destinationVertexName = destinationVertexName;
+ this.tezUmbilical = tezUmbilical;
+ this.sourceInfo = new EventMetaData(EventGenerator.OUTPUT, taskVertexName,
+ destinationVertexName, taskAttemptID);
}
@Override
public void sendEvents(List<Event> events) {
- // TODO Auto-generated method stub
-
+ List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+ for (Event e : events) {
+ TezEvent tEvt = new TezEvent(e, sourceInfo);
+ tezEvents.add(tEvt);
+ }
+ tezUmbilical.addEvents(tezEvents);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
index b987bfe..4e0f061 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
@@ -18,6 +18,7 @@
package org.apache.tez.engine.newapi.impl;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -25,23 +26,34 @@ import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.newapi.Event;
import org.apache.tez.engine.newapi.TezProcessorContext;
+import org.apache.tez.engine.newapi.impl.EventMetaData.EventGenerator;
public class TezProcessorContextImpl extends TezTaskContextImpl
implements TezProcessorContext {
private final byte[] userPayload;
+ private final TezUmbilical tezUmbilical;
+ private final EventMetaData sourceInfo;
- public TezProcessorContextImpl(Configuration tezConf, String vertexName,
+ public TezProcessorContextImpl(Configuration tezConf,
+ TezUmbilical tezUmbilical, String vertexName,
TezTaskAttemptID taskAttemptID, TezCounters counters,
byte[] userPayload) {
super(tezConf, vertexName, taskAttemptID, counters);
this.userPayload = userPayload;
+ this.tezUmbilical = tezUmbilical;
+ this.sourceInfo = new EventMetaData(EventGenerator.PROCESSOR,
+ taskVertexName, "", taskAttemptID);
}
@Override
public void sendEvents(List<Event> events) {
- // TODO Auto-generated method stub
-
+ List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+ for (Event e : events) {
+ TezEvent tEvt = new TezEvent(e, sourceInfo);
+ tezEvents.add(tEvt);
+ }
+ tezUmbilical.addEvents(tezEvents);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
index c89003e..712eec3 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
@@ -26,10 +26,10 @@ import org.apache.tez.engine.newapi.TezTaskContext;
public abstract class TezTaskContextImpl implements TezTaskContext {
- private final Configuration conf;
- private final String taskVertexName;
- private final TezTaskAttemptID taskAttemptID;
- private final TezCounters counters;
+ protected final Configuration conf;
+ protected final String taskVertexName;
+ protected final TezTaskAttemptID taskAttemptID;
+ protected final TezCounters counters;
@Private
public TezTaskContextImpl(Configuration conf,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
index 51daf06..c3065fe 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
@@ -18,203 +18,10 @@
package org.apache.tez.engine.newapi.impl;
-import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.tez.common.ContainerContext;
-import org.apache.tez.common.ContainerTask;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.records.TezTaskAttemptID;
+public interface TezUmbilical {
-/**
- * Interface to the RPC layer ( umbilical ) between the Tez AM and
- * a Tez Container's JVM.
- */
-public class TezUmbilical extends AbstractService {
-
- private static final Log LOG = LogFactory.getLog(TezUmbilical.class);
-
- private final TezTaskUmbilicalProtocol umbilical;
- private Thread heartbeatThread;
- private Thread eventRouterThread;
- private AtomicBoolean stopped = new AtomicBoolean(false);
- private long amPollInterval;
- private final String containerIdStr;
-
- private TezTaskAttemptID currentTaskAttemptID;
- private int eventCounter = 0;
- private int maxEventsToGet = 0;
- private LinkedList<TezEvent> eventsToSend;
- private ConcurrentLinkedQueue<TezEvent> eventsToBeProcessed;
-
- public TezUmbilical(TezTaskUmbilicalProtocol umbilical,
- String containerIdStr) {
- super(TezUmbilical.class.getName());
- this.umbilical = umbilical;
- this.containerIdStr = containerIdStr;
- this.eventsToSend = new LinkedList<TezEvent>();
- this.eventsToBeProcessed = new ConcurrentLinkedQueue<TezEvent>();
- }
-
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- amPollInterval = conf.getLong(
- TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
- TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
- maxEventsToGet = conf.getInt(
- TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
- TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT);
- super.serviceInit(conf);
- }
-
- @Override
- protected void serviceStart() throws Exception {
- startHeartbeatThread();
- startRouterThread();
- super.serviceStart();
- }
-
- @Override
- protected void serviceStop() throws Exception {
- stopped.set(true);
- eventRouterThread.interrupt();
- super.serviceStop();
- }
-
- private void startHeartbeatThread() {
- heartbeatThread = new Thread(new Runnable() {
- public void run() {
- while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
- try {
- Thread.sleep(amPollInterval);
- try {
- heartbeat();
- } catch (TezException e) {
- LOG.error("Error communicating with AM: " + e.getMessage() , e);
- // TODO TODONEWTEZ
- } catch (InvalidToken e) {
- LOG.error("Error in authencating with AM: ", e);
- // TODO TODONEWTEZ
- } catch (Exception e) {
- LOG.error("Error in heartbeating with AM. ", e);
- // TODO TODONEWTEZ
- }
- } catch (InterruptedException e) {
- if (!stopped.get()) {
- LOG.warn("Heartbeat thread interrupted. Returning.");
- }
- return;
- }
- }
- }
- });
- heartbeatThread.setName("Tez Container Heartbeat Thread ["
- + containerIdStr + "]");
- heartbeatThread.start();
- }
-
- private void startRouterThread() {
- eventRouterThread = new Thread(new Runnable() {
- public void run() {
- while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
- try {
- TezEvent e = eventsToBeProcessed.poll();
- if (e == null) {
- eventsToBeProcessed.wait();
- }
- // TODO TODONEWTEZ
- switch (e.getEventType()) {
- case DATA_MOVEMENT_EVENT:
- // redirect to input of current task
- if (!e.getDestinationInfo().getTaskAttemptID().equals(
- currentTaskAttemptID)) {
- // error? or block?
- }
- // route to appropriate input
- break;
- case TASK_FAILED_EVENT:
- // route to ???
- break;
- case INPUT_READ_ERROR_EVENT:
- // invalid event? ignore?
- break;
- }
- } catch (InterruptedException e) {
- if (!stopped.get()) {
- LOG.warn("Event Router thread interrupted. Returning.");
- }
- return;
- }
- }
- }
- });
- eventRouterThread.setName("Tez Container Event Router Thread ["
- + containerIdStr + "]");
- eventRouterThread.start();
- }
-
- private synchronized void heartbeat() throws TezException, IOException {
- List<TezEvent> events = new ArrayList<TezEvent>();
- events.addAll(eventsToSend);
- TezHeartbeatRequest request = new TezHeartbeatRequest(events,
- currentTaskAttemptID, eventCounter, maxEventsToGet);
- TezHeartbeatResponse response = umbilical.heartbeat(request);
- eventsToSend.clear();
- eventCounter += response.getEvents().size();
- eventsToBeProcessed.addAll(response.getEvents());
- eventsToBeProcessed.notifyAll();
- }
-
- /**
- * Hook to ask the Tez AM for the next task to be run on the Container
- * @return Next task to be run
- * @throws IOException
- */
- public synchronized ContainerTask getNextTask(
- ContainerContext containerContext) throws IOException {
- ContainerTask task = umbilical.getTask(containerContext);
- if (task.getTaskSpec().getTaskAttemptID() != currentTaskAttemptID) {
- currentTaskAttemptID = task.getTaskSpec().getTaskAttemptID();
- }
- return task;
- }
-
- /**
- * Hook to query the Tez AM whether a particular Task Attempt can commit its
- * output.
- * @param attemptID Attempt ID of the Task that is waiting to commit.
- * attempts can commit.
- * @throws IOException
- */
- public synchronized boolean canCommit(TezTaskAttemptID attemptID)
- throws IOException {
- return umbilical.canCommit(attemptID);
- }
-
- /**
- * Inform the Tez AM that an attempt has failed.
- * @param attemptID Task Attempt ID of the failed attempt.
- * @param taskFailedEvent Event with details on the attempt failure.
- * @throws IOException
- */
- public synchronized void taskFailed(TezTaskAttemptID attemptID,
- TezEvent taskFailedEvent) throws IOException {
- umbilical.taskFailed(attemptID, taskFailedEvent);
- }
+ public void addEvents(Collection<TezEvent> events);
- public synchronized void addEvents(Collection<TezEvent> events) {
- eventsToSend.addAll(events);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index 7b0eb45..f077831 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -44,9 +44,11 @@ import org.apache.tez.engine.newapi.TezProcessorContext;
import org.apache.tez.engine.newapi.impl.InputSpec;
import org.apache.tez.engine.newapi.impl.OutputSpec;
import org.apache.tez.engine.newapi.impl.TaskSpec;
+import org.apache.tez.engine.newapi.impl.TezEvent;
import org.apache.tez.engine.newapi.impl.TezInputContextImpl;
import org.apache.tez.engine.newapi.impl.TezOutputContextImpl;
import org.apache.tez.engine.newapi.impl.TezProcessorContextImpl;
+import org.apache.tez.engine.newapi.impl.TezUmbilical;
import com.google.common.base.Preconditions;
@@ -62,6 +64,7 @@ public class LogicalIOProcessorRuntimeTask {
private final TaskSpec taskSpec;
private final Configuration tezConf;
+ private final TezUmbilical tezUmbilical;
private final List<InputSpec> inputSpecs;
private final List<LogicalInput> inputs;
@@ -85,11 +88,13 @@ public class LogicalIOProcessorRuntimeTask {
private Map<String, List<Event>> closeInputEventMap;
private Map<String, List<Event>> closeOutputEventMap;
- public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, Configuration tezConf) {
+ public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, Configuration tezConf,
+ TezUmbilical tezUmbilical) {
LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
+ taskSpec);
this.taskSpec = taskSpec;
this.tezConf = tezConf;
+ this.tezUmbilical = tezUmbilical;
this.inputSpecs = taskSpec.getInputs();
this.inputs = createInputs(inputSpecs);
this.outputSpecs = taskSpec.getOutputs();
@@ -215,7 +220,7 @@ public class LogicalIOProcessorRuntimeTask {
private TezInputContext createInputContext(InputSpec inputSpec) {
TezInputContext inputContext = new TezInputContextImpl(tezConf,
- taskSpec.getVertexName(), inputSpec.getSourceVertexName(),
+ tezUmbilical, taskSpec.getVertexName(), inputSpec.getSourceVertexName(),
taskSpec.getTaskAttemptID(), tezCounters,
inputSpec.getInputDescriptor().getUserPayload());
return inputContext;
@@ -223,7 +228,8 @@ public class LogicalIOProcessorRuntimeTask {
private TezOutputContext createOutputContext(OutputSpec outputSpec) {
TezOutputContext outputContext = new TezOutputContextImpl(tezConf,
- taskSpec.getVertexName(), outputSpec.getDestinationVertexName(),
+ tezUmbilical, taskSpec.getVertexName(),
+ outputSpec.getDestinationVertexName(),
taskSpec.getTaskAttemptID(), tezCounters,
outputSpec.getOutputDescriptor().getUserPayload());
return outputContext;
@@ -231,8 +237,8 @@ public class LogicalIOProcessorRuntimeTask {
private TezProcessorContext createProcessorContext() {
TezProcessorContext processorContext = new TezProcessorContextImpl(tezConf,
- taskSpec.getVertexName(), taskSpec.getTaskAttemptID(), tezCounters,
- processorDescriptor.getUserPayload());
+ tezUmbilical, taskSpec.getVertexName(), taskSpec.getTaskAttemptID(),
+ tezCounters, processorDescriptor.getUserPayload());
return processorContext;
}
@@ -245,9 +251,9 @@ public class LogicalIOProcessorRuntimeTask {
if (input instanceof LogicalInput) {
inputs.add((LogicalInput) input);
} else {
- throw new TezUncheckedException(
- input.getClass().getName()
- + " is not a sub-type of LogicalInput. Only LogicalInput sub-types supported by a LogicalIOProcessor.");
+ throw new TezUncheckedException(input.getClass().getName()
+ + " is not a sub-type of LogicalInput."
+ + " Only LogicalInput sub-types supported by LogicalIOProcessor.");
}
}
@@ -263,9 +269,9 @@ public class LogicalIOProcessorRuntimeTask {
if (output instanceof LogicalOutput) {
outputs.add((LogicalOutput) output);
} else {
- throw new TezUncheckedException(
- output.getClass().getName()
- + " is not a sub-type of LogicalOutput. Only LogicalOutput sub-types supported by a LogicalIOProcessor.");
+ throw new TezUncheckedException(output.getClass().getName()
+ + " is not a sub-type of LogicalOutput."
+ + " Only LogicalOutput sub-types supported by LogicalIOProcessor.");
}
}
return outputs;
@@ -276,10 +282,14 @@ public class LogicalIOProcessorRuntimeTask {
Processor processor = RuntimeUtils.createClazzInstance(processorDescriptor
.getClassName());
if (!(processor instanceof LogicalIOProcessor)) {
- throw new TezUncheckedException(
- processor.getClass().getName()
- + " is not a sub-type of LogicalIOProcessor. Only LogicalIOProcessor sub-types supported at the moment");
+ throw new TezUncheckedException(processor.getClass().getName()
+ + " is not a sub-type of LogicalIOProcessor."
+ + " Only LogicalOutput sub-types supported by LogicalIOProcessor.");
}
return (LogicalIOProcessor) processor;
}
+
+ public void handleEvent(TezEvent e) {
+ // TODO TODONEWTEZ
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/proto/Events.proto b/tez-engine/src/main/proto/Events.proto
index fe6e449..fa9cb2c 100644
--- a/tez-engine/src/main/proto/Events.proto
+++ b/tez-engine/src/main/proto/Events.proto
@@ -20,6 +20,9 @@ option java_package = "org.apache.tez.engine.api.events";
option java_outer_classname = "SystemEventProtos";
option java_generate_equals_and_hash = true;
-message TaskFailedEventProto {
+message TaskAttemptFailedEventProto {
optional string diagnostics = 1;
}
+
+message TaskAttemptCompletedEventProto {
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
index 16cc8db..204f517 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
@@ -682,9 +682,17 @@ public class LocalJobRunnerTez implements ClientProtocol {
return null;
}
+
@Override
- public void taskFailed(TezTaskAttemptID attemptID, TezEvent taskFailedEvent)
- throws IOException {
+ public void taskAttemptFailed(TezTaskAttemptID attemptID,
+ TezEvent taskFailedEvent) throws IOException {
+ // TODO Auto-generated method stub
+ // TODO TODONEWTEZ
+ }
+
+ @Override
+ public void taskAttemptCompleted(TezTaskAttemptID attemptID,
+ TezEvent taskAttemptCompletedEvent) throws IOException {
// TODO Auto-generated method stub
// TODO TODONEWTEZ
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
index da68776..1a40ead 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
@@ -156,8 +156,15 @@ public class TestUmbilicalProtocol implements TezTaskUmbilicalProtocol {
}
@Override
- public void taskFailed(TezTaskAttemptID attemptID, TezEvent taskFailedEvent)
- throws IOException {
+ public void taskAttemptFailed(TezTaskAttemptID attemptID,
+ TezEvent taskFailedEvent) throws IOException {
+ // TODO Auto-generated method stub
+ // TODO TODONEWTEZ
+ }
+
+ @Override
+ public void taskAttemptCompleted(TezTaskAttemptID attemptID,
+ TezEvent taskAttemptCompletedEvent) throws IOException {
// TODO Auto-generated method stub
// TODO TODONEWTEZ
}