You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/23 19:45:24 UTC
[10/20] git commit: Rename *.new* packages back to what they should
be,
remove dead code from the old packages - mapreduce module - tez-engine module
(part of TEZ-398). (sseth)
Rename *.new* packages back to what they should be, remove dead code
from the old packages - mapreduce module - tez-engine module (part of
TEZ-398). (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/b4950f98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/b4950f98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/b4950f98
Branch: refs/heads/TEZ-398
Commit: b4950f98a7a0c62d4c6d53ab75bfb857e8f6b551
Parents: 3d60945
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Sep 23 10:44:14 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Sep 23 10:44:14 2013 -0700
----------------------------------------------------------------------
.../apache/hadoop/mapred/YarnTezDagChild.java | 24 +-
.../dag/app/TaskAttemptListenerImpTezDag.java | 6 +-
.../java/org/apache/tez/dag/app/dag/Task.java | 2 +-
.../java/org/apache/tez/dag/app/dag/Vertex.java | 4 +-
.../dag/event/TaskAttemptEventStatusUpdate.java | 2 +-
.../dag/app/dag/event/TaskEventAddTezEvent.java | 2 +-
.../app/dag/event/VertexEventRouteEvent.java | 2 +-
.../org/apache/tez/dag/app/dag/impl/Edge.java | 10 +-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 4 +-
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 2 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 12 +-
.../app/rm/AMSchedulerEventTALaunchRequest.java | 2 +-
.../rm/container/AMContainerEventAssignTA.java | 2 +-
.../dag/app/rm/container/AMContainerImpl.java | 2 +-
.../dag/app/rm/container/AMContainerTask.java | 2 +-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 2 +-
.../tez/dag/app/rm/TestContainerReuse.java | 7 +-
.../org/apache/tez/common/ContainerTask.java | 2 +-
.../apache/tez/common/RunningTaskContext.java | 70 -
.../apache/tez/common/TezEngineTaskContext.java | 141 --
.../org/apache/tez/common/TezTaskReporter.java | 116 --
.../tez/common/TezTaskUmbilicalProtocol.java | 14 +-
.../org/apache/tez/engine/api/KVReader.java | 81 +
.../org/apache/tez/engine/api/KVWriter.java | 40 +
.../api/events/TaskAttemptCompletedEvent.java | 28 +
.../api/events/TaskAttemptFailedEvent.java | 35 +
.../api/events/TaskStatusUpdateEvent.java | 70 +
.../tez/engine/api/impl/EventMetaData.java | 152 ++
.../apache/tez/engine/api/impl/EventType.java | 29 +
.../apache/tez/engine/api/impl/InputSpec.java | 88 +
.../apache/tez/engine/api/impl/OutputSpec.java | 87 +
.../apache/tez/engine/api/impl/TaskSpec.java | 146 ++
.../apache/tez/engine/api/impl/TezEvent.java | 248 +++
.../engine/api/impl/TezHeartbeatRequest.java | 137 ++
.../engine/api/impl/TezHeartbeatResponse.java | 105 ++
.../engine/api/impl/TezInputContextImpl.java | 84 +
.../engine/api/impl/TezOutputContextImpl.java | 85 +
.../api/impl/TezProcessorContextImpl.java | 86 +
.../tez/engine/api/impl/TezTaskContextImpl.java | 145 ++
.../tez/engine/api/impl/TezUmbilical.java | 36 +
.../broadcast/input/BroadcastInputManager.java | 2 +-
.../broadcast/input/BroadcastKVReader.java | 2 +-
.../broadcast/output/FileBasedKVWriter.java | 4 +-
.../tez/engine/common/TezEngineUtils.java | 4 +-
.../tez/engine/common/combine/CombineInput.java | 176 --
.../engine/common/combine/CombineOutput.java | 55 -
.../common/localshuffle/LocalShuffle.java | 4 +-
.../engine/common/shuffle/impl/MapOutput.java | 2 +-
.../common/shuffle/impl/MergeManager.java | 2 +-
.../tez/engine/common/sort/SortingOutput.java | 32 -
.../engine/common/sort/impl/ExternalSorter.java | 2 +-
.../common/task/impl/CombineValuesIterator.java | 51 -
.../newoutput/TezLocalTaskOutputFiles.java | 249 ---
.../task/local/newoutput/TezTaskOutput.java | 165 --
.../local/newoutput/TezTaskOutputFiles.java | 246 ---
.../local/output/TezLocalTaskOutputFiles.java | 58 +-
.../common/task/local/output/TezTaskOutput.java | 45 +-
.../task/local/output/TezTaskOutputFiles.java | 65 +-
.../engine/lib/input/ShuffledMergedInput.java | 2 +-
.../engine/lib/oldinput/LocalMergedInput.java | 67 -
.../lib/oldinput/OldShuffledMergedInput.java | 74 -
.../lib/oldoutput/OldInMemorySortedOutput.java | 58 -
.../oldoutput/OldLocalOnFileSorterOutput.java | 38 -
.../lib/oldoutput/OldOnFileSortedOutput.java | 62 -
.../engine/lib/output/InMemorySortedOutput.java | 2 +-
.../lib/output/LocalOnFileSorterOutput.java | 2 +-
.../engine/lib/output/OnFileSortedOutput.java | 2 +-
.../lib/output/OnFileUnorderedKVOutput.java | 2 +-
.../org/apache/tez/engine/newapi/KVReader.java | 79 -
.../org/apache/tez/engine/newapi/KVWriter.java | 38 -
.../events/TaskAttemptCompletedEvent.java | 28 -
.../newapi/events/TaskAttemptFailedEvent.java | 35 -
.../newapi/events/TaskStatusUpdateEvent.java | 70 -
.../tez/engine/newapi/impl/EventMetaData.java | 152 --
.../tez/engine/newapi/impl/EventType.java | 29 -
.../tez/engine/newapi/impl/InputSpec.java | 88 -
.../tez/engine/newapi/impl/OutputSpec.java | 87 -
.../apache/tez/engine/newapi/impl/TaskSpec.java | 146 --
.../apache/tez/engine/newapi/impl/TezEvent.java | 248 ---
.../engine/newapi/impl/TezHeartbeatRequest.java | 137 --
.../newapi/impl/TezHeartbeatResponse.java | 105 --
.../engine/newapi/impl/TezInputContextImpl.java | 84 -
.../newapi/impl/TezOutputContextImpl.java | 85 -
.../newapi/impl/TezProcessorContextImpl.java | 86 -
.../engine/newapi/impl/TezTaskContextImpl.java | 145 --
.../tez/engine/newapi/impl/TezUmbilical.java | 36 -
.../LogicalIOProcessorRuntimeTask.java | 20 +-
.../tez/engine/newruntime/RuntimeTask.java | 6 +-
.../apache/tez/engine/runtime/RuntimeUtils.java | 164 --
.../engine/shuffle/common/DiskFetchedInput.java | 2 +-
.../org/apache/tez/engine/task/RuntimeTask.java | 92 -
.../mapred/LocalClientProtocolProviderTez.java | 108 +-
.../hadoop/mapred/LocalJobRunnerMetricsTez.java | 196 +-
.../apache/hadoop/mapred/LocalJobRunnerTez.java | 1753 +++++++++---------
.../apache/tez/mapreduce/input/SimpleInput.java | 2 +-
.../tez/mapreduce/output/SimpleOutput.java | 2 +-
.../mapreduce/processor/map/MapProcessor.java | 4 +-
.../processor/reduce/ReduceProcessor.java | 4 +-
.../tez/mapreduce/TestUmbilicalProtocol.java | 4 +-
.../tez/mapreduce/processor/MapUtils.java | 43 +-
.../processor/map/TestMapProcessor.java | 45 +-
.../processor/reduce/TestReduceProcessor.java | 76 +-
102 files changed, 2984 insertions(+), 4800 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 31898a3..f32fa6b 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -67,23 +67,23 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.engine.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.engine.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.engine.api.impl.EventMetaData;
+import org.apache.tez.engine.api.impl.InputSpec;
+import org.apache.tez.engine.api.impl.OutputSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TezEvent;
+import org.apache.tez.engine.api.impl.TezHeartbeatRequest;
+import org.apache.tez.engine.api.impl.TezHeartbeatResponse;
+import org.apache.tez.engine.api.impl.TezUmbilical;
+import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
import org.apache.tez.engine.common.objectregistry.ObjectLifeCycle;
import org.apache.tez.engine.common.objectregistry.ObjectRegistryImpl;
import org.apache.tez.engine.common.objectregistry.ObjectRegistryModule;
import org.apache.tez.engine.common.security.JobTokenIdentifier;
import org.apache.tez.engine.common.security.TokenCache;
-import org.apache.tez.engine.newapi.events.TaskAttemptCompletedEvent;
-import org.apache.tez.engine.newapi.events.TaskAttemptFailedEvent;
-import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
-import org.apache.tez.engine.newapi.impl.EventMetaData;
-import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.newapi.impl.InputSpec;
-import org.apache.tez.engine.newapi.impl.OutputSpec;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
-import org.apache.tez.engine.newapi.impl.TezEvent;
-import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
-import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
-import org.apache.tez.engine.newapi.impl.TezUmbilical;
import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.mapreduce.input.SimpleInputLegacy;
import org.apache.tez.mapreduce.output.SimpleOutput;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 2be9c5f..36486c9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -52,10 +52,10 @@ import org.apache.tez.dag.app.rm.container.AMContainerTask;
import org.apache.tez.dag.app.security.authorize.MRAMPolicyProvider;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.api.impl.TezEvent;
+import org.apache.tez.engine.api.impl.TezHeartbeatRequest;
+import org.apache.tez.engine.api.impl.TezHeartbeatResponse;
import org.apache.tez.engine.common.security.JobTokenSecretManager;
-import org.apache.tez.engine.newapi.impl.TezEvent;
-import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
-import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
import org.apache.tez.engine.records.OutputContext;
import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index 0947a41..088a195 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -26,7 +26,7 @@ import org.apache.tez.dag.api.oldrecords.TaskReport;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.engine.newapi.impl.TezEvent;
+import org.apache.tez.engine.api.impl.TezEvent;
/**
* Read only view of Task.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 85240e7..42ff8de 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -32,8 +32,8 @@ import org.apache.tez.dag.app.dag.impl.Edge;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.newapi.impl.InputSpec;
-import org.apache.tez.engine.newapi.impl.OutputSpec;
+import org.apache.tez.engine.api.impl.InputSpec;
+import org.apache.tez.engine.api.impl.OutputSpec;
import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
index 3a8c489..0b8db76 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
@@ -25,7 +25,7 @@ import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
+import org.apache.tez.engine.api.events.TaskStatusUpdateEvent;
public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventAddTezEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventAddTezEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventAddTezEvent.java
index 51f6d53..4154bd0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventAddTezEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventAddTezEvent.java
@@ -19,7 +19,7 @@
package org.apache.tez.dag.app.dag.event;
import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.engine.newapi.impl.TezEvent;
+import org.apache.tez.engine.api.impl.TezEvent;
public class TaskEventAddTezEvent extends TaskEvent {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
index c851ae0..37478cb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
@@ -21,7 +21,7 @@ package org.apache.tez.dag.app.dag.event;
import java.util.List;
import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.newapi.impl.TezEvent;
+import org.apache.tez.engine.api.impl.TezEvent;
public class VertexEventRouteEvent extends VertexEvent {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index d565978..3605857 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -31,14 +31,14 @@ import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.engine.api.impl.EventMetaData;
+import org.apache.tez.engine.api.impl.InputSpec;
+import org.apache.tez.engine.api.impl.OutputSpec;
+import org.apache.tez.engine.api.impl.TezEvent;
+import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
import org.apache.tez.engine.newapi.events.DataMovementEvent;
import org.apache.tez.engine.newapi.events.InputFailedEvent;
import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
-import org.apache.tez.engine.newapi.impl.EventMetaData;
-import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.newapi.impl.InputSpec;
-import org.apache.tez.engine.newapi.impl.OutputSpec;
-import org.apache.tez.engine.newapi.impl.TezEvent;
public class Edge {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 1ae9dcd..f2b2776 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -89,8 +89,8 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TezBuilderUtils;
-import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
+import org.apache.tez.engine.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.engine.api.impl.TaskSpec;
import com.google.common.annotations.VisibleForTesting;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 92a1859..13fa915 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -75,7 +75,7 @@ import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.newapi.impl.TezEvent;
+import org.apache.tez.engine.api.impl.TezEvent;
import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
import com.google.common.annotations.VisibleForTesting;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 0bcba4f..1ec1225 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -106,14 +106,14 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.engine.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.engine.api.impl.EventMetaData;
+import org.apache.tez.engine.api.impl.InputSpec;
+import org.apache.tez.engine.api.impl.OutputSpec;
+import org.apache.tez.engine.api.impl.TezEvent;
import org.apache.tez.engine.newapi.events.DataMovementEvent;
import org.apache.tez.engine.newapi.events.InputFailedEvent;
-import org.apache.tez.engine.newapi.events.TaskAttemptFailedEvent;
-import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
-import org.apache.tez.engine.newapi.impl.EventMetaData;
-import org.apache.tez.engine.newapi.impl.InputSpec;
-import org.apache.tez.engine.newapi.impl.OutputSpec;
-import org.apache.tez.engine.newapi.impl.TezEvent;
import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
import com.google.common.annotations.VisibleForTesting;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
index 1c30b0b..14edd96 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
index dd178fb..76e80f5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
@@ -19,7 +19,7 @@ package org.apache.tez.dag.app.rm.container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
public class AMContainerEventAssignTA extends AMContainerEvent {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index eccf92a..94dd580 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -54,7 +54,7 @@ import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
//import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
@SuppressWarnings("rawtypes")
public class AMContainerImpl implements AMContainer {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
index be1c08e..c0ef524 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
@@ -18,7 +18,7 @@
package org.apache.tez.dag.app.rm.container;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
public class AMContainerTask {
private final boolean shouldDie;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index b5e283b..676e747 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -82,7 +82,7 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 408f88a..3a6e008 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -57,9 +56,9 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.newapi.impl.InputSpec;
-import org.apache.tez.engine.newapi.impl.OutputSpec;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.InputSpec;
+import org.apache.tez.engine.api.impl.OutputSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
import org.junit.Test;
import com.google.common.collect.Lists;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java b/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
index 3c18d9f..e90f7fa 100644
--- a/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
@@ -22,7 +22,7 @@ import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
public class ContainerTask implements Writable {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/common/RunningTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/RunningTaskContext.java b/tez-engine/src/main/java/org/apache/tez/common/RunningTaskContext.java
deleted file mode 100644
index aac4095..0000000
--- a/tez-engine/src/main/java/org/apache/tez/common/RunningTaskContext.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.common;
-
-import java.io.IOException;
-
-import javax.crypto.SecretKey;
-
-import org.apache.hadoop.util.Progress;
-import org.apache.tez.engine.api.Partitioner;
-import org.apache.tez.engine.api.Processor;
-
-public class RunningTaskContext {
-
- protected SecretKey jobTokenSecret;
- protected TezTaskReporter reporter;
- protected Partitioner partitioner;
- protected Processor combineProcessor;
- protected TezTaskStatus status;
- protected Progress progress = new Progress();
-
- public Progress getProgress() {
- return progress;
- }
-
- public void setJobTokenSecret(SecretKey jobTokenSecret) {
- this.jobTokenSecret = jobTokenSecret;
- }
-
- public TezTaskStatus getStatus() {
- return status;
- }
-
- public TezTaskReporter getTaskReporter() {
- return reporter;
- }
-
- // TODO Doesn't belong here.
- public Processor getCombineProcessor() {
- return combineProcessor;
- }
-
- // TODO Doesn't belong here.
- public Partitioner getPartitioner() {
- return partitioner;
- }
-
- // TODO Doesn't belong here.
- public SecretKey getJobTokenSecret() {
- return jobTokenSecret;
- }
-
- public void statusUpdate() throws IOException, InterruptedException {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java b/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
deleted file mode 100644
index c012928..0000000
--- a/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.common;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-public class TezEngineTaskContext extends TezTaskContext {
-
- // These two could be replaced by a TezConfiguration / DagSpec.
- private List<InputSpec> inputSpecList;
- private List<OutputSpec> outputSpecList;
- private ProcessorDescriptor processorDescriptor;
-
- public TezEngineTaskContext() {
- super();
- }
-
- public TezEngineTaskContext(TezTaskAttemptID taskAttemptID, String user,
- String jobName, String vertexName, ProcessorDescriptor processorDescriptor,
- List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList) {
- super(taskAttemptID, user, jobName, vertexName);
- this.inputSpecList = inputSpecList;
- this.outputSpecList = outputSpecList;
- if (this.inputSpecList == null) {
- inputSpecList = new ArrayList<InputSpec>(0);
- }
- if (this.outputSpecList == null) {
- outputSpecList = new ArrayList<OutputSpec>(0);
- }
- this.inputSpecList = inputSpecList;
- this.outputSpecList = outputSpecList;
- this.processorDescriptor = processorDescriptor;
- }
-
- public String getRuntimeName() {
- // FIXME. Add this to the DAG configuration, and fetch from there.
- return "org.apache.tez.mapreduce.task.MRRuntimeTask";
- }
-
- public String getProcessorName() {
- return processorDescriptor.getClassName();
- }
-
- public byte[] getProcessorUserPayload() {
- return processorDescriptor.getUserPayload();
- }
-
- public List<InputSpec> getInputSpecList() {
- return this.inputSpecList;
- }
-
- public List<OutputSpec> getOutputSpecList() {
- return this.outputSpecList;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- byte[] procDesc =
- DagTypeConverters.convertToDAGPlan(processorDescriptor).toByteArray();
- out.writeInt(procDesc.length);
- out.write(procDesc);
- out.writeInt(inputSpecList.size());
- for (InputSpec inputSpec : inputSpecList) {
- inputSpec.write(out);
- }
- out.writeInt(outputSpecList.size());
- for (OutputSpec outputSpec : outputSpecList) {
- outputSpec.write(out);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- int procDescLength = in.readInt();
- // TODO at least 3 buffer copies here. Need to convert this to full PB
- // TEZ-305
- byte[] procDescBytes = new byte[procDescLength];
- in.readFully(procDescBytes);
- processorDescriptor = DagTypeConverters.convertProcessorDescriptorFromDAGPlan(
- TezEntityDescriptorProto.parseFrom(procDescBytes));
- int numInputSpecs = in.readInt();
- inputSpecList = new ArrayList<InputSpec>(numInputSpecs);
- for (int i = 0; i < numInputSpecs; i++) {
- InputSpec inputSpec = new InputSpec();
- inputSpec.readFields(in);
- inputSpecList.add(inputSpec);
- }
- int numOutputSpecs = in.readInt();
- outputSpecList = new ArrayList<OutputSpec>(numOutputSpecs);
- for (int i = 0; i < numOutputSpecs; i++) {
- OutputSpec outputSpec = new OutputSpec();
- outputSpec.readFields(in);
- outputSpecList.add(outputSpec);
- }
- }
-
- @Override
- public String toString() {
- StringBuffer sb = new StringBuffer();
- sb.append("processorName=" + getProcessorName()
- + ", inputSpecListSize=" + inputSpecList.size()
- + ", outputSpecListSize=" + outputSpecList.size());
- sb.append(", inputSpecList=[");
- for (InputSpec i : inputSpecList) {
- sb.append("{" + i.toString() + "}, ");
- }
- sb.append("], outputSpecList=[");
- for (OutputSpec i : outputSpecList) {
- sb.append("{" + i.toString() + "}, ");
- }
- sb.append("]");
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/common/TezTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/TezTaskReporter.java b/tez-engine/src/main/java/org/apache/tez/common/TezTaskReporter.java
deleted file mode 100644
index 1931e31..0000000
--- a/tez-engine/src/main/java/org/apache/tez/common/TezTaskReporter.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.common;
-
-import java.io.IOException;
-
-import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.util.Progressable;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
-
-public interface TezTaskReporter extends Progressable, Master {
-
- public void setStatus(String status);
-
- public float getProgress();
-
- public void setProgress(float progress);
-
- public void progress();
-
- public TezCounter getCounter(String group, String name);
-
- public TezCounter getCounter(Enum<?> name);
-
- public void incrCounter(String group, String counter, long amount);
-
- public void incrCounter(Enum<?> key, long amount);
-
- public void reportFatalError(TezTaskAttemptID taskAttemptId,
- Throwable exception, String logMsg);
-
- public final TezTaskReporter NULL = new TezTaskReporter() {
-
- @Override
- public TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents(
- int fromEventIdx, int maxEventsToFetch,
- TezTaskAttemptID reduce) {
- return null;
- }
-
- @Override
- public void setStatus(String status) {
- }
-
- @Override
- public void setProgress(float progress) {
- }
-
- @Override
- public void progress() {
- }
-
- @Override
- public void incrCounter(Enum<?> key, long amount) {
- }
-
- @Override
- public void incrCounter(String group, String counter, long amount) {
- }
-
- @Override
- public float getProgress() {
- return 0.0f;
- }
-
- @Override
- public TezCounter getCounter(Enum<?> name) {
- return null;
- }
-
- @Override
- public TezCounter getCounter(String group, String name) {
- return null;
- }
-
- @Override
- public void reportFatalError(TezTaskAttemptID taskAttemptId,
- Throwable exception, String logMsg) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public long getProtocolVersion(String protocol, long clientVersion)
- throws IOException {
- // TODO TEZAM3
- return 0;
- }
-
- @Override
- public ProtocolSignature getProtocolSignature(String protocol,
- long clientVersion, int clientMethodsHash) throws IOException {
- // TODO TEZAM3
- return null;
- }
- };
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java b/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
index 28991a8..7d81b4c 100644
--- a/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
+++ b/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
@@ -22,13 +22,15 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.tez.common.records.ProceedToCompletionResponse;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
-import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
+import org.apache.tez.engine.api.impl.TezHeartbeatRequest;
+import org.apache.tez.engine.api.impl.TezHeartbeatResponse;
import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
/** Protocol that task child process uses to contact its parent process. The
* parent is a daemon which which polls the central master for a new map or
@@ -36,7 +38,8 @@ import org.apache.tez.engine.records.OutputContext;
* and parent is via this protocol. */
@InterfaceAudience.Private
@InterfaceStability.Stable
-public interface TezTaskUmbilicalProtocol extends Master {
+@ProtocolInfo(protocolName = "TezTaskUmbilicalProtocol", protocolVersion = 1)
+public interface TezTaskUmbilicalProtocol extends VersionedProtocol {
public static final long versionID = 19L;
@@ -68,4 +71,7 @@ public interface TezTaskUmbilicalProtocol extends Master {
public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
throws IOException, TezException;
+ public TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents(
+ int fromEventIdx, int maxEventsToFetch,
+ TezTaskAttemptID taskAttemptId);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/KVReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/KVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/api/KVReader.java
new file mode 100644
index 0000000..150b598
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/KVReader.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+import java.io.IOException;
+
+import org.apache.tez.engine.newapi.Reader;
+
+/**
+ * A key/value(s) pair based {@link Reader}.
+ *
+ * Example usage
+ * <code>
+ * while (kvReader.next()) {
+ * KVRecord kvRecord = getCurrentKV();
+ * Object key = kvRecord.getKey();
+ * Iterable values = kvRecord.getValues();
+ * </code>
+ *
+ */
+public interface KVReader extends Reader {
+
+ /**
+ * Moves to the next key/values(s) pair
+ *
+ * @return true if another key/value(s) pair exists, false if there are no more.
+ * @throws IOException
+ * if an error occurs
+ */
+ public boolean next() throws IOException;
+
+ /**
+ * Return the current key/value(s) pair. Use moveToNext() to advance.
+ * @return
+ * @throws IOException
+ */
+ public KVRecord getCurrentKV() throws IOException;
+
+ // TODO NEWTEZ Move this to getCurrentKey and getCurrentValue independently. Otherwise usage pattern seems to be getCurrentKV.getKey, getCurrentKV.getValue
+
+ // TODO NEWTEZ KVRecord which does not need to return a list!
+ // TODO NEWTEZ Parameterize this
+ /**
+ * Represents a key and an associated set of values
+ *
+ */
+ public static class KVRecord {
+
+ private Object key;
+ private Iterable<Object> values;
+
+ public KVRecord(Object key, Iterable<Object> values) {
+ this.key = key;
+ this.values = values;
+ }
+
+ public Object getKey() {
+ return this.key;
+ }
+
+ public Iterable<Object> getValues() {
+ return this.values;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/KVWriter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/KVWriter.java b/tez-engine/src/main/java/org/apache/tez/engine/api/KVWriter.java
new file mode 100644
index 0000000..079d488
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/KVWriter.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+import java.io.IOException;
+
+import org.apache.tez.engine.newapi.Writer;
+
+/**
+ * A key/value(s) pair based {@link Writer}
+ */
+public interface KVWriter extends Writer {
+ /**
+ * Writes a key/value pair.
+ *
+ * @param key
+ * the key to write
+ * @param value
+ * the value to write
+ * @throws IOException
+ * if an error occurs
+ */
+ public void write(Object key, Object value) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptCompletedEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptCompletedEvent.java
new file mode 100644
index 0000000..3a90f56
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptCompletedEvent.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.events;
+
+import org.apache.tez.engine.newapi.Event;
+
+public class TaskAttemptCompletedEvent extends Event {
+
+ public TaskAttemptCompletedEvent() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptFailedEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptFailedEvent.java
new file mode 100644
index 0000000..bd0bc04
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptFailedEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.events;
+
+import org.apache.tez.engine.newapi.Event;
+
+public class TaskAttemptFailedEvent extends Event {
+
+ private final String diagnostics;
+
+ public TaskAttemptFailedEvent(String diagnostics) {
+ this.diagnostics = diagnostics;
+ }
+
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskStatusUpdateEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskStatusUpdateEvent.java
new file mode 100644
index 0000000..c0d77da
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskStatusUpdateEvent.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.events;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.engine.newapi.Event;
+
+public class TaskStatusUpdateEvent extends Event implements Writable {
+
+ private TezCounters tezCounters;
+ private float progress;
+
+ public TaskStatusUpdateEvent() {
+ }
+
+ public TaskStatusUpdateEvent(TezCounters tezCounters, float progress) {
+ this.tezCounters = tezCounters;
+ this.progress = progress;
+ }
+
+ public TezCounters getCounters() {
+ return tezCounters;
+ }
+
+ public float getProgress() {
+ return progress;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeFloat(progress);
+ if (tezCounters != null) {
+ out.writeBoolean(true);
+ tezCounters.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ progress = in.readFloat();
+ if (in.readBoolean()) {
+ tezCounters = new TezCounters();
+ tezCounters.readFields(in);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventMetaData.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventMetaData.java
new file mode 100644
index 0000000..64df7bb
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventMetaData.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+/**
+ * Class that encapsulates all the information to identify the unique
+ * object that either generated an Event or is the recipient of an Event.
+ */
+public class EventMetaData implements Writable {
+
+ public static enum EventProducerConsumerType {
+ INPUT,
+ PROCESSOR,
+ OUTPUT,
+ SYSTEM
+ }
+
+ /**
+ * Producer Type ( one of Input/Output/Processor ) that generated the Event
+ * or Consumer Type that will consume the Event.
+ */
+ private EventProducerConsumerType producerConsumerType;
+
+ /**
+ * Name of the vertex where the event was generated.
+ */
+ private String taskVertexName;
+
+ /**
+ * Name of the vertex to which the Input or Output is connected to.
+ */
+ private String edgeVertexName;
+
+ /**
+ * i'th physical input/output that this event maps to.
+ */
+ private int index;
+
+ /**
+ * Task Attempt ID
+ */
+ private TezTaskAttemptID taskAttemptID;
+
+ public EventMetaData() {
+ }
+
+ public EventMetaData(EventProducerConsumerType generator,
+ String taskVertexName, String edgeVertexName,
+ TezTaskAttemptID taskAttemptID) {
+ this.producerConsumerType = generator;
+ this.taskVertexName = taskVertexName;
+ this.edgeVertexName = edgeVertexName;
+ this.taskAttemptID = taskAttemptID;
+ }
+
+ public EventProducerConsumerType getEventGenerator() {
+ return producerConsumerType;
+ }
+
+ public TezTaskAttemptID getTaskAttemptID() {
+ return taskAttemptID;
+ }
+
+ public String getTaskVertexName() {
+ return taskVertexName;
+ }
+
+ public String getEdgeVertexName() {
+ return edgeVertexName;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(producerConsumerType.ordinal());
+ if (taskVertexName != null) {
+ out.writeBoolean(true);
+ out.writeUTF(taskVertexName);
+ } else {
+ out.writeBoolean(false);
+ }
+ if (edgeVertexName != null) {
+ out.writeBoolean(true);
+ out.writeUTF(edgeVertexName);
+ } else {
+ out.writeBoolean(false);
+ }
+ if(taskAttemptID != null) {
+ out.writeBoolean(true);
+ taskAttemptID.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+
+ out.writeInt(index);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ producerConsumerType = EventProducerConsumerType.values()[in.readInt()];
+ if (in.readBoolean()) {
+ taskVertexName = in.readUTF();
+ }
+ if (in.readBoolean()) {
+ edgeVertexName = in.readUTF();
+ }
+ if (in.readBoolean()) {
+ taskAttemptID = new TezTaskAttemptID();
+ taskAttemptID.readFields(in);
+ }
+ index = in.readInt();
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+ public void setIndex(int index) {
+ this.index = index;
+ }
+
+ @Override
+ public String toString() {
+ return "{ producerConsumerType=" + producerConsumerType
+ + ", taskVertexName=" + taskVertexName
+ + ", edgeVertexName=" + edgeVertexName
+ + ", taskAttemptId=" + taskAttemptID
+ + ", index=" + index + " }";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventType.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventType.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventType.java
new file mode 100644
index 0000000..52fc10d
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventType.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+public enum EventType {
+ TASK_ATTEMPT_COMPLETED_EVENT,
+ TASK_ATTEMPT_FAILED_EVENT,
+ DATA_MOVEMENT_EVENT,
+ INPUT_READ_ERROR_EVENT,
+ INPUT_FAILED_EVENT,
+ INTPUT_INFORMATION_EVENT,
+ TASK_STATUS_UPDATE_EVENT
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/InputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/InputSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/InputSpec.java
new file mode 100644
index 0000000..a9ef333
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/InputSpec.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+
+public class InputSpec implements Writable {
+
+ private String sourceVertexName;
+ private InputDescriptor inputDescriptor;
+ private int physicalEdgeCount;
+
+ public InputSpec() {
+ }
+
+ public InputSpec(String sourceVertexName, InputDescriptor inputDescriptor,
+ int physicalEdgeCount) {
+ this.sourceVertexName = sourceVertexName;
+ this.inputDescriptor = inputDescriptor;
+ this.physicalEdgeCount = physicalEdgeCount;
+ }
+
+ public String getSourceVertexName() {
+ return sourceVertexName;
+ }
+
+ public InputDescriptor getInputDescriptor() {
+ return inputDescriptor;
+ }
+
+ public int getPhysicalEdgeCount() {
+ return physicalEdgeCount;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // TODONEWTEZ convert to PB
+ out.writeUTF(sourceVertexName);
+ out.writeInt(physicalEdgeCount);
+ byte[] inputDescBytes =
+ DagTypeConverters.convertToDAGPlan(inputDescriptor).toByteArray();
+ out.writeInt(inputDescBytes.length);
+ out.write(inputDescBytes);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ sourceVertexName = in.readUTF();
+ physicalEdgeCount = in.readInt();
+ int inputDescLen = in.readInt();
+ byte[] inputDescBytes = new byte[inputDescLen];
+ in.readFully(inputDescBytes);
+ inputDescriptor =
+ DagTypeConverters.convertInputDescriptorFromDAGPlan(
+ TezEntityDescriptorProto.parseFrom(inputDescBytes));
+ }
+
+ public String toString() {
+ return "{ sourceVertexName=" + sourceVertexName
+ + ", physicalEdgeCount" + physicalEdgeCount
+ + ", inputClassName=" + inputDescriptor.getClassName()
+ + " }";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/OutputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/OutputSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/OutputSpec.java
new file mode 100644
index 0000000..3a1d5d8
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/OutputSpec.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+
+public class OutputSpec implements Writable {
+
+ private String destinationVertexName;
+ private OutputDescriptor outputDescriptor;
+ private int physicalEdgeCount;
+
+ public OutputSpec() {
+ }
+
+ public OutputSpec(String destinationVertexName,
+ OutputDescriptor outputDescriptor, int physicalEdgeCount) {
+ this.destinationVertexName = destinationVertexName;
+ this.outputDescriptor = outputDescriptor;
+ this.physicalEdgeCount = physicalEdgeCount;
+ }
+
+ public String getDestinationVertexName() {
+ return destinationVertexName;
+ }
+
+ public OutputDescriptor getOutputDescriptor() {
+ return outputDescriptor;
+ }
+
+ public int getPhysicalEdgeCount() {
+ return physicalEdgeCount;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // TODONEWTEZ convert to PB
+ out.writeUTF(destinationVertexName);
+ out.writeInt(physicalEdgeCount);
+ byte[] inputDescBytes =
+ DagTypeConverters.convertToDAGPlan(outputDescriptor).toByteArray();
+ out.writeInt(inputDescBytes.length);
+ out.write(inputDescBytes);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ destinationVertexName = in.readUTF();
+ physicalEdgeCount = in.readInt();
+ int inputDescLen = in.readInt();
+ byte[] inputDescBytes = new byte[inputDescLen];
+ in.readFully(inputDescBytes);
+ outputDescriptor =
+ DagTypeConverters.convertOutputDescriptorFromDAGPlan(
+ TezEntityDescriptorProto.parseFrom(inputDescBytes));
+ }
+
+ public String toString() {
+ return "{ destinationVertexName=" + destinationVertexName
+ + ", physicalEdgeCount" + physicalEdgeCount
+ + ", outputClassName=" + outputDescriptor.getClassName()
+ + " }";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TaskSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TaskSpec.java
new file mode 100644
index 0000000..6527777
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TaskSpec.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public class TaskSpec implements Writable {
+
+ private TezTaskAttemptID taskAttemptId;
+ private String vertexName;
+ private String user;
+ private ProcessorDescriptor processorDescriptor;
+ private List<InputSpec> inputSpecList;
+ private List<OutputSpec> outputSpecList;
+
+ public TaskSpec() {
+ }
+
+ // TODO NEWTEZ Remove user
+ public TaskSpec(TezTaskAttemptID taskAttemptID, String user,
+ String vertexName, ProcessorDescriptor processorDescriptor,
+ List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList) {
+ this.taskAttemptId = taskAttemptID;
+ this.vertexName = vertexName;
+ this.user = user;
+ this.processorDescriptor = processorDescriptor;
+ this.inputSpecList = inputSpecList;
+ this.outputSpecList = outputSpecList;
+ }
+
+ public String getVertexName() {
+ return vertexName;
+ }
+
+ public TezTaskAttemptID getTaskAttemptID() {
+ return taskAttemptId;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public ProcessorDescriptor getProcessorDescriptor() {
+ return processorDescriptor;
+ }
+
+ public List<InputSpec> getInputs() {
+ return inputSpecList;
+ }
+
+ public List<OutputSpec> getOutputs() {
+ return outputSpecList;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ taskAttemptId.write(out);
+ out.writeUTF(vertexName);
+ byte[] procDesc =
+ DagTypeConverters.convertToDAGPlan(processorDescriptor).toByteArray();
+ out.writeInt(procDesc.length);
+ out.write(procDesc);
+ out.writeInt(inputSpecList.size());
+ for (InputSpec inputSpec : inputSpecList) {
+ inputSpec.write(out);
+ }
+ out.writeInt(outputSpecList.size());
+ for (OutputSpec outputSpec : outputSpecList) {
+ outputSpec.write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ taskAttemptId = new TezTaskAttemptID();
+ taskAttemptId.readFields(in);
+ vertexName = in.readUTF();
+ int procDescLength = in.readInt();
+ // TODO at least 3 buffer copies here. Need to convert this to full PB
+ // TEZ-305
+ byte[] procDescBytes = new byte[procDescLength];
+ in.readFully(procDescBytes);
+ processorDescriptor =
+ DagTypeConverters.convertProcessorDescriptorFromDAGPlan(
+ TezEntityDescriptorProto.parseFrom(procDescBytes));
+ int numInputSpecs = in.readInt();
+ inputSpecList = new ArrayList<InputSpec>(numInputSpecs);
+ for (int i = 0; i < numInputSpecs; i++) {
+ InputSpec inputSpec = new InputSpec();
+ inputSpec.readFields(in);
+ inputSpecList.add(inputSpec);
+ }
+ int numOutputSpecs = in.readInt();
+ outputSpecList = new ArrayList<OutputSpec>(numOutputSpecs);
+ for (int i = 0; i < numOutputSpecs; i++) {
+ OutputSpec outputSpec = new OutputSpec();
+ outputSpec.readFields(in);
+ outputSpecList.add(outputSpec);
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("TaskAttemptID:" + taskAttemptId);
+ sb.append("processorName=" + processorDescriptor.getClassName()
+ + ", inputSpecListSize=" + inputSpecList.size()
+ + ", outputSpecListSize=" + outputSpecList.size());
+ sb.append(", inputSpecList=[");
+ for (InputSpec i : inputSpecList) {
+ sb.append("{" + i.toString() + "}, ");
+ }
+ sb.append("], outputSpecList=[");
+ for (OutputSpec i : outputSpecList) {
+ sb.append("{" + i.toString() + "}, ");
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezEvent.java
new file mode 100644
index 0000000..9d0228d
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezEvent.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.engine.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.engine.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.engine.api.events.EventProtos.DataMovementEventProto;
+import org.apache.tez.engine.api.events.EventProtos.InputFailedEventProto;
+import org.apache.tez.engine.api.events.EventProtos.InputInformationEventProto;
+import org.apache.tez.engine.api.events.EventProtos.InputReadErrorEventProto;
+import org.apache.tez.engine.api.events.SystemEventProtos.TaskAttemptCompletedEventProto;
+import org.apache.tez.engine.api.events.SystemEventProtos.TaskAttemptFailedEventProto;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.newapi.events.InputInformationEvent;
+import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
+
+import com.google.protobuf.ByteString;
+
+public class TezEvent implements Writable {
+
+ private EventType eventType;
+
+ private Event event;
+
+ private EventMetaData sourceInfo;
+
+ private EventMetaData destinationInfo;
+
+ public TezEvent() {
+ }
+
+ public TezEvent(Event event, EventMetaData sourceInfo) {
+ this.event = event;
+ this.setSourceInfo(sourceInfo);
+ if (event instanceof DataMovementEvent) {
+ eventType = EventType.DATA_MOVEMENT_EVENT;
+ } else if (event instanceof InputReadErrorEvent) {
+ eventType = EventType.INPUT_READ_ERROR_EVENT;
+ } else if (event instanceof TaskAttemptFailedEvent) {
+ eventType = EventType.TASK_ATTEMPT_FAILED_EVENT;
+ } else if (event instanceof TaskAttemptCompletedEvent) {
+ eventType = EventType.TASK_ATTEMPT_COMPLETED_EVENT;
+ } else if (event instanceof InputInformationEvent) {
+ eventType = EventType.INTPUT_INFORMATION_EVENT;
+ } else if (event instanceof InputFailedEvent) {
+ eventType = EventType.INPUT_FAILED_EVENT;
+ } else if (event instanceof TaskStatusUpdateEvent) {
+ eventType = EventType.TASK_STATUS_UPDATE_EVENT;
+ } else {
+ throw new TezUncheckedException("Unknown event, event="
+ + event.getClass().getName());
+ }
+ }
+
+ public Event getEvent() {
+ return event;
+ }
+
+ public EventMetaData getSourceInfo() {
+ return sourceInfo;
+ }
+
+ public void setSourceInfo(EventMetaData sourceInfo) {
+ this.sourceInfo = sourceInfo;
+ }
+
+ public EventMetaData getDestinationInfo() {
+ return destinationInfo;
+ }
+
+ public void setDestinationInfo(EventMetaData destinationInfo) {
+ this.destinationInfo = destinationInfo;
+ }
+
+ public EventType getEventType() {
+ return eventType;
+ }
+
+ private void serializeEvent(DataOutput out) throws IOException {
+ if (event == null) {
+ out.writeBoolean(false);
+ return;
+ }
+ out.writeBoolean(true);
+ out.writeInt(eventType.ordinal());
+ if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
+ // TODO NEWTEZ convert to PB
+ TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event;
+ sEvt.write(out);
+ } else {
+ byte[] eventBytes = null;
+ switch (eventType) {
+ case DATA_MOVEMENT_EVENT:
+ DataMovementEvent dmEvt = (DataMovementEvent) event;
+ eventBytes = DataMovementEventProto.newBuilder()
+ .setSourceIndex(dmEvt.getSourceIndex())
+ .setTargetIndex(dmEvt.getTargetIndex())
+ .setUserPayload(ByteString.copyFrom(dmEvt.getUserPayload()))
+ .build().toByteArray();
+ break;
+ case INPUT_READ_ERROR_EVENT:
+ InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
+ eventBytes = InputReadErrorEventProto.newBuilder()
+ .setIndex(ideEvt.getIndex())
+ .setDiagnostics(ideEvt.getDiagnostics())
+ .build().toByteArray();
+ break;
+ case TASK_ATTEMPT_FAILED_EVENT:
+ TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event;
+ eventBytes = TaskAttemptFailedEventProto.newBuilder()
+ .setDiagnostics(tfEvt.getDiagnostics())
+ .build().toByteArray();
+ break;
+ case TASK_ATTEMPT_COMPLETED_EVENT:
+ eventBytes = TaskAttemptCompletedEventProto.newBuilder()
+ .build().toByteArray();
+ break;
+ case INPUT_FAILED_EVENT:
+ InputFailedEvent ifEvt = (InputFailedEvent) event;
+ eventBytes = InputFailedEventProto.newBuilder()
+ .setSourceIndex(ifEvt.getSourceIndex())
+ .setTargetIndex(ifEvt.getTargetIndex())
+ .setVersion(ifEvt.getVersion()).build().toByteArray();
+ case INTPUT_INFORMATION_EVENT:
+ InputInformationEvent iEvt = (InputInformationEvent) event;
+ eventBytes = InputInformationEventProto.newBuilder()
+ .setUserPayload(ByteString.copyFrom(iEvt.getUserPayload()))
+ .build().toByteArray();
+ default:
+ throw new TezUncheckedException("Unknown TezEvent"
+ + ", type=" + eventType);
+ }
+ out.writeInt(eventBytes.length);
+ out.write(eventBytes);
+ }
+ }
+
+ private void deserializeEvent(DataInput in) throws IOException {
+ if (!in.readBoolean()) {
+ event = null;
+ return;
+ }
+ eventType = EventType.values()[in.readInt()];
+ if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
+ // TODO NEWTEZ convert to PB
+ event = new TaskStatusUpdateEvent();
+ ((TaskStatusUpdateEvent)event).readFields(in);
+ } else {
+ int eventBytesLen = in.readInt();
+ byte[] eventBytes = new byte[eventBytesLen];
+ in.readFully(eventBytes);
+ switch (eventType) {
+ case DATA_MOVEMENT_EVENT:
+ DataMovementEventProto dmProto =
+ DataMovementEventProto.parseFrom(eventBytes);
+ event = new DataMovementEvent(dmProto.getSourceIndex(),
+ dmProto.getTargetIndex(),
+ dmProto.getUserPayload().toByteArray());
+ break;
+ case INPUT_READ_ERROR_EVENT:
+ InputReadErrorEventProto ideProto =
+ InputReadErrorEventProto.parseFrom(eventBytes);
+ event = new InputReadErrorEvent(ideProto.getDiagnostics(),
+ ideProto.getIndex(), ideProto.getVersion());
+ break;
+ case TASK_ATTEMPT_FAILED_EVENT:
+ TaskAttemptFailedEventProto tfProto =
+ TaskAttemptFailedEventProto.parseFrom(eventBytes);
+ event = new TaskAttemptFailedEvent(tfProto.getDiagnostics());
+ break;
+ case TASK_ATTEMPT_COMPLETED_EVENT:
+ event = new TaskAttemptCompletedEvent();
+ break;
+ case INPUT_FAILED_EVENT:
+ InputFailedEventProto ifProto =
+ InputFailedEventProto.parseFrom(eventBytes);
+ event = new InputFailedEvent(ifProto.getSourceIndex(),
+ ifProto.getTargetIndex(), ifProto.getVersion());
+ break;
+ case INTPUT_INFORMATION_EVENT:
+ InputInformationEventProto infoProto =
+ InputInformationEventProto.parseFrom(eventBytes);
+ event = new InputInformationEvent(
+ infoProto.getUserPayload().toByteArray());
+ break;
+ default:
+ throw new TezUncheckedException("Unknown TezEvent"
+ + ", type=" + eventType);
+ }
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ serializeEvent(out);
+ if (sourceInfo != null) {
+ out.writeBoolean(true);
+ sourceInfo.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+ if (destinationInfo != null) {
+ out.writeBoolean(true);
+ destinationInfo.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ deserializeEvent(in);
+ if (in.readBoolean()) {
+ sourceInfo = new EventMetaData();
+ sourceInfo.readFields(in);
+ }
+ if (in.readBoolean()) {
+ destinationInfo = new EventMetaData();
+ destinationInfo.readFields(in);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatRequest.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatRequest.java
new file mode 100644
index 0000000..dc1a447
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatRequest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+
+public class TezHeartbeatRequest implements Writable {
+
+ private String containerIdentifier;
+ private List<TezEvent> events;
+ private TezTaskAttemptID currentTaskAttemptID;
+ private int startIndex;
+ private int maxEvents;
+ private long requestId;
+
+ public TezHeartbeatRequest() {
+ }
+
+ public TezHeartbeatRequest(long requestId, List<TezEvent> events,
+ String containerIdentifier, TezTaskAttemptID taskAttemptID,
+ int startIndex, int maxEvents) {
+ this.containerIdentifier = containerIdentifier;
+ this.requestId = requestId;
+ this.events = Collections.unmodifiableList(events);
+ this.startIndex = startIndex;
+ this.maxEvents = maxEvents;
+ this.currentTaskAttemptID = taskAttemptID;
+ }
+
+ public String getContainerIdentifier() {
+ return containerIdentifier;
+ }
+
+ public List<TezEvent> getEvents() {
+ return events;
+ }
+
+ public int getStartIndex() {
+ return startIndex;
+ }
+
+ public int getMaxEvents() {
+ return maxEvents;
+ }
+
+ public long getRequestId() {
+ return requestId;
+ }
+
+ public TezTaskAttemptID getCurrentTaskAttemptID() {
+ return currentTaskAttemptID;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if (events != null) {
+ out.writeBoolean(true);
+ out.writeInt(events.size());
+ for (TezEvent e : events) {
+ e.write(out);
+ }
+ } else {
+ out.writeBoolean(false);
+ }
+ if (currentTaskAttemptID != null) {
+ out.writeBoolean(true);
+ currentTaskAttemptID.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+ out.writeInt(startIndex);
+ out.writeInt(maxEvents);
+ out.writeLong(requestId);
+ Text.writeString(out, containerIdentifier);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ if (in.readBoolean()) {
+ int eventsCount = in.readInt();
+ events = new ArrayList<TezEvent>(eventsCount);
+ for (int i = 0; i < eventsCount; ++i) {
+ TezEvent e = new TezEvent();
+ e.readFields(in);
+ events.add(e);
+ }
+ }
+ if (in.readBoolean()) {
+ currentTaskAttemptID = new TezTaskAttemptID();
+ currentTaskAttemptID.readFields(in);
+ } else {
+ currentTaskAttemptID = null;
+ }
+ startIndex = in.readInt();
+ maxEvents = in.readInt();
+ requestId = in.readLong();
+ containerIdentifier = Text.readString(in);
+ }
+
+ @Override
+ public String toString() {
+ return "{ "
+ + " containerId=" + containerIdentifier
+ + ", requestId=" + requestId
+ + ", startIndex=" + startIndex
+ + ", maxEventsToGet=" + maxEvents
+ + ", taskAttemptId" + currentTaskAttemptID
+ + ", eventCount=" + (events != null ? events.size() : 0)
+ + " }";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatResponse.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatResponse.java
new file mode 100644
index 0000000..22ae7eb
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatResponse.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+
+public class TezHeartbeatResponse implements Writable {
+
+ private long lastRequestId;
+ private boolean shouldDie = false;
+ private List<TezEvent> events;
+
+ public TezHeartbeatResponse() {
+ }
+
+ public TezHeartbeatResponse(List<TezEvent> events) {
+ this.events = Collections.unmodifiableList(events);
+ }
+
+ public List<TezEvent> getEvents() {
+ return events;
+ }
+
+ public boolean shouldDie() {
+ return shouldDie;
+ }
+
+ public long getLastRequestId() {
+ return lastRequestId;
+ }
+
+ public void setEvents(List<TezEvent> events) {
+ this.events = Collections.unmodifiableList(events);
+ }
+
+ public void setLastRequestId(long lastRequestId ) {
+ this.lastRequestId = lastRequestId;
+ }
+
+ public void setShouldDie() {
+ this.shouldDie = true;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(lastRequestId);
+ out.writeBoolean(shouldDie);
+ if(events != null) {
+ out.writeBoolean(true);
+ out.writeInt(events.size());
+ for (TezEvent e : events) {
+ e.write(out);
+ }
+ } else {
+ out.writeBoolean(false);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ lastRequestId = in.readLong();
+ shouldDie = in.readBoolean();
+ if(in.readBoolean()) {
+ int eventCount = in.readInt();
+ events = new ArrayList<TezEvent>(eventCount);
+ for (int i = 0; i < eventCount; ++i) {
+ TezEvent e = new TezEvent();
+ e.readFields(in);
+ events.add(e);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "{ "
+ + " lastRequestId=" + lastRequestId
+ + ", shouldDie=" + shouldDie
+ + ", eventCount=" + (events != null ? events.size() : 0)
+ + " }";
+ }
+}