You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/09 06:11:37 UTC
[2/2] git commit: TEZ-423. Move new re-factor classes to appropriate
module. (hitesh)
TEZ-423. Move new re-factor classes to appropriate module. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/1cc83e45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/1cc83e45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/1cc83e45
Branch: refs/heads/TEZ-398
Commit: 1cc83e457b8a86aab801b526c79652fa99e83186
Parents: bc9cee3
Author: Hitesh Shah <hi...@apache.org>
Authored: Sun Sep 8 21:09:45 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Sun Sep 8 21:09:45 2013 -0700
----------------------------------------------------------------------
.../org/apache/tez/common/ContainerTask.java | 102 ---------
.../apache/tez/dag/api/TezConfiguration.java | 8 +
tez-dag/pom.xml | 4 -
.../apache/hadoop/mapred/YarnTezDagChild.java | 61 ++---
.../dag/app/TaskAttemptListenerImpTezDag.java | 43 +++-
.../java/org/apache/tez/dag/app/dag/Vertex.java | 18 +-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 20 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 16 +-
.../app/rm/AMSchedulerEventTALaunchRequest.java | 13 +-
.../dag/app/rm/TaskSchedulerEventHandler.java | 2 +-
.../rm/container/AMContainerEventAssignTA.java | 22 +-
.../dag/app/rm/container/AMContainerImpl.java | 12 +-
.../dag/app/rm/container/AMContainerTask.java | 8 +-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 7 +-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 44 ++--
.../tez/dag/app/rm/TestContainerReuse.java | 64 +++---
.../dag/app/rm/container/TestAMContainer.java | 4 +-
tez-engine-api/pom.xml | 31 +++
.../engine/newapi/events/DataMovementEvent.java | 109 +++++++++
.../newapi/events/InputDataErrorEvent.java | 15 +-
.../newapi/events/TaskCommunicationEvent.java | 84 -------
.../tez/engine/newapi/impl/EventMetaData.java | 81 -------
.../tez/engine/newapi/impl/TezUmbilical.java | 61 -----
.../tez/engine/newapi/rpc/impl/InputSpec.java | 51 -----
.../tez/engine/newapi/rpc/impl/OutputSpec.java | 48 ----
.../tez/engine/newapi/rpc/impl/TaskSpec.java | 66 ------
.../tez/engine/newapi/rpc/impl/TezEvent.java | 64 ------
tez-engine-api/src/main/proto/Events.proto | 34 +++
tez-engine/pom.xml | 31 +++
.../org/apache/tez/common/ContainerTask.java | 74 +++++++
.../tez/common/TezTaskUmbilicalProtocol.java | 40 ++--
.../engine/newapi/events/TaskFailedEvent.java | 35 +++
.../tez/engine/newapi/impl/EventMetaData.java | 120 ++++++++++
.../tez/engine/newapi/impl/EventType.java | 25 +++
.../tez/engine/newapi/impl/InputSpec.java | 81 +++++++
.../tez/engine/newapi/impl/OutputSpec.java | 81 +++++++
.../apache/tez/engine/newapi/impl/TaskSpec.java | 139 ++++++++++++
.../apache/tez/engine/newapi/impl/TezEvent.java | 182 +++++++++++++++
.../engine/newapi/impl/TezHeartbeatRequest.java | 102 +++++++++
.../newapi/impl/TezHeartbeatResponse.java | 37 ++++
.../tez/engine/newapi/impl/TezUmbilical.java | 220 +++++++++++++++++++
.../LogicalIOProcessorRuntimeTask.java | 18 +-
tez-engine/src/main/proto/Events.proto | 25 +++
.../apache/hadoop/mapred/LocalJobRunnerTez.java | 17 ++
.../tez/mapreduce/TestUmbilicalProtocol.java | 25 ++-
45 files changed, 1608 insertions(+), 736 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-common/src/main/java/org/apache/tez/common/ContainerTask.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/ContainerTask.java b/tez-common/src/main/java/org/apache/tez/common/ContainerTask.java
deleted file mode 100644
index 69bd2b4..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/ContainerTask.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.common;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-public class ContainerTask implements Writable {
-
- TezTaskContext tezTaskContext;
- boolean shouldDie;
-
- public ContainerTask() {
- }
-
- public ContainerTask(TezTaskContext tezTaskContext, boolean shouldDie) {
- this.tezTaskContext = tezTaskContext;
- this.shouldDie = shouldDie;
- }
-
- public TezTaskContext getTezEngineTaskContext() {
- return tezTaskContext;
- }
-
- public boolean shouldDie() {
- return shouldDie;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeBoolean(shouldDie);
- if (tezTaskContext != null) {
- out.writeBoolean(true);
- Text.writeString(out, tezTaskContext.getClass().getName());
- tezTaskContext.write(out);
- } else {
- out.writeBoolean(false);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- shouldDie = in.readBoolean();
- boolean taskComing = in.readBoolean();
- if (taskComing) {
- String contextClass = Text.readString(in);
- tezTaskContext = createEmptyContext(contextClass);
- tezTaskContext.readFields(in);
- }
- }
-
- private TezTaskContext createEmptyContext(String contextClassName)
- throws IOException {
- try {
- Class<?> clazz = Class.forName(contextClassName);
- Constructor<?> c = clazz.getConstructor(null);
- c.setAccessible(true);
- return (TezTaskContext) c.newInstance(null);
- } catch (ClassNotFoundException e) {
- throw new IOException(e);
- } catch (SecurityException e) {
- throw new IOException(e);
- } catch (NoSuchMethodException e) {
- throw new IOException(e);
- } catch (IllegalArgumentException e) {
- throw new IOException(e);
- } catch (InstantiationException e) {
- throw new IOException(e);
- } catch (IllegalAccessException e) {
- throw new IOException(e);
- } catch (InvocationTargetException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public String toString() {
- return "shouldDie: " + shouldDie + ", tezEngineTaskContext: "
- + tezTaskContext;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index cc41856..e40f4f5 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -164,6 +164,14 @@ public class TezConfiguration extends Configuration {
+ "get-task.sleep.interval-ms.max";
public static final int TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT = 500;
+ public static final String TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS = TEZ_TASK_PREFIX
+ + "am.heartbeat.interval-ms.max";
+ public static final int TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000;
+
+ public static final String TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT = TEZ_TASK_PREFIX
+ + "max-events-per-heartbeat.max";
+ public static final int TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT = 100;
+
/**
* Configuration to specify whether container should be reused.
*/
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dag/pom.xml b/tez-dag/pom.xml
index f9ec145..875a196 100644
--- a/tez-dag/pom.xml
+++ b/tez-dag/pom.xml
@@ -101,10 +101,6 @@
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 9f59219..2bac9b6 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -49,13 +49,12 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
-import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.OutputSpec;
-import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.Limits;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
@@ -65,7 +64,9 @@ import org.apache.tez.engine.common.objectregistry.ObjectRegistryImpl;
import org.apache.tez.engine.common.objectregistry.ObjectRegistryModule;
import org.apache.tez.engine.common.security.JobTokenIdentifier;
import org.apache.tez.engine.common.security.TokenCache;
-import org.apache.tez.engine.runtime.RuntimeUtils;
+import org.apache.tez.engine.newapi.impl.InputSpec;
+import org.apache.tez.engine.newapi.impl.OutputSpec;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
import org.apache.tez.engine.task.RuntimeTask;
import org.apache.tez.mapreduce.input.SimpleInput;
import org.apache.tez.mapreduce.output.SimpleOutput;
@@ -147,11 +148,12 @@ public class YarnTezDagChild {
if (LOG.isDebugEnabled()) {
LOG.debug("PID, containerId: " + pid + ", " + containerIdentifier);
}
- TezEngineTaskContext taskContext = null;
+ TaskSpec taskSpec = null;
ContainerTask containerTask = null;
UserGroupInformation childUGI = null;
TezTaskAttemptID taskAttemptId = null;
- ContainerContext containerContext = new ContainerContext(containerIdentifier, pid);
+ ContainerContext containerContext = new ContainerContext(
+ containerIdentifier, pid);
int getTaskMaxSleepTime = defaultConf.getInt(
TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT);
@@ -173,19 +175,18 @@ public class YarnTezDagChild {
LOG.info("TaskInfo: shouldDie: "
+ containerTask.shouldDie()
+ (containerTask.shouldDie() == true ? "" : ", taskAttemptId: "
- + containerTask.getTezEngineTaskContext().getTaskAttemptId()));
+ + containerTask.getTaskSpec().getTaskAttemptID()));
if (containerTask.shouldDie()) {
return;
}
taskCount++;
- taskContext = (TezEngineTaskContext) containerTask
- .getTezEngineTaskContext();
+ taskSpec = containerTask.getTaskSpec();
if (LOG.isDebugEnabled()) {
LOG.debug("New container task context:"
- + taskContext.toString());
+ + taskSpec.toString());
}
- taskAttemptId = taskContext.getTaskAttemptId();
+ taskAttemptId = taskSpec.getTaskAttemptID();
TezVertexID newVertexId = taskAttemptId.getTaskID().getVertexID();
if (currentVertexId != null) {
@@ -200,7 +201,7 @@ public class YarnTezDagChild {
updateLoggers(taskAttemptId);
- final Task t = createAndConfigureTezTask(taskContext, umbilical,
+ final Task t = createAndConfigureTezTask(taskSpec, umbilical,
credentials, jobToken, attemptNumber);
final Configuration conf = ((RuntimeTask)t).getConfiguration();
@@ -297,7 +298,7 @@ public class YarnTezDagChild {
}
private static Task createAndConfigureTezTask(
- TezEngineTaskContext taskContext, TezTaskUmbilicalProtocol master,
+ TaskSpec taskSpec, TezTaskUmbilicalProtocol master,
Credentials cxredentials, Token<JobTokenIdentifier> jobToken,
int appAttemptId) throws IOException, InterruptedException {
@@ -308,19 +309,29 @@ public class YarnTezDagChild {
configureLocalDirs(conf);
-
// FIXME need Input/Output vertices else we have this hack
- if (taskContext.getInputSpecList().isEmpty()) {
- taskContext.getInputSpecList().add(
- new InputSpec("null", 0, SimpleInput.class.getName()));
+ if (taskSpec.getInputs().isEmpty()) {
+ InputDescriptor simpleInputDesc =
+ new InputDescriptor(SimpleInput.class.getName());
+ simpleInputDesc.setUserPayload(
+ taskSpec.getProcessorDescriptor().getUserPayload());
+ taskSpec.getInputs().add(
+ new InputSpec("null", simpleInputDesc, 0));
}
- if (taskContext.getOutputSpecList().isEmpty()) {
- taskContext.getOutputSpecList().add(
- new OutputSpec("null", 0, SimpleOutput.class.getName()));
+ if (taskSpec.getOutputs().isEmpty()) {
+ OutputDescriptor simpleOutputDesc =
+ new OutputDescriptor(SimpleOutput.class.getName());
+ simpleOutputDesc.setUserPayload(
+ taskSpec.getProcessorDescriptor().getUserPayload());
+ taskSpec.getOutputs().add(
+ new OutputSpec("null", simpleOutputDesc, 0));
}
- Task t = RuntimeUtils.createRuntimeTask(taskContext);
-
- t.initialize(conf, taskContext.getProcessorUserPayload(), master);
+ Task t = null;
+
+ // FIXME TODONEWTEZ
+
+ // RuntimeUtils.createRuntimeTask(taskSpec);
+ // t.initialize(conf, taskSpec.getProcessorUserPayload(), master);
// FIXME wrapper should initialize all of processor, inputs and outputs
// Currently, processor is inited via task init
@@ -353,13 +364,13 @@ public class YarnTezDagChild {
}
}
}
-
+
private static void updateLoggers(TezTaskAttemptID tezTaskAttemptID)
throws FileNotFoundException {
String containerLogDir = null;
LOG.info("Redirecting log files based on TaskAttemptId: " + tezTaskAttemptID);
-
+
Appender appender = Logger.getRootLogger().getAppender(
TezConfiguration.TEZ_CONTAINER_LOGGER_NAME);
if (appender != null) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 619a494..6d64a58 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -58,6 +59,10 @@ import org.apache.tez.dag.app.rm.container.AMContainerTask;
import org.apache.tez.dag.app.security.authorize.MRAMPolicyProvider;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.common.security.JobTokenSecretManager;
+import org.apache.tez.engine.newapi.events.TaskFailedEvent;
+import org.apache.tez.engine.newapi.impl.TezEvent;
+import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
+import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
import org.apache.tez.engine.records.OutputContext;
import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
@@ -210,7 +215,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
+ " is invalid and will be killed");
else
LOG.info("Container with id: " + containerId
- + " is valid and will be killed");
+ + " is valid and will be killed");
task = TASK_FOR_INVALID_JVM;
} else {
pingContainerHeartbeatHandler(containerId);
@@ -224,16 +229,16 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
LOG.info("No task currently assigned to Container with id: "
+ containerId);
} else {
- registerTaskAttempt(taskContext.getTask().getTaskAttemptId(),
+ registerTaskAttempt(taskContext.getTask().getTaskAttemptID(),
containerId);
task = new ContainerTask(taskContext.getTask(), false);
context.getEventHandler().handle(
new TaskAttemptEventStartedRemotely(taskContext.getTask()
- .getTaskAttemptId(), containerId, context
+ .getTaskAttemptID(), containerId, context
.getApplicationACLs(), context.getAllContainers()
.get(containerId).getShufflePort()));
LOG.info("Container with id: " + containerId + " given task: "
- + taskContext.getTask().getTaskAttemptId());
+ + taskContext.getTask().getTaskAttemptID());
}
}
}
@@ -474,10 +479,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
// between polls (MRTask) implies tasks end up wasting upto 1 second doing
// nothing. Similarly for CA_COMMIT.
- DAG job = context.getCurrentDAG();
- Task task =
- job.getVertex(taskAttemptId.getTaskID().getVertexID()).
- getTask(taskAttemptId.getTaskID());
// TODO In-Memory Shuffle
/*
@@ -563,4 +564,30 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
+ ", ContainerId not known for this attempt");
}
}
+
+ @Override
+ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
+ throws IOException, TezException {
+ // TODO TODONEWTEZ Auto-generated method stub
+ TezTaskAttemptID taskAttemptID = request.getCurrentTaskAttemptID();
+ LOG.info("Ping from " + taskAttemptID.toString());
+ taskHeartbeatHandler.pinged(taskAttemptID);
+ pingContainerHeartbeatHandler(taskAttemptID);
+ return null;
+ }
+
+ @Override
+ public void taskFailed(TezTaskAttemptID taskAttemptId,
+ TezEvent tezEvent) throws IOException {
+ TaskFailedEvent taskFailedEvent = (TaskFailedEvent) tezEvent.getEvent();
+ LOG.fatal("Task: " + taskAttemptId + " - failed : "
+ + taskFailedEvent.getDiagnostics());
+ reportDiagnosticInfo(taskAttemptId, "Error: "
+ + taskFailedEvent.getDiagnostics());
+
+ context.getEventHandler().handle(
+ new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_FAILED));
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index a33ab91..48c9993 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -23,8 +23,6 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.OutputSpec;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -34,23 +32,25 @@ import org.apache.tez.dag.api.client.VertexStatusBuilder;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.newapi.impl.InputSpec;
+import org.apache.tez.engine.newapi.impl.OutputSpec;
import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
/**
- * Main interface to interact with the job. Provides only getters.
+ * Main interface to interact with the job. Provides only getters.
*/
public interface Vertex extends Comparable<Vertex> {
TezVertexID getVertexId();
public VertexPlan getVertexPlan();
-
+
int getDistanceFromRoot();
String getName();
VertexState getState();
/**
- * Get all the counters of this vertex.
+ * Get all the counters of this vertex.
* @return aggregate task-counters
*/
TezCounters getAllCounters();
@@ -64,18 +64,18 @@ public interface Vertex extends Comparable<Vertex> {
float getProgress();
ProgressBuilder getVertexProgress();
VertexStatusBuilder getVertexStatus();
-
+
void setParallelism(int parallelism, List<byte[]> taskUserPayloads);
-
+
TezDependentTaskCompletionEvent[] getTaskAttemptCompletionEvents(
TezTaskAttemptID attemptId, int fromEventId, int maxEvents);
-
+
void setInputVertices(Map<Vertex, EdgeProperty> inVertices);
void setOutputVertices(Map<Vertex, EdgeProperty> outVertices);
Map<Vertex, EdgeProperty> getInputVertices();
Map<Vertex, EdgeProperty> getOutputVertices();
-
+
List<InputSpec> getInputSpecList();
List<OutputSpec> getOutputSpecList();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index ae3c05f..00ef9e5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -47,8 +47,6 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.common.TezTaskContext;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -94,6 +92,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TezBuilderUtils;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
import com.google.common.annotations.VisibleForTesting;
@@ -294,14 +293,13 @@ public class TaskAttemptImpl implements TaskAttempt,
return getVertexID().getDAGId();
}
- TezTaskContext createRemoteTask() {
+ TaskSpec createRemoteTaskSpec() {
Vertex vertex = getVertex();
ProcessorDescriptor procDesc = vertex.getProcessorDescriptor();
DAG dag = vertex.getDAG();
-
- return new TezEngineTaskContext(getID(), dag.getUserName(),
- dag.getName(), vertex.getName(), procDesc,
- vertex.getInputSpecList(), vertex.getOutputSpecList());
+ return new TaskSpec(getID(), dag.getUserName(),
+ vertex.getName(), procDesc, vertex.getInputSpecList(),
+ vertex.getOutputSpecList());
}
@Override
@@ -507,7 +505,7 @@ public class TaskAttemptImpl implements TaskAttempt,
.getVertex(attemptId.getTaskID().getVertexID())
.getTask(attemptId.getTaskID());
}
-
+
Vertex getVertex() {
return appContext.getCurrentDAG()
.getVertex(attemptId.getTaskID().getVertexID());
@@ -856,7 +854,7 @@ public class TaskAttemptImpl implements TaskAttempt,
// recovery.
// Create the remote task.
- TezTaskContext remoteTaskContext = ta.createRemoteTask();
+ TaskSpec remoteTaskSpec = ta.createRemoteTaskSpec();
// Create startTaskRequest
String[] requestHosts = new String[0];
@@ -889,12 +887,12 @@ public class TaskAttemptImpl implements TaskAttempt,
if (LOG.isDebugEnabled()) {
LOG.debug("Asking for container launch with taskAttemptContext: "
- + remoteTaskContext);
+ + remoteTaskSpec);
}
// Send out a launch request to the scheduler.
AMSchedulerEventTALaunchRequest launchRequestEvent = new AMSchedulerEventTALaunchRequest(
- ta.attemptId, ta.taskResource, remoteTaskContext, ta, requestHosts,
+ ta.attemptId, ta.taskResource, remoteTaskSpec, ta, requestHosts,
requestRacks, scheduleEvent.getPriority(), ta.containerContext);
ta.sendEvent(launchRequestEvent);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index b20ac2a..a040ff2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -50,8 +50,6 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.OutputSpec;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeProperty;
@@ -103,6 +101,8 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.newapi.impl.InputSpec;
+import org.apache.tez.engine.newapi.impl.OutputSpec;
import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
import com.google.common.annotations.VisibleForTesting;
@@ -656,7 +656,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
readLock.unlock();
}
}
-
+
@Override
public void setParallelism(int parallelism, List<byte[]> taskUserPayloads) {
writeLock.lock();
@@ -1413,7 +1413,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
public int getOutputVerticesCount() {
return this.targetVertices.size();
}
-
+
@Override
public ProcessorDescriptor getProcessorDescriptor() {
return processorDescriptor;
@@ -1453,8 +1453,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.getInputVerticesCount());
for (Entry<Vertex, EdgeProperty> entry : this.getInputVertices().entrySet()) {
InputSpec inputSpec = new InputSpec(entry.getKey().getName(),
- entry.getKey().getTotalTasks(),
- entry.getValue().getEdgeDestination().getClassName());
+ entry.getValue().getEdgeDestination(), entry.getKey().getTotalTasks());
if (LOG.isDebugEnabled()) {
LOG.debug("For vertex : " + this.getName()
+ ", Using InputSpec : " + inputSpec);
@@ -1472,8 +1471,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount());
for (Entry<Vertex, EdgeProperty> entry : this.getOutputVertices().entrySet()) {
OutputSpec outputSpec = new OutputSpec(entry.getKey().getName(),
- entry.getKey().getTotalTasks(),
- entry.getValue().getEdgeSource().getClassName());
+ entry.getValue().getEdgeSource(), entry.getKey().getTotalTasks());
if (LOG.isDebugEnabled()) {
LOG.debug("For vertex : " + this.getName()
+ ", Using OutputSpec : " + outputSpec);
@@ -1500,7 +1498,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexScheduler getVertexScheduler() {
return this.vertexScheduler;
}
-
+
private static void logLocationHints(VertexLocationHint locationHint) {
Multiset<String> hosts = HashMultiset.create();
Multiset<String> racks = HashMultiset.create();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
index f66c1a2..1c30b0b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
@@ -19,10 +19,10 @@ package org.apache.tez.dag.app.rm;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.common.TezTaskContext;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
@@ -35,19 +35,18 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
private final Priority priority;
private final Resource capability;
private final ContainerContext containerContext;
-
- private final TezTaskContext remoteTaskContext;
+ private final TaskSpec remoteTaskSpec;
private final TaskAttempt taskAttempt;
public AMSchedulerEventTALaunchRequest(TezTaskAttemptID attemptId,
Resource capability,
- TezTaskContext remoteTaskContext, TaskAttempt ta,
+ TaskSpec remoteTaskSpec, TaskAttempt ta,
String[] hosts, String[] racks, Priority priority, ContainerContext containerContext) {
super(AMSchedulerEventType.S_TA_LAUNCH_REQUEST);
this.attemptId = attemptId;
this.capability = capability;
- this.remoteTaskContext = remoteTaskContext;
+ this.remoteTaskSpec = remoteTaskSpec;
this.taskAttempt = ta;
this.hosts = hosts;
this.racks = racks;
@@ -75,8 +74,8 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
return priority;
}
- public TezTaskContext getRemoteTaskContext() {
- return remoteTaskContext;
+ public TaskSpec getRemoteTaskSpec() {
+ return remoteTaskSpec;
}
public TaskAttempt getTaskAttempt() {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 1cb6de6..9e059a9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -400,7 +400,7 @@ public class TaskSchedulerEventHandler extends AbstractService
}
sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container));
sendEvent(new AMContainerEventAssignTA(containerId,
- taskAttempt.getID(), event.getRemoteTaskContext()));
+ taskAttempt.getID(), event.getRemoteTaskSpec()));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
index 0fe5b3b..dd178fb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
@@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -18,26 +18,26 @@
package org.apache.tez.dag.app.rm.container;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.common.TezTaskContext;
import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
public class AMContainerEventAssignTA extends AMContainerEvent {
private final TezTaskAttemptID attemptId;
// TODO Maybe have tht TAL pull the remoteTask from the TaskAttempt itself ?
- private final TezTaskContext remoteTaskContext;
-
+ private final TaskSpec remoteTaskSpec;
+
public AMContainerEventAssignTA(ContainerId containerId,
- TezTaskAttemptID attemptId, Object remoteTaskContext) {
+ TezTaskAttemptID attemptId, Object remoteTaskSpec) {
super(containerId, AMContainerEventType.C_ASSIGN_TA);
this.attemptId = attemptId;
- this.remoteTaskContext = (TezTaskContext)remoteTaskContext;
+ this.remoteTaskSpec = (TaskSpec)remoteTaskSpec;
}
-
- public TezTaskContext getRemoteTaskContext() {
- return this.remoteTaskContext;
+
+ public TaskSpec getRemoteTaskSpec() {
+ return this.remoteTaskSpec;
}
-
+
public TezTaskAttemptID getTaskAttemptId() {
return this.attemptId;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 9d44c11..afb0ed5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.tez.common.TezTaskContext;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.ContainerContext;
@@ -55,6 +54,7 @@ import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
//import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
@SuppressWarnings("rawtypes")
public class AMContainerImpl implements AMContainer {
@@ -74,8 +74,8 @@ public class AMContainerImpl implements AMContainer {
private final List<TezTaskAttemptID> completedAttempts = new LinkedList<TezTaskAttemptID>();
// TODO Maybe this should be pulled from the TaskAttempt.s
- private final Map<TezTaskAttemptID, TezTaskContext> remoteTaskMap =
- new HashMap<TezTaskAttemptID, TezTaskContext>();
+ private final Map<TezTaskAttemptID, TaskSpec> remoteTaskMap =
+ new HashMap<TezTaskAttemptID, TaskSpec>();
// TODO ?? Convert to list and hash.
@@ -453,10 +453,10 @@ public class AMContainerImpl implements AMContainer {
}
container.pendingAttempt = event.getTaskAttemptId();
if (LOG.isDebugEnabled()) {
- LOG.debug("AssignTA: attempt: " + event.getRemoteTaskContext());
+ LOG.debug("AssignTA: attempt: " + event.getRemoteTaskSpec());
}
container.remoteTaskMap
- .put(event.getTaskAttemptId(), event.getRemoteTaskContext());
+ .put(event.getTaskAttemptId(), event.getRemoteTaskSpec());
return container.getState();
}
}
@@ -600,7 +600,7 @@ public class AMContainerImpl implements AMContainer {
AMContainerImpl container, AMContainerEvent cEvent) {
if (LOG.isDebugEnabled()) {
LOG.debug("AssignTAAtIdle: attempt: " +
- ((AMContainerEventAssignTA) cEvent).getRemoteTaskContext());
+ ((AMContainerEventAssignTA) cEvent).getRemoteTaskSpec());
}
return super.transition(container, cEvent);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
index 1189a34..be1c08e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
@@ -18,13 +18,13 @@
package org.apache.tez.dag.app.rm.container;
-import org.apache.tez.common.TezTaskContext;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
public class AMContainerTask {
private final boolean shouldDie;
- private final TezTaskContext tezTask;
+ private final TaskSpec tezTask;
- public AMContainerTask(boolean shouldDie, TezTaskContext tezTask) {
+ public AMContainerTask(boolean shouldDie, TaskSpec tezTask) {
this.shouldDie = shouldDie;
this.tezTask = tezTask;
}
@@ -33,7 +33,7 @@ public class AMContainerTask {
return this.shouldDie;
}
- public TezTaskContext getTask() {
+ public TaskSpec getTask() {
return this.tezTask;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 3274a4a..fe97e10 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -83,6 +83,7 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -175,7 +176,7 @@ public class TestTaskAttempt {
new SystemClock(), mock(TaskHeartbeatHandler.class),
mock(AppContext.class), locationHint, false, Resource.newInstance(1024,
1), createFakeContainerContext());
-
+
TaskAttemptImpl spyTa = spy(taImpl);
when(spyTa.resolveHosts(hosts)).thenReturn(
resolved.toArray(new String[3]));
@@ -766,7 +767,7 @@ public class TestTaskAttempt {
}
@Override
- protected TezTaskContext createRemoteTask() {
+ protected TaskSpec createRemoteTaskSpec() {
// FIXME
return null;
}
@@ -786,7 +787,7 @@ public class TestTaskAttempt {
TaskAttemptState state) {
}
}
-
+
private static ContainerContext createFakeContainerContext() {
return new ContainerContext(new HashMap<String, LocalResource>(),
new Credentials(), new HashMap<String, String>(), "");
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 238b2bd..7e7a8a5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -613,38 +613,38 @@ public class TestVertexImpl {
Assert.assertEquals(2, v3.getOutputVerticesCount());
Assert.assertTrue("vertex1".equals(v3.getInputSpecList().get(0)
- .getVertexName())
+ .getSourceVertexName())
|| "vertex2".equals(v3.getInputSpecList().get(0)
- .getVertexName()));
+ .getSourceVertexName()));
Assert.assertTrue("vertex1".equals(v3.getInputSpecList().get(1)
- .getVertexName())
+ .getSourceVertexName())
|| "vertex2".equals(v3.getInputSpecList().get(1)
- .getVertexName()));
+ .getSourceVertexName()));
Assert.assertTrue("i3_v1".equals(v3.getInputSpecList().get(0)
- .getInputClassName())
+ .getInputDescriptor().getClassName())
|| "i3_v2".equals(v3.getInputSpecList().get(0)
- .getInputClassName()));
+ .getInputDescriptor().getClassName()));
Assert.assertTrue("i3_v1".equals(v3.getInputSpecList().get(1)
- .getInputClassName())
+ .getInputDescriptor().getClassName())
|| "i3_v2".equals(v3.getInputSpecList().get(1)
- .getInputClassName()));
+ .getInputDescriptor().getClassName()));
Assert.assertTrue("vertex4".equals(v3.getOutputSpecList().get(0)
- .getVertexName())
+ .getDestinationVertexName())
|| "vertex5".equals(v3.getOutputSpecList().get(0)
- .getVertexName()));
+ .getDestinationVertexName()));
Assert.assertTrue("vertex4".equals(v3.getOutputSpecList().get(1)
- .getVertexName())
+ .getDestinationVertexName())
|| "vertex5".equals(v3.getOutputSpecList().get(1)
- .getVertexName()));
+ .getDestinationVertexName()));
Assert.assertTrue("o3_v4".equals(v3.getOutputSpecList().get(0)
- .getOutputClassName())
+ .getOutputDescriptor().getClassName())
|| "o3_v5".equals(v3.getOutputSpecList().get(0)
- .getOutputClassName()));
+ .getOutputDescriptor().getClassName()));
Assert.assertTrue("o3_v4".equals(v3.getOutputSpecList().get(1)
- .getOutputClassName())
+ .getOutputDescriptor().getClassName())
|| "o3_v5".equals(v3.getOutputSpecList().get(1)
- .getOutputClassName()));
+ .getOutputDescriptor().getClassName()));
}
@Test(timeout = 5000)
@@ -654,7 +654,7 @@ public class TestVertexImpl {
VertexImpl v = vertices.get("vertex2");
startVertex(v);
}
-
+
@Test//(timeout = 5000)
public void testVertexSetParallelism() {
VertexImpl v2 = vertices.get("vertex2");
@@ -663,9 +663,9 @@ public class TestVertexImpl {
Map<TezTaskID, Task> tasks = v2.getTasks();
Assert.assertEquals(2, tasks.size());
TezTaskID firstTask = tasks.keySet().iterator().next();
-
+
startVertex(v2);
-
+
byte[] payload = new byte[0];
List<byte[]> taskPayloads = Collections.singletonList(payload);
v2.setParallelism(1, taskPayloads);
@@ -673,7 +673,7 @@ public class TestVertexImpl {
Assert.assertEquals(1, tasks.size());
// the last one is removed
Assert.assertTrue(tasks.keySet().iterator().next().equals(firstTask));
-
+
VertexImpl v1 = vertices.get("vertex1");
TezTaskID t1_v1 = new TezTaskID(v1.getVertexId(), 0);
TezTaskAttemptID ta1_t1_v1 = new TezTaskAttemptID(t1_v1, 0);
@@ -681,7 +681,7 @@ public class TestVertexImpl {
TezDependentTaskCompletionEvent cEvt1 =
new TezDependentTaskCompletionEvent(1, ta1_t1_v1,
Status.SUCCEEDED, "", 3, 0);
- v2.handle(
+ v2.handle(
new VertexEventSourceTaskAttemptCompleted(v2.getVertexId(), cEvt1));
TezTaskID t1_v2 = new TezTaskID(v2.getVertexId(), 0);
@@ -689,7 +689,7 @@ public class TestVertexImpl {
TezDependentTaskCompletionEvent[] events =
v2.getTaskAttemptCompletionEvents(ta1_t1_v2, 0, 100);
Assert.assertEquals(1, events.length);
- TezDependentTaskCompletionEvent clone = events[0];
+ TezDependentTaskCompletionEvent clone = events[0];
// payload must be present in the first event
Assert.assertEquals(payload, clone.getUserPayload());
// event must be a copy
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index e232ec0..408f88a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -29,9 +29,9 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.OutputSpec;
import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -57,12 +57,15 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.newapi.impl.InputSpec;
+import org.apache.tez.engine.newapi.impl.OutputSpec;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
import org.junit.Test;
import com.google.common.collect.Lists;
public class TestContainerReuse {
-
+
@Test(timeout = 15000l)
public void testDelayedReuseContainerBecomesAvailable() throws IOException, InterruptedException, ExecutionException {
@@ -77,7 +80,7 @@ public class TestContainerReuse {
CapturingEventHandler eventHandler = new CapturingEventHandler();
TezDAGID dagID = new TezDAGID("0", 0, 0);
TezVertexID vertexID = new TezVertexID(dagID, 1);
-
+
AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
AMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
String appUrl = "url";
@@ -143,9 +146,9 @@ public class TestContainerReuse {
verify(rmClient, times(0)).releaseAssignedContainer(eq(containerHost1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
eventHandler.reset();
-
+
taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta21, containerHost2.getId(), TaskAttemptState.SUCCEEDED));
-
+
long currentTs = System.currentTimeMillis();
Throwable exception = null;
while (System.currentTimeMillis() < currentTs + 5000l) {
@@ -162,7 +165,7 @@ public class TestContainerReuse {
taskScheduler.close();
taskSchedulerEventHandler.close();
}
-
+
@Test(timeout = 15000l)
public void testDelayedReuseContainerNotAvailable() throws IOException, InterruptedException, ExecutionException {
Configuration conf = new Configuration(new YarnConfiguration());
@@ -176,7 +179,7 @@ public class TestContainerReuse {
CapturingEventHandler eventHandler = new CapturingEventHandler();
TezDAGID dagID = new TezDAGID("0", 0, 0);
TezVertexID vertexID = new TezVertexID(dagID, 1);
-
+
AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
AMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
String appUrl = "url";
@@ -202,28 +205,28 @@ public class TestContainerReuse {
TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
.getSpyTaskScheduler();
TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
-
+
Resource resource = Resource.newInstance(1024, 1);
Priority priority = Priority.newInstance(5);
String [] host1 = {"host1"};
String [] host2 = {"host2"};
-
+
String [] defaultRack = {"/default-rack"};
-
+
TezTaskAttemptID taID11 = new TezTaskAttemptID(new TezTaskID(vertexID, 1), 1);
TezTaskAttemptID taID21 = new TezTaskAttemptID(new TezTaskID(vertexID, 2), 1);
TezTaskAttemptID taID31 = new TezTaskAttemptID(new TezTaskID(vertexID, 3), 1);
TaskAttempt ta11 = mock(TaskAttempt.class);
TaskAttempt ta21 = mock(TaskAttempt.class);
TaskAttempt ta31 = mock(TaskAttempt.class);
-
+
AMSchedulerEventTALaunchRequest lrTa11 = createLaunchRequestEvent(taID11, ta11, resource, host1, defaultRack, priority, conf);
AMSchedulerEventTALaunchRequest lrTa21 = createLaunchRequestEvent(taID21, ta21, resource, host2, defaultRack, priority, conf);
AMSchedulerEventTALaunchRequest lrTa31 = createLaunchRequestEvent(taID31, ta31, resource, host1, defaultRack, priority, conf);
-
+
taskSchedulerEventHandler.handleEvent(lrTa11);
taskSchedulerEventHandler.handleEvent(lrTa21);
-
+
Container containerHost1 = createContainer(1, host1[0], resource, priority);
Container containerHost2 = createContainer(2, host2[0], resource, priority);
@@ -231,10 +234,10 @@ public class TestContainerReuse {
drainableAppCallback.drain();
verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(containerHost1));
verify(taskSchedulerEventHandler).taskAllocated(eq(ta21), any(Object.class), eq(containerHost2));
-
+
// Adding the event later so that task1 assigned to containerHost1 is deterministic.
taskSchedulerEventHandler.handleEvent(lrTa31);
-
+
taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta21, containerHost2.getId(), TaskAttemptState.SUCCEEDED));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta21), eq(true));
@@ -242,7 +245,7 @@ public class TestContainerReuse {
verify(rmClient, times(0)).releaseAssignedContainer(eq(containerHost2.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
eventHandler.reset();
-
+
long currentTs = System.currentTimeMillis();
Throwable exception = null;
while (System.currentTimeMillis() < currentTs + 5000l) {
@@ -385,7 +388,7 @@ public class TestContainerReuse {
taskScheduler.close();
taskSchedulerEventHandler.close();
}
-
+
@Test(timeout = 10000l)
public void testReuseNonLocalRequest() throws IOException, InterruptedException, ExecutionException {
Configuration tezConf = new Configuration(new YarnConfiguration());
@@ -444,7 +447,7 @@ public class TestContainerReuse {
TaskAttempt ta12 = mock(TaskAttempt.class);
doReturn(vertexID).when(ta12).getVertexID();
AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID12, ta12, resource1, emptyHosts, emptyRacks, priority, tezConf);
-
+
// Send launch request for task 1 onle, deterministic assignment to this task.
taskSchedulerEventHandler.handleEvent(lrEvent11);
@@ -457,7 +460,7 @@ public class TestContainerReuse {
// Send launch request for task2 (vertex2)
taskSchedulerEventHandler.handleEvent(lrEvent12);
-
+
// Task assigned to container completed successfully. Container should be
// assigned immediately to ta12, since there's no local requests (instead of
// waiting for the re-use delay)
@@ -478,7 +481,7 @@ public class TestContainerReuse {
taskScheduler.close();
taskSchedulerEventHandler.close();
}
-
+
@Test(timeout = 10000l)
public void testNoReuseAcrossVertices() throws IOException, InterruptedException, ExecutionException {
Configuration tezConf = new Configuration(new YarnConfiguration());
@@ -536,7 +539,7 @@ public class TestContainerReuse {
TaskAttempt ta21 = mock(TaskAttempt.class);
doReturn(vertexID2).when(ta21).getVertexID();
AMSchedulerEventTALaunchRequest lrEvent21 = createLaunchRequestEvent(taID21, ta21, resource1, host1, racks, priority, tezConf);
-
+
// Send launch request for task 1 onle, deterministic assignment to this task.
taskSchedulerEventHandler.handleEvent(lrEvent11);
@@ -549,7 +552,7 @@ public class TestContainerReuse {
// Send launch request for task2 (vertex2)
taskSchedulerEventHandler.handleEvent(lrEvent21);
-
+
// Task assigned to container completed successfully. Container should not be assigned to task21. Released since delay is 0.
taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED));
drainableAppCallback.drain();
@@ -575,14 +578,17 @@ public class TestContainerReuse {
TezTaskAttemptID taID, TaskAttempt ta, Resource capability, String[] hosts,
String[] racks, Priority priority, Configuration conf) {
- ContainerContext containerContext =
+ ContainerContext containerContext =
new ContainerContext(new HashMap<String, LocalResource>(),
new Credentials(), new HashMap<String, String>(), "");
- AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest(taID, capability, new TezEngineTaskContext(taID, "user", "jobName", "vertexName",
- new ProcessorDescriptor("processorClassName"),
- Collections.singletonList(new InputSpec("vertexName", 1,
- "inputClassName")), Collections.singletonList(new OutputSpec(
- "vertexName", 1, "outputClassName"))), ta, hosts, racks, priority, containerContext);
+ AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest(taID, capability,
+ new TaskSpec(taID, "user", "vertexName",
+ new ProcessorDescriptor("processorClassName"),
+ Collections.singletonList(new InputSpec("vertexName",
+ new InputDescriptor("inputClassName"), 1)),
+ Collections.singletonList(new OutputSpec(
+ "vertexName", new OutputDescriptor("outputClassName"), 1))),
+ ta, hosts, racks, priority, containerContext);
return lr;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index 2d2945e..62b6c28 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -110,7 +110,7 @@ public class TestAMContainer {
wc.verifyNoOutgoingEvents();
assertFalse(pulledTask.shouldDie());
assertEquals(wc.tezTaskContext.getTaskAttemptId(), pulledTask.getTask()
- .getTaskAttemptId());
+ .getTaskAttemptID());
assertEquals(wc.taskAttemptID, wc.amContainer.getRunningTaskAttempt());
assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
@@ -166,7 +166,7 @@ public class TestAMContainer {
wc.verifyNoOutgoingEvents();
assertFalse(pulledTask.shouldDie());
assertEquals(wc.tezTaskContext.getTaskAttemptId(), pulledTask.getTask()
- .getTaskAttemptId());
+ .getTaskAttemptID());
assertEquals(wc.taskAttemptID, wc.amContainer.getRunningTaskAttempt());
assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/pom.xml
----------------------------------------------------------------------
diff --git a/tez-engine-api/pom.xml b/tez-engine-api/pom.xml
index 107bdc9..b19e96b 100644
--- a/tez-engine-api/pom.xml
+++ b/tez-engine-api/pom.xml
@@ -44,6 +44,10 @@
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
</dependencies>
<build>
@@ -54,6 +58,33 @@
<configuration>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-maven-plugins</artifactId>
+ <executions>
+ <execution>
+ <id>compile-protoc</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>protoc</goal>
+ </goals>
+ <configuration>
+ <protocVersion>${protobuf.version}</protocVersion>
+ <protocCommand>${protoc.path}</protocCommand>
+ <imports>
+ <param>${basedir}/src/main/proto</param>
+ </imports>
+ <source>
+ <directory>${basedir}/src/main/proto</directory>
+ <includes>
+ <include>Events.proto</include>
+ </includes>
+ </source>
+ <output>${project.build.directory}/generated-sources/java</output>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/DataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/DataMovementEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/DataMovementEvent.java
new file mode 100644
index 0000000..182e8dc
--- /dev/null
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/DataMovementEvent.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.events;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.engine.newapi.Event;
+
+/**
+ * Event used by user code to send information between tasks. An output can
+ * generate an Event of this type to sending information regarding output data
+ * ( such as URI for file-based output data, port info in case of
+ * streaming-based data transfers ) to the Input on the destination vertex.
+ */
+public final class DataMovementEvent extends Event {
+
+ /**
+ * Index(i) of the i-th (physical) Input or Output that generated an Event.
+ * For a Processor-generated event, this is ignored.
+ */
+ private final int sourceIndex;
+
+ /**
+ * Index(i) of the i-th (physical) Input or Output that is meant to receive
+ * this Event. For a Processor event, this is ignored.
+ */
+ private int targetIndex;
+
+ /**
+ * User Payload for this Event
+ */
+ private final byte[] userPayload;
+
+ /**
+ * Version number to indicate what attempt generated this Event
+ */
+ private int version;
+
+ /**
+ * User Event constructor
+ * @param sourceIndex Index to identify the physical edge of the input/output
+ * that generated the event
+ * @param userPayload User Payload of the User Event
+ */
+ public DataMovementEvent(int sourceIndex,
+ byte[] userPayload) {
+ this.userPayload = userPayload;
+ this.sourceIndex = sourceIndex;
+ }
+
+ @Private
+ public DataMovementEvent(int sourceIndex,
+ int targetIndex,
+ byte[] userPayload) {
+ this.userPayload = userPayload;
+ this.sourceIndex = sourceIndex;
+ this.targetIndex = targetIndex;
+ }
+
+ /**
+ * Constructor for Processor-generated User Events
+ * @param userPayload
+ */
+ public DataMovementEvent(byte[] userPayload) {
+ this(-1, userPayload);
+ }
+
+ public byte[] getUserPayload() {
+ return userPayload;
+ }
+
+ public int getSourceIndex() {
+ return sourceIndex;
+ }
+
+ public int getTargetIndex() {
+ return targetIndex;
+ }
+
+ @Private
+ void setTargetIndex(int targetIndex) {
+ this.targetIndex = targetIndex;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ @Private
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java
index 4528e4d..c5a92b8 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java
@@ -24,7 +24,7 @@ import org.apache.tez.engine.newapi.Event;
* Event generated by an Input to indicate error when trying to
* retrieve data.
*/
-public class InputDataErrorEvent extends Event {
+public final class InputDataErrorEvent extends Event {
/**
* Diagnostics/trace of the error that occurred on the Input's edge.
@@ -36,10 +36,17 @@ public class InputDataErrorEvent extends Event {
*/
private final int index;
- protected InputDataErrorEvent(String diagnostics, int index) {
+ /**
+ * Version of the data on which the error occurred.
+ */
+ private final int version;
+
+ public InputDataErrorEvent(String diagnostics, int index,
+ int version) {
super();
this.diagnostics = diagnostics;
this.index = index;
+ this.version = version;
}
public String getDiagnostics() {
@@ -50,4 +57,8 @@ public class InputDataErrorEvent extends Event {
return index;
}
+ public int getVersion() {
+ return version;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/TaskCommunicationEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/TaskCommunicationEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/TaskCommunicationEvent.java
deleted file mode 100644
index 9d8f01a..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/TaskCommunicationEvent.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.events;
-
-import org.apache.tez.engine.newapi.Event;
-
-/**
- * Event used by user code to send information between tasks. An output can
- * generate an Event of this type to sending information regarding output data
- * ( such as URI for file-based output data, port info in case of
- * streaming-based data transfers ) to the Input on the destination vertex.
- */
-public final class TaskCommunicationEvent extends Event {
-
- /**
- * Index(i) of the i-th (physical) Input or Output that generated an Event.
- * For a Processor-generated event, this is ignored.
- */
- private final int sourceIndex;
-
- /**
- * Index(i) of the i-th (physical) Input or Output that is meant to receive
- * this Event. For a Processor event, this is ignored.
- */
- private int targetIndex;
-
- /**
- * User Payload for this Event
- */
- private final byte[] userPayload;
-
- /**
- * User Event constructor
- * @param index Index to identify the physical edge of the input/output
- * @param userPayload User Payload of the User Event
- */
- public TaskCommunicationEvent(int index,
- byte[] userPayload) {
- this.userPayload = userPayload;
- this.sourceIndex = index;
- }
-
- /**
- * Constructor for Processor-generated User Events
- * @param userPayload
- */
- public TaskCommunicationEvent(byte[] userPayload) {
- this(-1, userPayload);
- }
-
- public byte[] getUserPayload() {
- return userPayload;
- }
-
- public int getSourceIndex() {
- return sourceIndex;
- }
-
- public int getTargetIndex() {
- return targetIndex;
- }
-
- void setTargetIndex(int targetIndex) {
- this.targetIndex = targetIndex;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
deleted file mode 100644
index fe06bec..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-/**
- * Class that encapsulates all the information to identify the unique
- * object that either generated an Event or is the recipient of an Event.
- */
-public class EventMetaData {
-
- public static enum EventGenerator {
- INPUT,
- PROCESSOR,
- OUTPUT,
- SYSTEM
- }
-
- /**
- * Source Type ( one of Input/Output/Processor ) that generated the Event.
- */
- private final EventGenerator generator;
-
- /**
- * Name of the vertex where the event was generated.
- */
- private final String taskVertexName;
-
- /**
- * Name of the vertex to which the Input or Output is connected to.
- */
- private final String edgeVertexName;
-
- /**
- * Task Attempt ID
- */
- private final TezTaskAttemptID taskAttemptID;
-
- public EventMetaData(EventGenerator generator,
- String taskVertexName, String edgeVertexName,
- TezTaskAttemptID taskAttemptID) {
- this.generator = generator;
- this.taskVertexName = taskVertexName;
- this.edgeVertexName = edgeVertexName;
- this.taskAttemptID = taskAttemptID;
- }
-
- public EventGenerator getEventGenerator() {
- return generator;
- }
-
- public TezTaskAttemptID getTaskAttemptID() {
- return taskAttemptID;
- }
-
- public String getTaskVertexName() {
- return taskVertexName;
- }
-
- public String getEdgeVertexName() {
- return edgeVertexName;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
deleted file mode 100644
index cb8bd6d..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.rpc.impl.TaskSpec;
-
-/**
- * Interface to the RPC layer ( umbilical ) between the Tez AM and
- * a Tez Container's JVM.
- */
-public interface TezUmbilical {
-
- /**
- * Heartbeat call back to the AM from the Container JVM
- * @param events Events to be sent to the AM
- * @return Events sent by the AM to the Container JVM which in turn will
- * be handled either by the JVM or routed to the appropriate instances of
- * Input/Processor/Outputs with a particular Task Attempt.
- */
- public Event[] hearbeat(Event[] events);
-
- /**
- * Hook to ask the Tez AM for the next task to be run on the Container
- * @return Next task to be run
- */
- public TaskSpec getNextTask();
-
- /**
- * Hook to query the Tez AM whether a particular Task Attempt can commit its
- * output.
- * @param attemptIDs Attempt IDs of the Tasks that are waiting to commit.
- * @return Map of boolean flags indicating whether the respective task
- * attempts can commit.
- */
- public boolean canTaskCommit(TezTaskAttemptID attemptID);
-
- /**
- * Inform the Tez AM that one or more Task attempts have failed.
- * @param attemptIDs Task Attempt IDs for the failed attempts.
- */
- public void taskFailed(TezTaskAttemptID attemptID);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/InputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/InputSpec.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/InputSpec.java
deleted file mode 100644
index 200ec21..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/InputSpec.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.rpc.impl;
-
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.records.TezVertexID;
-
-/**
- * Serializable information of a given Physical Input.
- */
-public interface InputSpec {
-
- /**
- * @return The name of the Source Vertex whose Output is consumed by this
- * Input.
- */
- public String getSourceVertexName();
-
- /**
- * @return The Vertex ID of the Source Vertex whose Output is consumed by this
- * Input.
- */
- public TezVertexID getSourceVertexID();
-
- /**
- * @return {@link InputDescriptor}
- */
- public InputDescriptor getInputDescriptor();
-
- /**
- * @return The no. of physical edges mapping to this Input.
- */
- public int getPhysicalEdgeCount();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/OutputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/OutputSpec.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/OutputSpec.java
deleted file mode 100644
index 0cf41b6..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/OutputSpec.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.rpc.impl;
-
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.records.TezVertexID;
-
-public interface OutputSpec {
-
- /**
- * @return The name of the Target Vertex whose Input is consumed by this
- * Output.
- */
- public String getDestinationVertexName();
-
- /**
- * @return The Vertex ID of the Target Vertex whose Input is consumed by this
- * Output.
- */
- public TezVertexID getDestinationVertexID();
-
- /**
- * @return {@link OutputDescriptor}
- */
- public OutputDescriptor getOutputDescriptor();
-
- /**
- * @return The no. of physical edges mapping to this Output.
- */
- public int getPhysicalEdgeCount();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java
deleted file mode 100644
index 2ba8cf7..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.rpc.impl;
-
-import java.util.List;
-
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-/**
- * Serializable Task information that is sent across the Umbilical from the
- * Tez AM to the Tez Container's JVM.
- */
-public interface TaskSpec {
-
- /**
- * Get the vertex name for the current task.
- * @return the vertex name set by the user.
- */
- public String getVertexName();
-
- /**
- * Get the task attempt id for the current task.
- * @return the {@link TaskAttemptID}
- */
- public TezTaskAttemptID getTaskAttemptID();
-
- /**
- * Get the owner of the job.
- * @return the owner.
- */
- public String getUser();
- /**
- * The Processor definition for the given Task
- * @return {@link ProcessorDescriptor}
- */
- public ProcessorDescriptor getProcessorDescriptor();
-
- /**
- * The List of Inputs for this Task.
- * @return {@link Input}
- */
- public List<InputSpec> getInputs();
-
- /**
- * The List of Outputs for this Task.
- * @return {@link Output}
- */
- public List<OutputSpec> getOutputs();
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezEvent.java
deleted file mode 100644
index 19174c8..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezEvent.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.rpc.impl;
-
-import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.impl.EventMetaData;
-
-public class TezEvent {
-
- private final String eventClassName;
-
- private final Event event;
-
- private EventMetaData sourceInfo;
-
- private EventMetaData targetInfo;
-
- public TezEvent(Event event, EventMetaData sourceInfo) {
- this.event = event;
- this.eventClassName = event.getClass().getName();
- this.setSourceInfo(sourceInfo);
- }
-
- public Event getEvent() {
- return event;
- }
-
- public EventMetaData getSourceInfo() {
- return sourceInfo;
- }
-
- public void setSourceInfo(EventMetaData sourceInfo) {
- this.sourceInfo = sourceInfo;
- }
-
- public EventMetaData getTargetInfo() {
- return targetInfo;
- }
-
- public void setTargetInfo(EventMetaData targetInfo) {
- this.targetInfo = targetInfo;
- }
-
- public String getEventClassName() {
- return eventClassName;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/proto/Events.proto b/tez-engine-api/src/main/proto/Events.proto
new file mode 100644
index 0000000..3651c35
--- /dev/null
+++ b/tez-engine-api/src/main/proto/Events.proto
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.engine.api.events";
+option java_outer_classname = "EventProtos";
+option java_generate_equals_and_hash = true;
+
+message DataMovementEventProto {
+ optional int32 source_index = 1;
+ optional int32 target_index = 2;
+ optional bytes user_payload = 3;
+ optional int32 version = 4;
+}
+
+message InputDataErrorEventProto {
+ optional int32 index = 1;
+ optional string diagnostics = 2;
+ optional int32 version = 3;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine/pom.xml
----------------------------------------------------------------------
diff --git a/tez-engine/pom.xml b/tez-engine/pom.xml
index 4ecf210..ecfdc04 100644
--- a/tez-engine/pom.xml
+++ b/tez-engine/pom.xml
@@ -49,6 +49,10 @@
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
</dependencies>
<build>
@@ -59,6 +63,33 @@
<configuration>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-maven-plugins</artifactId>
+ <executions>
+ <execution>
+ <id>compile-protoc</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>protoc</goal>
+ </goals>
+ <configuration>
+ <protocVersion>${protobuf.version}</protocVersion>
+ <protocCommand>${protoc.path}</protocCommand>
+ <imports>
+ <param>${basedir}/src/main/proto</param>
+ </imports>
+ <source>
+ <directory>${basedir}/src/main/proto</directory>
+ <includes>
+ <include>Events.proto</include>
+ </includes>
+ </source>
+ <output>${project.build.directory}/generated-sources/java</output>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java b/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
new file mode 100644
index 0000000..3c18d9f
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
+
+public class ContainerTask implements Writable {
+
+ TaskSpec taskSpec;
+ boolean shouldDie;
+
+ public ContainerTask() {
+ }
+
+ public ContainerTask(TaskSpec taskSpec, boolean shouldDie) {
+ this.taskSpec = taskSpec;
+ this.shouldDie = shouldDie;
+ }
+
+ public TaskSpec getTaskSpec() {
+ return taskSpec;
+ }
+
+ public boolean shouldDie() {
+ return shouldDie;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(shouldDie);
+ if (taskSpec != null) {
+ out.writeBoolean(true);
+ taskSpec.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ shouldDie = in.readBoolean();
+ boolean taskComing = in.readBoolean();
+ if (taskComing) {
+ taskSpec = new TaskSpec();
+ taskSpec.readFields(in);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "shouldDie: " + shouldDie + ", TaskSpec: "
+ + taskSpec;
+ }
+}