You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:52 UTC
[45/50] [abbrv] Rename tez-engine-api to tez-runtime-api and
tez-engine is split into 2: - tez-engine-library for user-visible
Input/Output/Processor implementations - tez-engine-internals for framework
internals
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-dag/src/main/java/org/apache/tez/engine/records/TezDependentTaskCompletionEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/engine/records/TezDependentTaskCompletionEvent.java b/tez-dag/src/main/java/org/apache/tez/engine/records/TezDependentTaskCompletionEvent.java
deleted file mode 100644
index 7a4dd13..0000000
--- a/tez-dag/src/main/java/org/apache/tez/engine/records/TezDependentTaskCompletionEvent.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.records;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-/**
- * This is used to track task completion events on
- * job tracker.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Stable
-// TODO TEZAM3 This needs to be more generic. Maybe some kind of a serialized
-// blob - which can be interpretted by the Input plugin.
-public class TezDependentTaskCompletionEvent implements Writable {
- @InterfaceAudience.Public
- @InterfaceStability.Evolving
- // TODO EVENTUALLY - Remove TIPFAILED state ?
- static public enum Status {FAILED, KILLED, SUCCEEDED, OBSOLETE, TIPFAILED};
-
- private int eventId;
- private int taskRunTime; // using int since runtime is the time difference
- private TezTaskAttemptID taskAttemptId;
- private long dataSize;
- Status status;
- byte[] userPayload;
- // TODO TEZAM2 Get rid of the isMap field. Job specific type information can be determined from TaskAttemptId.getTaskType
-// boolean isMap = false;
- public static final TezDependentTaskCompletionEvent[] EMPTY_ARRAY =
- new TezDependentTaskCompletionEvent[0];
-
- public TezDependentTaskCompletionEvent() {
- taskAttemptId = new TezTaskAttemptID();
- }
-
- /**
- * Constructor. eventId should be created externally and incremented
- * per event for each job.
- * @param eventId event id, event id should be unique and assigned in
- * incrementally, starting from 0.
- * @param taskAttemptId task id
- * @param status task's status
- * @param taskTrackerHttp task tracker's host:port for http.
- */
- public TezDependentTaskCompletionEvent(int eventId,
- TezTaskAttemptID taskAttemptId,
-// boolean isMap,
- Status status,
- int runTime,
- long dataSize){
-
- this.taskAttemptId = taskAttemptId;
-// this.isMap = isMap;
- this.eventId = eventId;
- this.status =status;
- this.taskRunTime = runTime;
- this.dataSize = dataSize;
- }
-
- public TezDependentTaskCompletionEvent clone() {
- TezDependentTaskCompletionEvent clone = new TezDependentTaskCompletionEvent(
- this.eventId, this.taskAttemptId, this.status,
- this.taskRunTime, this.dataSize);
-
- return clone;
- }
-
- /**
- * Returns event Id.
- * @return event id
- */
- public int getEventId() {
- return eventId;
- }
-
- /**
- * Returns task id.
- * @return task id
- */
- public TezTaskAttemptID getTaskAttemptID() {
- return taskAttemptId;
- }
-
- /**
- * Returns enum Status.SUCESS or Status.FAILURE.
- * @return task tracker status
- */
- public Status getStatus() {
- return status;
- }
-
- /**
- * Returns time (in millisec) the task took to complete.
- */
- public int getTaskRunTime() {
- return taskRunTime;
- }
-
- /**
- * Return size of output produced by the task
- */
- public long getDataSize() {
- return dataSize;
- }
-
- /**
- * @return user payload. Maybe null
- */
- public byte[] getUserPayload() {
- return userPayload;
- }
-
- /**
- * Set the task completion time
- * @param taskCompletionTime time (in millisec) the task took to complete
- */
- protected void setTaskRunTime(int taskCompletionTime) {
- this.taskRunTime = taskCompletionTime;
- }
-
- /**
- * set event Id. should be assigned incrementally starting from 0.
- * @param eventId
- */
- public void setEventId(int eventId) {
- this.eventId = eventId;
- }
-
- /**
- * Sets task id.
- * @param taskId
- */
- public void setTaskAttemptID(TezTaskAttemptID taskId) {
- this.taskAttemptId = taskId;
- }
-
- /**
- * Set task status.
- * @param status
- */
- public void setTaskStatus(Status status) {
- this.status = status;
- }
-
- /**
- * Set the user payload
- * @param userPayload
- */
- public void setUserPayload(byte[] userPayload) {
- this.userPayload = userPayload;
- }
-
- @Override
- public String toString(){
- StringBuffer buf = new StringBuffer();
- buf.append("Task Id : ");
- buf.append(taskAttemptId);
- buf.append(", Status : ");
- buf.append(status.name());
- return buf.toString();
- }
-
- @Override
- public boolean equals(Object o) {
- // not counting userPayload as that is a piggyback mechanism
- if(o == null)
- return false;
- if(o.getClass().equals(this.getClass())) {
- TezDependentTaskCompletionEvent event = (TezDependentTaskCompletionEvent) o;
- return this.eventId == event.getEventId()
- && this.status.equals(event.getStatus())
- && this.taskAttemptId.equals(event.getTaskAttemptID())
- && this.taskRunTime == event.getTaskRunTime()
- && this.dataSize == event.getDataSize();
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return toString().hashCode();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- taskAttemptId.write(out);
-// out.writeBoolean(isMap);
- WritableUtils.writeEnum(out, status);
- WritableUtils.writeVInt(out, taskRunTime);
- WritableUtils.writeVInt(out, eventId);
- WritableUtils.writeCompressedByteArray(out, userPayload);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- taskAttemptId.readFields(in);
-// isMap = in.readBoolean();
- status = WritableUtils.readEnum(in, Status.class);
- taskRunTime = WritableUtils.readVInt(in);
- eventId = WritableUtils.readVInt(in);
- userPayload = WritableUtils.readCompressedByteArray(in);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-dag/src/main/java/org/apache/tez/engine/records/TezTaskDependencyCompletionEventsUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/engine/records/TezTaskDependencyCompletionEventsUpdate.java b/tez-dag/src/main/java/org/apache/tez/engine/records/TezTaskDependencyCompletionEventsUpdate.java
deleted file mode 100644
index 13c9088..0000000
--- a/tez-dag/src/main/java/org/apache/tez/engine/records/TezTaskDependencyCompletionEventsUpdate.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.records;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-
-
-public class TezTaskDependencyCompletionEventsUpdate implements Writable {
- TezDependentTaskCompletionEvent[] events;
- boolean reset;
-
- public TezTaskDependencyCompletionEventsUpdate() { }
-
- public TezTaskDependencyCompletionEventsUpdate(
- TezDependentTaskCompletionEvent[] events, boolean reset) {
- this.events = events;
- this.reset = reset;
- }
-
- public boolean shouldReset() {
- return reset;
- }
-
- public TezDependentTaskCompletionEvent[] getDependentTaskCompletionEvents() {
- return events;
- }
-
- public void write(DataOutput out) throws IOException {
- out.writeBoolean(reset);
- out.writeInt(events.length);
- for (TezDependentTaskCompletionEvent event : events) {
- event.write(out);
- }
- }
-
- public void readFields(DataInput in) throws IOException {
- reset = in.readBoolean();
- events = new TezDependentTaskCompletionEvent[in.readInt()];
- for (int i = 0; i < events.length; ++i) {
- events[i] = new TezDependentTaskCompletionEvent();
- events[i].readFields(in);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-dag/src/main/java/org/apache/tez/runtime/records/TezDependentTaskCompletionEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/records/TezDependentTaskCompletionEvent.java b/tez-dag/src/main/java/org/apache/tez/runtime/records/TezDependentTaskCompletionEvent.java
new file mode 100644
index 0000000..fd4c1ee
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/records/TezDependentTaskCompletionEvent.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.records;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+/**
+ * This is used to track task completion events on
+ * job tracker.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+// TODO TEZAM3 This needs to be more generic. Maybe some kind of a serialized
+// blob - which can be interpretted by the Input plugin.
+public class TezDependentTaskCompletionEvent implements Writable {
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ // TODO EVENTUALLY - Remove TIPFAILED state ?
+ static public enum Status {FAILED, KILLED, SUCCEEDED, OBSOLETE, TIPFAILED};
+
+ private int eventId;
+ private int taskRunTime; // using int since runtime is the time difference
+ private TezTaskAttemptID taskAttemptId;
+ private long dataSize;
+ Status status;
+ byte[] userPayload;
+ // TODO TEZAM2 Get rid of the isMap field. Job specific type information can be determined from TaskAttemptId.getTaskType
+// boolean isMap = false;
+ public static final TezDependentTaskCompletionEvent[] EMPTY_ARRAY =
+ new TezDependentTaskCompletionEvent[0];
+
+ public TezDependentTaskCompletionEvent() {
+ taskAttemptId = new TezTaskAttemptID();
+ }
+
+ /**
+ * Constructor. eventId should be created externally and incremented
+ * per event for each job.
+ * @param eventId event id, event id should be unique and assigned in
+ * incrementally, starting from 0.
+ * @param taskAttemptId task id
+ * @param status task's status
+ * @param taskTrackerHttp task tracker's host:port for http.
+ */
+ public TezDependentTaskCompletionEvent(int eventId,
+ TezTaskAttemptID taskAttemptId,
+// boolean isMap,
+ Status status,
+ int runTime,
+ long dataSize){
+
+ this.taskAttemptId = taskAttemptId;
+// this.isMap = isMap;
+ this.eventId = eventId;
+ this.status =status;
+ this.taskRunTime = runTime;
+ this.dataSize = dataSize;
+ }
+
+ public TezDependentTaskCompletionEvent clone() {
+ TezDependentTaskCompletionEvent clone = new TezDependentTaskCompletionEvent(
+ this.eventId, this.taskAttemptId, this.status,
+ this.taskRunTime, this.dataSize);
+
+ return clone;
+ }
+
+ /**
+ * Returns event Id.
+ * @return event id
+ */
+ public int getEventId() {
+ return eventId;
+ }
+
+ /**
+ * Returns task id.
+ * @return task id
+ */
+ public TezTaskAttemptID getTaskAttemptID() {
+ return taskAttemptId;
+ }
+
+ /**
+ * Returns enum Status.SUCESS or Status.FAILURE.
+ * @return task tracker status
+ */
+ public Status getStatus() {
+ return status;
+ }
+
+ /**
+ * Returns time (in millisec) the task took to complete.
+ */
+ public int getTaskRunTime() {
+ return taskRunTime;
+ }
+
+ /**
+ * Return size of output produced by the task
+ */
+ public long getDataSize() {
+ return dataSize;
+ }
+
+ /**
+ * @return user payload. Maybe null
+ */
+ public byte[] getUserPayload() {
+ return userPayload;
+ }
+
+ /**
+ * Set the task completion time
+ * @param taskCompletionTime time (in millisec) the task took to complete
+ */
+ protected void setTaskRunTime(int taskCompletionTime) {
+ this.taskRunTime = taskCompletionTime;
+ }
+
+ /**
+ * set event Id. should be assigned incrementally starting from 0.
+ * @param eventId
+ */
+ public void setEventId(int eventId) {
+ this.eventId = eventId;
+ }
+
+ /**
+ * Sets task id.
+ * @param taskId
+ */
+ public void setTaskAttemptID(TezTaskAttemptID taskId) {
+ this.taskAttemptId = taskId;
+ }
+
+ /**
+ * Set task status.
+ * @param status
+ */
+ public void setTaskStatus(Status status) {
+ this.status = status;
+ }
+
+ /**
+ * Set the user payload
+ * @param userPayload
+ */
+ public void setUserPayload(byte[] userPayload) {
+ this.userPayload = userPayload;
+ }
+
+ @Override
+ public String toString(){
+ StringBuffer buf = new StringBuffer();
+ buf.append("Task Id : ");
+ buf.append(taskAttemptId);
+ buf.append(", Status : ");
+ buf.append(status.name());
+ return buf.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ // not counting userPayload as that is a piggyback mechanism
+ if(o == null)
+ return false;
+ if(o.getClass().equals(this.getClass())) {
+ TezDependentTaskCompletionEvent event = (TezDependentTaskCompletionEvent) o;
+ return this.eventId == event.getEventId()
+ && this.status.equals(event.getStatus())
+ && this.taskAttemptId.equals(event.getTaskAttemptID())
+ && this.taskRunTime == event.getTaskRunTime()
+ && this.dataSize == event.getDataSize();
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ taskAttemptId.write(out);
+// out.writeBoolean(isMap);
+ WritableUtils.writeEnum(out, status);
+ WritableUtils.writeVInt(out, taskRunTime);
+ WritableUtils.writeVInt(out, eventId);
+ WritableUtils.writeCompressedByteArray(out, userPayload);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ taskAttemptId.readFields(in);
+// isMap = in.readBoolean();
+ status = WritableUtils.readEnum(in, Status.class);
+ taskRunTime = WritableUtils.readVInt(in);
+ eventId = WritableUtils.readVInt(in);
+ userPayload = WritableUtils.readCompressedByteArray(in);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-dag/src/main/java/org/apache/tez/runtime/records/TezTaskDependencyCompletionEventsUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/records/TezTaskDependencyCompletionEventsUpdate.java b/tez-dag/src/main/java/org/apache/tez/runtime/records/TezTaskDependencyCompletionEventsUpdate.java
new file mode 100644
index 0000000..ff4f267
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/records/TezTaskDependencyCompletionEventsUpdate.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.records;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+
+public class TezTaskDependencyCompletionEventsUpdate implements Writable {
+ TezDependentTaskCompletionEvent[] events;
+ boolean reset;
+
+ public TezTaskDependencyCompletionEventsUpdate() { }
+
+ public TezTaskDependencyCompletionEventsUpdate(
+ TezDependentTaskCompletionEvent[] events, boolean reset) {
+ this.events = events;
+ this.reset = reset;
+ }
+
+ public boolean shouldReset() {
+ return reset;
+ }
+
+ public TezDependentTaskCompletionEvent[] getDependentTaskCompletionEvents() {
+ return events;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(reset);
+ out.writeInt(events.length);
+ for (TezDependentTaskCompletionEvent event : events) {
+ event.write(out);
+ }
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ reset = in.readBoolean();
+ events = new TezDependentTaskCompletionEvent[in.readInt()];
+ for (int i = 0; i < events.length; ++i) {
+ events[i] = new TezDependentTaskCompletionEvent();
+ events[i].readFields(in);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 6e42673..31513c5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -70,7 +70,7 @@ import org.apache.tez.dag.history.avro.HistoryEventType;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.common.security.JobTokenSecretManager;
+import org.apache.tez.runtime.library.common.security.JobTokenSecretManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 676e747..f2717be 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -82,7 +82,7 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 3d00cb7..b524f6a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -82,8 +82,8 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
-import org.apache.tez.engine.records.TezDependentTaskCompletionEvent.Status;
+import org.apache.tez.runtime.records.TezDependentTaskCompletionEvent;
+import org.apache.tez.runtime.records.TezDependentTaskCompletionEvent.Status;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
index 1a07b5b..81715bd 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
@@ -40,8 +40,8 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.runtime.records.TezDependentTaskCompletionEvent;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 3a6e008..68ee532 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -56,9 +56,9 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.api.impl.InputSpec;
-import org.apache.tez.engine.api.impl.OutputSpec;
-import org.apache.tez.engine.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
import org.junit.Test;
import com.google.common.collect.Lists;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index c2457e1..fc89e82 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -65,9 +65,9 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.api.impl.TaskSpec;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
-import org.apache.tez.engine.common.security.TokenCache;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.library.common.security.JobTokenIdentifier;
+import org.apache.tez.runtime.library.common.security.TokenCache;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-engine/findbugs-exclude.xml b/tez-engine/findbugs-exclude.xml
deleted file mode 100644
index 5b11308..0000000
--- a/tez-engine/findbugs-exclude.xml
+++ /dev/null
@@ -1,16 +0,0 @@
-<!--
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. See accompanying LICENSE file.
--->
-<FindBugsFilter>
-
-</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/pom.xml
----------------------------------------------------------------------
diff --git a/tez-engine/pom.xml b/tez-engine/pom.xml
deleted file mode 100644
index 498f2f2..0000000
--- a/tez-engine/pom.xml
+++ /dev/null
@@ -1,92 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. See accompanying LICENSE file.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez</artifactId>
- <version>0.2.0-SNAPSHOT</version>
- </parent>
- <artifactId>tez-engine</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-common</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.inject</groupId>
- <artifactId>guice</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.rat</groupId>
- <artifactId>apache-rat-plugin</artifactId>
- <configuration>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-maven-plugins</artifactId>
- <executions>
- <execution>
- <id>compile-protoc</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>protoc</goal>
- </goals>
- <configuration>
- <protocVersion>${protobuf.version}</protocVersion>
- <protocCommand>${protoc.path}</protocCommand>
- <imports>
- <param>${basedir}/src/main/proto</param>
- </imports>
- <source>
- <directory>${basedir}/src/main/proto</directory>
- <includes>
- <include>Events.proto</include>
- <include>ShufflePayloads.proto</include>
- </includes>
- </source>
- <output>${project.build.directory}/generated-sources/java</output>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/hadoop/io/BufferUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/hadoop/io/BufferUtils.java b/tez-engine/src/main/java/org/apache/hadoop/io/BufferUtils.java
deleted file mode 100644
index 16f7a8f..0000000
--- a/tez-engine/src/main/java/org/apache/hadoop/io/BufferUtils.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.io;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
-@InterfaceStability.Unstable
-public class BufferUtils {
- public static int compare(DataInputBuffer buf1, DataInputBuffer buf2) {
- byte[] b1 = buf1.getData();
- byte[] b2 = buf2.getData();
- int s1 = buf1.getPosition();
- int s2 = buf2.getPosition();
- int l1 = buf1.getLength();
- int l2 = buf2.getLength();
- return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
- }
-
- public static int compare(DataOutputBuffer buf1, DataOutputBuffer buf2) {
- byte[] b1 = buf1.getData();
- byte[] b2 = buf2.getData();
- int s1 = 0;
- int s2 = 0;
- int l1 = buf1.getLength();
- int l2 = buf2.getLength();
- return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
- }
-
- public static int compare(DataInputBuffer buf1, DataOutputBuffer buf2) {
- byte[] b1 = buf1.getData();
- byte[] b2 = buf2.getData();
- int s1 = buf1.getPosition();
- int s2 = 0;
- int l1 = buf1.getLength();
- int l2 = buf2.getLength();
- return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
- }
-
- public static int compare(DataOutputBuffer buf1, DataInputBuffer buf2) {
- return compare(buf2, buf1);
- }
-
- public static void copy(DataInputBuffer src, DataOutputBuffer dst)
- throws IOException {
- byte[] b1 = src.getData();
- int s1 = src.getPosition();
- int l1 = src.getLength();
- dst.reset();
- dst.write(b1, s1, l1 - s1);
- }
-
- public static void copy(DataOutputBuffer src, DataOutputBuffer dst)
- throws IOException {
- byte[] b1 = src.getData();
- int s1 = 0;
- int l1 = src.getLength();
- dst.reset();
- dst.write(b1, s1, l1);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/hadoop/io/HashComparator.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/hadoop/io/HashComparator.java b/tez-engine/src/main/java/org/apache/hadoop/io/HashComparator.java
deleted file mode 100644
index a372e01..0000000
--- a/tez-engine/src/main/java/org/apache/hadoop/io/HashComparator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.io;
-
-public interface HashComparator<KEY> {
-
- int getHashCode(KEY key);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/common/Constants.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/Constants.java b/tez-engine/src/main/java/org/apache/tez/common/Constants.java
deleted file mode 100644
index 8ea2909..0000000
--- a/tez-engine/src/main/java/org/apache/tez/common/Constants.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.common;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-
-
-public class Constants {
-
- // TODO NEWTEZ Check which of these constants are expecting specific pieces of information which are being removed - like taskAttemptId
-
- public static final String TEZ = "tez";
-
- public static final String MAP_OUTPUT_FILENAME_STRING = "file.out";
- public static final String MAP_OUTPUT_INDEX_SUFFIX_STRING = ".index";
- public static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out";
-
- public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
- public static String MERGED_OUTPUT_PREFIX = ".merged";
-
- // TODO NEWTEZ Remove this constant once the old code is removed.
- public static final String TEZ_ENGINE_TASK_ATTEMPT_ID =
- "tez.engine.task.attempt.id";
-
- public static final String TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING = "file.out";
-
- public static final String TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING = ".index";
-
- public static final String TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING = "%s/task_%d.out";
-
- public static final String TEZ_ENGINE_JOB_CREDENTIALS =
- "tez.engine.job.credentials";
-
- @Private
- public static final String TEZ_ENGINE_TASK_MEMORY = "tez.engine.task.memory";
-
- public static final String TASK_OUTPUT_DIR = "output";
-
- public static final String TEZ_ENGINE_TASK_OUTPUT_MANAGER =
- "tez.engine.task.local.output.manager";
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/common/ContainerContext.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/ContainerContext.java b/tez-engine/src/main/java/org/apache/tez/common/ContainerContext.java
deleted file mode 100644
index df92bdc..0000000
--- a/tez-engine/src/main/java/org/apache/tez/common/ContainerContext.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.common;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-// TODO EVENTUALLY move this over to PB. Fix package/module.
-// TODO EVENTUALLY unit tests for functionality.
-public class ContainerContext implements Writable {
-
- String containerIdentifier;
- String pid;
-
- public ContainerContext() {
- containerIdentifier = "";
- pid = "";
- }
-
- public ContainerContext(String containerIdStr, String pid) {
- this.containerIdentifier = containerIdStr;
- this.pid = pid;
- }
-
- public String getContainerIdentifier() {
- return containerIdentifier;
- }
-
- public String getPid() {
- return pid;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.containerIdentifier = Text.readString(in);
- this.pid = Text.readString(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, containerIdentifier);
- Text.writeString(out, pid);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java b/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
deleted file mode 100644
index e90f7fa..0000000
--- a/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.common;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.engine.api.impl.TaskSpec;
-
-public class ContainerTask implements Writable {
-
- TaskSpec taskSpec;
- boolean shouldDie;
-
- public ContainerTask() {
- }
-
- public ContainerTask(TaskSpec taskSpec, boolean shouldDie) {
- this.taskSpec = taskSpec;
- this.shouldDie = shouldDie;
- }
-
- public TaskSpec getTaskSpec() {
- return taskSpec;
- }
-
- public boolean shouldDie() {
- return shouldDie;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeBoolean(shouldDie);
- if (taskSpec != null) {
- out.writeBoolean(true);
- taskSpec.write(out);
- } else {
- out.writeBoolean(false);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- shouldDie = in.readBoolean();
- boolean taskComing = in.readBoolean();
- if (taskComing) {
- taskSpec = new TaskSpec();
- taskSpec.readFields(in);
- }
- }
-
- @Override
- public String toString() {
- return "shouldDie: " + shouldDie + ", TaskSpec: "
- + taskSpec;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java b/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
deleted file mode 100644
index 9e4129f..0000000
--- a/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.common;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.ipc.ProtocolInfo;
-import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.tez.common.records.ProceedToCompletionResponse;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.impl.TezHeartbeatRequest;
-import org.apache.tez.engine.api.impl.TezHeartbeatResponse;
-
-/** Protocol that task child process uses to contact its parent process. The
- * parent is a daemon which which polls the central master for a new map or
- * reduce task and runs it as a child process. All communication between child
- * and parent is via this protocol. */
-@InterfaceAudience.Private
-@InterfaceStability.Stable
-@ProtocolInfo(protocolName = "TezTaskUmbilicalProtocol", protocolVersion = 1)
-public interface TezTaskUmbilicalProtocol extends VersionedProtocol {
-
- public static final long versionID = 19L;
-
- ContainerTask getTask(ContainerContext containerContext) throws IOException;
-
- boolean canCommit(TezTaskAttemptID taskid) throws IOException;
-
- ProceedToCompletionResponse
- proceedToCompletion(TezTaskAttemptID taskAttemptId) throws IOException;
-
- /// Copies from TezUmbilical until complete re-factor is done
- // TODONEWTEZ
-
- public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
- throws IOException, TezException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/KVReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/KVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/api/KVReader.java
deleted file mode 100644
index 0178b3a..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/KVReader.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api;
-
-import java.io.IOException;
-
-import org.apache.tez.engine.api.Reader;
-
-/**
- * A key/value(s) pair based {@link Reader}.
- *
- * Example usage
- * <code>
- * while (kvReader.next()) {
- * KVRecord kvRecord = getCurrentKV();
- * Object key = kvRecord.getKey();
- * Iterable values = kvRecord.getValues();
- * </code>
- *
- */
-public interface KVReader extends Reader {
-
- /**
- * Moves to the next key/values(s) pair
- *
- * @return true if another key/value(s) pair exists, false if there are no more.
- * @throws IOException
- * if an error occurs
- */
- public boolean next() throws IOException;
-
- /**
- * Return the current key/value(s) pair. Use moveToNext() to advance.
- * @return
- * @throws IOException
- */
- public KVRecord getCurrentKV() throws IOException;
-
- // TODO NEWTEZ Move this to getCurrentKey and getCurrentValue independently. Otherwise usage pattern seems to be getCurrentKV.getKey, getCurrentKV.getValue
-
- // TODO NEWTEZ KVRecord which does not need to return a list!
- // TODO NEWTEZ Parameterize this
- /**
- * Represents a key and an associated set of values
- *
- */
- public static class KVRecord {
-
- private Object key;
- private Iterable<Object> values;
-
- public KVRecord(Object key, Iterable<Object> values) {
- this.key = key;
- this.values = values;
- }
-
- public Object getKey() {
- return this.key;
- }
-
- public Iterable<Object> getValues() {
- return this.values;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/KVWriter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/KVWriter.java b/tez-engine/src/main/java/org/apache/tez/engine/api/KVWriter.java
deleted file mode 100644
index 970831b..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/KVWriter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api;
-
-import java.io.IOException;
-
-import org.apache.tez.engine.api.Writer;
-
-/**
- * A key/value(s) pair based {@link Writer}
- */
-public interface KVWriter extends Writer {
- /**
- * Writes a key/value pair.
- *
- * @param key
- * the key to write
- * @param value
- * the value to write
- * @throws IOException
- * if an error occurs
- */
- public void write(Object key, Object value) throws IOException;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java b/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java
deleted file mode 100644
index ccf3cb8..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.api;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezJobConfig;
-
-/**
- * {@link Partitioner} is used by the TEZ framework to partition output
- * key/value pairs.
- *
- * <b>Partitioner Initialization</b></p> The Partitioner class is picked up
- * using the TEZ_ENGINE_PARTITIONER_CLASS attribute in {@link TezJobConfig}
- *
- * TODO NEWTEZ Change construction to first check for a Constructor with a bytep[] payload
- *
- * Partitioners need to provide a single argument ({@link Configuration})
- * constructor or a 0 argument constructor. If both exist, preference is given
- * to the single argument constructor. This is primarily for MR support.
- *
- * If using the configuration constructor, TEZ_ENGINE_NUM_EXPECTED_PARTITIONS
- * will be set in the configuration, to indicate the max number of expected
- * partitions.
- *
- */
-public interface Partitioner {
-
- /**
- * Get partition for given key/value.
- * @param key key
- * @param value value
- * @param numPartitions number of partitions
- * @return
- */
- int getPartition(Object key, Object value, int numPartitions);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptCompletedEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptCompletedEvent.java
deleted file mode 100644
index a9f2c98..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptCompletedEvent.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.events;
-
-import org.apache.tez.engine.api.Event;
-
-public class TaskAttemptCompletedEvent extends Event {
-
- public TaskAttemptCompletedEvent() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptFailedEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptFailedEvent.java
deleted file mode 100644
index fc67472..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptFailedEvent.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.events;
-
-import org.apache.tez.engine.api.Event;
-
-public class TaskAttemptFailedEvent extends Event {
-
- private final String diagnostics;
-
- public TaskAttemptFailedEvent(String diagnostics) {
- this.diagnostics = diagnostics;
- }
-
- public String getDiagnostics() {
- return diagnostics;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskStatusUpdateEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskStatusUpdateEvent.java
deleted file mode 100644
index c0d1ee6..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskStatusUpdateEvent.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.events;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.engine.api.Event;
-
-public class TaskStatusUpdateEvent extends Event implements Writable {
-
- private TezCounters tezCounters;
- private float progress;
-
- public TaskStatusUpdateEvent() {
- }
-
- public TaskStatusUpdateEvent(TezCounters tezCounters, float progress) {
- this.tezCounters = tezCounters;
- this.progress = progress;
- }
-
- public TezCounters getCounters() {
- return tezCounters;
- }
-
- public float getProgress() {
- return progress;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeFloat(progress);
- if (tezCounters != null) {
- out.writeBoolean(true);
- tezCounters.write(out);
- } else {
- out.writeBoolean(false);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- progress = in.readFloat();
- if (in.readBoolean()) {
- tezCounters = new TezCounters();
- tezCounters.readFields(in);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventMetaData.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventMetaData.java
deleted file mode 100644
index 64df7bb..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventMetaData.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-/**
- * Class that encapsulates all the information to identify the unique
- * object that either generated an Event or is the recipient of an Event.
- */
-public class EventMetaData implements Writable {
-
- public static enum EventProducerConsumerType {
- INPUT,
- PROCESSOR,
- OUTPUT,
- SYSTEM
- }
-
- /**
- * Producer Type ( one of Input/Output/Processor ) that generated the Event
- * or Consumer Type that will consume the Event.
- */
- private EventProducerConsumerType producerConsumerType;
-
- /**
- * Name of the vertex where the event was generated.
- */
- private String taskVertexName;
-
- /**
- * Name of the vertex to which the Input or Output is connected to.
- */
- private String edgeVertexName;
-
- /**
- * i'th physical input/output that this event maps to.
- */
- private int index;
-
- /**
- * Task Attempt ID
- */
- private TezTaskAttemptID taskAttemptID;
-
- public EventMetaData() {
- }
-
- public EventMetaData(EventProducerConsumerType generator,
- String taskVertexName, String edgeVertexName,
- TezTaskAttemptID taskAttemptID) {
- this.producerConsumerType = generator;
- this.taskVertexName = taskVertexName;
- this.edgeVertexName = edgeVertexName;
- this.taskAttemptID = taskAttemptID;
- }
-
- public EventProducerConsumerType getEventGenerator() {
- return producerConsumerType;
- }
-
- public TezTaskAttemptID getTaskAttemptID() {
- return taskAttemptID;
- }
-
- public String getTaskVertexName() {
- return taskVertexName;
- }
-
- public String getEdgeVertexName() {
- return edgeVertexName;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(producerConsumerType.ordinal());
- if (taskVertexName != null) {
- out.writeBoolean(true);
- out.writeUTF(taskVertexName);
- } else {
- out.writeBoolean(false);
- }
- if (edgeVertexName != null) {
- out.writeBoolean(true);
- out.writeUTF(edgeVertexName);
- } else {
- out.writeBoolean(false);
- }
- if(taskAttemptID != null) {
- out.writeBoolean(true);
- taskAttemptID.write(out);
- } else {
- out.writeBoolean(false);
- }
-
- out.writeInt(index);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- producerConsumerType = EventProducerConsumerType.values()[in.readInt()];
- if (in.readBoolean()) {
- taskVertexName = in.readUTF();
- }
- if (in.readBoolean()) {
- edgeVertexName = in.readUTF();
- }
- if (in.readBoolean()) {
- taskAttemptID = new TezTaskAttemptID();
- taskAttemptID.readFields(in);
- }
- index = in.readInt();
- }
-
- public int getIndex() {
- return index;
- }
-
- public void setIndex(int index) {
- this.index = index;
- }
-
- @Override
- public String toString() {
- return "{ producerConsumerType=" + producerConsumerType
- + ", taskVertexName=" + taskVertexName
- + ", edgeVertexName=" + edgeVertexName
- + ", taskAttemptId=" + taskAttemptID
- + ", index=" + index + " }";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventType.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventType.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventType.java
deleted file mode 100644
index 52fc10d..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventType.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.impl;
-
-public enum EventType {
- TASK_ATTEMPT_COMPLETED_EVENT,
- TASK_ATTEMPT_FAILED_EVENT,
- DATA_MOVEMENT_EVENT,
- INPUT_READ_ERROR_EVENT,
- INPUT_FAILED_EVENT,
- INTPUT_INFORMATION_EVENT,
- TASK_STATUS_UPDATE_EVENT
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/impl/InputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/InputSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/InputSpec.java
deleted file mode 100644
index a9ef333..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/InputSpec.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
-
-public class InputSpec implements Writable {
-
- private String sourceVertexName;
- private InputDescriptor inputDescriptor;
- private int physicalEdgeCount;
-
- public InputSpec() {
- }
-
- public InputSpec(String sourceVertexName, InputDescriptor inputDescriptor,
- int physicalEdgeCount) {
- this.sourceVertexName = sourceVertexName;
- this.inputDescriptor = inputDescriptor;
- this.physicalEdgeCount = physicalEdgeCount;
- }
-
- public String getSourceVertexName() {
- return sourceVertexName;
- }
-
- public InputDescriptor getInputDescriptor() {
- return inputDescriptor;
- }
-
- public int getPhysicalEdgeCount() {
- return physicalEdgeCount;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- // TODONEWTEZ convert to PB
- out.writeUTF(sourceVertexName);
- out.writeInt(physicalEdgeCount);
- byte[] inputDescBytes =
- DagTypeConverters.convertToDAGPlan(inputDescriptor).toByteArray();
- out.writeInt(inputDescBytes.length);
- out.write(inputDescBytes);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- sourceVertexName = in.readUTF();
- physicalEdgeCount = in.readInt();
- int inputDescLen = in.readInt();
- byte[] inputDescBytes = new byte[inputDescLen];
- in.readFully(inputDescBytes);
- inputDescriptor =
- DagTypeConverters.convertInputDescriptorFromDAGPlan(
- TezEntityDescriptorProto.parseFrom(inputDescBytes));
- }
-
- public String toString() {
- return "{ sourceVertexName=" + sourceVertexName
- + ", physicalEdgeCount" + physicalEdgeCount
- + ", inputClassName=" + inputDescriptor.getClassName()
- + " }";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/impl/OutputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/OutputSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/OutputSpec.java
deleted file mode 100644
index 3a1d5d8..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/OutputSpec.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
-
-public class OutputSpec implements Writable {
-
- private String destinationVertexName;
- private OutputDescriptor outputDescriptor;
- private int physicalEdgeCount;
-
- public OutputSpec() {
- }
-
- public OutputSpec(String destinationVertexName,
- OutputDescriptor outputDescriptor, int physicalEdgeCount) {
- this.destinationVertexName = destinationVertexName;
- this.outputDescriptor = outputDescriptor;
- this.physicalEdgeCount = physicalEdgeCount;
- }
-
- public String getDestinationVertexName() {
- return destinationVertexName;
- }
-
- public OutputDescriptor getOutputDescriptor() {
- return outputDescriptor;
- }
-
- public int getPhysicalEdgeCount() {
- return physicalEdgeCount;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- // TODONEWTEZ convert to PB
- out.writeUTF(destinationVertexName);
- out.writeInt(physicalEdgeCount);
- byte[] inputDescBytes =
- DagTypeConverters.convertToDAGPlan(outputDescriptor).toByteArray();
- out.writeInt(inputDescBytes.length);
- out.write(inputDescBytes);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- destinationVertexName = in.readUTF();
- physicalEdgeCount = in.readInt();
- int inputDescLen = in.readInt();
- byte[] inputDescBytes = new byte[inputDescLen];
- in.readFully(inputDescBytes);
- outputDescriptor =
- DagTypeConverters.convertOutputDescriptorFromDAGPlan(
- TezEntityDescriptorProto.parseFrom(inputDescBytes));
- }
-
- public String toString() {
- return "{ destinationVertexName=" + destinationVertexName
- + ", physicalEdgeCount" + physicalEdgeCount
- + ", outputClassName=" + outputDescriptor.getClassName()
- + " }";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TaskSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TaskSpec.java
deleted file mode 100644
index 6527777..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TaskSpec.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.engine.api.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-public class TaskSpec implements Writable {
-
- private TezTaskAttemptID taskAttemptId;
- private String vertexName;
- private String user;
- private ProcessorDescriptor processorDescriptor;
- private List<InputSpec> inputSpecList;
- private List<OutputSpec> outputSpecList;
-
- public TaskSpec() {
- }
-
- // TODO NEWTEZ Remove user
- public TaskSpec(TezTaskAttemptID taskAttemptID, String user,
- String vertexName, ProcessorDescriptor processorDescriptor,
- List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList) {
- this.taskAttemptId = taskAttemptID;
- this.vertexName = vertexName;
- this.user = user;
- this.processorDescriptor = processorDescriptor;
- this.inputSpecList = inputSpecList;
- this.outputSpecList = outputSpecList;
- }
-
- public String getVertexName() {
- return vertexName;
- }
-
- public TezTaskAttemptID getTaskAttemptID() {
- return taskAttemptId;
- }
-
- public String getUser() {
- return user;
- }
-
- public ProcessorDescriptor getProcessorDescriptor() {
- return processorDescriptor;
- }
-
- public List<InputSpec> getInputs() {
- return inputSpecList;
- }
-
- public List<OutputSpec> getOutputs() {
- return outputSpecList;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- taskAttemptId.write(out);
- out.writeUTF(vertexName);
- byte[] procDesc =
- DagTypeConverters.convertToDAGPlan(processorDescriptor).toByteArray();
- out.writeInt(procDesc.length);
- out.write(procDesc);
- out.writeInt(inputSpecList.size());
- for (InputSpec inputSpec : inputSpecList) {
- inputSpec.write(out);
- }
- out.writeInt(outputSpecList.size());
- for (OutputSpec outputSpec : outputSpecList) {
- outputSpec.write(out);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- taskAttemptId = new TezTaskAttemptID();
- taskAttemptId.readFields(in);
- vertexName = in.readUTF();
- int procDescLength = in.readInt();
- // TODO at least 3 buffer copies here. Need to convert this to full PB
- // TEZ-305
- byte[] procDescBytes = new byte[procDescLength];
- in.readFully(procDescBytes);
- processorDescriptor =
- DagTypeConverters.convertProcessorDescriptorFromDAGPlan(
- TezEntityDescriptorProto.parseFrom(procDescBytes));
- int numInputSpecs = in.readInt();
- inputSpecList = new ArrayList<InputSpec>(numInputSpecs);
- for (int i = 0; i < numInputSpecs; i++) {
- InputSpec inputSpec = new InputSpec();
- inputSpec.readFields(in);
- inputSpecList.add(inputSpec);
- }
- int numOutputSpecs = in.readInt();
- outputSpecList = new ArrayList<OutputSpec>(numOutputSpecs);
- for (int i = 0; i < numOutputSpecs; i++) {
- OutputSpec outputSpec = new OutputSpec();
- outputSpec.readFields(in);
- outputSpecList.add(outputSpec);
- }
- }
-
- @Override
- public String toString() {
- StringBuffer sb = new StringBuffer();
- sb.append("TaskAttemptID:" + taskAttemptId);
- sb.append("processorName=" + processorDescriptor.getClassName()
- + ", inputSpecListSize=" + inputSpecList.size()
- + ", outputSpecListSize=" + outputSpecList.size());
- sb.append(", inputSpecList=[");
- for (InputSpec i : inputSpecList) {
- sb.append("{" + i.toString() + "}, ");
- }
- sb.append("], outputSpecList=[");
- for (OutputSpec i : outputSpecList) {
- sb.append("{" + i.toString() + "}, ");
- }
- sb.append("]");
- return sb.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezEvent.java
deleted file mode 100644
index 6841d72..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezEvent.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.events.DataMovementEvent;
-import org.apache.tez.engine.api.events.InputFailedEvent;
-import org.apache.tez.engine.api.events.InputInformationEvent;
-import org.apache.tez.engine.api.events.InputReadErrorEvent;
-import org.apache.tez.engine.api.events.TaskAttemptCompletedEvent;
-import org.apache.tez.engine.api.events.TaskAttemptFailedEvent;
-import org.apache.tez.engine.api.events.TaskStatusUpdateEvent;
-import org.apache.tez.engine.api.events.EventProtos.DataMovementEventProto;
-import org.apache.tez.engine.api.events.EventProtos.InputFailedEventProto;
-import org.apache.tez.engine.api.events.EventProtos.InputInformationEventProto;
-import org.apache.tez.engine.api.events.EventProtos.InputReadErrorEventProto;
-import org.apache.tez.engine.api.events.SystemEventProtos.TaskAttemptCompletedEventProto;
-import org.apache.tez.engine.api.events.SystemEventProtos.TaskAttemptFailedEventProto;
-
-import com.google.protobuf.ByteString;
-
-public class TezEvent implements Writable {
-
- private EventType eventType;
-
- private Event event;
-
- private EventMetaData sourceInfo;
-
- private EventMetaData destinationInfo;
-
- public TezEvent() {
- }
-
- public TezEvent(Event event, EventMetaData sourceInfo) {
- this.event = event;
- this.setSourceInfo(sourceInfo);
- if (event instanceof DataMovementEvent) {
- eventType = EventType.DATA_MOVEMENT_EVENT;
- } else if (event instanceof InputReadErrorEvent) {
- eventType = EventType.INPUT_READ_ERROR_EVENT;
- } else if (event instanceof TaskAttemptFailedEvent) {
- eventType = EventType.TASK_ATTEMPT_FAILED_EVENT;
- } else if (event instanceof TaskAttemptCompletedEvent) {
- eventType = EventType.TASK_ATTEMPT_COMPLETED_EVENT;
- } else if (event instanceof InputInformationEvent) {
- eventType = EventType.INTPUT_INFORMATION_EVENT;
- } else if (event instanceof InputFailedEvent) {
- eventType = EventType.INPUT_FAILED_EVENT;
- } else if (event instanceof TaskStatusUpdateEvent) {
- eventType = EventType.TASK_STATUS_UPDATE_EVENT;
- } else {
- throw new TezUncheckedException("Unknown event, event="
- + event.getClass().getName());
- }
- }
-
- public Event getEvent() {
- return event;
- }
-
- public EventMetaData getSourceInfo() {
- return sourceInfo;
- }
-
- public void setSourceInfo(EventMetaData sourceInfo) {
- this.sourceInfo = sourceInfo;
- }
-
- public EventMetaData getDestinationInfo() {
- return destinationInfo;
- }
-
- public void setDestinationInfo(EventMetaData destinationInfo) {
- this.destinationInfo = destinationInfo;
- }
-
- public EventType getEventType() {
- return eventType;
- }
-
- private void serializeEvent(DataOutput out) throws IOException {
- if (event == null) {
- out.writeBoolean(false);
- return;
- }
- out.writeBoolean(true);
- out.writeInt(eventType.ordinal());
- if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
- // TODO NEWTEZ convert to PB
- TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event;
- sEvt.write(out);
- } else {
- byte[] eventBytes = null;
- switch (eventType) {
- case DATA_MOVEMENT_EVENT:
- DataMovementEvent dmEvt = (DataMovementEvent) event;
- eventBytes = DataMovementEventProto.newBuilder()
- .setSourceIndex(dmEvt.getSourceIndex())
- .setTargetIndex(dmEvt.getTargetIndex())
- .setUserPayload(ByteString.copyFrom(dmEvt.getUserPayload()))
- .build().toByteArray();
- break;
- case INPUT_READ_ERROR_EVENT:
- InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
- eventBytes = InputReadErrorEventProto.newBuilder()
- .setIndex(ideEvt.getIndex())
- .setDiagnostics(ideEvt.getDiagnostics())
- .build().toByteArray();
- break;
- case TASK_ATTEMPT_FAILED_EVENT:
- TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event;
- eventBytes = TaskAttemptFailedEventProto.newBuilder()
- .setDiagnostics(tfEvt.getDiagnostics())
- .build().toByteArray();
- break;
- case TASK_ATTEMPT_COMPLETED_EVENT:
- eventBytes = TaskAttemptCompletedEventProto.newBuilder()
- .build().toByteArray();
- break;
- case INPUT_FAILED_EVENT:
- InputFailedEvent ifEvt = (InputFailedEvent) event;
- eventBytes = InputFailedEventProto.newBuilder()
- .setSourceIndex(ifEvt.getSourceIndex())
- .setTargetIndex(ifEvt.getTargetIndex())
- .setVersion(ifEvt.getVersion()).build().toByteArray();
- case INTPUT_INFORMATION_EVENT:
- InputInformationEvent iEvt = (InputInformationEvent) event;
- eventBytes = InputInformationEventProto.newBuilder()
- .setUserPayload(ByteString.copyFrom(iEvt.getUserPayload()))
- .build().toByteArray();
- default:
- throw new TezUncheckedException("Unknown TezEvent"
- + ", type=" + eventType);
- }
- out.writeInt(eventBytes.length);
- out.write(eventBytes);
- }
- }
-
- private void deserializeEvent(DataInput in) throws IOException {
- if (!in.readBoolean()) {
- event = null;
- return;
- }
- eventType = EventType.values()[in.readInt()];
- if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
- // TODO NEWTEZ convert to PB
- event = new TaskStatusUpdateEvent();
- ((TaskStatusUpdateEvent)event).readFields(in);
- } else {
- int eventBytesLen = in.readInt();
- byte[] eventBytes = new byte[eventBytesLen];
- in.readFully(eventBytes);
- switch (eventType) {
- case DATA_MOVEMENT_EVENT:
- DataMovementEventProto dmProto =
- DataMovementEventProto.parseFrom(eventBytes);
- event = new DataMovementEvent(dmProto.getSourceIndex(),
- dmProto.getTargetIndex(),
- dmProto.getUserPayload().toByteArray());
- break;
- case INPUT_READ_ERROR_EVENT:
- InputReadErrorEventProto ideProto =
- InputReadErrorEventProto.parseFrom(eventBytes);
- event = new InputReadErrorEvent(ideProto.getDiagnostics(),
- ideProto.getIndex(), ideProto.getVersion());
- break;
- case TASK_ATTEMPT_FAILED_EVENT:
- TaskAttemptFailedEventProto tfProto =
- TaskAttemptFailedEventProto.parseFrom(eventBytes);
- event = new TaskAttemptFailedEvent(tfProto.getDiagnostics());
- break;
- case TASK_ATTEMPT_COMPLETED_EVENT:
- event = new TaskAttemptCompletedEvent();
- break;
- case INPUT_FAILED_EVENT:
- InputFailedEventProto ifProto =
- InputFailedEventProto.parseFrom(eventBytes);
- event = new InputFailedEvent(ifProto.getSourceIndex(),
- ifProto.getTargetIndex(), ifProto.getVersion());
- break;
- case INTPUT_INFORMATION_EVENT:
- InputInformationEventProto infoProto =
- InputInformationEventProto.parseFrom(eventBytes);
- event = new InputInformationEvent(
- infoProto.getUserPayload().toByteArray());
- break;
- default:
- throw new TezUncheckedException("Unknown TezEvent"
- + ", type=" + eventType);
- }
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- serializeEvent(out);
- if (sourceInfo != null) {
- out.writeBoolean(true);
- sourceInfo.write(out);
- } else {
- out.writeBoolean(false);
- }
- if (destinationInfo != null) {
- out.writeBoolean(true);
- destinationInfo.write(out);
- } else {
- out.writeBoolean(false);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- deserializeEvent(in);
- if (in.readBoolean()) {
- sourceInfo = new EventMetaData();
- sourceInfo.readFields(in);
- }
- if (in.readBoolean()) {
- destinationInfo = new EventMetaData();
- destinationInfo.readFields(in);
- }
- }
-
-}