You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/09 06:11:36 UTC
[1/2] TEZ-423. Move new re-factor classes to appropriate module.
(hitesh)
Updated Branches:
refs/heads/TEZ-398 bc9cee33c -> 1cc83e457
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java b/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
index e94fd30..af2193c 100644
--- a/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
+++ b/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
@@ -23,14 +23,18 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.tez.common.records.ProceedToCompletionResponse;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.newapi.impl.TezEvent;
+import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
+import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
import org.apache.tez.engine.records.OutputContext;
/** Protocol that task child process uses to contact its parent process. The
* parent is a daemon which which polls the central master for a new map or
* reduce task and runs it as a child process. All communication between child
- * and parent is via this protocol. */
+ * and parent is via this protocol. */
@InterfaceAudience.Private
@InterfaceStability.Stable
public interface TezTaskUmbilicalProtocol extends Master {
@@ -38,42 +42,52 @@ public interface TezTaskUmbilicalProtocol extends Master {
public static final long versionID = 19L;
ContainerTask getTask(ContainerContext containerContext) throws IOException;
-
- boolean statusUpdate(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
+
+ boolean statusUpdate(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
throws IOException, InterruptedException;
-
+
void reportDiagnosticInfo(TezTaskAttemptID taskid, String trace) throws IOException;
-
+
boolean ping(TezTaskAttemptID taskid) throws IOException;
void done(TezTaskAttemptID taskid) throws IOException;
-
- void commitPending(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
- throws IOException, InterruptedException;
+
+ void commitPending(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
+ throws IOException, InterruptedException;
boolean canCommit(TezTaskAttemptID taskid) throws IOException;
void shuffleError(TezTaskAttemptID taskId, String message) throws IOException;
-
+
void fsError(TezTaskAttemptID taskId, String message) throws IOException;
void fatalError(TezTaskAttemptID taskId, String message) throws IOException;
-
+
// TODO TEZAM5 Can commitPending and outputReady be collapsed into a single
// call.
// IAC outputReady followed by commit is a little confusing - since the output
// isn't really in place till a commit is called. Maybe rename to
// processingComplete or some such.
-
+
// TODO EVENTUALLY This is not the most useful API. Once there's some kind of
// support for the Task handing output over to the Container, this won't rally
// be required. i.e. InMemShuffle running as a service in the Container, or
// the second task in getTask(). ContainerUmbilical would include getTask and
// getServices...
-
+
void outputReady(TezTaskAttemptID taskAttemptId, OutputContext outputContext)
throws IOException;
-
+
ProceedToCompletionResponse
proceedToCompletion(TezTaskAttemptID taskAttemptId) throws IOException;
+
+ /// Copies from TezUmbilical until complete re-factor is done
+ // TODONEWTEZ
+
+ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
+ throws IOException, TezException;
+
+ public void taskFailed(TezTaskAttemptID attemptID,
+ TezEvent taskFailedEvent) throws IOException;
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskFailedEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskFailedEvent.java
new file mode 100644
index 0000000..ddd346f
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskFailedEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.events;
+
+import org.apache.tez.engine.newapi.Event;
+
+public class TaskFailedEvent extends Event {
+
+ private final String diagnostics;
+
+ public TaskFailedEvent(String diagnostics) {
+ this.diagnostics = diagnostics;
+ }
+
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
new file mode 100644
index 0000000..933b48f
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+/**
+ * Class that encapsulates all the information to identify the unique
+ * object that either generated an Event or is the recipient of an Event.
+ */
+public class EventMetaData implements Writable {
+
+ public static enum EventGenerator {
+ INPUT,
+ PROCESSOR,
+ OUTPUT,
+ SYSTEM
+ }
+
+ /**
+ * Source Type ( one of Input/Output/Processor ) that generated the Event.
+ */
+ private EventGenerator generator;
+
+ /**
+ * Name of the vertex where the event was generated.
+ */
+ private String taskVertexName;
+
+ /**
+ * Name of the vertex to which the Input or Output is connected to.
+ */
+ private String edgeVertexName;
+
+ /**
+ * Task Attempt ID
+ */
+ private TezTaskAttemptID taskAttemptID;
+
+ public EventMetaData() {
+ }
+
+ public EventMetaData(EventGenerator generator,
+ String taskVertexName, String edgeVertexName,
+ TezTaskAttemptID taskAttemptID) {
+ this.generator = generator;
+ this.taskVertexName = taskVertexName;
+ this.edgeVertexName = edgeVertexName;
+ this.taskAttemptID = taskAttemptID;
+ }
+
+ public EventGenerator getEventGenerator() {
+ return generator;
+ }
+
+ public TezTaskAttemptID getTaskAttemptID() {
+ return taskAttemptID;
+ }
+
+ public String getTaskVertexName() {
+ return taskVertexName;
+ }
+
+ public String getEdgeVertexName() {
+ return edgeVertexName;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(generator.ordinal());
+ if (taskVertexName != null) {
+ out.writeBoolean(true);
+ out.writeUTF(taskVertexName);
+ } else {
+ out.writeBoolean(false);
+ }
+ if (edgeVertexName != null) {
+ out.writeBoolean(true);
+ out.writeUTF(edgeVertexName);
+ } else {
+ out.writeBoolean(false);
+ }
+ taskAttemptID.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ generator = EventGenerator.values()[in.readInt()];
+ if (in.readBoolean()) {
+ taskVertexName = in.readUTF();
+ }
+ if (in.readBoolean()) {
+ edgeVertexName = in.readUTF();
+ }
+ taskAttemptID = new TezTaskAttemptID();
+ taskAttemptID.readFields(in);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
new file mode 100644
index 0000000..1b4509f
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.impl;
+
+public enum EventType {
+ TASK_FAILED_EVENT,
+ DATA_MOVEMENT_EVENT,
+ INPUT_DATA_ERROR_EVENT
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java
new file mode 100644
index 0000000..c9870f1
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+
+public class InputSpec implements Writable {
+
+ private String sourceVertexName;
+ private InputDescriptor inputDescriptor;
+ private int physicalEdgeCount;
+
+ public InputSpec() {
+ }
+
+ public InputSpec(String sourceVertexName, InputDescriptor inputDescriptor,
+ int physicalEdgeCount) {
+ this.sourceVertexName = sourceVertexName;
+ this.inputDescriptor = inputDescriptor;
+ this.physicalEdgeCount = physicalEdgeCount;
+ }
+
+ public String getSourceVertexName() {
+ return sourceVertexName;
+ }
+
+ public InputDescriptor getInputDescriptor() {
+ return inputDescriptor;
+ }
+
+ public int getPhysicalEdgeCount() {
+ return physicalEdgeCount;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // TODONEWTEZ convert to PB
+ out.writeUTF(sourceVertexName);
+ out.writeInt(physicalEdgeCount);
+ byte[] inputDescBytes =
+ DagTypeConverters.convertToDAGPlan(inputDescriptor).toByteArray();
+ out.writeInt(inputDescBytes.length);
+ out.write(inputDescBytes);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ sourceVertexName = in.readUTF();
+ physicalEdgeCount = in.readInt();
+ int inputDescLen = in.readInt();
+ byte[] inputDescBytes = new byte[inputDescLen];
+ in.readFully(inputDescBytes);
+ inputDescriptor =
+ DagTypeConverters.convertInputDescriptorFromDAGPlan(
+ TezEntityDescriptorProto.parseFrom(inputDescBytes));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java
new file mode 100644
index 0000000..4e6fa64
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+
+public class OutputSpec implements Writable {
+
+ private String destinationVertexName;
+ private OutputDescriptor inputDescriptor;
+ private int physicalEdgeCount;
+
+ public OutputSpec() {
+ }
+
+ public OutputSpec(String destinationVertexName,
+ OutputDescriptor inputDescriptor, int physicalEdgeCount) {
+ this.destinationVertexName = destinationVertexName;
+ this.inputDescriptor = inputDescriptor;
+ this.physicalEdgeCount = physicalEdgeCount;
+ }
+
+ public String getDestinationVertexName() {
+ return destinationVertexName;
+ }
+
+ public OutputDescriptor getOutputDescriptor() {
+ return inputDescriptor;
+ }
+
+ public int getPhysicalEdgeCount() {
+ return physicalEdgeCount;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // TODONEWTEZ convert to PB
+ out.writeUTF(destinationVertexName);
+ out.writeInt(physicalEdgeCount);
+ byte[] inputDescBytes =
+ DagTypeConverters.convertToDAGPlan(inputDescriptor).toByteArray();
+ out.writeInt(inputDescBytes.length);
+ out.write(inputDescBytes);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ destinationVertexName = in.readUTF();
+ physicalEdgeCount = in.readInt();
+ int inputDescLen = in.readInt();
+ byte[] inputDescBytes = new byte[inputDescLen];
+ in.readFully(inputDescBytes);
+ inputDescriptor =
+ DagTypeConverters.convertOutputDescriptorFromDAGPlan(
+ TezEntityDescriptorProto.parseFrom(inputDescBytes));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java
new file mode 100644
index 0000000..03a26c3
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.engine.newapi.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public class TaskSpec implements Writable {
+
+ private TezTaskAttemptID taskAttemptId;
+ private String vertexName;
+ private String user;
+ private ProcessorDescriptor processorDescriptor;
+ private List<InputSpec> inputSpecList;
+ private List<OutputSpec> outputSpecList;
+
+ public TaskSpec() {
+ }
+
+ public TaskSpec(TezTaskAttemptID taskAttemptID, String user,
+ String vertexName, ProcessorDescriptor processorDescriptor,
+ List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList) {
+ this.taskAttemptId = taskAttemptID;
+ this.user = user;
+ this.vertexName = vertexName;
+ this.processorDescriptor = processorDescriptor;
+ this.inputSpecList = inputSpecList;
+ this.outputSpecList = outputSpecList;
+ }
+
+ public String getVertexName() {
+ return vertexName;
+ }
+
+ public TezTaskAttemptID getTaskAttemptID() {
+ return taskAttemptId;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public ProcessorDescriptor getProcessorDescriptor() {
+ return processorDescriptor;
+ }
+
+ public List<InputSpec> getInputs() {
+ return inputSpecList;
+ }
+
+ public List<OutputSpec> getOutputs() {
+ return outputSpecList;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ byte[] procDesc =
+ DagTypeConverters.convertToDAGPlan(processorDescriptor).toByteArray();
+ out.writeInt(procDesc.length);
+ out.write(procDesc);
+ out.writeInt(inputSpecList.size());
+ for (InputSpec inputSpec : inputSpecList) {
+ inputSpec.write(out);
+ }
+ out.writeInt(outputSpecList.size());
+ for (OutputSpec outputSpec : outputSpecList) {
+ outputSpec.write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int procDescLength = in.readInt();
+ // TODO at least 3 buffer copies here. Need to convert this to full PB
+ // TEZ-305
+ byte[] procDescBytes = new byte[procDescLength];
+ in.readFully(procDescBytes);
+ processorDescriptor =
+ DagTypeConverters.convertProcessorDescriptorFromDAGPlan(
+ TezEntityDescriptorProto.parseFrom(procDescBytes));
+ int numInputSpecs = in.readInt();
+ inputSpecList = new ArrayList<InputSpec>(numInputSpecs);
+ for (int i = 0; i < numInputSpecs; i++) {
+ InputSpec inputSpec = new InputSpec();
+ inputSpec.readFields(in);
+ inputSpecList.add(inputSpec);
+ }
+ int numOutputSpecs = in.readInt();
+ outputSpecList = new ArrayList<OutputSpec>(numOutputSpecs);
+ for (int i = 0; i < numOutputSpecs; i++) {
+ OutputSpec outputSpec = new OutputSpec();
+ outputSpec.readFields(in);
+ outputSpecList.add(outputSpec);
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("processorName=" + processorDescriptor.getClassName()
+ + ", inputSpecListSize=" + inputSpecList.size()
+ + ", outputSpecListSize=" + outputSpecList.size());
+ sb.append(", inputSpecList=[");
+ for (InputSpec i : inputSpecList) {
+ sb.append("{" + i.toString() + "}, ");
+ }
+ sb.append("], outputSpecList=[");
+ for (OutputSpec i : outputSpecList) {
+ sb.append("{" + i.toString() + "}, ");
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
new file mode 100644
index 0000000..9eea738
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.api.events.EventProtos.DataMovementEventProto;
+import org.apache.tez.engine.api.events.EventProtos.InputDataErrorEventProto;
+import org.apache.tez.engine.api.events.SystemEventProtos.TaskFailedEventProto;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputDataErrorEvent;
+import org.apache.tez.engine.newapi.events.TaskFailedEvent;
+
+import com.google.protobuf.ByteString;
+
+public class TezEvent implements Writable {
+
+ private EventType eventType;
+
+ private Event event;
+
+ private EventMetaData sourceInfo;
+
+ private EventMetaData destinationInfo;
+
+ public TezEvent() {
+ }
+
+ public TezEvent(Event event, EventMetaData sourceInfo) {
+ this.event = event;
+ this.setSourceInfo(sourceInfo);
+ if (event instanceof DataMovementEvent) {
+ eventType = EventType.DATA_MOVEMENT_EVENT;
+ } else if (event instanceof InputDataErrorEvent) {
+ eventType = EventType.INPUT_DATA_ERROR_EVENT;
+ } else if (event instanceof TaskFailedEvent) {
+ eventType = EventType.TASK_FAILED_EVENT;
+ } else {
+ throw new TezUncheckedException("Unknown event, event="
+ + event.getClass().getName());
+ }
+ }
+
+ public Event getEvent() {
+ return event;
+ }
+
+ public EventMetaData getSourceInfo() {
+ return sourceInfo;
+ }
+
+ public void setSourceInfo(EventMetaData sourceInfo) {
+ this.sourceInfo = sourceInfo;
+ }
+
+ public EventMetaData getDestinationInfo() {
+ return destinationInfo;
+ }
+
+ public void setDestinationInfo(EventMetaData destinationInfo) {
+ this.destinationInfo = destinationInfo;
+ }
+
+ public EventType getEventType() {
+ return eventType;
+ }
+
+ private void serializeEvent(DataOutput out) throws IOException {
+ if (event == null) {
+ out.writeBoolean(false);
+ return;
+ }
+ out.writeBoolean(true);
+ byte[] eventBytes = null;
+ switch (eventType) {
+ case DATA_MOVEMENT_EVENT:
+ DataMovementEvent dmEvt = (DataMovementEvent) event;
+ eventBytes = DataMovementEventProto.newBuilder()
+ .setSourceIndex(dmEvt.getSourceIndex())
+ .setTargetIndex(dmEvt.getTargetIndex())
+ .setUserPayload(ByteString.copyFrom(dmEvt.getUserPayload()))
+ .build().toByteArray();
+ break;
+ case INPUT_DATA_ERROR_EVENT:
+ InputDataErrorEvent ideEvt = (InputDataErrorEvent) event;
+ eventBytes = InputDataErrorEventProto.newBuilder()
+ .setIndex(ideEvt.getIndex())
+ .setDiagnostics(ideEvt.getDiagnostics())
+ .build().toByteArray();
+ break;
+ case TASK_FAILED_EVENT:
+ TaskFailedEvent tfEvt = (TaskFailedEvent) event;
+ eventBytes = TaskFailedEventProto.newBuilder()
+ .setDiagnostics(tfEvt.getDiagnostics())
+ .build().toByteArray();
+ break;
+ }
+ out.writeInt(eventType.ordinal());
+ out.writeInt(eventBytes.length);
+ out.write(eventBytes);
+ }
+
+ private void deserializeEvent(DataInput in) throws IOException {
+ if (!in.readBoolean()) {
+ event = null;
+ return;
+ }
+ eventType = EventType.values()[in.readInt()];
+ int eventBytesLen = in.readInt();
+ byte[] eventBytes = new byte[eventBytesLen];
+ in.readFully(eventBytes);
+ switch (eventType) {
+ case DATA_MOVEMENT_EVENT:
+ DataMovementEventProto dmProto = DataMovementEventProto.parseFrom(eventBytes);
+ event = new DataMovementEvent(dmProto.getSourceIndex(),
+ dmProto.getTargetIndex(),
+ dmProto.getUserPayload().toByteArray());
+ break;
+ case INPUT_DATA_ERROR_EVENT:
+ InputDataErrorEventProto ideProto =
+ InputDataErrorEventProto.parseFrom(eventBytes);
+ event = new InputDataErrorEvent(ideProto.getDiagnostics(),
+ ideProto.getIndex(), ideProto.getVersion());
+ break;
+ case TASK_FAILED_EVENT:
+ TaskFailedEventProto tfProto =
+ TaskFailedEventProto.parseFrom(eventBytes);
+ event = new TaskFailedEvent(tfProto.getDiagnostics());
+ break;
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ serializeEvent(out);
+ if (sourceInfo != null) {
+ out.writeBoolean(true);
+ sourceInfo.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+ if (destinationInfo != null) {
+ out.writeBoolean(true);
+ destinationInfo.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ deserializeEvent(in);
+ if (in.readBoolean()) {
+ sourceInfo.readFields(in);
+ }
+ if (in.readBoolean()) {
+ destinationInfo.readFields(in);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
new file mode 100644
index 0000000..cda456c
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+
+public class TezHeartbeatRequest implements Writable {
+
+ private List<TezEvent> events;
+ private TezTaskAttemptID currentTaskAttemptID;
+ private int startIndex;
+ private int maxEvents;
+
+ public TezHeartbeatRequest() {
+ }
+
+ public TezHeartbeatRequest(List<TezEvent> events,
+ TezTaskAttemptID taskAttemptID,
+ int startIndex, int maxEvents) {
+ this.events = Collections.unmodifiableList(events);
+ this.startIndex = startIndex;
+ this.maxEvents = maxEvents;
+ this.currentTaskAttemptID = taskAttemptID;
+ }
+
+ public List<TezEvent> getEvents() {
+ return events;
+ }
+
+ public int getStartIndex() {
+ return startIndex;
+ }
+
+ public int getMaxEvents() {
+ return maxEvents;
+ }
+
+ public TezTaskAttemptID getCurrentTaskAttemptID() {
+ return currentTaskAttemptID;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(events.size());
+ for (TezEvent e : events) {
+ e.write(out);
+ }
+ if (currentTaskAttemptID != null) {
+ out.writeBoolean(true);
+ currentTaskAttemptID.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+ out.writeInt(startIndex);
+ out.writeInt(maxEvents);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int eventsCount = in.readInt();
+ events = new ArrayList<TezEvent>(eventsCount);
+ for (int i = 0; i < eventsCount; ++i) {
+ TezEvent e = new TezEvent();
+ e.readFields(in);
+ events.add(e);
+ }
+ if (in.readBoolean()) {
+ currentTaskAttemptID = new TezTaskAttemptID();
+ currentTaskAttemptID.readFields(in);
+ } else {
+ currentTaskAttemptID = null;
+ }
+ startIndex = in.readInt();
+ maxEvents = in.readInt();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
new file mode 100644
index 0000000..35c961b
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.impl;
+
+import java.util.Collections;
+import java.util.List;
+
+
+public class TezHeartbeatResponse {
+
+ private final List<TezEvent> events;
+
+ public TezHeartbeatResponse(List<TezEvent> events) {
+ this.events = Collections.unmodifiableList(events);
+ }
+
+ public List<TezEvent> getEvents() {
+ return events;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
new file mode 100644
index 0000000..aa3f4fc
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+/**
+ * Interface to the RPC layer ( umbilical ) between the Tez AM and
+ * a Tez Container's JVM.
+ */
+public class TezUmbilical extends AbstractService {
+
+ private static final Log LOG = LogFactory.getLog(TezUmbilical.class);
+
+ private final TezTaskUmbilicalProtocol umbilical;
+ private Thread heartbeatThread;
+ private Thread eventRouterThread;
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+ private long amPollInterval;
+ private final String containerIdStr;
+
+ private TezTaskAttemptID currentTaskAttemptID;
+ private int eventCounter = 0;
+ private int maxEventsToGet = 0;
+ private LinkedList<TezEvent> eventsToSend;
+ private ConcurrentLinkedQueue<TezEvent> eventsToBeProcessed;
+
+ public TezUmbilical(TezTaskUmbilicalProtocol umbilical,
+ String containerIdStr) {
+ super(TezUmbilical.class.getName());
+ this.umbilical = umbilical;
+ this.containerIdStr = containerIdStr;
+ this.eventsToSend = new LinkedList<TezEvent>();
+ this.eventsToBeProcessed = new ConcurrentLinkedQueue<TezEvent>();
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ amPollInterval = conf.getLong(
+ TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
+ TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
+ maxEventsToGet = conf.getInt(
+ TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
+ TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ startHeartbeatThread();
+ startRouterThread();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ stopped.set(true);
+ eventRouterThread.interrupt();
+ super.serviceStop();
+ }
+
+ private void startHeartbeatThread() {
+ heartbeatThread = new Thread(new Runnable() {
+ public void run() {
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+ try {
+ Thread.sleep(amPollInterval);
+ try {
+ heartbeat();
+ } catch (TezException e) {
+ LOG.error("Error communicating with AM: " + e.getMessage() , e);
+ // TODO TODONEWTEZ
+ } catch (InvalidToken e) {
+ LOG.error("Error in authencating with AM: ", e);
+ // TODO TODONEWTEZ
+ } catch (Exception e) {
+ LOG.error("Error in heartbeating with AM. ", e);
+ // TODO TODONEWTEZ
+ }
+ } catch (InterruptedException e) {
+ if (!stopped.get()) {
+ LOG.warn("Heartbeat thread interrupted. Returning.");
+ }
+ return;
+ }
+ }
+ }
+ });
+ heartbeatThread.setName("Tez Container Heartbeat Thread ["
+ + containerIdStr + "]");
+ heartbeatThread.start();
+ }
+
+ private void startRouterThread() {
+ eventRouterThread = new Thread(new Runnable() {
+ public void run() {
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+ try {
+ TezEvent e = eventsToBeProcessed.poll();
+ if (e == null) {
+ eventsToBeProcessed.wait();
+ }
+ // TODO TODONEWTEZ
+ switch (e.getEventType()) {
+ case DATA_MOVEMENT_EVENT:
+ // redirect to input of current task
+ if (!e.getDestinationInfo().getTaskAttemptID().equals(
+ currentTaskAttemptID)) {
+ // error? or block?
+ }
+ // route to appropriate input
+ break;
+ case TASK_FAILED_EVENT:
+ // route to ???
+ break;
+ case INPUT_DATA_ERROR_EVENT:
+ // invalid event? ignore?
+ break;
+ }
+ } catch (InterruptedException e) {
+ if (!stopped.get()) {
+ LOG.warn("Event Router thread interrupted. Returning.");
+ }
+ return;
+ }
+ }
+ }
+ });
+ eventRouterThread.setName("Tez Container Event Router Thread ["
+ + containerIdStr + "]");
+ eventRouterThread.start();
+ }
+
+ private synchronized void heartbeat() throws TezException, IOException {
+ List<TezEvent> events = new ArrayList<TezEvent>();
+ events.addAll(eventsToSend);
+ TezHeartbeatRequest request = new TezHeartbeatRequest(events,
+ currentTaskAttemptID, eventCounter, maxEventsToGet);
+ TezHeartbeatResponse response = umbilical.heartbeat(request);
+ eventsToSend.clear();
+ eventCounter += response.getEvents().size();
+ eventsToBeProcessed.addAll(response.getEvents());
+ eventsToBeProcessed.notifyAll();
+ }
+
+ /**
+ * Hook to ask the Tez AM for the next task to be run on the Container
+ * @return Next task to be run
+ * @throws IOException
+ */
+ public synchronized ContainerTask getNextTask(
+ ContainerContext containerContext) throws IOException {
+ ContainerTask task = umbilical.getTask(containerContext);
+ if (task.getTaskSpec().getTaskAttemptID() != currentTaskAttemptID) {
+ currentTaskAttemptID = task.getTaskSpec().getTaskAttemptID();
+ }
+ return task;
+ }
+
+ /**
+ * Hook to query the Tez AM whether a particular Task Attempt can commit its
+ * output.
+ * @param attemptID Attempt ID of the Task that is waiting to commit.
+ * attempts can commit.
+ * @throws IOException
+ */
+ public synchronized boolean canCommit(TezTaskAttemptID attemptID)
+ throws IOException {
+ return umbilical.canCommit(attemptID);
+ }
+
+ /**
+ * Inform the Tez AM that an attempt has failed.
+ * @param attemptID Task Attempt ID of the failed attempt.
+ * @param taskFailedEvent Event with details on the attempt failure.
+ * @throws IOException
+ */
+ public synchronized void taskFailed(TezTaskAttemptID attemptID,
+ TezEvent taskFailedEvent) throws IOException {
+ umbilical.taskFailed(attemptID, taskFailedEvent);
+ }
+
+ public synchronized void addEvents(Collection<TezEvent> events) {
+ eventsToSend.addAll(events);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index 7cec1f9..7b0eb45 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -41,12 +41,12 @@ import org.apache.tez.engine.newapi.Processor;
import org.apache.tez.engine.newapi.TezInputContext;
import org.apache.tez.engine.newapi.TezOutputContext;
import org.apache.tez.engine.newapi.TezProcessorContext;
+import org.apache.tez.engine.newapi.impl.InputSpec;
+import org.apache.tez.engine.newapi.impl.OutputSpec;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
import org.apache.tez.engine.newapi.impl.TezInputContextImpl;
import org.apache.tez.engine.newapi.impl.TezOutputContextImpl;
import org.apache.tez.engine.newapi.impl.TezProcessorContextImpl;
-import org.apache.tez.engine.newapi.rpc.impl.InputSpec;
-import org.apache.tez.engine.newapi.rpc.impl.OutputSpec;
-import org.apache.tez.engine.newapi.rpc.impl.TaskSpec;
import com.google.common.base.Preconditions;
@@ -56,7 +56,7 @@ public class LogicalIOProcessorRuntimeTask {
private enum State {
NEW, INITED, RUNNING, CLOSED
}
-
+
private static final Log LOG = LogFactory
.getLog(LogicalIOProcessorRuntimeTask.class);
@@ -71,11 +71,11 @@ public class LogicalIOProcessorRuntimeTask {
private final ProcessorDescriptor processorDescriptor;
private final LogicalIOProcessor processor;
-
+
private final TezCounters tezCounters;
private State state;
-
+
private Map<String, LogicalInput> inputMap;
private Map<String, LogicalOutput> outputMap;
@@ -177,12 +177,12 @@ public class LogicalIOProcessorRuntimeTask {
closeOutputEventMap.put(destVertexName, closeOutputEvents);
}
}
-
+
public Map<String, List<Event>> getInputCloseEvents() {
Preconditions.checkState(this.state == State.CLOSED, "Not closed yet");
return closeInputEventMap;
}
-
+
public Map<String, List<Event>> getOutputCloseEvents() {
Preconditions.checkState(this.state == State.CLOSED, "Not closed yet");
return closeOutputEventMap;
@@ -282,4 +282,4 @@ public class LogicalIOProcessorRuntimeTask {
}
return (LogicalIOProcessor) processor;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/proto/Events.proto b/tez-engine/src/main/proto/Events.proto
new file mode 100644
index 0000000..fe6e449
--- /dev/null
+++ b/tez-engine/src/main/proto/Events.proto
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.engine.api.events";
+option java_outer_classname = "SystemEventProtos";
+option java_generate_equals_and_hash = true;
+
+message TaskFailedEventProto {
+ optional string diagnostics = 1;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
index 30ba8f8..16cc8db 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
@@ -78,6 +78,9 @@ import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
import org.apache.tez.engine.lib.input.LocalMergedInput;
import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.newapi.impl.TezEvent;
+import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
+import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
import org.apache.tez.engine.records.OutputContext;
import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
import org.apache.tez.engine.runtime.RuntimeUtils;
@@ -671,6 +674,20 @@ public class LocalJobRunnerTez implements ClientProtocol {
// TODO TEZAM5 Really depends on the module - inmem shuffle or not.
return new ProceedToCompletionResponse(true, true);
}
+
+ @Override
+ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) {
+ // TODO Auto-generated method stub
+ // TODO TODONEWTEZ
+ return null;
+ }
+
+ @Override
+ public void taskFailed(TezTaskAttemptID attemptID, TezEvent taskFailedEvent)
+ throws IOException {
+ // TODO Auto-generated method stub
+ // TODO TODONEWTEZ
+ }
}
public LocalJobRunnerTez(Configuration conf) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
index 840cb31..da68776 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
@@ -29,6 +29,9 @@ import org.apache.tez.common.TezTaskStatus;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.records.ProceedToCompletionResponse;
import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.newapi.impl.TezEvent;
+import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
+import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
import org.apache.tez.engine.records.OutputContext;
import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
@@ -36,12 +39,12 @@ public class TestUmbilicalProtocol implements TezTaskUmbilicalProtocol {
private static final Log LOG = LogFactory.getLog(TestUmbilicalProtocol.class);
private ProceedToCompletionResponse proceedToCompletionResponse;
-
-
+
+
public TestUmbilicalProtocol() {
proceedToCompletionResponse = new ProceedToCompletionResponse(false, true);
}
-
+
public TestUmbilicalProtocol(boolean shouldLinger) {
if (shouldLinger) {
proceedToCompletionResponse = new ProceedToCompletionResponse(false, false);
@@ -136,7 +139,7 @@ public class TestUmbilicalProtocol implements TezTaskUmbilicalProtocol {
public void outputReady(TezTaskAttemptID taskAttemptId,
OutputContext outputContext) throws IOException {
// TODO Auto-generated method stub
-
+
}
@Override
@@ -145,4 +148,18 @@ public class TestUmbilicalProtocol implements TezTaskUmbilicalProtocol {
return proceedToCompletionResponse;
}
+ @Override
+ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) {
+ // TODO Auto-generated method stub
+ // TODO TODONEWTEZ
+ return null;
+ }
+
+ @Override
+ public void taskFailed(TezTaskAttemptID attemptID, TezEvent taskFailedEvent)
+ throws IOException {
+ // TODO Auto-generated method stub
+ // TODO TODONEWTEZ
+ }
+
}
[2/2] git commit: TEZ-423. Move new re-factor classes to appropriate
module. (hitesh)
Posted by hi...@apache.org.
TEZ-423. Move new re-factor classes to appropriate module. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/1cc83e45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/1cc83e45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/1cc83e45
Branch: refs/heads/TEZ-398
Commit: 1cc83e457b8a86aab801b526c79652fa99e83186
Parents: bc9cee3
Author: Hitesh Shah <hi...@apache.org>
Authored: Sun Sep 8 21:09:45 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Sun Sep 8 21:09:45 2013 -0700
----------------------------------------------------------------------
.../org/apache/tez/common/ContainerTask.java | 102 ---------
.../apache/tez/dag/api/TezConfiguration.java | 8 +
tez-dag/pom.xml | 4 -
.../apache/hadoop/mapred/YarnTezDagChild.java | 61 ++---
.../dag/app/TaskAttemptListenerImpTezDag.java | 43 +++-
.../java/org/apache/tez/dag/app/dag/Vertex.java | 18 +-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 20 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 16 +-
.../app/rm/AMSchedulerEventTALaunchRequest.java | 13 +-
.../dag/app/rm/TaskSchedulerEventHandler.java | 2 +-
.../rm/container/AMContainerEventAssignTA.java | 22 +-
.../dag/app/rm/container/AMContainerImpl.java | 12 +-
.../dag/app/rm/container/AMContainerTask.java | 8 +-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 7 +-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 44 ++--
.../tez/dag/app/rm/TestContainerReuse.java | 64 +++---
.../dag/app/rm/container/TestAMContainer.java | 4 +-
tez-engine-api/pom.xml | 31 +++
.../engine/newapi/events/DataMovementEvent.java | 109 +++++++++
.../newapi/events/InputDataErrorEvent.java | 15 +-
.../newapi/events/TaskCommunicationEvent.java | 84 -------
.../tez/engine/newapi/impl/EventMetaData.java | 81 -------
.../tez/engine/newapi/impl/TezUmbilical.java | 61 -----
.../tez/engine/newapi/rpc/impl/InputSpec.java | 51 -----
.../tez/engine/newapi/rpc/impl/OutputSpec.java | 48 ----
.../tez/engine/newapi/rpc/impl/TaskSpec.java | 66 ------
.../tez/engine/newapi/rpc/impl/TezEvent.java | 64 ------
tez-engine-api/src/main/proto/Events.proto | 34 +++
tez-engine/pom.xml | 31 +++
.../org/apache/tez/common/ContainerTask.java | 74 +++++++
.../tez/common/TezTaskUmbilicalProtocol.java | 40 ++--
.../engine/newapi/events/TaskFailedEvent.java | 35 +++
.../tez/engine/newapi/impl/EventMetaData.java | 120 ++++++++++
.../tez/engine/newapi/impl/EventType.java | 25 +++
.../tez/engine/newapi/impl/InputSpec.java | 81 +++++++
.../tez/engine/newapi/impl/OutputSpec.java | 81 +++++++
.../apache/tez/engine/newapi/impl/TaskSpec.java | 139 ++++++++++++
.../apache/tez/engine/newapi/impl/TezEvent.java | 182 +++++++++++++++
.../engine/newapi/impl/TezHeartbeatRequest.java | 102 +++++++++
.../newapi/impl/TezHeartbeatResponse.java | 37 ++++
.../tez/engine/newapi/impl/TezUmbilical.java | 220 +++++++++++++++++++
.../LogicalIOProcessorRuntimeTask.java | 18 +-
tez-engine/src/main/proto/Events.proto | 25 +++
.../apache/hadoop/mapred/LocalJobRunnerTez.java | 17 ++
.../tez/mapreduce/TestUmbilicalProtocol.java | 25 ++-
45 files changed, 1608 insertions(+), 736 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-common/src/main/java/org/apache/tez/common/ContainerTask.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/ContainerTask.java b/tez-common/src/main/java/org/apache/tez/common/ContainerTask.java
deleted file mode 100644
index 69bd2b4..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/ContainerTask.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.common;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-public class ContainerTask implements Writable {
-
- TezTaskContext tezTaskContext;
- boolean shouldDie;
-
- public ContainerTask() {
- }
-
- public ContainerTask(TezTaskContext tezTaskContext, boolean shouldDie) {
- this.tezTaskContext = tezTaskContext;
- this.shouldDie = shouldDie;
- }
-
- public TezTaskContext getTezEngineTaskContext() {
- return tezTaskContext;
- }
-
- public boolean shouldDie() {
- return shouldDie;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeBoolean(shouldDie);
- if (tezTaskContext != null) {
- out.writeBoolean(true);
- Text.writeString(out, tezTaskContext.getClass().getName());
- tezTaskContext.write(out);
- } else {
- out.writeBoolean(false);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- shouldDie = in.readBoolean();
- boolean taskComing = in.readBoolean();
- if (taskComing) {
- String contextClass = Text.readString(in);
- tezTaskContext = createEmptyContext(contextClass);
- tezTaskContext.readFields(in);
- }
- }
-
- private TezTaskContext createEmptyContext(String contextClassName)
- throws IOException {
- try {
- Class<?> clazz = Class.forName(contextClassName);
- Constructor<?> c = clazz.getConstructor(null);
- c.setAccessible(true);
- return (TezTaskContext) c.newInstance(null);
- } catch (ClassNotFoundException e) {
- throw new IOException(e);
- } catch (SecurityException e) {
- throw new IOException(e);
- } catch (NoSuchMethodException e) {
- throw new IOException(e);
- } catch (IllegalArgumentException e) {
- throw new IOException(e);
- } catch (InstantiationException e) {
- throw new IOException(e);
- } catch (IllegalAccessException e) {
- throw new IOException(e);
- } catch (InvocationTargetException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public String toString() {
- return "shouldDie: " + shouldDie + ", tezEngineTaskContext: "
- + tezTaskContext;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index cc41856..e40f4f5 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -164,6 +164,14 @@ public class TezConfiguration extends Configuration {
+ "get-task.sleep.interval-ms.max";
public static final int TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT = 500;
+ public static final String TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS = TEZ_TASK_PREFIX
+ + "am.heartbeat.interval-ms.max";
+ public static final int TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000;
+
+ public static final String TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT = TEZ_TASK_PREFIX
+ + "max-events-per-heartbeat.max";
+ public static final int TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT = 100;
+
/**
* Configuration to specify whether container should be reused.
*/
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dag/pom.xml b/tez-dag/pom.xml
index f9ec145..875a196 100644
--- a/tez-dag/pom.xml
+++ b/tez-dag/pom.xml
@@ -101,10 +101,6 @@
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 9f59219..2bac9b6 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -49,13 +49,12 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
-import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.OutputSpec;
-import org.apache.tez.common.TezEngineTaskContext;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.Limits;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
@@ -65,7 +64,9 @@ import org.apache.tez.engine.common.objectregistry.ObjectRegistryImpl;
import org.apache.tez.engine.common.objectregistry.ObjectRegistryModule;
import org.apache.tez.engine.common.security.JobTokenIdentifier;
import org.apache.tez.engine.common.security.TokenCache;
-import org.apache.tez.engine.runtime.RuntimeUtils;
+import org.apache.tez.engine.newapi.impl.InputSpec;
+import org.apache.tez.engine.newapi.impl.OutputSpec;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
import org.apache.tez.engine.task.RuntimeTask;
import org.apache.tez.mapreduce.input.SimpleInput;
import org.apache.tez.mapreduce.output.SimpleOutput;
@@ -147,11 +148,12 @@ public class YarnTezDagChild {
if (LOG.isDebugEnabled()) {
LOG.debug("PID, containerId: " + pid + ", " + containerIdentifier);
}
- TezEngineTaskContext taskContext = null;
+ TaskSpec taskSpec = null;
ContainerTask containerTask = null;
UserGroupInformation childUGI = null;
TezTaskAttemptID taskAttemptId = null;
- ContainerContext containerContext = new ContainerContext(containerIdentifier, pid);
+ ContainerContext containerContext = new ContainerContext(
+ containerIdentifier, pid);
int getTaskMaxSleepTime = defaultConf.getInt(
TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT);
@@ -173,19 +175,18 @@ public class YarnTezDagChild {
LOG.info("TaskInfo: shouldDie: "
+ containerTask.shouldDie()
+ (containerTask.shouldDie() == true ? "" : ", taskAttemptId: "
- + containerTask.getTezEngineTaskContext().getTaskAttemptId()));
+ + containerTask.getTaskSpec().getTaskAttemptID()));
if (containerTask.shouldDie()) {
return;
}
taskCount++;
- taskContext = (TezEngineTaskContext) containerTask
- .getTezEngineTaskContext();
+ taskSpec = containerTask.getTaskSpec();
if (LOG.isDebugEnabled()) {
LOG.debug("New container task context:"
- + taskContext.toString());
+ + taskSpec.toString());
}
- taskAttemptId = taskContext.getTaskAttemptId();
+ taskAttemptId = taskSpec.getTaskAttemptID();
TezVertexID newVertexId = taskAttemptId.getTaskID().getVertexID();
if (currentVertexId != null) {
@@ -200,7 +201,7 @@ public class YarnTezDagChild {
updateLoggers(taskAttemptId);
- final Task t = createAndConfigureTezTask(taskContext, umbilical,
+ final Task t = createAndConfigureTezTask(taskSpec, umbilical,
credentials, jobToken, attemptNumber);
final Configuration conf = ((RuntimeTask)t).getConfiguration();
@@ -297,7 +298,7 @@ public class YarnTezDagChild {
}
private static Task createAndConfigureTezTask(
- TezEngineTaskContext taskContext, TezTaskUmbilicalProtocol master,
+ TaskSpec taskSpec, TezTaskUmbilicalProtocol master,
Credentials cxredentials, Token<JobTokenIdentifier> jobToken,
int appAttemptId) throws IOException, InterruptedException {
@@ -308,19 +309,29 @@ public class YarnTezDagChild {
configureLocalDirs(conf);
-
// FIXME need Input/Output vertices else we have this hack
- if (taskContext.getInputSpecList().isEmpty()) {
- taskContext.getInputSpecList().add(
- new InputSpec("null", 0, SimpleInput.class.getName()));
+ if (taskSpec.getInputs().isEmpty()) {
+ InputDescriptor simpleInputDesc =
+ new InputDescriptor(SimpleInput.class.getName());
+ simpleInputDesc.setUserPayload(
+ taskSpec.getProcessorDescriptor().getUserPayload());
+ taskSpec.getInputs().add(
+ new InputSpec("null", simpleInputDesc, 0));
}
- if (taskContext.getOutputSpecList().isEmpty()) {
- taskContext.getOutputSpecList().add(
- new OutputSpec("null", 0, SimpleOutput.class.getName()));
+ if (taskSpec.getOutputs().isEmpty()) {
+ OutputDescriptor simpleOutputDesc =
+ new OutputDescriptor(SimpleOutput.class.getName());
+ simpleOutputDesc.setUserPayload(
+ taskSpec.getProcessorDescriptor().getUserPayload());
+ taskSpec.getOutputs().add(
+ new OutputSpec("null", simpleOutputDesc, 0));
}
- Task t = RuntimeUtils.createRuntimeTask(taskContext);
-
- t.initialize(conf, taskContext.getProcessorUserPayload(), master);
+ Task t = null;
+
+ // FIXME TODONEWTEZ
+
+ // RuntimeUtils.createRuntimeTask(taskSpec);
+ // t.initialize(conf, taskSpec.getProcessorUserPayload(), master);
// FIXME wrapper should initialize all of processor, inputs and outputs
// Currently, processor is inited via task init
@@ -353,13 +364,13 @@ public class YarnTezDagChild {
}
}
}
-
+
private static void updateLoggers(TezTaskAttemptID tezTaskAttemptID)
throws FileNotFoundException {
String containerLogDir = null;
LOG.info("Redirecting log files based on TaskAttemptId: " + tezTaskAttemptID);
-
+
Appender appender = Logger.getRootLogger().getAppender(
TezConfiguration.TEZ_CONTAINER_LOGGER_NAME);
if (appender != null) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 619a494..6d64a58 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -58,6 +59,10 @@ import org.apache.tez.dag.app.rm.container.AMContainerTask;
import org.apache.tez.dag.app.security.authorize.MRAMPolicyProvider;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.engine.common.security.JobTokenSecretManager;
+import org.apache.tez.engine.newapi.events.TaskFailedEvent;
+import org.apache.tez.engine.newapi.impl.TezEvent;
+import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
+import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
import org.apache.tez.engine.records.OutputContext;
import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
@@ -210,7 +215,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
+ " is invalid and will be killed");
else
LOG.info("Container with id: " + containerId
- + " is valid and will be killed");
+ + " is valid and will be killed");
task = TASK_FOR_INVALID_JVM;
} else {
pingContainerHeartbeatHandler(containerId);
@@ -224,16 +229,16 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
LOG.info("No task currently assigned to Container with id: "
+ containerId);
} else {
- registerTaskAttempt(taskContext.getTask().getTaskAttemptId(),
+ registerTaskAttempt(taskContext.getTask().getTaskAttemptID(),
containerId);
task = new ContainerTask(taskContext.getTask(), false);
context.getEventHandler().handle(
new TaskAttemptEventStartedRemotely(taskContext.getTask()
- .getTaskAttemptId(), containerId, context
+ .getTaskAttemptID(), containerId, context
.getApplicationACLs(), context.getAllContainers()
.get(containerId).getShufflePort()));
LOG.info("Container with id: " + containerId + " given task: "
- + taskContext.getTask().getTaskAttemptId());
+ + taskContext.getTask().getTaskAttemptID());
}
}
}
@@ -474,10 +479,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
// between polls (MRTask) implies tasks end up wasting upto 1 second doing
// nothing. Similarly for CA_COMMIT.
- DAG job = context.getCurrentDAG();
- Task task =
- job.getVertex(taskAttemptId.getTaskID().getVertexID()).
- getTask(taskAttemptId.getTaskID());
// TODO In-Memory Shuffle
/*
@@ -563,4 +564,30 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
+ ", ContainerId not known for this attempt");
}
}
+
+ @Override
+ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
+ throws IOException, TezException {
+ // TODO TODONEWTEZ Auto-generated method stub
+ TezTaskAttemptID taskAttemptID = request.getCurrentTaskAttemptID();
+ LOG.info("Ping from " + taskAttemptID.toString());
+ taskHeartbeatHandler.pinged(taskAttemptID);
+ pingContainerHeartbeatHandler(taskAttemptID);
+ return null;
+ }
+
+ @Override
+ public void taskFailed(TezTaskAttemptID taskAttemptId,
+ TezEvent tezEvent) throws IOException {
+ TaskFailedEvent taskFailedEvent = (TaskFailedEvent) tezEvent.getEvent();
+ LOG.fatal("Task: " + taskAttemptId + " - failed : "
+ + taskFailedEvent.getDiagnostics());
+ reportDiagnosticInfo(taskAttemptId, "Error: "
+ + taskFailedEvent.getDiagnostics());
+
+ context.getEventHandler().handle(
+ new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_FAILED));
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index a33ab91..48c9993 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -23,8 +23,6 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.OutputSpec;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -34,23 +32,25 @@ import org.apache.tez.dag.api.client.VertexStatusBuilder;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.newapi.impl.InputSpec;
+import org.apache.tez.engine.newapi.impl.OutputSpec;
import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
/**
- * Main interface to interact with the job. Provides only getters.
+ * Main interface to interact with the job. Provides only getters.
*/
public interface Vertex extends Comparable<Vertex> {
TezVertexID getVertexId();
public VertexPlan getVertexPlan();
-
+
int getDistanceFromRoot();
String getName();
VertexState getState();
/**
- * Get all the counters of this vertex.
+ * Get all the counters of this vertex.
* @return aggregate task-counters
*/
TezCounters getAllCounters();
@@ -64,18 +64,18 @@ public interface Vertex extends Comparable<Vertex> {
float getProgress();
ProgressBuilder getVertexProgress();
VertexStatusBuilder getVertexStatus();
-
+
void setParallelism(int parallelism, List<byte[]> taskUserPayloads);
-
+
TezDependentTaskCompletionEvent[] getTaskAttemptCompletionEvents(
TezTaskAttemptID attemptId, int fromEventId, int maxEvents);
-
+
void setInputVertices(Map<Vertex, EdgeProperty> inVertices);
void setOutputVertices(Map<Vertex, EdgeProperty> outVertices);
Map<Vertex, EdgeProperty> getInputVertices();
Map<Vertex, EdgeProperty> getOutputVertices();
-
+
List<InputSpec> getInputSpecList();
List<OutputSpec> getOutputSpecList();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index ae3c05f..00ef9e5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -47,8 +47,6 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.common.TezTaskContext;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -94,6 +92,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TezBuilderUtils;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
import com.google.common.annotations.VisibleForTesting;
@@ -294,14 +293,13 @@ public class TaskAttemptImpl implements TaskAttempt,
return getVertexID().getDAGId();
}
- TezTaskContext createRemoteTask() {
+ TaskSpec createRemoteTaskSpec() {
Vertex vertex = getVertex();
ProcessorDescriptor procDesc = vertex.getProcessorDescriptor();
DAG dag = vertex.getDAG();
-
- return new TezEngineTaskContext(getID(), dag.getUserName(),
- dag.getName(), vertex.getName(), procDesc,
- vertex.getInputSpecList(), vertex.getOutputSpecList());
+ return new TaskSpec(getID(), dag.getUserName(),
+ vertex.getName(), procDesc, vertex.getInputSpecList(),
+ vertex.getOutputSpecList());
}
@Override
@@ -507,7 +505,7 @@ public class TaskAttemptImpl implements TaskAttempt,
.getVertex(attemptId.getTaskID().getVertexID())
.getTask(attemptId.getTaskID());
}
-
+
Vertex getVertex() {
return appContext.getCurrentDAG()
.getVertex(attemptId.getTaskID().getVertexID());
@@ -856,7 +854,7 @@ public class TaskAttemptImpl implements TaskAttempt,
// recovery.
// Create the remote task.
- TezTaskContext remoteTaskContext = ta.createRemoteTask();
+ TaskSpec remoteTaskSpec = ta.createRemoteTaskSpec();
// Create startTaskRequest
String[] requestHosts = new String[0];
@@ -889,12 +887,12 @@ public class TaskAttemptImpl implements TaskAttempt,
if (LOG.isDebugEnabled()) {
LOG.debug("Asking for container launch with taskAttemptContext: "
- + remoteTaskContext);
+ + remoteTaskSpec);
}
// Send out a launch request to the scheduler.
AMSchedulerEventTALaunchRequest launchRequestEvent = new AMSchedulerEventTALaunchRequest(
- ta.attemptId, ta.taskResource, remoteTaskContext, ta, requestHosts,
+ ta.attemptId, ta.taskResource, remoteTaskSpec, ta, requestHosts,
requestRacks, scheduleEvent.getPriority(), ta.containerContext);
ta.sendEvent(launchRequestEvent);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index b20ac2a..a040ff2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -50,8 +50,6 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.OutputSpec;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeProperty;
@@ -103,6 +101,8 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.newapi.impl.InputSpec;
+import org.apache.tez.engine.newapi.impl.OutputSpec;
import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
import com.google.common.annotations.VisibleForTesting;
@@ -656,7 +656,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
readLock.unlock();
}
}
-
+
@Override
public void setParallelism(int parallelism, List<byte[]> taskUserPayloads) {
writeLock.lock();
@@ -1413,7 +1413,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
public int getOutputVerticesCount() {
return this.targetVertices.size();
}
-
+
@Override
public ProcessorDescriptor getProcessorDescriptor() {
return processorDescriptor;
@@ -1453,8 +1453,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.getInputVerticesCount());
for (Entry<Vertex, EdgeProperty> entry : this.getInputVertices().entrySet()) {
InputSpec inputSpec = new InputSpec(entry.getKey().getName(),
- entry.getKey().getTotalTasks(),
- entry.getValue().getEdgeDestination().getClassName());
+ entry.getValue().getEdgeDestination(), entry.getKey().getTotalTasks());
if (LOG.isDebugEnabled()) {
LOG.debug("For vertex : " + this.getName()
+ ", Using InputSpec : " + inputSpec);
@@ -1472,8 +1471,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount());
for (Entry<Vertex, EdgeProperty> entry : this.getOutputVertices().entrySet()) {
OutputSpec outputSpec = new OutputSpec(entry.getKey().getName(),
- entry.getKey().getTotalTasks(),
- entry.getValue().getEdgeSource().getClassName());
+ entry.getValue().getEdgeSource(), entry.getKey().getTotalTasks());
if (LOG.isDebugEnabled()) {
LOG.debug("For vertex : " + this.getName()
+ ", Using OutputSpec : " + outputSpec);
@@ -1500,7 +1498,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexScheduler getVertexScheduler() {
return this.vertexScheduler;
}
-
+
private static void logLocationHints(VertexLocationHint locationHint) {
Multiset<String> hosts = HashMultiset.create();
Multiset<String> racks = HashMultiset.create();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
index f66c1a2..1c30b0b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
@@ -19,10 +19,10 @@ package org.apache.tez.dag.app.rm;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.common.TezTaskContext;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
@@ -35,19 +35,18 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
private final Priority priority;
private final Resource capability;
private final ContainerContext containerContext;
-
- private final TezTaskContext remoteTaskContext;
+ private final TaskSpec remoteTaskSpec;
private final TaskAttempt taskAttempt;
public AMSchedulerEventTALaunchRequest(TezTaskAttemptID attemptId,
Resource capability,
- TezTaskContext remoteTaskContext, TaskAttempt ta,
+ TaskSpec remoteTaskSpec, TaskAttempt ta,
String[] hosts, String[] racks, Priority priority, ContainerContext containerContext) {
super(AMSchedulerEventType.S_TA_LAUNCH_REQUEST);
this.attemptId = attemptId;
this.capability = capability;
- this.remoteTaskContext = remoteTaskContext;
+ this.remoteTaskSpec = remoteTaskSpec;
this.taskAttempt = ta;
this.hosts = hosts;
this.racks = racks;
@@ -75,8 +74,8 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
return priority;
}
- public TezTaskContext getRemoteTaskContext() {
- return remoteTaskContext;
+ public TaskSpec getRemoteTaskSpec() {
+ return remoteTaskSpec;
}
public TaskAttempt getTaskAttempt() {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 1cb6de6..9e059a9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -400,7 +400,7 @@ public class TaskSchedulerEventHandler extends AbstractService
}
sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container));
sendEvent(new AMContainerEventAssignTA(containerId,
- taskAttempt.getID(), event.getRemoteTaskContext()));
+ taskAttempt.getID(), event.getRemoteTaskSpec()));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
index 0fe5b3b..dd178fb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
@@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -18,26 +18,26 @@
package org.apache.tez.dag.app.rm.container;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.common.TezTaskContext;
import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
public class AMContainerEventAssignTA extends AMContainerEvent {
private final TezTaskAttemptID attemptId;
// TODO Maybe have tht TAL pull the remoteTask from the TaskAttempt itself ?
- private final TezTaskContext remoteTaskContext;
-
+ private final TaskSpec remoteTaskSpec;
+
public AMContainerEventAssignTA(ContainerId containerId,
- TezTaskAttemptID attemptId, Object remoteTaskContext) {
+ TezTaskAttemptID attemptId, Object remoteTaskSpec) {
super(containerId, AMContainerEventType.C_ASSIGN_TA);
this.attemptId = attemptId;
- this.remoteTaskContext = (TezTaskContext)remoteTaskContext;
+ this.remoteTaskSpec = (TaskSpec)remoteTaskSpec;
}
-
- public TezTaskContext getRemoteTaskContext() {
- return this.remoteTaskContext;
+
+ public TaskSpec getRemoteTaskSpec() {
+ return this.remoteTaskSpec;
}
-
+
public TezTaskAttemptID getTaskAttemptId() {
return this.attemptId;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 9d44c11..afb0ed5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.tez.common.TezTaskContext;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.ContainerContext;
@@ -55,6 +54,7 @@ import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
//import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
@SuppressWarnings("rawtypes")
public class AMContainerImpl implements AMContainer {
@@ -74,8 +74,8 @@ public class AMContainerImpl implements AMContainer {
private final List<TezTaskAttemptID> completedAttempts = new LinkedList<TezTaskAttemptID>();
// TODO Maybe this should be pulled from the TaskAttempt.s
- private final Map<TezTaskAttemptID, TezTaskContext> remoteTaskMap =
- new HashMap<TezTaskAttemptID, TezTaskContext>();
+ private final Map<TezTaskAttemptID, TaskSpec> remoteTaskMap =
+ new HashMap<TezTaskAttemptID, TaskSpec>();
// TODO ?? Convert to list and hash.
@@ -453,10 +453,10 @@ public class AMContainerImpl implements AMContainer {
}
container.pendingAttempt = event.getTaskAttemptId();
if (LOG.isDebugEnabled()) {
- LOG.debug("AssignTA: attempt: " + event.getRemoteTaskContext());
+ LOG.debug("AssignTA: attempt: " + event.getRemoteTaskSpec());
}
container.remoteTaskMap
- .put(event.getTaskAttemptId(), event.getRemoteTaskContext());
+ .put(event.getTaskAttemptId(), event.getRemoteTaskSpec());
return container.getState();
}
}
@@ -600,7 +600,7 @@ public class AMContainerImpl implements AMContainer {
AMContainerImpl container, AMContainerEvent cEvent) {
if (LOG.isDebugEnabled()) {
LOG.debug("AssignTAAtIdle: attempt: " +
- ((AMContainerEventAssignTA) cEvent).getRemoteTaskContext());
+ ((AMContainerEventAssignTA) cEvent).getRemoteTaskSpec());
}
return super.transition(container, cEvent);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
index 1189a34..be1c08e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
@@ -18,13 +18,13 @@
package org.apache.tez.dag.app.rm.container;
-import org.apache.tez.common.TezTaskContext;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
public class AMContainerTask {
private final boolean shouldDie;
- private final TezTaskContext tezTask;
+ private final TaskSpec tezTask;
- public AMContainerTask(boolean shouldDie, TezTaskContext tezTask) {
+ public AMContainerTask(boolean shouldDie, TaskSpec tezTask) {
this.shouldDie = shouldDie;
this.tezTask = tezTask;
}
@@ -33,7 +33,7 @@ public class AMContainerTask {
return this.shouldDie;
}
- public TezTaskContext getTask() {
+ public TaskSpec getTask() {
return this.tezTask;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 3274a4a..fe97e10 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -83,6 +83,7 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -175,7 +176,7 @@ public class TestTaskAttempt {
new SystemClock(), mock(TaskHeartbeatHandler.class),
mock(AppContext.class), locationHint, false, Resource.newInstance(1024,
1), createFakeContainerContext());
-
+
TaskAttemptImpl spyTa = spy(taImpl);
when(spyTa.resolveHosts(hosts)).thenReturn(
resolved.toArray(new String[3]));
@@ -766,7 +767,7 @@ public class TestTaskAttempt {
}
@Override
- protected TezTaskContext createRemoteTask() {
+ protected TaskSpec createRemoteTaskSpec() {
// FIXME
return null;
}
@@ -786,7 +787,7 @@ public class TestTaskAttempt {
TaskAttemptState state) {
}
}
-
+
private static ContainerContext createFakeContainerContext() {
return new ContainerContext(new HashMap<String, LocalResource>(),
new Credentials(), new HashMap<String, String>(), "");
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 238b2bd..7e7a8a5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -613,38 +613,38 @@ public class TestVertexImpl {
Assert.assertEquals(2, v3.getOutputVerticesCount());
Assert.assertTrue("vertex1".equals(v3.getInputSpecList().get(0)
- .getVertexName())
+ .getSourceVertexName())
|| "vertex2".equals(v3.getInputSpecList().get(0)
- .getVertexName()));
+ .getSourceVertexName()));
Assert.assertTrue("vertex1".equals(v3.getInputSpecList().get(1)
- .getVertexName())
+ .getSourceVertexName())
|| "vertex2".equals(v3.getInputSpecList().get(1)
- .getVertexName()));
+ .getSourceVertexName()));
Assert.assertTrue("i3_v1".equals(v3.getInputSpecList().get(0)
- .getInputClassName())
+ .getInputDescriptor().getClassName())
|| "i3_v2".equals(v3.getInputSpecList().get(0)
- .getInputClassName()));
+ .getInputDescriptor().getClassName()));
Assert.assertTrue("i3_v1".equals(v3.getInputSpecList().get(1)
- .getInputClassName())
+ .getInputDescriptor().getClassName())
|| "i3_v2".equals(v3.getInputSpecList().get(1)
- .getInputClassName()));
+ .getInputDescriptor().getClassName()));
Assert.assertTrue("vertex4".equals(v3.getOutputSpecList().get(0)
- .getVertexName())
+ .getDestinationVertexName())
|| "vertex5".equals(v3.getOutputSpecList().get(0)
- .getVertexName()));
+ .getDestinationVertexName()));
Assert.assertTrue("vertex4".equals(v3.getOutputSpecList().get(1)
- .getVertexName())
+ .getDestinationVertexName())
|| "vertex5".equals(v3.getOutputSpecList().get(1)
- .getVertexName()));
+ .getDestinationVertexName()));
Assert.assertTrue("o3_v4".equals(v3.getOutputSpecList().get(0)
- .getOutputClassName())
+ .getOutputDescriptor().getClassName())
|| "o3_v5".equals(v3.getOutputSpecList().get(0)
- .getOutputClassName()));
+ .getOutputDescriptor().getClassName()));
Assert.assertTrue("o3_v4".equals(v3.getOutputSpecList().get(1)
- .getOutputClassName())
+ .getOutputDescriptor().getClassName())
|| "o3_v5".equals(v3.getOutputSpecList().get(1)
- .getOutputClassName()));
+ .getOutputDescriptor().getClassName()));
}
@Test(timeout = 5000)
@@ -654,7 +654,7 @@ public class TestVertexImpl {
VertexImpl v = vertices.get("vertex2");
startVertex(v);
}
-
+
@Test//(timeout = 5000)
public void testVertexSetParallelism() {
VertexImpl v2 = vertices.get("vertex2");
@@ -663,9 +663,9 @@ public class TestVertexImpl {
Map<TezTaskID, Task> tasks = v2.getTasks();
Assert.assertEquals(2, tasks.size());
TezTaskID firstTask = tasks.keySet().iterator().next();
-
+
startVertex(v2);
-
+
byte[] payload = new byte[0];
List<byte[]> taskPayloads = Collections.singletonList(payload);
v2.setParallelism(1, taskPayloads);
@@ -673,7 +673,7 @@ public class TestVertexImpl {
Assert.assertEquals(1, tasks.size());
// the last one is removed
Assert.assertTrue(tasks.keySet().iterator().next().equals(firstTask));
-
+
VertexImpl v1 = vertices.get("vertex1");
TezTaskID t1_v1 = new TezTaskID(v1.getVertexId(), 0);
TezTaskAttemptID ta1_t1_v1 = new TezTaskAttemptID(t1_v1, 0);
@@ -681,7 +681,7 @@ public class TestVertexImpl {
TezDependentTaskCompletionEvent cEvt1 =
new TezDependentTaskCompletionEvent(1, ta1_t1_v1,
Status.SUCCEEDED, "", 3, 0);
- v2.handle(
+ v2.handle(
new VertexEventSourceTaskAttemptCompleted(v2.getVertexId(), cEvt1));
TezTaskID t1_v2 = new TezTaskID(v2.getVertexId(), 0);
@@ -689,7 +689,7 @@ public class TestVertexImpl {
TezDependentTaskCompletionEvent[] events =
v2.getTaskAttemptCompletionEvents(ta1_t1_v2, 0, 100);
Assert.assertEquals(1, events.length);
- TezDependentTaskCompletionEvent clone = events[0];
+ TezDependentTaskCompletionEvent clone = events[0];
// payload must be present in the first event
Assert.assertEquals(payload, clone.getUserPayload());
// event must be a copy
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index e232ec0..408f88a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -29,9 +29,9 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.OutputSpec;
import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -57,12 +57,15 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.newapi.impl.InputSpec;
+import org.apache.tez.engine.newapi.impl.OutputSpec;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
import org.junit.Test;
import com.google.common.collect.Lists;
public class TestContainerReuse {
-
+
@Test(timeout = 15000l)
public void testDelayedReuseContainerBecomesAvailable() throws IOException, InterruptedException, ExecutionException {
@@ -77,7 +80,7 @@ public class TestContainerReuse {
CapturingEventHandler eventHandler = new CapturingEventHandler();
TezDAGID dagID = new TezDAGID("0", 0, 0);
TezVertexID vertexID = new TezVertexID(dagID, 1);
-
+
AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
AMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
String appUrl = "url";
@@ -143,9 +146,9 @@ public class TestContainerReuse {
verify(rmClient, times(0)).releaseAssignedContainer(eq(containerHost1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
eventHandler.reset();
-
+
taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta21, containerHost2.getId(), TaskAttemptState.SUCCEEDED));
-
+
long currentTs = System.currentTimeMillis();
Throwable exception = null;
while (System.currentTimeMillis() < currentTs + 5000l) {
@@ -162,7 +165,7 @@ public class TestContainerReuse {
taskScheduler.close();
taskSchedulerEventHandler.close();
}
-
+
@Test(timeout = 15000l)
public void testDelayedReuseContainerNotAvailable() throws IOException, InterruptedException, ExecutionException {
Configuration conf = new Configuration(new YarnConfiguration());
@@ -176,7 +179,7 @@ public class TestContainerReuse {
CapturingEventHandler eventHandler = new CapturingEventHandler();
TezDAGID dagID = new TezDAGID("0", 0, 0);
TezVertexID vertexID = new TezVertexID(dagID, 1);
-
+
AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
AMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
String appUrl = "url";
@@ -202,28 +205,28 @@ public class TestContainerReuse {
TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
.getSpyTaskScheduler();
TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
-
+
Resource resource = Resource.newInstance(1024, 1);
Priority priority = Priority.newInstance(5);
String [] host1 = {"host1"};
String [] host2 = {"host2"};
-
+
String [] defaultRack = {"/default-rack"};
-
+
TezTaskAttemptID taID11 = new TezTaskAttemptID(new TezTaskID(vertexID, 1), 1);
TezTaskAttemptID taID21 = new TezTaskAttemptID(new TezTaskID(vertexID, 2), 1);
TezTaskAttemptID taID31 = new TezTaskAttemptID(new TezTaskID(vertexID, 3), 1);
TaskAttempt ta11 = mock(TaskAttempt.class);
TaskAttempt ta21 = mock(TaskAttempt.class);
TaskAttempt ta31 = mock(TaskAttempt.class);
-
+
AMSchedulerEventTALaunchRequest lrTa11 = createLaunchRequestEvent(taID11, ta11, resource, host1, defaultRack, priority, conf);
AMSchedulerEventTALaunchRequest lrTa21 = createLaunchRequestEvent(taID21, ta21, resource, host2, defaultRack, priority, conf);
AMSchedulerEventTALaunchRequest lrTa31 = createLaunchRequestEvent(taID31, ta31, resource, host1, defaultRack, priority, conf);
-
+
taskSchedulerEventHandler.handleEvent(lrTa11);
taskSchedulerEventHandler.handleEvent(lrTa21);
-
+
Container containerHost1 = createContainer(1, host1[0], resource, priority);
Container containerHost2 = createContainer(2, host2[0], resource, priority);
@@ -231,10 +234,10 @@ public class TestContainerReuse {
drainableAppCallback.drain();
verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(containerHost1));
verify(taskSchedulerEventHandler).taskAllocated(eq(ta21), any(Object.class), eq(containerHost2));
-
+
// Adding the event later so that task1 assigned to containerHost1 is deterministic.
taskSchedulerEventHandler.handleEvent(lrTa31);
-
+
taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta21, containerHost2.getId(), TaskAttemptState.SUCCEEDED));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta21), eq(true));
@@ -242,7 +245,7 @@ public class TestContainerReuse {
verify(rmClient, times(0)).releaseAssignedContainer(eq(containerHost2.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
eventHandler.reset();
-
+
long currentTs = System.currentTimeMillis();
Throwable exception = null;
while (System.currentTimeMillis() < currentTs + 5000l) {
@@ -385,7 +388,7 @@ public class TestContainerReuse {
taskScheduler.close();
taskSchedulerEventHandler.close();
}
-
+
@Test(timeout = 10000l)
public void testReuseNonLocalRequest() throws IOException, InterruptedException, ExecutionException {
Configuration tezConf = new Configuration(new YarnConfiguration());
@@ -444,7 +447,7 @@ public class TestContainerReuse {
TaskAttempt ta12 = mock(TaskAttempt.class);
doReturn(vertexID).when(ta12).getVertexID();
AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID12, ta12, resource1, emptyHosts, emptyRacks, priority, tezConf);
-
+
// Send launch request for task 1 onle, deterministic assignment to this task.
taskSchedulerEventHandler.handleEvent(lrEvent11);
@@ -457,7 +460,7 @@ public class TestContainerReuse {
// Send launch request for task2 (vertex2)
taskSchedulerEventHandler.handleEvent(lrEvent12);
-
+
// Task assigned to container completed successfully. Container should be
// assigned immediately to ta12, since there's no local requests (instead of
// waiting for the re-use delay)
@@ -478,7 +481,7 @@ public class TestContainerReuse {
taskScheduler.close();
taskSchedulerEventHandler.close();
}
-
+
@Test(timeout = 10000l)
public void testNoReuseAcrossVertices() throws IOException, InterruptedException, ExecutionException {
Configuration tezConf = new Configuration(new YarnConfiguration());
@@ -536,7 +539,7 @@ public class TestContainerReuse {
TaskAttempt ta21 = mock(TaskAttempt.class);
doReturn(vertexID2).when(ta21).getVertexID();
AMSchedulerEventTALaunchRequest lrEvent21 = createLaunchRequestEvent(taID21, ta21, resource1, host1, racks, priority, tezConf);
-
+
// Send launch request for task 1 onle, deterministic assignment to this task.
taskSchedulerEventHandler.handleEvent(lrEvent11);
@@ -549,7 +552,7 @@ public class TestContainerReuse {
// Send launch request for task2 (vertex2)
taskSchedulerEventHandler.handleEvent(lrEvent21);
-
+
// Task assigned to container completed successfully. Container should not be assigned to task21. Released since delay is 0.
taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED));
drainableAppCallback.drain();
@@ -575,14 +578,17 @@ public class TestContainerReuse {
TezTaskAttemptID taID, TaskAttempt ta, Resource capability, String[] hosts,
String[] racks, Priority priority, Configuration conf) {
- ContainerContext containerContext =
+ ContainerContext containerContext =
new ContainerContext(new HashMap<String, LocalResource>(),
new Credentials(), new HashMap<String, String>(), "");
- AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest(taID, capability, new TezEngineTaskContext(taID, "user", "jobName", "vertexName",
- new ProcessorDescriptor("processorClassName"),
- Collections.singletonList(new InputSpec("vertexName", 1,
- "inputClassName")), Collections.singletonList(new OutputSpec(
- "vertexName", 1, "outputClassName"))), ta, hosts, racks, priority, containerContext);
+ AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest(taID, capability,
+ new TaskSpec(taID, "user", "vertexName",
+ new ProcessorDescriptor("processorClassName"),
+ Collections.singletonList(new InputSpec("vertexName",
+ new InputDescriptor("inputClassName"), 1)),
+ Collections.singletonList(new OutputSpec(
+ "vertexName", new OutputDescriptor("outputClassName"), 1))),
+ ta, hosts, racks, priority, containerContext);
return lr;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index 2d2945e..62b6c28 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -110,7 +110,7 @@ public class TestAMContainer {
wc.verifyNoOutgoingEvents();
assertFalse(pulledTask.shouldDie());
assertEquals(wc.tezTaskContext.getTaskAttemptId(), pulledTask.getTask()
- .getTaskAttemptId());
+ .getTaskAttemptID());
assertEquals(wc.taskAttemptID, wc.amContainer.getRunningTaskAttempt());
assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
@@ -166,7 +166,7 @@ public class TestAMContainer {
wc.verifyNoOutgoingEvents();
assertFalse(pulledTask.shouldDie());
assertEquals(wc.tezTaskContext.getTaskAttemptId(), pulledTask.getTask()
- .getTaskAttemptId());
+ .getTaskAttemptID());
assertEquals(wc.taskAttemptID, wc.amContainer.getRunningTaskAttempt());
assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/pom.xml
----------------------------------------------------------------------
diff --git a/tez-engine-api/pom.xml b/tez-engine-api/pom.xml
index 107bdc9..b19e96b 100644
--- a/tez-engine-api/pom.xml
+++ b/tez-engine-api/pom.xml
@@ -44,6 +44,10 @@
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
</dependencies>
<build>
@@ -54,6 +58,33 @@
<configuration>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-maven-plugins</artifactId>
+ <executions>
+ <execution>
+ <id>compile-protoc</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>protoc</goal>
+ </goals>
+ <configuration>
+ <protocVersion>${protobuf.version}</protocVersion>
+ <protocCommand>${protoc.path}</protocCommand>
+ <imports>
+ <param>${basedir}/src/main/proto</param>
+ </imports>
+ <source>
+ <directory>${basedir}/src/main/proto</directory>
+ <includes>
+ <include>Events.proto</include>
+ </includes>
+ </source>
+ <output>${project.build.directory}/generated-sources/java</output>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/DataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/DataMovementEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/DataMovementEvent.java
new file mode 100644
index 0000000..182e8dc
--- /dev/null
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/DataMovementEvent.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.events;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.engine.newapi.Event;
+
+/**
+ * Event used by user code to send information between tasks. An output can
+ * generate an Event of this type to sending information regarding output data
+ * ( such as URI for file-based output data, port info in case of
+ * streaming-based data transfers ) to the Input on the destination vertex.
+ */
+public final class DataMovementEvent extends Event {
+
+ /**
+ * Index(i) of the i-th (physical) Input or Output that generated an Event.
+ * For a Processor-generated event, this is ignored.
+ */
+ private final int sourceIndex;
+
+ /**
+ * Index(i) of the i-th (physical) Input or Output that is meant to receive
+ * this Event. For a Processor event, this is ignored.
+ */
+ private int targetIndex;
+
+ /**
+ * User Payload for this Event
+ */
+ private final byte[] userPayload;
+
+ /**
+ * Version number to indicate what attempt generated this Event
+ */
+ private int version;
+
+ /**
+ * User Event constructor
+ * @param sourceIndex Index to identify the physical edge of the input/output
+ * that generated the event
+ * @param userPayload User Payload of the User Event
+ */
+ public DataMovementEvent(int sourceIndex,
+ byte[] userPayload) {
+ this.userPayload = userPayload;
+ this.sourceIndex = sourceIndex;
+ }
+
+ @Private
+ public DataMovementEvent(int sourceIndex,
+ int targetIndex,
+ byte[] userPayload) {
+ this.userPayload = userPayload;
+ this.sourceIndex = sourceIndex;
+ this.targetIndex = targetIndex;
+ }
+
+ /**
+ * Constructor for Processor-generated User Events
+ * @param userPayload
+ */
+ public DataMovementEvent(byte[] userPayload) {
+ this(-1, userPayload);
+ }
+
+ public byte[] getUserPayload() {
+ return userPayload;
+ }
+
+ public int getSourceIndex() {
+ return sourceIndex;
+ }
+
+ public int getTargetIndex() {
+ return targetIndex;
+ }
+
+ @Private
+ void setTargetIndex(int targetIndex) {
+ this.targetIndex = targetIndex;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ @Private
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java
index 4528e4d..c5a92b8 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java
@@ -24,7 +24,7 @@ import org.apache.tez.engine.newapi.Event;
* Event generated by an Input to indicate error when trying to
* retrieve data.
*/
-public class InputDataErrorEvent extends Event {
+public final class InputDataErrorEvent extends Event {
/**
* Diagnostics/trace of the error that occurred on the Input's edge.
@@ -36,10 +36,17 @@ public class InputDataErrorEvent extends Event {
*/
private final int index;
- protected InputDataErrorEvent(String diagnostics, int index) {
+ /**
+ * Version of the data on which the error occurred.
+ */
+ private final int version;
+
+ public InputDataErrorEvent(String diagnostics, int index,
+ int version) {
super();
this.diagnostics = diagnostics;
this.index = index;
+ this.version = version;
}
public String getDiagnostics() {
@@ -50,4 +57,8 @@ public class InputDataErrorEvent extends Event {
return index;
}
+ public int getVersion() {
+ return version;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/TaskCommunicationEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/TaskCommunicationEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/TaskCommunicationEvent.java
deleted file mode 100644
index 9d8f01a..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/TaskCommunicationEvent.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.events;
-
-import org.apache.tez.engine.newapi.Event;
-
-/**
- * Event used by user code to send information between tasks. An output can
- * generate an Event of this type to sending information regarding output data
- * ( such as URI for file-based output data, port info in case of
- * streaming-based data transfers ) to the Input on the destination vertex.
- */
-public final class TaskCommunicationEvent extends Event {
-
- /**
- * Index(i) of the i-th (physical) Input or Output that generated an Event.
- * For a Processor-generated event, this is ignored.
- */
- private final int sourceIndex;
-
- /**
- * Index(i) of the i-th (physical) Input or Output that is meant to receive
- * this Event. For a Processor event, this is ignored.
- */
- private int targetIndex;
-
- /**
- * User Payload for this Event
- */
- private final byte[] userPayload;
-
- /**
- * User Event constructor
- * @param index Index to identify the physical edge of the input/output
- * @param userPayload User Payload of the User Event
- */
- public TaskCommunicationEvent(int index,
- byte[] userPayload) {
- this.userPayload = userPayload;
- this.sourceIndex = index;
- }
-
- /**
- * Constructor for Processor-generated User Events
- * @param userPayload
- */
- public TaskCommunicationEvent(byte[] userPayload) {
- this(-1, userPayload);
- }
-
- public byte[] getUserPayload() {
- return userPayload;
- }
-
- public int getSourceIndex() {
- return sourceIndex;
- }
-
- public int getTargetIndex() {
- return targetIndex;
- }
-
- void setTargetIndex(int targetIndex) {
- this.targetIndex = targetIndex;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
deleted file mode 100644
index fe06bec..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-/**
- * Class that encapsulates all the information to identify the unique
- * object that either generated an Event or is the recipient of an Event.
- */
-public class EventMetaData {
-
- public static enum EventGenerator {
- INPUT,
- PROCESSOR,
- OUTPUT,
- SYSTEM
- }
-
- /**
- * Source Type ( one of Input/Output/Processor ) that generated the Event.
- */
- private final EventGenerator generator;
-
- /**
- * Name of the vertex where the event was generated.
- */
- private final String taskVertexName;
-
- /**
- * Name of the vertex to which the Input or Output is connected to.
- */
- private final String edgeVertexName;
-
- /**
- * Task Attempt ID
- */
- private final TezTaskAttemptID taskAttemptID;
-
- public EventMetaData(EventGenerator generator,
- String taskVertexName, String edgeVertexName,
- TezTaskAttemptID taskAttemptID) {
- this.generator = generator;
- this.taskVertexName = taskVertexName;
- this.edgeVertexName = edgeVertexName;
- this.taskAttemptID = taskAttemptID;
- }
-
- public EventGenerator getEventGenerator() {
- return generator;
- }
-
- public TezTaskAttemptID getTaskAttemptID() {
- return taskAttemptID;
- }
-
- public String getTaskVertexName() {
- return taskVertexName;
- }
-
- public String getEdgeVertexName() {
- return edgeVertexName;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
deleted file mode 100644
index cb8bd6d..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.rpc.impl.TaskSpec;
-
-/**
- * Interface to the RPC layer ( umbilical ) between the Tez AM and
- * a Tez Container's JVM.
- */
-public interface TezUmbilical {
-
- /**
- * Heartbeat call back to the AM from the Container JVM
- * @param events Events to be sent to the AM
- * @return Events sent by the AM to the Container JVM which in turn will
- * be handled either by the JVM or routed to the appropriate instances of
- * Input/Processor/Outputs with a particular Task Attempt.
- */
- public Event[] hearbeat(Event[] events);
-
- /**
- * Hook to ask the Tez AM for the next task to be run on the Container
- * @return Next task to be run
- */
- public TaskSpec getNextTask();
-
- /**
- * Hook to query the Tez AM whether a particular Task Attempt can commit its
- * output.
- * @param attemptIDs Attempt IDs of the Tasks that are waiting to commit.
- * @return Map of boolean flags indicating whether the respective task
- * attempts can commit.
- */
- public boolean canTaskCommit(TezTaskAttemptID attemptID);
-
- /**
- * Inform the Tez AM that one or more Task attempts have failed.
- * @param attemptIDs Task Attempt IDs for the failed attempts.
- */
- public void taskFailed(TezTaskAttemptID attemptID);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/InputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/InputSpec.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/InputSpec.java
deleted file mode 100644
index 200ec21..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/InputSpec.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.rpc.impl;
-
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.records.TezVertexID;
-
-/**
- * Serializable information of a given Physical Input.
- */
-public interface InputSpec {
-
- /**
- * @return The name of the Source Vertex whose Output is consumed by this
- * Input.
- */
- public String getSourceVertexName();
-
- /**
- * @return The Vertex ID of the Source Vertex whose Output is consumed by this
- * Input.
- */
- public TezVertexID getSourceVertexID();
-
- /**
- * @return {@link InputDescriptor}
- */
- public InputDescriptor getInputDescriptor();
-
- /**
- * @return The no. of physical edges mapping to this Input.
- */
- public int getPhysicalEdgeCount();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/OutputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/OutputSpec.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/OutputSpec.java
deleted file mode 100644
index 0cf41b6..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/OutputSpec.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.rpc.impl;
-
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.records.TezVertexID;
-
-public interface OutputSpec {
-
- /**
- * @return The name of the Target Vertex whose Input is consumed by this
- * Output.
- */
- public String getDestinationVertexName();
-
- /**
- * @return The Vertex ID of the Target Vertex whose Input is consumed by this
- * Output.
- */
- public TezVertexID getDestinationVertexID();
-
- /**
- * @return {@link OutputDescriptor}
- */
- public OutputDescriptor getOutputDescriptor();
-
- /**
- * @return The no. of physical edges mapping to this Output.
- */
- public int getPhysicalEdgeCount();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java
deleted file mode 100644
index 2ba8cf7..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.rpc.impl;
-
-import java.util.List;
-
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-/**
- * Serializable Task information that is sent across the Umbilical from the
- * Tez AM to the Tez Container's JVM.
- */
-public interface TaskSpec {
-
- /**
- * Get the vertex name for the current task.
- * @return the vertex name set by the user.
- */
- public String getVertexName();
-
- /**
- * Get the task attempt id for the current task.
- * @return the {@link TaskAttemptID}
- */
- public TezTaskAttemptID getTaskAttemptID();
-
- /**
- * Get the owner of the job.
- * @return the owner.
- */
- public String getUser();
- /**
- * The Processor definition for the given Task
- * @return {@link ProcessorDescriptor}
- */
- public ProcessorDescriptor getProcessorDescriptor();
-
- /**
- * The List of Inputs for this Task.
- * @return {@link Input}
- */
- public List<InputSpec> getInputs();
-
- /**
- * The List of Outputs for this Task.
- * @return {@link Output}
- */
- public List<OutputSpec> getOutputs();
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezEvent.java
deleted file mode 100644
index 19174c8..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezEvent.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.rpc.impl;
-
-import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.impl.EventMetaData;
-
-public class TezEvent {
-
- private final String eventClassName;
-
- private final Event event;
-
- private EventMetaData sourceInfo;
-
- private EventMetaData targetInfo;
-
- public TezEvent(Event event, EventMetaData sourceInfo) {
- this.event = event;
- this.eventClassName = event.getClass().getName();
- this.setSourceInfo(sourceInfo);
- }
-
- public Event getEvent() {
- return event;
- }
-
- public EventMetaData getSourceInfo() {
- return sourceInfo;
- }
-
- public void setSourceInfo(EventMetaData sourceInfo) {
- this.sourceInfo = sourceInfo;
- }
-
- public EventMetaData getTargetInfo() {
- return targetInfo;
- }
-
- public void setTargetInfo(EventMetaData targetInfo) {
- this.targetInfo = targetInfo;
- }
-
- public String getEventClassName() {
- return eventClassName;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/proto/Events.proto b/tez-engine-api/src/main/proto/Events.proto
new file mode 100644
index 0000000..3651c35
--- /dev/null
+++ b/tez-engine-api/src/main/proto/Events.proto
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.engine.api.events";
+option java_outer_classname = "EventProtos";
+option java_generate_equals_and_hash = true;
+
+message DataMovementEventProto {
+ optional int32 source_index = 1;
+ optional int32 target_index = 2;
+ optional bytes user_payload = 3;
+ optional int32 version = 4;
+}
+
+message InputDataErrorEventProto {
+ optional int32 index = 1;
+ optional string diagnostics = 2;
+ optional int32 version = 3;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine/pom.xml
----------------------------------------------------------------------
diff --git a/tez-engine/pom.xml b/tez-engine/pom.xml
index 4ecf210..ecfdc04 100644
--- a/tez-engine/pom.xml
+++ b/tez-engine/pom.xml
@@ -49,6 +49,10 @@
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
</dependencies>
<build>
@@ -59,6 +63,33 @@
<configuration>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-maven-plugins</artifactId>
+ <executions>
+ <execution>
+ <id>compile-protoc</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>protoc</goal>
+ </goals>
+ <configuration>
+ <protocVersion>${protobuf.version}</protocVersion>
+ <protocCommand>${protoc.path}</protocCommand>
+ <imports>
+ <param>${basedir}/src/main/proto</param>
+ </imports>
+ <source>
+ <directory>${basedir}/src/main/proto</directory>
+ <includes>
+ <include>Events.proto</include>
+ </includes>
+ </source>
+ <output>${project.build.directory}/generated-sources/java</output>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java b/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
new file mode 100644
index 0000000..3c18d9f
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
+
+public class ContainerTask implements Writable {
+
+ TaskSpec taskSpec;
+ boolean shouldDie;
+
+ public ContainerTask() {
+ }
+
+ public ContainerTask(TaskSpec taskSpec, boolean shouldDie) {
+ this.taskSpec = taskSpec;
+ this.shouldDie = shouldDie;
+ }
+
+ public TaskSpec getTaskSpec() {
+ return taskSpec;
+ }
+
+ public boolean shouldDie() {
+ return shouldDie;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(shouldDie);
+ if (taskSpec != null) {
+ out.writeBoolean(true);
+ taskSpec.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ shouldDie = in.readBoolean();
+ boolean taskComing = in.readBoolean();
+ if (taskComing) {
+ taskSpec = new TaskSpec();
+ taskSpec.readFields(in);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "shouldDie: " + shouldDie + ", TaskSpec: "
+ + taskSpec;
+ }
+}