You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:25 UTC
[18/50] [abbrv] TEZ-443. Merge tez-dag-api and tez-engine-api into a
single module - tez-api (part of TEZ-398). (sseth)
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
deleted file mode 100644
index dae5625..0000000
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api.client.rpc;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.VertexStatus;
-import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto;
-import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto;
-import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TryKillDAGRequestProto;
-import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProto;
-import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto;
-
-import com.google.protobuf.ServiceException;
-
-public class DAGClientRPCImpl implements DAGClient {
- private static final Log LOG = LogFactory.getLog(DAGClientRPCImpl.class);
-
- private final ApplicationId appId;
- private final String dagId;
- private final TezConfiguration conf;
- private ApplicationReport appReport;
- private YarnClient yarnClient;
- private DAGClientAMProtocolBlockingPB proxy = null;
-
- public DAGClientRPCImpl(ApplicationId appId, String dagId,
- TezConfiguration conf) {
- this.appId = appId;
- this.dagId = dagId;
- this.conf = conf;
- yarnClient = new YarnClientImpl();
- yarnClient.init(new YarnConfiguration(conf));
- yarnClient.start();
- appReport = null;
- }
-
- @Override
- public ApplicationId getApplicationId() {
- return appId;
- }
-
- @Override
- public DAGStatus getDAGStatus() throws IOException, TezException {
- if(createAMProxyIfNeeded()) {
- try {
- return getDAGStatusViaAM();
- } catch (TezException e) {
- resetProxy(e); // create proxy again
- }
- }
-
- // Later maybe from History
- return getDAGStatusViaRM();
- }
-
- @Override
- public VertexStatus getVertexStatus(String vertexName)
- throws IOException, TezException {
- if(createAMProxyIfNeeded()) {
- try {
- return getVertexStatusViaAM(vertexName);
- } catch (TezException e) {
- resetProxy(e); // create proxy again
- }
- }
-
- // need AM for this. Later maybe from History
- return null;
- }
-
- @Override
- public void tryKillDAG() throws TezException, IOException {
- if(LOG.isDebugEnabled()) {
- LOG.debug("TryKill for app: " + appId + " dag:" + dagId);
- }
- if(createAMProxyIfNeeded()) {
- TryKillDAGRequestProto requestProto =
- TryKillDAGRequestProto.newBuilder().setDagId(dagId).build();
- try {
- proxy.tryKillDAG(null, requestProto);
- } catch (ServiceException e) {
- resetProxy(e);
- }
- }
- }
-
- @Override
- public void close() throws IOException {
- if (this.proxy != null) {
- RPC.stopProxy(this.proxy);
- }
- if(yarnClient != null) {
- yarnClient.stop();
- }
- }
-
- @Override
- public ApplicationReport getApplicationReport() {
- return appReport;
- }
-
- void resetProxy(Exception e) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Resetting AM proxy for app: " + appId + " dag:" + dagId +
- " due to exception :", e);
- }
- proxy = null;
- }
-
- DAGStatus getDAGStatusViaAM() throws IOException, TezException {
- if(LOG.isDebugEnabled()) {
- LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
- }
- GetDAGStatusRequestProto requestProto =
- GetDAGStatusRequestProto.newBuilder().setDagId(dagId).build();
- try {
- return new DAGStatus(
- proxy.getDAGStatus(null, requestProto).getDagStatus());
- } catch (ServiceException e) {
- // TEZ-151 retrieve wrapped TezException
- throw new TezException(e);
- }
- }
-
-
-
- DAGStatus getDAGStatusViaRM() throws TezException, IOException {
- if(LOG.isDebugEnabled()) {
- LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
- }
- ApplicationReport appReport;
- try {
- appReport = yarnClient.getApplicationReport(appId);
- } catch (YarnException e) {
- throw new TezException(e);
- }
-
- if(appReport == null) {
- throw new TezException("Unknown/Invalid appId: " + appId);
- }
-
- DAGStatusProto.Builder builder = DAGStatusProto.newBuilder();
- DAGStatus dagStatus = new DAGStatus(builder);
- DAGStatusStateProto dagState = null;
- switch (appReport.getYarnApplicationState()) {
- case NEW:
- case NEW_SAVING:
- case SUBMITTED:
- case ACCEPTED:
- dagState = DAGStatusStateProto.DAG_SUBMITTED;
- break;
- case RUNNING:
- dagState = DAGStatusStateProto.DAG_RUNNING;
- break;
- case FAILED:
- dagState = DAGStatusStateProto.DAG_FAILED;
- break;
- case KILLED:
- dagState = DAGStatusStateProto.DAG_KILLED;
- break;
- case FINISHED:
- switch(appReport.getFinalApplicationStatus()) {
- case UNDEFINED:
- case FAILED:
- dagState = DAGStatusStateProto.DAG_FAILED;
- break;
- case KILLED:
- dagState = DAGStatusStateProto.DAG_KILLED;
- break;
- case SUCCEEDED:
- dagState = DAGStatusStateProto.DAG_SUCCEEDED;
- break;
- default:
- throw new TezUncheckedException("Encountered unknown final application"
- + " status from YARN"
- + ", appState=" + appReport.getYarnApplicationState()
- + ", finalStatus=" + appReport.getFinalApplicationStatus());
- }
- break;
- default:
- throw new TezUncheckedException("Encountered unknown application state"
- + " from YARN, appState=" + appReport.getYarnApplicationState());
- }
-
- builder.setState(dagState);
- if(appReport.getDiagnostics() != null) {
- builder.addAllDiagnostics(Collections.singleton(appReport.getDiagnostics()));
- }
-
- return dagStatus;
- }
-
- VertexStatus getVertexStatusViaAM(String vertexName) throws TezException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("GetVertexStatus via AM for app: " + appId + " dag: " + dagId
- + " vertex: " + vertexName);
- }
- GetVertexStatusRequestProto requestProto =
- GetVertexStatusRequestProto.newBuilder().
- setDagId(dagId).setVertexName(vertexName).build();
-
- try {
- return new VertexStatus(
- proxy.getVertexStatus(null, requestProto).getVertexStatus());
- } catch (ServiceException e) {
- // TEZ-151 retrieve wrapped TezException
- throw new TezException(e);
- }
- }
-
- ApplicationReport getAppReport() throws IOException, TezException {
- try {
- ApplicationReport appReport = yarnClient.getApplicationReport(appId);
- if (LOG.isDebugEnabled()) {
- LOG.debug("App: " + appId + " in state: "
- + appReport.getYarnApplicationState());
- }
- return appReport;
- } catch (YarnException e) {
- throw new TezException(e);
- }
- }
-
- boolean createAMProxyIfNeeded() throws IOException, TezException {
- if(proxy != null) {
- // if proxy exist optimistically use it assuming there is no retry
- return true;
- }
- appReport = getAppReport();
-
- if(appReport == null) {
- return false;
- }
- YarnApplicationState appState = appReport.getYarnApplicationState();
- if(appState != YarnApplicationState.RUNNING) {
- return false;
- }
-
- // YARN-808. Cannot ascertain if AM is ready until we connect to it.
- // workaround check the default string set by YARN
- if(appReport.getHost() == null || appReport.getHost().equals("N/A") ||
- appReport.getRpcPort() == 0){
- // attempt not running
- return false;
- }
-
- InetSocketAddress addr = new InetSocketAddress(appReport.getHost(),
- appReport.getRpcPort());
-
- RPC.setProtocolEngine(conf, DAGClientAMProtocolBlockingPB.class,
- ProtobufRpcEngine.class);
- proxy = (DAGClientAMProtocolBlockingPB) RPC.getProxy(
- DAGClientAMProtocolBlockingPB.class, 0, addr, conf);
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/proto/DAGApiRecords.proto b/tez-dag-api/src/main/proto/DAGApiRecords.proto
deleted file mode 100644
index 4385749..0000000
--- a/tez-dag-api/src/main/proto/DAGApiRecords.proto
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tez.dag.api.records";
-option java_outer_classname = "DAGProtos";
-option java_generate_equals_and_hash = true;
-
-// DAG plan messages
-
-// Many of these types have a dual in the Tez-api. To reduce confusion, these types have prefix or suffix
-// of "Plan" to indicate they are to be used in the dag-plan.
-// The big types use a suffix: JobPlan, VertexPlan, EdgePlan
-// --> these get more direct use in the runtime and the naming is natural.
-// The enums and utility types use prefix: PlanVertexType, PlanEdgeConnectionPaatern, etc
-// --> there is not great naming choice for these that avoids ambiguity, but this one seems acceptable.
-
-enum PlanVertexType {
- INPUT = 0;
- NORMAL = 1;
- OUTPUT = 2;
-}
-
-enum PlanEdgeDataMovementType {
- ONE_TO_ONE = 0;
- BROADCAST = 1;
- SCATTER_GATHER = 2;
-}
-
-enum PlanEdgeDataSourceType {
- PERSISTED = 0;
- PERSISTED_RELIABLE = 1;
- EPHEMERAL = 2;
-}
-
-enum PlanEdgeSchedulingType {
- SEQUENTIAL = 0;
- CONCURRENT = 1;
-}
-
-message PlanKeyValuePair {
- required string key = 1;
- required string value = 2;
-}
-
-enum PlanLocalResourceType {
- FILE = 0;
- ARCHIVE = 1;
- PATTERN = 2;
-}
-
-enum PlanLocalResourceVisibility {
- PUBLIC = 0;
- PRIVATE = 1;
- APPLICATION = 2;
-}
-
-message PlanLocalResource {
- required string name = 1;
- required string uri = 2;
- required int64 size = 3;
- required int64 timeStamp = 4;
- required PlanLocalResourceType type = 5;
- required PlanLocalResourceVisibility visibility = 6;
- optional string pattern = 7; // only used if type=PATTERN
-}
-
-// Each taskLocationHint represents a single split in in the input.
-// It is the list of [{rack,machines}] that host a replica of each particular split.
-// For now it is represented as pair-of-arrays rather than array-of-pairs.
-message PlanTaskLocationHint {
- repeated string rack = 1;
- repeated string host = 2;
-}
-
-message PlanTaskConfiguration {
- required int32 numTasks = 1;
- required int32 memoryMb = 2;
- required int32 virtualCores = 3;
- required string javaOpts = 4;
- required string taskModule = 5;
- repeated PlanLocalResource localResource = 6;
- repeated PlanKeyValuePair environmentSetting = 8;
-}
-
-message TezEntityDescriptorProto {
- optional string class_name = 1;
- optional bytes user_payload = 2;
-}
-
-message VertexPlan {
- required string name = 1;
- required PlanVertexType type = 2;
- optional TezEntityDescriptorProto processor_descriptor = 3;
- required PlanTaskConfiguration taskConfig = 4;
- repeated PlanTaskLocationHint taskLocationHint = 7;
- repeated string inEdgeId = 8;
- repeated string outEdgeId = 9;
-}
-
-message EdgePlan {
- required string id = 1;
- required string inputVertexName = 2;
- required string outputVertexName = 3;
- required PlanEdgeDataMovementType dataMovementType = 4;
- required PlanEdgeDataSourceType dataSourceType = 5;
- required PlanEdgeSchedulingType schedulingType = 6;
- optional TezEntityDescriptorProto edge_source = 7;
- optional TezEntityDescriptorProto edge_destination = 8;
-}
-
-message ConfigurationProto {
- repeated PlanKeyValuePair confKeyValues = 1;
-}
-
-message DAGPlan {
- required string name = 1;
- repeated VertexPlan vertex = 2;
- repeated EdgePlan edge = 3;
- optional ConfigurationProto dagKeyValues = 4;
-}
-
-// DAG monitoring messages
-message ProgressProto {
- optional int32 totalTaskCount = 1;
- optional int32 succeededTaskCount = 2;
- optional int32 runningTaskCount = 3;
- optional int32 failedTaskCount = 4;
- optional int32 killedTaskCount = 5;
-}
-
-enum VertexStatusStateProto {
- VERTEX_INITED = 0;
- VERTEX_RUNNING = 1;
- VERTEX_SUCCEEDED = 2;
- VERTEX_KILLED = 3;
- VERTEX_FAILED = 4;
- VERTEX_ERROR = 5;
- VERTEX_TERMINATING = 6;
-}
-
-message VertexStatusProto {
- optional VertexStatusStateProto state = 1;
- repeated string diagnostics = 2;
- optional ProgressProto progress = 3;
-}
-
-enum DAGStatusStateProto {
- DAG_SUBMITTED = 0;
- DAG_INITING = 1;
- DAG_RUNNING = 2;
- DAG_SUCCEEDED = 3;
- DAG_KILLED = 4;
- DAG_FAILED = 5;
- DAG_ERROR = 6;
- DAG_TERMINATING = 7;
-}
-
-message StringProgressPairProto {
- required string key = 1;
- required ProgressProto progress = 2;
-}
-
-message DAGStatusProto {
- optional DAGStatusStateProto state = 1;
- repeated string diagnostics = 2;
- optional ProgressProto DAGProgress = 3;
- repeated StringProgressPairProto vertexProgress = 4;
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto b/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto
deleted file mode 100644
index 6fcd1f8..0000000
--- a/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tez.dag.api.client.rpc";
-option java_outer_classname = "DAGClientAMProtocolRPC";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-//import "DAGClientAMProtocolRecords.proto";
-
-import "DAGApiRecords.proto";
-
-message GetAllDAGsRequestProto {
-}
-
-message GetAllDAGsResponseProto {
- repeated string dagId = 1;
-}
-
-message GetDAGStatusRequestProto {
- optional string dagId = 1;
-}
-
-message GetDAGStatusResponseProto {
- optional DAGStatusProto dagStatus = 1;
-}
-
-message GetVertexStatusRequestProto {
- optional string dagId = 1;
- optional string vertexName = 2;
-}
-
-message GetVertexStatusResponseProto {
- optional VertexStatusProto vertexStatus = 1;
-}
-
-message TryKillDAGRequestProto {
- optional string dagId = 1;
-}
-
-message TryKillDAGResponseProto {
- //nothing yet
-}
-
-message SubmitDAGRequestProto {
- optional DAGPlan d_a_g_plan = 1;
-}
-
-message SubmitDAGResponseProto {
- optional string dagId = 1;
-}
-
-message ShutdownSessionRequestProto {
-}
-
-message ShutdownSessionResponseProto {
-}
-
-service DAGClientAMProtocol {
- rpc getAllDAGs (GetAllDAGsRequestProto) returns (GetAllDAGsResponseProto);
- rpc getDAGStatus (GetDAGStatusRequestProto) returns (GetDAGStatusResponseProto);
- rpc getVertexStatus (GetVertexStatusRequestProto) returns (GetVertexStatusResponseProto);
- rpc tryKillDAG (TryKillDAGRequestProto) returns (TryKillDAGResponseProto);
- rpc submitDAG (SubmitDAGRequestProto) returns (SubmitDAGResponseProto);
- rpc shutdownSession (ShutdownSessionRequestProto) returns (ShutdownSessionResponseProto);
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
deleted file mode 100644
index 53ec357..0000000
--- a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
-import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
-import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
-import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
-import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
-import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-// based on TestDAGLocationHint
-public class TestDAGPlan {
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder(); //TODO: doesn't seem to be deleting this folder automatically as expected.
-
- @Test
- public void testBasicJobPlanSerde() throws IOException {
-
- DAGPlan job = DAGPlan.newBuilder()
- .setName("test")
- .addVertex(
- VertexPlan.newBuilder()
- .setName("vertex1")
- .setType(PlanVertexType.NORMAL)
- .addTaskLocationHint(PlanTaskLocationHint.newBuilder().addHost("machineName").addRack("rack1").build())
- .setTaskConfig(
- PlanTaskConfiguration.newBuilder()
- .setNumTasks(2)
- .setVirtualCores(4)
- .setMemoryMb(1024)
- .setJavaOpts("")
- .setTaskModule("x.y")
- .build())
- .build())
- .build();
- File file = tempFolder.newFile("jobPlan");
- FileOutputStream outStream = null;
- try {
- outStream = new FileOutputStream(file);
- job.writeTo(outStream);
- }
- finally {
- if(outStream != null){
- outStream.close();
- }
- }
-
- DAGPlan inJob;
- FileInputStream inputStream;
- try {
- inputStream = new FileInputStream(file);
- inJob = DAGPlan.newBuilder().mergeFrom(inputStream).build();
- }
- finally {
- outStream.close();
- }
-
- Assert.assertEquals(job, inJob);
- }
-
- @Test
- public void testUserPayloadSerde() {
- DAG dag = new DAG("testDag");
- ProcessorDescriptor pd1 = new ProcessorDescriptor("processor1").
- setUserPayload("processor1Bytes".getBytes());
- ProcessorDescriptor pd2 = new ProcessorDescriptor("processor2").
- setUserPayload("processor2Bytes".getBytes());
- Vertex v1 = new Vertex("v1", pd1, 10, Resource.newInstance(1024, 1));
- Vertex v2 = new Vertex("v2", pd2, 1, Resource.newInstance(1024, 1));
- v1.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
- .setTaskLocalResources(new HashMap<String, LocalResource>());
- v2.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
- .setTaskLocalResources(new HashMap<String, LocalResource>());
-
- InputDescriptor inputDescriptor = new InputDescriptor("input").
- setUserPayload("inputBytes".getBytes());
- OutputDescriptor outputDescriptor = new OutputDescriptor("output").
- setUserPayload("outputBytes".getBytes());
- Edge edge = new Edge(v1, v2, new EdgeProperty(
- DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL, outputDescriptor, inputDescriptor));
-
- dag.addVertex(v1).addVertex(v2).addEdge(edge);
-
- DAGPlan dagProto = dag.createDag(new TezConfiguration());
-
- assertEquals(2, dagProto.getVertexCount());
- assertEquals(1, dagProto.getEdgeCount());
-
- VertexPlan v1Proto = dagProto.getVertex(0);
- VertexPlan v2Proto = dagProto.getVertex(1);
- EdgePlan edgeProto = dagProto.getEdge(0);
-
- assertEquals("processor1Bytes", new String(v1Proto.getProcessorDescriptor()
- .getUserPayload().toByteArray()));
- assertEquals("processor1", v1Proto.getProcessorDescriptor().getClassName());
-
- assertEquals("processor2Bytes", new String(v2Proto.getProcessorDescriptor()
- .getUserPayload().toByteArray()));
- assertEquals("processor2", v2Proto.getProcessorDescriptor().getClassName());
-
- assertEquals("inputBytes", new String(edgeProto.getEdgeDestination()
- .getUserPayload().toByteArray()));
- assertEquals("input", edgeProto.getEdgeDestination().getClassName());
-
- assertEquals("outputBytes", new String(edgeProto.getEdgeSource()
- .getUserPayload().toByteArray()));
- assertEquals("output", edgeProto.getEdgeSource().getClassName());
-
- EdgeProperty edgeProperty = DagTypeConverters
- .createEdgePropertyMapFromDAGPlan(dagProto.getEdgeList().get(0));
-
- byte[] ib = edgeProperty.getEdgeDestination().getUserPayload();
- assertEquals("inputBytes", new String(ib));
- assertEquals("input", edgeProperty.getEdgeDestination().getClassName());
-
- byte[] ob = edgeProperty.getEdgeSource().getUserPayload();
- assertEquals("outputBytes", new String(ob));
- assertEquals("output", edgeProperty.getEdgeSource().getClassName());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
deleted file mode 100644
index b33f3a6..0000000
--- a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ /dev/null
@@ -1,417 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestDAGVerify {
-
- private final String dummyProcessorClassName = TestDAGVerify.class.getName();
- private final String dummyInputClassName = TestDAGVerify.class.getName();
- private final String dummyOutputClassName = TestDAGVerify.class.getName();
- private final int dummyTaskCount = 2;
- private final Resource dummyTaskResource = Resource.newInstance(1, 1);
-
- // v1
- // |
- // v2
- @Test
- public void testVerify1() {
- Vertex v1 = new Vertex("v1",
- new ProcessorDescriptor(dummyProcessorClassName),
- dummyTaskCount, dummyTaskResource);
- Vertex v2 = new Vertex("v2",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Edge e1 = new Edge(v1, v2,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
- new OutputDescriptor(dummyOutputClassName),
- new InputDescriptor(dummyInputClassName)));
- DAG dag = new DAG("testDag");
- dag.addVertex(v1);
- dag.addVertex(v2);
- dag.addEdge(e1);
- dag.verify();
- }
-
- @Test(expected = IllegalStateException.class)
- public void testVerify2() {
- Vertex v1 = new Vertex("v1",
- new ProcessorDescriptor(dummyProcessorClassName),
- dummyTaskCount, dummyTaskResource);
- Vertex v2 = new Vertex("v2",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Edge e1 = new Edge(v1, v2,
- new EdgeProperty(DataMovementType.ONE_TO_ONE,
- DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
- new OutputDescriptor(dummyOutputClassName),
- new InputDescriptor(dummyInputClassName)));
- DAG dag = new DAG("testDag");
- dag.addVertex(v1);
- dag.addVertex(v2);
- dag.addEdge(e1);
- dag.verify();
- }
-
- @Test(expected = IllegalStateException.class)
- public void testVerify3() {
- Vertex v1 = new Vertex("v1",
- new ProcessorDescriptor(dummyProcessorClassName),
- dummyTaskCount, dummyTaskResource);
- Vertex v2 = new Vertex("v2",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Edge e1 = new Edge(v1, v2,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.EPHEMERAL, SchedulingType.SEQUENTIAL,
- new OutputDescriptor(dummyOutputClassName),
- new InputDescriptor(dummyInputClassName)));
- DAG dag = new DAG("testDag");
- dag.addVertex(v1);
- dag.addVertex(v2);
- dag.addEdge(e1);
- dag.verify();
- }
-
- @Test(expected = IllegalStateException.class)
- public void testVerify4() {
- Vertex v1 = new Vertex("v1",
- new ProcessorDescriptor(dummyProcessorClassName),
- dummyTaskCount, dummyTaskResource);
- Vertex v2 = new Vertex("v2",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Edge e1 = new Edge(v1, v2,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.EPHEMERAL, SchedulingType.CONCURRENT,
- new OutputDescriptor(dummyOutputClassName),
- new InputDescriptor(dummyInputClassName)));
- DAG dag = new DAG("testDag");
- dag.addVertex(v1);
- dag.addVertex(v2);
- dag.addEdge(e1);
- dag.verify();
- }
-
- // v1 <----
- // | ^
- // v2 ^
- // | | ^
- // v3 v4
- @Test
- public void testCycle1() {
- IllegalStateException ex=null;
- Vertex v1 = new Vertex("v1",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Vertex v2 = new Vertex("v2",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Vertex v3 = new Vertex("v3",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Vertex v4 = new Vertex("v4",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Edge e1 = new Edge(v1, v2,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
- new OutputDescriptor("dummy output class"),
- new InputDescriptor("dummy input class")));
- Edge e2 = new Edge(v2, v3,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
- new OutputDescriptor("dummy output class"),
- new InputDescriptor("dummy input class")));
- Edge e3 = new Edge(v2, v4,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
- new OutputDescriptor("dummy output class"),
- new InputDescriptor("dummy input class")));
- Edge e4 = new Edge(v4, v1,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
- new OutputDescriptor("dummy output class"),
- new InputDescriptor("dummy input class")));
- DAG dag = new DAG("testDag");
- dag.addVertex(v1);
- dag.addVertex(v2);
- dag.addVertex(v3);
- dag.addVertex(v4);
- dag.addEdge(e1);
- dag.addEdge(e2);
- dag.addEdge(e3);
- dag.addEdge(e4);
- try{
- dag.verify();
- }
- catch (IllegalStateException e){
- ex = e;
- }
- Assert.assertNotNull(ex);
- System.out.println(ex.getMessage());
- Assert.assertTrue(ex.getMessage().startsWith("DAG contains a cycle"));
- }
-
- // v1
- // |
- // -> v2
- // ^ | |
- // v3 v4
- @Test
- public void testCycle2() {
- IllegalStateException ex=null;
- Vertex v1 = new Vertex("v1",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Vertex v2 = new Vertex("v2",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Vertex v3 = new Vertex("v3",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Vertex v4 = new Vertex("v4",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Edge e1 = new Edge(v1, v2,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
- new OutputDescriptor("dummy output class"),
- new InputDescriptor("dummy input class")));
- Edge e2 = new Edge(v2, v3,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
- new OutputDescriptor("dummy output class"),
- new InputDescriptor("dummy input class")));
- Edge e3 = new Edge(v2, v4,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
- new OutputDescriptor("dummy output class"),
- new InputDescriptor("dummy input class")));
- Edge e4 = new Edge(v3, v2,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
- new OutputDescriptor("dummy output class"),
- new InputDescriptor("dummy input class")));
- DAG dag = new DAG("testDag");
- dag.addVertex(v1);
- dag.addVertex(v2);
- dag.addVertex(v3);
- dag.addVertex(v4);
- dag.addEdge(e1);
- dag.addEdge(e2);
- dag.addEdge(e3);
- dag.addEdge(e4);
- try{
- dag.verify();
- }
- catch (IllegalStateException e){
- ex = e;
- }
- Assert.assertNotNull(ex);
- System.out.println(ex.getMessage());
- Assert.assertTrue(ex.getMessage().startsWith("DAG contains a cycle"));
- }
-
- @Test
- public void repeatedVertexName() {
- IllegalStateException ex=null;
- Vertex v1 = new Vertex("v1",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Vertex v1repeat = new Vertex("v1",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- DAG dag = new DAG("testDag");
- dag.addVertex(v1);
- dag.addVertex(v1repeat);
- try {
- dag.verify();
- }
- catch (IllegalStateException e){
- ex = e;
- }
- Assert.assertNotNull(ex);
- System.out.println(ex.getMessage());
- Assert.assertTrue(ex.getMessage().startsWith("DAG contains multiple vertices with name"));
- }
-
- // v1 v2
- // | |
- // v3
- @Test
- public void BinaryInputDisallowed() {
- IllegalStateException ex=null;
- try {
- Vertex v1 = new Vertex("v1",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Vertex v2 = new Vertex("v2",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Vertex v3 = new Vertex("v3",
- new ProcessorDescriptor("ReduceProcessor"),
- dummyTaskCount, dummyTaskResource);
- Edge e1 = new Edge(v1, v3,
- new EdgeProperty(DataMovementType.ONE_TO_ONE,
- DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
- new OutputDescriptor("dummy output class"),
- new InputDescriptor("dummy input class")));
- Edge e2 = new Edge(v2, v3,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
- new OutputDescriptor("dummy output class"),
- new InputDescriptor("dummy input class")));
- DAG dag = new DAG("testDag");
- dag.addVertex(v1);
- dag.addVertex(v2);
- dag.addVertex(v3);
- dag.addEdge(e1);
- dag.addEdge(e2);
- dag.verify();
- }
- catch (IllegalStateException e){
- ex = e;
- }
- Assert.assertNotNull(ex);
- System.out.println(ex.getMessage());
- Assert.assertTrue(ex.getMessage().startsWith(
- "Unsupported connection pattern on edge"));
- }
-
- // v1 v2
- // | |
- // v3
- @Test
- public void BinaryInputAllowed() {
- Vertex v1 = new Vertex("v1",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Vertex v2 = new Vertex("v2",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Vertex v3 = new Vertex("v3",
- new ProcessorDescriptor("ReduceProcessor"),
- dummyTaskCount, dummyTaskResource);
- Edge e1 = new Edge(v1, v3,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
- new OutputDescriptor("dummy output class"),
- new InputDescriptor("dummy input class")));
- Edge e2 = new Edge(v2, v3,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
- new OutputDescriptor("dummy output class"),
- new InputDescriptor("dummy input class")));
- DAG dag = new DAG("testDag");
- dag.addVertex(v1);
- dag.addVertex(v2);
- dag.addVertex(v3);
- dag.addEdge(e1);
- dag.addEdge(e2);
- dag.verify();
- }
-
- // v1
- // | |
- // v2 v3
- @Test
- public void BinaryOutput() {
- IllegalStateException ex=null;
- try {
- Vertex v1 = new Vertex("v1",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Vertex v2 = new Vertex("v2",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Vertex v3 = new Vertex("v3",
- new ProcessorDescriptor("MapProcessor"),
- dummyTaskCount, dummyTaskResource);
- Edge e1 = new Edge(v1, v2,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
- new OutputDescriptor("dummy output class"),
- new InputDescriptor("dummy input class")));
- Edge e2 = new Edge(v1, v2,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
- new OutputDescriptor("dummy output class"),
- new InputDescriptor("dummy input class")));
- DAG dag = new DAG("testDag");
- dag.addVertex(v1);
- dag.addVertex(v2);
- dag.addVertex(v3);
- dag.addEdge(e1);
- dag.addEdge(e2);
- dag.verify();
- }
- catch (IllegalStateException e){
- ex = e;
- }
- Assert.assertNotNull(ex);
- System.out.println(ex.getMessage());
- Assert.assertTrue(ex.getMessage().startsWith("Vertex has outDegree>1"));
- }
-
- @Test
- public void testDagWithNoVertices() {
- IllegalStateException ex=null;
- try {
- DAG dag = new DAG("testDag");
- dag.verify();
- }
- catch (IllegalStateException e){
- ex = e;
- }
- Assert.assertNotNull(ex);
- System.out.println(ex.getMessage());
- Assert.assertTrue(ex.getMessage()
- .startsWith("Invalid dag containing 0 vertices"));
- }
-
- @SuppressWarnings("unused")
- @Test
- public void testInvalidVertexConstruction() {
- try {
- Vertex v1 = new Vertex("v1",
- new ProcessorDescriptor("MapProcessor"),
- 0, dummyTaskResource);
- Assert.fail("Expected exception for 0 parallelism");
- } catch (IllegalArgumentException e) {
- Assert.assertTrue(e.getMessage().startsWith("Parallelism cannot be 0"));
- }
- try {
- Vertex v1 = new Vertex("v1",
- new ProcessorDescriptor("MapProcessor"),
- 1, null);
- Assert.fail("Expected exception for 0 parallelism");
- } catch (IllegalArgumentException e) {
- Assert.assertTrue(e.getMessage().startsWith("Resource cannot be null"));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dag/pom.xml b/tez-dag/pom.xml
index 875a196..bc6aeef 100644
--- a/tez-dag/pom.xml
+++ b/tez-dag/pom.xml
@@ -29,6 +29,10 @@
<dependencies>
<dependency>
<groupId>org.apache.tez</groupId>
+ <artifactId>tez-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
<artifactId>tez-common</artifactId>
</dependency>
<dependency>
@@ -55,14 +59,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez-dag-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez-engine-api</artifactId>
- </dependency>
- <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 36486c9..7a143a5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -44,7 +44,6 @@ import org.apache.tez.common.records.ProceedToCompletionResponse;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputConsumable;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.rm.container.AMContainerImpl;
@@ -56,9 +55,6 @@ import org.apache.tez.engine.api.impl.TezEvent;
import org.apache.tez.engine.api.impl.TezHeartbeatRequest;
import org.apache.tez.engine.api.impl.TezHeartbeatResponse;
import org.apache.tez.engine.common.security.JobTokenSecretManager;
-import org.apache.tez.engine.records.OutputContext;
-import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
-import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
@SuppressWarnings("unchecked")
public class TaskAttemptListenerImpTezDag extends AbstractService implements
@@ -176,29 +172,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
@Override
- public TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents(
- int fromEventIdx, int maxEvents,
- TezTaskAttemptID taskAttemptID) {
-
- LOG.info("Dependency Completion Events request from " + taskAttemptID
- + ". fromEventID " + fromEventIdx + " maxEvents " + maxEvents);
-
- // TODO: shouldReset is never used. See TT. Ask for Removal.
- boolean shouldReset = false;
- TezDependentTaskCompletionEvent[] events =
- context.getCurrentDAG().
- getVertex(taskAttemptID.getTaskID().getVertexID()).
- getTaskAttemptCompletionEvents(taskAttemptID, fromEventIdx, maxEvents);
-
- taskHeartbeatHandler.progressing(taskAttemptID);
- pingContainerHeartbeatHandler(taskAttemptID);
-
- // No filters for now. Only required events stored in a vertex.
-
- return new TezTaskDependencyCompletionEventsUpdate(events,shouldReset);
- }
-
- @Override
public ContainerTask getTask(ContainerContext containerContext)
throws IOException {
@@ -370,17 +343,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
@Override
- public void outputReady(TezTaskAttemptID taskAttemptId,
- OutputContext outputContext) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("AttemptId: " + taskAttemptId + " reported output context: "
- + outputContext);
- }
- context.getEventHandler().handle(
- new TaskAttemptEventOutputConsumable(taskAttemptId, outputContext));
- }
-
- @Override
public ProceedToCompletionResponse
proceedToCompletion(TezTaskAttemptID taskAttemptId) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java
index c476966..2779faf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java
@@ -20,9 +20,9 @@ package org.apache.tez.dag.app.dag;
import java.util.List;
-import org.apache.tez.engine.newapi.events.DataMovementEvent;
-import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
-import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.api.events.DataMovementEvent;
+import org.apache.tez.engine.api.events.InputFailedEvent;
+import org.apache.tez.engine.api.events.InputReadErrorEvent;
public abstract class EdgeManager {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputConsumable.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputConsumable.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputConsumable.java
deleted file mode 100644
index f10209f..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputConsumable.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.dag.app.dag.event;
-
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.records.OutputContext;
-
-public class TaskAttemptEventOutputConsumable extends TaskAttemptEvent {
-
- private final OutputContext outputContext;
-
- public TaskAttemptEventOutputConsumable(TezTaskAttemptID id,
- OutputContext outputContext) {
- super(id, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE);
- this.outputContext = outputContext;
- }
-
- public OutputContext getOutputContext() {
- return this.outputContext;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
index 71f17ac..b05a6f1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
@@ -22,9 +22,9 @@ import java.util.List;
import org.apache.tez.dag.app.dag.EdgeManager;
import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.engine.newapi.events.DataMovementEvent;
-import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
-import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.api.events.DataMovementEvent;
+import org.apache.tez.engine.api.events.InputFailedEvent;
+import org.apache.tez.engine.api.events.InputReadErrorEvent;
public class BroadcastEdgeManager extends EdgeManager {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 3605857..060a112 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -31,14 +31,14 @@ import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.engine.api.events.DataMovementEvent;
+import org.apache.tez.engine.api.events.InputFailedEvent;
+import org.apache.tez.engine.api.events.InputReadErrorEvent;
import org.apache.tez.engine.api.impl.EventMetaData;
import org.apache.tez.engine.api.impl.InputSpec;
import org.apache.tez.engine.api.impl.OutputSpec;
import org.apache.tez.engine.api.impl.TezEvent;
import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.newapi.events.DataMovementEvent;
-import org.apache.tez.engine.newapi.events.InputFailedEvent;
-import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
public class Edge {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
index 7c4743e..a916ad2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
@@ -22,9 +22,9 @@ import java.util.List;
import org.apache.tez.dag.app.dag.EdgeManager;
import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.engine.newapi.events.DataMovementEvent;
-import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
-import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.api.events.DataMovementEvent;
+import org.apache.tez.engine.api.events.InputFailedEvent;
+import org.apache.tez.engine.api.events.InputReadErrorEvent;
public class OneToOneEdgeManager extends EdgeManager {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
index 380b6b6..1d4df5b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
@@ -22,9 +22,9 @@ import java.util.List;
import org.apache.tez.dag.app.dag.EdgeManager;
import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.engine.newapi.events.DataMovementEvent;
-import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
-import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.api.events.DataMovementEvent;
+import org.apache.tez.engine.api.events.InputFailedEvent;
+import org.apache.tez.engine.api.events.InputReadErrorEvent;
public class ScatterGatherEdgeManager extends EdgeManager {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
index b854a43..a0ed329 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
@@ -39,9 +39,9 @@ import org.apache.tez.dag.app.dag.VertexScheduler;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.newapi.events.DataMovementEvent;
-import org.apache.tez.engine.newapi.events.InputFailedEvent;
-import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
+import org.apache.tez.engine.api.events.DataMovementEvent;
+import org.apache.tez.engine.api.events.InputFailedEvent;
+import org.apache.tez.engine.api.events.InputReadErrorEvent;
import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 1ec1225..74005b7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -106,14 +106,14 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.api.events.DataMovementEvent;
+import org.apache.tez.engine.api.events.InputFailedEvent;
import org.apache.tez.engine.api.events.TaskAttemptFailedEvent;
import org.apache.tez.engine.api.events.TaskStatusUpdateEvent;
import org.apache.tez.engine.api.impl.EventMetaData;
import org.apache.tez.engine.api.impl.InputSpec;
import org.apache.tez.engine.api.impl.OutputSpec;
import org.apache.tez.engine.api.impl.TezEvent;
-import org.apache.tez.engine.newapi.events.DataMovementEvent;
-import org.apache.tez.engine.newapi.events.InputFailedEvent;
import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
import com.google.common.annotations.VisibleForTesting;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/engine/records/TezDependentTaskCompletionEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/engine/records/TezDependentTaskCompletionEvent.java b/tez-dag/src/main/java/org/apache/tez/engine/records/TezDependentTaskCompletionEvent.java
new file mode 100644
index 0000000..7a4dd13
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/engine/records/TezDependentTaskCompletionEvent.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.records;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+/**
+ * This is used to track task completion events on
+ * job tracker.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+// TODO TEZAM3 This needs to be more generic. Maybe some kind of a serialized
+// blob - which can be interpretted by the Input plugin.
+public class TezDependentTaskCompletionEvent implements Writable {
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ // TODO EVENTUALLY - Remove TIPFAILED state ?
+ static public enum Status {FAILED, KILLED, SUCCEEDED, OBSOLETE, TIPFAILED};
+
+ private int eventId;
+ private int taskRunTime; // using int since runtime is the time difference
+ private TezTaskAttemptID taskAttemptId;
+ private long dataSize;
+ Status status;
+ byte[] userPayload;
+ // TODO TEZAM2 Get rid of the isMap field. Job specific type information can be determined from TaskAttemptId.getTaskType
+// boolean isMap = false;
+ public static final TezDependentTaskCompletionEvent[] EMPTY_ARRAY =
+ new TezDependentTaskCompletionEvent[0];
+
+ public TezDependentTaskCompletionEvent() {
+ taskAttemptId = new TezTaskAttemptID();
+ }
+
+ /**
+ * Constructor. eventId should be created externally and incremented
+ * per event for each job.
+ * @param eventId event id, event id should be unique and assigned in
+ * incrementally, starting from 0.
+ * @param taskAttemptId task id
+ * @param status task's status
+ * @param taskTrackerHttp task tracker's host:port for http.
+ */
+ public TezDependentTaskCompletionEvent(int eventId,
+ TezTaskAttemptID taskAttemptId,
+// boolean isMap,
+ Status status,
+ int runTime,
+ long dataSize){
+
+ this.taskAttemptId = taskAttemptId;
+// this.isMap = isMap;
+ this.eventId = eventId;
+ this.status =status;
+ this.taskRunTime = runTime;
+ this.dataSize = dataSize;
+ }
+
+ public TezDependentTaskCompletionEvent clone() {
+ TezDependentTaskCompletionEvent clone = new TezDependentTaskCompletionEvent(
+ this.eventId, this.taskAttemptId, this.status,
+ this.taskRunTime, this.dataSize);
+
+ return clone;
+ }
+
+ /**
+ * Returns event Id.
+ * @return event id
+ */
+ public int getEventId() {
+ return eventId;
+ }
+
+ /**
+ * Returns task id.
+ * @return task id
+ */
+ public TezTaskAttemptID getTaskAttemptID() {
+ return taskAttemptId;
+ }
+
+ /**
+ * Returns enum Status.SUCESS or Status.FAILURE.
+ * @return task tracker status
+ */
+ public Status getStatus() {
+ return status;
+ }
+
+ /**
+ * Returns time (in millisec) the task took to complete.
+ */
+ public int getTaskRunTime() {
+ return taskRunTime;
+ }
+
+ /**
+ * Return size of output produced by the task
+ */
+ public long getDataSize() {
+ return dataSize;
+ }
+
+ /**
+ * @return user payload. Maybe null
+ */
+ public byte[] getUserPayload() {
+ return userPayload;
+ }
+
+ /**
+ * Set the task completion time
+ * @param taskCompletionTime time (in millisec) the task took to complete
+ */
+ protected void setTaskRunTime(int taskCompletionTime) {
+ this.taskRunTime = taskCompletionTime;
+ }
+
+ /**
+ * set event Id. should be assigned incrementally starting from 0.
+ * @param eventId
+ */
+ public void setEventId(int eventId) {
+ this.eventId = eventId;
+ }
+
+ /**
+ * Sets task id.
+ * @param taskId
+ */
+ public void setTaskAttemptID(TezTaskAttemptID taskId) {
+ this.taskAttemptId = taskId;
+ }
+
+ /**
+ * Set task status.
+ * @param status
+ */
+ public void setTaskStatus(Status status) {
+ this.status = status;
+ }
+
+ /**
+ * Set the user payload
+ * @param userPayload
+ */
+ public void setUserPayload(byte[] userPayload) {
+ this.userPayload = userPayload;
+ }
+
+ @Override
+ public String toString(){
+ StringBuffer buf = new StringBuffer();
+ buf.append("Task Id : ");
+ buf.append(taskAttemptId);
+ buf.append(", Status : ");
+ buf.append(status.name());
+ return buf.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ // not counting userPayload as that is a piggyback mechanism
+ if(o == null)
+ return false;
+ if(o.getClass().equals(this.getClass())) {
+ TezDependentTaskCompletionEvent event = (TezDependentTaskCompletionEvent) o;
+ return this.eventId == event.getEventId()
+ && this.status.equals(event.getStatus())
+ && this.taskAttemptId.equals(event.getTaskAttemptID())
+ && this.taskRunTime == event.getTaskRunTime()
+ && this.dataSize == event.getDataSize();
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ taskAttemptId.write(out);
+// out.writeBoolean(isMap);
+ WritableUtils.writeEnum(out, status);
+ WritableUtils.writeVInt(out, taskRunTime);
+ WritableUtils.writeVInt(out, eventId);
+ WritableUtils.writeCompressedByteArray(out, userPayload);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ taskAttemptId.readFields(in);
+// isMap = in.readBoolean();
+ status = WritableUtils.readEnum(in, Status.class);
+ taskRunTime = WritableUtils.readVInt(in);
+ eventId = WritableUtils.readVInt(in);
+ userPayload = WritableUtils.readCompressedByteArray(in);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/engine/records/TezTaskDependencyCompletionEventsUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/engine/records/TezTaskDependencyCompletionEventsUpdate.java b/tez-dag/src/main/java/org/apache/tez/engine/records/TezTaskDependencyCompletionEventsUpdate.java
new file mode 100644
index 0000000..13c9088
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/engine/records/TezTaskDependencyCompletionEventsUpdate.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.records;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+
+public class TezTaskDependencyCompletionEventsUpdate implements Writable {
+ TezDependentTaskCompletionEvent[] events;
+ boolean reset;
+
+ public TezTaskDependencyCompletionEventsUpdate() { }
+
+ public TezTaskDependencyCompletionEventsUpdate(
+ TezDependentTaskCompletionEvent[] events, boolean reset) {
+ this.events = events;
+ this.reset = reset;
+ }
+
+ public boolean shouldReset() {
+ return reset;
+ }
+
+ public TezDependentTaskCompletionEvent[] getDependentTaskCompletionEvents() {
+ return events;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(reset);
+ out.writeInt(events.length);
+ for (TezDependentTaskCompletionEvent event : events) {
+ event.write(out);
+ }
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ reset = in.readBoolean();
+ events = new TezDependentTaskCompletionEvent[in.readInt()];
+ for (int i = 0; i < events.length; ++i) {
+ events[i] = new TezDependentTaskCompletionEvent();
+ events[i].readFields(in);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index d4eae9d..c2457e1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.TezTaskContext;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.ContainerContext;
@@ -66,6 +65,7 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.api.impl.TaskSpec;
import org.apache.tez.engine.common.security.JobTokenIdentifier;
import org.apache.tez.engine.common.security.TokenCache;
import org.junit.Test;
@@ -109,7 +109,7 @@ public class TestAMContainer {
wc.verifyState(AMContainerState.RUNNING);
wc.verifyNoOutgoingEvents();
assertFalse(pulledTask.shouldDie());
- assertEquals(wc.tezTaskContext.getTaskAttemptId(), pulledTask.getTask()
+ assertEquals(wc.taskSpec.getTaskAttemptID(), pulledTask.getTask()
.getTaskAttemptID());
assertEquals(wc.taskAttemptID, wc.amContainer.getRunningTaskAttempt());
assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
@@ -165,7 +165,7 @@ public class TestAMContainer {
wc.verifyState(AMContainerState.RUNNING);
wc.verifyNoOutgoingEvents();
assertFalse(pulledTask.shouldDie());
- assertEquals(wc.tezTaskContext.getTaskAttemptId(), pulledTask.getTask()
+ assertEquals(wc.taskSpec.getTaskAttemptID(), pulledTask.getTask()
.getTaskAttemptID());
assertEquals(wc.taskAttemptID, wc.amContainer.getRunningTaskAttempt());
assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
@@ -824,7 +824,7 @@ public class TestAMContainer {
TezTaskID taskID;
TezTaskAttemptID taskAttemptID;
- TezTaskContext tezTaskContext;
+ TaskSpec taskSpec;
public AMContainerImpl amContainer;
@@ -859,8 +859,8 @@ public class TestAMContainer {
taskID = new TezTaskID(vertexID, 1);
taskAttemptID = new TezTaskAttemptID(taskID, 1);
- tezTaskContext = mock(TezTaskContext.class);
- doReturn(taskAttemptID).when(tezTaskContext).getTaskAttemptId();
+ taskSpec = mock(TaskSpec.class);
+ doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID();
amContainer = new AMContainerImpl(container, chh, tal,
appContext);
@@ -904,7 +904,7 @@ public class TestAMContainer {
public void assignTaskAttempt(TezTaskAttemptID taID) {
reset(eventHandler);
amContainer.handle(new AMContainerEventAssignTA(containerID, taID,
- tezTaskContext));
+ taskSpec));
}
public AMContainerTask pullTaskToRun() {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dist/src/main/assembly/tez-dist-full.xml
----------------------------------------------------------------------
diff --git a/tez-dist/src/main/assembly/tez-dist-full.xml b/tez-dist/src/main/assembly/tez-dist-full.xml
index 3176dd1..383eb43 100644
--- a/tez-dist/src/main/assembly/tez-dist-full.xml
+++ b/tez-dist/src/main/assembly/tez-dist-full.xml
@@ -24,8 +24,7 @@
<moduleSet>
<useAllReactorProjects>true</useAllReactorProjects>
<includes>
- <include>org.apache.tez:tez-dag-api</include>
- <include>org.apache.tez:tez-engine-api</include>
+ <include>org.apache.tez:tez-api</include>
</includes>
<binaries>
<outputDirectory>/</outputDirectory>
@@ -40,8 +39,7 @@
<outputDirectory>/lib</outputDirectory>
<!-- Exclude hadoop artifacts. They will be found via HADOOP* env -->
<excludes>
- <exclude>org.apache.tez:tez-dag-api</exclude>
- <exclude>org.apache.tez:tez-engine-api</exclude>
+ <exclude>org.apache.tez:tez-api</exclude>
<exclude>*:*:test-jar</exclude>
<exclude>org.apache.hadoop:hadoop-common</exclude>
<exclude>org.apache.hadoop:hadoop-auth</exclude>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dist/src/main/assembly/tez-dist.xml
----------------------------------------------------------------------
diff --git a/tez-dist/src/main/assembly/tez-dist.xml b/tez-dist/src/main/assembly/tez-dist.xml
index 01d0d40..aefb8a2 100644
--- a/tez-dist/src/main/assembly/tez-dist.xml
+++ b/tez-dist/src/main/assembly/tez-dist.xml
@@ -24,8 +24,7 @@
<moduleSet>
<useAllReactorProjects>true</useAllReactorProjects>
<includes>
- <include>org.apache.tez:tez-dag-api</include>
- <include>org.apache.tez:tez-engine-api</include>
+ <include>org.apache.tez:tez-api</include>
</includes>
<binaries>
<outputDirectory>/</outputDirectory>
@@ -42,8 +41,7 @@
<useTransitiveFiltering>true</useTransitiveFiltering>
<!-- Exclude hadoop artifacts. They will be found via HADOOP* env -->
<excludes>
- <exclude>org.apache.tez:tez-dag-api</exclude>
- <exclude>org.apache.tez:tez-engine-api</exclude>
+ <exclude>org.apache.tez:tez-api</exclude>
<exclude>*:*:test-jar</exclude>
<exclude>org.apache.hadoop:hadoop-common</exclude>
<exclude>org.apache.hadoop:hadoop-auth</exclude>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-engine-api/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-engine-api/findbugs-exclude.xml b/tez-engine-api/findbugs-exclude.xml
deleted file mode 100644
index 5b11308..0000000
--- a/tez-engine-api/findbugs-exclude.xml
+++ /dev/null
@@ -1,16 +0,0 @@
-<!--
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. See accompanying LICENSE file.
--->
-<FindBugsFilter>
-
-</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-engine-api/pom.xml
----------------------------------------------------------------------
diff --git a/tez-engine-api/pom.xml b/tez-engine-api/pom.xml
deleted file mode 100644
index b19e96b..0000000
--- a/tez-engine-api/pom.xml
+++ /dev/null
@@ -1,91 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. See accompanying LICENSE file.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez</artifactId>
- <version>0.2.0-SNAPSHOT</version>
- </parent>
- <artifactId>tez-engine-api</artifactId>
- <dependencies>
- <dependency>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-common</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.inject</groupId>
- <artifactId>guice</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.rat</groupId>
- <artifactId>apache-rat-plugin</artifactId>
- <configuration>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-maven-plugins</artifactId>
- <executions>
- <execution>
- <id>compile-protoc</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>protoc</goal>
- </goals>
- <configuration>
- <protocVersion>${protobuf.version}</protocVersion>
- <protocCommand>${protoc.path}</protocCommand>
- <imports>
- <param>${basedir}/src/main/proto</param>
- </imports>
- <source>
- <directory>${basedir}/src/main/proto</directory>
- <includes>
- <include>Events.proto</include>
- </includes>
- </source>
- <output>${project.build.directory}/generated-sources/java</output>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-engine-api/src/main/java/org/apache/tez/engine/api/Input.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Input.java b/tez-engine-api/src/main/java/org/apache/tez/engine/api/Input.java
deleted file mode 100644
index 64c3834..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Input.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.api;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * {@link Input} represents a pipe through which an <em>tez</em> task
- * can get input key/value pairs.
- */
-public interface Input {
-
- /**
- * Initialize <code>Input</code>.
- *
- * @param conf job configuration
- * @param master master process controlling the task
- * @throws IOException
- * @throws InterruptedException
- */
- public void initialize(Configuration conf, Master master)
- throws IOException, InterruptedException;
-
- /**
- * Check if there is another key/value pair.
- *
- * @return true if a key/value pair was read
- * @throws IOException
- * @throws InterruptedException
- */
- public boolean hasNext() throws IOException, InterruptedException;
-
- /**
- * Get the next key.
- *
- * @return the current key or null if there is no current key
- * @throws IOException
- * @throws InterruptedException
- */
- public Object getNextKey() throws IOException, InterruptedException;
-
- /**
- * Get the next values.
- *
- * @return the object that was read
- * @throws IOException
- * @throws InterruptedException
- */
- public Iterable<Object> getNextValues()
- throws IOException, InterruptedException;
-
- /**
- * The current progress of the {@link Input} through its data.
- *
- * @return a number between 0.0 and 1.0 that is the fraction of the data read
- * @throws IOException
- * @throws InterruptedException
- */
- public float getProgress() throws IOException, InterruptedException;
-
- /**
- * Close this <code>Input</code> for future operations.
- */
- public void close() throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-engine-api/src/main/java/org/apache/tez/engine/api/Master.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Master.java b/tez-engine-api/src/main/java/org/apache/tez/engine/api/Master.java
deleted file mode 100644
index f3add9a..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Master.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.api;
-
-import org.apache.hadoop.ipc.ProtocolInfo;
-import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
-
-/**
- * {@link Master} represents the master controlling the {@link Task}.
- */
-@ProtocolInfo(protocolName = "Master", protocolVersion = 1)
-public interface Master extends VersionedProtocol {
-
- // TODO TEZAM3 This likely needs to change to be a little more generic.
- // Many output / input relationships cannot be captured via this. The current
- // form works primarily works for the existing MR
-
- TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents(
- int fromEventIdx, int maxEventsToFetch,
- TezTaskAttemptID taskAttemptId);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-engine-api/src/main/java/org/apache/tez/engine/api/Output.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Output.java b/tez-engine-api/src/main/java/org/apache/tez/engine/api/Output.java
deleted file mode 100644
index daa80d0..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Output.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.api;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.engine.records.OutputContext;
-
-/**
- * {@link Output} represents a pipe through which an <em>tez</em> task
- * can send out outputs.
- */
-public interface Output {
-
- /**
- * Initialize <code>Output</code>.
- *
- * @param conf job configuration
- * @param master master process controlling the task
- * @throws IOException
- * @throws InterruptedException
- */
- public void initialize(Configuration conf, Master master)
- throws IOException, InterruptedException;
-
- /**
- * Writes a key/value pair.
- *
- * @param key the key to write.
- * @param value the value to write.
- * @throws IOException
- */
- public void write(Object key, Object value
- ) throws IOException, InterruptedException;
-
- /**
- * Returns the OutputContext for the particular <code>Output</code>.
- *
- * @return the OutputContext for this Output if it exists, otherwise null.
- */
- public OutputContext getOutputContext();
-
- /**
- * Close this <code>Output</code> for future operations.
- *
- * @throws IOException
- */
- public void close() throws IOException, InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-engine-api/src/main/java/org/apache/tez/engine/api/Processor.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Processor.java b/tez-engine-api/src/main/java/org/apache/tez/engine/api/Processor.java
deleted file mode 100644
index 550ee73..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Processor.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.api;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * {@link Processor} represents the <em>tez</em> entity responsible for
- * consuming {@link Input} and producing {@link Output}.
- */
-public interface Processor {
-
- /**
- * Initialize the <code>Processor</code>.
- *
- * @param conf job-configuration
- * @param master master process controlling the task
- * @throws IOException
- * @throws InterruptedException
- */
- public void initialize(Configuration conf, Master master)
- throws IOException, InterruptedException;
-
- /**
- * Process input data from <code>input</code> and
- * send it to <code>output</code>.
- *
- * @param in input
- * @param out output
- * @param master master process controlling the task
- * @throws IOException
- * @throws InterruptedException
- */
- public void process(Input[] in, Output[] out)
- throws IOException, InterruptedException;
-
- /**
- * Close the {@link Processor}.
- *
- * @throws IOException
- * @throws InterruptedException
- */
- public void close() throws IOException, InterruptedException;
-
-}