You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:29 UTC
[22/50] [abbrv] TEZ-443. Merge tez-dag-api and tez-engine-api into a
single module - tez-api (part of TEZ-398). (sseth)
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
new file mode 100644
index 0000000..dae5625
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.client.rpc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TryKillDAGRequestProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto;
+
+import com.google.protobuf.ServiceException;
+
+public class DAGClientRPCImpl implements DAGClient {
+ private static final Log LOG = LogFactory.getLog(DAGClientRPCImpl.class);
+
+ private final ApplicationId appId;
+ private final String dagId;
+ private final TezConfiguration conf;
+ private ApplicationReport appReport;
+ private YarnClient yarnClient;
+ private DAGClientAMProtocolBlockingPB proxy = null;
+
+ public DAGClientRPCImpl(ApplicationId appId, String dagId,
+ TezConfiguration conf) {
+ this.appId = appId;
+ this.dagId = dagId;
+ this.conf = conf;
+ yarnClient = new YarnClientImpl();
+ yarnClient.init(new YarnConfiguration(conf));
+ yarnClient.start();
+ appReport = null;
+ }
+
+ @Override
+ public ApplicationId getApplicationId() {
+ return appId;
+ }
+
+ @Override
+ public DAGStatus getDAGStatus() throws IOException, TezException {
+ if(createAMProxyIfNeeded()) {
+ try {
+ return getDAGStatusViaAM();
+ } catch (TezException e) {
+ resetProxy(e); // create proxy again
+ }
+ }
+
+ // Later maybe from History
+ return getDAGStatusViaRM();
+ }
+
+ @Override
+ public VertexStatus getVertexStatus(String vertexName)
+ throws IOException, TezException {
+ if(createAMProxyIfNeeded()) {
+ try {
+ return getVertexStatusViaAM(vertexName);
+ } catch (TezException e) {
+ resetProxy(e); // create proxy again
+ }
+ }
+
+ // need AM for this. Later maybe from History
+ return null;
+ }
+
+ @Override
+ public void tryKillDAG() throws TezException, IOException {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("TryKill for app: " + appId + " dag:" + dagId);
+ }
+ if(createAMProxyIfNeeded()) {
+ TryKillDAGRequestProto requestProto =
+ TryKillDAGRequestProto.newBuilder().setDagId(dagId).build();
+ try {
+ proxy.tryKillDAG(null, requestProto);
+ } catch (ServiceException e) {
+ resetProxy(e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.proxy != null) {
+ RPC.stopProxy(this.proxy);
+ }
+ if(yarnClient != null) {
+ yarnClient.stop();
+ }
+ }
+
+ @Override
+ public ApplicationReport getApplicationReport() {
+ return appReport;
+ }
+
+ void resetProxy(Exception e) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Resetting AM proxy for app: " + appId + " dag:" + dagId +
+ " due to exception :", e);
+ }
+ proxy = null;
+ }
+
+ DAGStatus getDAGStatusViaAM() throws IOException, TezException {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
+ }
+ GetDAGStatusRequestProto requestProto =
+ GetDAGStatusRequestProto.newBuilder().setDagId(dagId).build();
+ try {
+ return new DAGStatus(
+ proxy.getDAGStatus(null, requestProto).getDagStatus());
+ } catch (ServiceException e) {
+ // TEZ-151 retrieve wrapped TezException
+ throw new TezException(e);
+ }
+ }
+
+
+
+ DAGStatus getDAGStatusViaRM() throws TezException, IOException {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
+ }
+ ApplicationReport appReport;
+ try {
+ appReport = yarnClient.getApplicationReport(appId);
+ } catch (YarnException e) {
+ throw new TezException(e);
+ }
+
+ if(appReport == null) {
+ throw new TezException("Unknown/Invalid appId: " + appId);
+ }
+
+ DAGStatusProto.Builder builder = DAGStatusProto.newBuilder();
+ DAGStatus dagStatus = new DAGStatus(builder);
+ DAGStatusStateProto dagState = null;
+ switch (appReport.getYarnApplicationState()) {
+ case NEW:
+ case NEW_SAVING:
+ case SUBMITTED:
+ case ACCEPTED:
+ dagState = DAGStatusStateProto.DAG_SUBMITTED;
+ break;
+ case RUNNING:
+ dagState = DAGStatusStateProto.DAG_RUNNING;
+ break;
+ case FAILED:
+ dagState = DAGStatusStateProto.DAG_FAILED;
+ break;
+ case KILLED:
+ dagState = DAGStatusStateProto.DAG_KILLED;
+ break;
+ case FINISHED:
+ switch(appReport.getFinalApplicationStatus()) {
+ case UNDEFINED:
+ case FAILED:
+ dagState = DAGStatusStateProto.DAG_FAILED;
+ break;
+ case KILLED:
+ dagState = DAGStatusStateProto.DAG_KILLED;
+ break;
+ case SUCCEEDED:
+ dagState = DAGStatusStateProto.DAG_SUCCEEDED;
+ break;
+ default:
+ throw new TezUncheckedException("Encountered unknown final application"
+ + " status from YARN"
+ + ", appState=" + appReport.getYarnApplicationState()
+ + ", finalStatus=" + appReport.getFinalApplicationStatus());
+ }
+ break;
+ default:
+ throw new TezUncheckedException("Encountered unknown application state"
+ + " from YARN, appState=" + appReport.getYarnApplicationState());
+ }
+
+ builder.setState(dagState);
+ if(appReport.getDiagnostics() != null) {
+ builder.addAllDiagnostics(Collections.singleton(appReport.getDiagnostics()));
+ }
+
+ return dagStatus;
+ }
+
+ VertexStatus getVertexStatusViaAM(String vertexName) throws TezException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("GetVertexStatus via AM for app: " + appId + " dag: " + dagId
+ + " vertex: " + vertexName);
+ }
+ GetVertexStatusRequestProto requestProto =
+ GetVertexStatusRequestProto.newBuilder().
+ setDagId(dagId).setVertexName(vertexName).build();
+
+ try {
+ return new VertexStatus(
+ proxy.getVertexStatus(null, requestProto).getVertexStatus());
+ } catch (ServiceException e) {
+ // TEZ-151 retrieve wrapped TezException
+ throw new TezException(e);
+ }
+ }
+
+ ApplicationReport getAppReport() throws IOException, TezException {
+ try {
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("App: " + appId + " in state: "
+ + appReport.getYarnApplicationState());
+ }
+ return appReport;
+ } catch (YarnException e) {
+ throw new TezException(e);
+ }
+ }
+
+ boolean createAMProxyIfNeeded() throws IOException, TezException {
+ if(proxy != null) {
+ // if proxy exist optimistically use it assuming there is no retry
+ return true;
+ }
+ appReport = getAppReport();
+
+ if(appReport == null) {
+ return false;
+ }
+ YarnApplicationState appState = appReport.getYarnApplicationState();
+ if(appState != YarnApplicationState.RUNNING) {
+ return false;
+ }
+
+ // YARN-808. Cannot ascertain if AM is ready until we connect to it.
+ // workaround check the default string set by YARN
+ if(appReport.getHost() == null || appReport.getHost().equals("N/A") ||
+ appReport.getRpcPort() == 0){
+ // attempt not running
+ return false;
+ }
+
+ InetSocketAddress addr = new InetSocketAddress(appReport.getHost(),
+ appReport.getRpcPort());
+
+ RPC.setProtocolEngine(conf, DAGClientAMProtocolBlockingPB.class,
+ ProtobufRpcEngine.class);
+ proxy = (DAGClientAMProtocolBlockingPB) RPC.getProxy(
+ DAGClientAMProtocolBlockingPB.class, 0, addr, conf);
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/Event.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/Event.java b/tez-api/src/main/java/org/apache/tez/engine/api/Event.java
new file mode 100644
index 0000000..80da655
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/Event.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+/**
+ * Base class for all events generated within the Tez execution engine.
+ * Used as the primary mode of communication between the AM, Inputs, Processors
+ * and Outputs.
+ */
+public abstract class Event {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/Input.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/Input.java b/tez-api/src/main/java/org/apache/tez/engine/api/Input.java
new file mode 100644
index 0000000..e333075
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/Input.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+import java.util.List;
+
+/**
+ * Represents an input through which a TezProcessor receives data on an edge.
+ * </p>
+ *
+ * <code>Input</code> classes must have a 0 argument public constructor for Tez
+ * to construct the <code>Input</code>. Tez will take care of initializing and
+ * closing the Input after a {@link Processor} completes. </p>
+ */
+public interface Input {
+
+ /**
+ * Initializes the <code>Input</code>.
+ *
+ * @param inputContext
+ * the {@link TezInputContext}
+ * @return
+ * @throws Exception
+ * if an error occurs
+ */
+ public List<Event> initialize(TezInputContext inputContext)
+ throws Exception;
+
+ /**
+ * Gets an instance of the {@link Reader} for this <code>Output</code>
+ *
+ * @return
+ * @throws Exception
+ * if an error occurs
+ */
+ public Reader getReader() throws Exception;
+
+ /**
+ * Handles user and system generated {@link Events}s, which typically carry
+ * information such as an output being available on the previous vertex.
+ *
+ * @param inputEvents
+ * the list of {@link Event}s
+ */
+ public void handleEvents(List<Event> inputEvents);
+
+ /**
+ * Closes the <code>Input</code>
+ *
+ * @return
+ * @throws Exception
+ * if an error occurs
+ */
+ public List<Event> close() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/LogicalIOProcessor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/LogicalIOProcessor.java b/tez-api/src/main/java/org/apache/tez/engine/api/LogicalIOProcessor.java
new file mode 100644
index 0000000..90be09e
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/LogicalIOProcessor.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+import java.util.Map;
+
+/**
+ * Represents a processor which consumes {@link LogicalInput}s and produces
+ * {@link LogicalOutput}s
+ */
+public interface LogicalIOProcessor extends Processor {
+
+ /**
+ * Runs the {@link LogicalProcessor}
+ *
+ * @param inputs
+ * a map of the source vertex name to {@link LogicalInput} - one per
+ * incoming edge.
+ * @param outputs
+ * a map of the destination vertex name to {@link LogicalOutput} -
+ * one per outgoing edge
+ * @throws Exception TODO
+ */
+ public void run(Map<String, LogicalInput> inputs,
+ Map<String, LogicalOutput> outputs) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/LogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/LogicalInput.java b/tez-api/src/main/java/org/apache/tez/engine/api/LogicalInput.java
new file mode 100644
index 0000000..4a47ccf
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/LogicalInput.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+/**
+ * An @link {@link Input} which handles all incoming physical connections on an
+ * edge. A {@link LogicalIOProcessor} sees a single Logical Input per incoming
+ * edge.
+ */
+public interface LogicalInput extends Input {
+
+ /**
+ * Sets the number of physical inputs that this <code>LogicalInput</code> will
+ * receive. This will be called by the Tez framework before initializing the
+ * <code>LogicalInput</code>
+ *
+ * @param numInputs
+ * the number of physical inputs.
+ */
+ public void setNumPhysicalInputs(int numInputs);
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/LogicalOutput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/LogicalOutput.java b/tez-api/src/main/java/org/apache/tez/engine/api/LogicalOutput.java
new file mode 100644
index 0000000..4626fbd
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/LogicalOutput.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+/**
+ * An @link {@link Output} which handles all outgoing physical connections on an
+ * edge. A {@link LogicalIOProcessor} sees a single Logical Output per outgoing
+ * edge.
+ */
+public interface LogicalOutput extends Output {
+ /**
+ * Sets the number of physical ouputs that this <code>LogicalOutput</code>
+ * will receive. This will be called by the Tez framework before initializing
+ * the <code>LogicalOutput</code>
+ *
+ * @param numOutputs
+ * the number of physical outputs
+ */
+ public void setNumPhysicalOutputs(int numOutputs);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/Output.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/Output.java b/tez-api/src/main/java/org/apache/tez/engine/api/Output.java
new file mode 100644
index 0000000..ec679ed
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/Output.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+import java.util.List;
+
+/**
+ * Represents an Output through which a TezProcessor writes information on an
+ * edge. </p>
+ *
+ * <code>Output</code> implementations must have a 0 argument public constructor
+ * for Tez to construct the <code>Output</code>. Tez will take care of
+ * initializing and closing the Input after a {@link Processor} completes. </p>
+ */
+public interface Output {
+
+ /**
+ * Initializes the <code>Output</code>
+ *
+ * @param outputContext
+ * the {@link TezOutputContext}
+ * @return
+ * @throws Exception
+ * if an error occurs
+ */
+ public List<Event> initialize(TezOutputContext outputContext)
+ throws Exception;
+
+ /**
+ * Gets an instance of the {@link Writer} in an <code>Output</code>
+ *
+ * @return
+ * @throws Exception
+ * if an error occurs
+ */
+ public Writer getWriter() throws Exception;
+
+ /**
+ * Handles user and system generated {@link Events}s, which typically carry
+ * information such as a downstream vertex being ready to consume input.
+ *
+ * @param outputEvents
+ * the list of {@link Event}s
+ */
+ public void handleEvents(List<Event> outputEvents);
+
+ /**
+ * Closes the <code>Output</code>
+ *
+ * @return
+ * @throws Exception
+ * if an error occurs
+ */
+ public List<Event> close() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/Processor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/Processor.java b/tez-api/src/main/java/org/apache/tez/engine/api/Processor.java
new file mode 100644
index 0000000..05e6b84
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/Processor.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link Processor} represents the <em>Tez</em> entity responsible for
+ * consuming {@link Input} and producing {@link Output}.
+ */
+public interface Processor {
+
+ /**
+ * Initializes the <code>Processor</code>
+ *
+ * @param processorContext
+ * @throws IOException
+ * if an error occurs
+ */
+ public void initialize(TezProcessorContext processorContext)
+ throws Exception;
+
+ /**
+ * Handles user and system generated {@link Events}s.
+ *
+ * @param processorEvents
+ * the list of {@link Event}s
+ */
+ public void handleEvents(List<Event> processorEvents);
+
+ /**
+ * Closes the <code>Processor</code>
+ *
+ * @throws IOException
+ * if an error occurs
+ */
+ public void close() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/Reader.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/Reader.java b/tez-api/src/main/java/org/apache/tez/engine/api/Reader.java
new file mode 100644
index 0000000..502c5f2
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/Reader.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+/**
+ * A <code>Reader</code> represents the data being read in an {@link Input}
+ */
+public interface Reader {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/TezInputContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/TezInputContext.java b/tez-api/src/main/java/org/apache/tez/engine/api/TezInputContext.java
new file mode 100644
index 0000000..ddf1ff8
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/TezInputContext.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+/**
+ * Context handle for the Input to initialize itself.
+ */
+public interface TezInputContext extends TezTaskContext {
+
+ /**
+ * Get the Vertex Name of the Source that generated data for this Input
+ * @return Name of the Source Vertex
+ */
+ public String getSourceVertexName();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/TezOutputContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/TezOutputContext.java b/tez-api/src/main/java/org/apache/tez/engine/api/TezOutputContext.java
new file mode 100644
index 0000000..791a0f0
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/TezOutputContext.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+/**
+ * Context handle for the Output to initialize itself.
+ */
+public interface TezOutputContext extends TezTaskContext {
+
+ /**
+ * Get the Vertex Name of the Destination that is the recipient of this
+ * Output's data
+ * @return Name of the Destination Vertex
+ */
+ public String getDestinationVertexName();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/TezProcessorContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/TezProcessorContext.java b/tez-api/src/main/java/org/apache/tez/engine/api/TezProcessorContext.java
new file mode 100644
index 0000000..2bbbe81
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/TezProcessorContext.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+import java.io.IOException;
+
+/**
+ * Context handle for the Processor to initialize itself.
+ */
+public interface TezProcessorContext extends TezTaskContext {
+
+ /**
+ * Set the overall progress of this Task Attempt
+ * @param progress Progress in the range from [0.0 - 1.0f]
+ */
+ public void setProgress(float progress);
+
+ /**
+ * Check whether this attempt can commit its output
+ * @return true if commit allowed
+ * @throws IOException
+ */
+ public boolean canCommit() throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/TezTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/TezTaskContext.java b/tez-api/src/main/java/org/apache/tez/engine/api/TezTaskContext.java
new file mode 100644
index 0000000..706e646
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/TezTaskContext.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.TezCounters;
+
+/**
+ * Base interface for Context classes used to initialize the Input, Output
+ * and Processor instances.
+ */
+public interface TezTaskContext {
+
+ // TODO NEWTEZ
+ // Scale the maximum events we fetch per RPC call to mitigate OOM issues
+ // on the ApplicationMaster when a thundering herd of reducers fetch events
+ // This should not be necessary after HADOOP-8942
+
+ /**
+ * Get the {@link ApplicationId} for the running app
+ * @return the {@link ApplicationId}
+ */
+ public ApplicationId getApplicationId();
+
+ /**
+ * Get the current DAG Attempt Number
+ * @return DAG Attempt Number
+ */
+ public int getDAGAttemptNumber();
+
+ /**
+ * Get the index of this Task
+ * @return Task Index
+ */
+ public int getTaskIndex();
+
+ /**
+ * Get the current Task Attempt Number
+ * @return Task Attempt Number
+ */
+ public int getTaskAttemptNumber();
+
+ /**
+ * Get the name of the DAG
+ * @return the DAG name
+ */
+ public String getDAGName();
+
+ /**
+ * Get the name of the Vertex in which the task is running
+ * @return Vertex Name
+ */
+ public String getTaskVertexName();
+
+ public TezCounters getCounters();
+
+ /**
+ * Send Events to the AM and/or dependent Vertices
+ * @param events Events to be sent
+ */
+ public void sendEvents(List<Event> events);
+
+ /**
+ * Get the User Payload for the Input/Output/Processor
+ * @return User Payload
+ */
+ public byte[] getUserPayload();
+
+ /**
+ * Get the work diectories for the Input/Output/Processor
+ * @return an array of work dirs
+ */
+ public String[] getWorkDirs();
+
+ /**
+ * Returns an identifier which is unique to the specific Input, Processor or
+ * Output
+ *
+ * @return
+ */
+ public String getUniqueIdentifier();
+
+ /**
+ * Report a fatal error to the framework. This will cause the entire task to
+ * fail and should not be used for reporting temporary or recoverable errors
+ *
+ * @param exception an exception representing the error
+ */
+ public void fatalError(Throwable exception, String message);
+
+ /**
+ * Returns meta-data for the specified service. As an example, when the MR
+ * ShuffleHandler is used - this would return the jobToken serialized as bytes
+ *
+ * @param serviceName
+ * the name of the service for which meta-data is required
+ * @return a ByteBuffer representing the meta-data
+ */
+ public ByteBuffer getServiceConsumerMetaData(String serviceName);
+
+ /**
+ * Return Provider meta-data for the specified service As an example, when the
+ * MR ShuffleHandler is used - this would return the shuffle port serialized
+ * as bytes
+ *
+ * @param serviceName
+ * the name of the service for which provider meta-data is required
+ * @return a ByteBuffer representing the meta-data
+ */
+ public ByteBuffer getServiceProviderMetaData(String serviceName);
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/Writer.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/Writer.java b/tez-api/src/main/java/org/apache/tez/engine/api/Writer.java
new file mode 100644
index 0000000..c9503a3
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/Writer.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+/**
+ * A <code>Writer</code> represents the data being written by an {@link Output}
+ */
+public interface Writer {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/events/DataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/events/DataMovementEvent.java b/tez-api/src/main/java/org/apache/tez/engine/api/events/DataMovementEvent.java
new file mode 100644
index 0000000..3f35555
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/events/DataMovementEvent.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.events;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.engine.api.Event;
+
+/**
+ * Event used by user code to send information between tasks. An output can
+ * generate an Event of this type to sending information regarding output data
+ * ( such as URI for file-based output data, port info in case of
+ * streaming-based data transfers ) to the Input on the destination vertex.
+ */
+public final class DataMovementEvent extends Event {
+
+ /**
+ * Index(i) of the i-th (physical) Input or Output that generated an Event.
+ * For a Processor-generated event, this is ignored.
+ */
+ private final int sourceIndex;
+
+ /**
+ * Index(i) of the i-th (physical) Input or Output that is meant to receive
+ * this Event. For a Processor event, this is ignored.
+ */
+ private int targetIndex;
+
+ /**
+ * User Payload for this Event
+ */
+ private final byte[] userPayload;
+
+ /**
+ * Version number to indicate what attempt generated this Event
+ */
+ private int version;
+
+ /**
+ * User Event constructor
+ * @param sourceIndex Index to identify the physical edge of the input/output
+ * that generated the event
+ * @param userPayload User Payload of the User Event
+ */
+ public DataMovementEvent(int sourceIndex,
+ byte[] userPayload) {
+ this.userPayload = userPayload;
+ this.sourceIndex = sourceIndex;
+ }
+
+ @Private
+ public DataMovementEvent(int sourceIndex,
+ int targetIndex,
+ byte[] userPayload) {
+ this.userPayload = userPayload;
+ this.sourceIndex = sourceIndex;
+ this.targetIndex = targetIndex;
+ }
+
+ /**
+ * Constructor for Processor-generated User Events
+ * @param userPayload
+ */
+ public DataMovementEvent(byte[] userPayload) {
+ this(-1, userPayload);
+ }
+
+ public byte[] getUserPayload() {
+ return userPayload;
+ }
+
+ public int getSourceIndex() {
+ return sourceIndex;
+ }
+
+ public int getTargetIndex() {
+ return targetIndex;
+ }
+
+ @Private
+ public void setTargetIndex(int targetIndex) {
+ this.targetIndex = targetIndex;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ @Private
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/events/InputFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/events/InputFailedEvent.java b/tez-api/src/main/java/org/apache/tez/engine/api/events/InputFailedEvent.java
new file mode 100644
index 0000000..57de09b
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/events/InputFailedEvent.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.events;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.engine.api.Event;
+
+/**
+ * Event sent from the AM to an Input to indicate that one of it's sources has
+ * failed - effectively the input is no longer available from the particular
+ * source.
+ * Users are not expected to send this event.
+ */
+public class InputFailedEvent extends Event{
+
+ /**
+ * Index(i) of the i-th (physical) Input or Output that generated the data.
+ * For a Processor-generated event, this is ignored.
+ */
+ private final int sourceIndex;
+
+ /**
+ * Index(i) of the i-th (physical) Input or Output that is meant to receive
+ * this Event. For a Processor event, this is ignored.
+ */
+ private int targetIndex;
+
+ /**
+ * Version number to indicate what attempt generated this Event
+ */
+ private int version;
+
+ /**
+ * User Event constructor
+ * @param sourceIndex Index to identify the physical edge of the input/output
+ * that generated the event
+ * @param userPayload User Payload of the User Event
+ */
+ public InputFailedEvent(int sourceIndex) {
+ this.sourceIndex = sourceIndex;
+ }
+
+ @Private
+ public InputFailedEvent(int sourceIndex,
+ int targetIndex,
+ int version) {
+ this.sourceIndex = sourceIndex;
+ this.targetIndex = targetIndex;
+ this.version = version;
+ }
+
+ public int getSourceIndex() {
+ return sourceIndex;
+ }
+
+ public int getTargetIndex() {
+ return targetIndex;
+ }
+
+ @Private
+ public void setTargetIndex(int targetIndex) {
+ this.targetIndex = targetIndex;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ @Private
+ public void setVersion(int version) {
+ this.version = version;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/events/InputInformationEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/events/InputInformationEvent.java b/tez-api/src/main/java/org/apache/tez/engine/api/events/InputInformationEvent.java
new file mode 100644
index 0000000..3656d45
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/events/InputInformationEvent.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.events;
+
+import org.apache.tez.engine.api.Event;
+
+/**
+ * Event used to send user specific data from the user
+ * code in the AM to the task input
+ */
+public class InputInformationEvent extends Event {
+
+ /**
+ * User Payload for this Event
+ */
+ private final byte[] userPayload;
+ public InputInformationEvent(byte[] userPayload) {
+ this.userPayload = userPayload;
+ }
+
+ public byte[] getUserPayload() {
+ return userPayload;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/events/InputReadErrorEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/events/InputReadErrorEvent.java b/tez-api/src/main/java/org/apache/tez/engine/api/events/InputReadErrorEvent.java
new file mode 100644
index 0000000..fa49b79
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/events/InputReadErrorEvent.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.events;
+
+import org.apache.tez.engine.api.Event;
+
+/**
+ * Event generated by an Input to indicate error when trying to retrieve data.
+ * This is not necessarily a fatal event - it's an indication to the AM to retry
+ * source data generation.
+ */
+public final class InputReadErrorEvent extends Event {
+
+ /**
+ * Diagnostics/trace of the error that occurred on the Input's edge.
+ */
+ private final String diagnostics;
+
+ /**
+ * Index of the physical edge on which the error occurred.
+ */
+ private final int index;
+
+ /**
+ * Version of the data on which the error occurred.
+ */
+ private final int version;
+
+ public InputReadErrorEvent(String diagnostics, int index,
+ int version) {
+ super();
+ this.diagnostics = diagnostics;
+ this.index = index;
+ this.version = version;
+ }
+
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectLifeCycle.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectLifeCycle.java b/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectLifeCycle.java
new file mode 100644
index 0000000..7099299
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectLifeCycle.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.objectregistry;
+
+/**
+ * Defines the valid lifecycle and scope of Objects stored in ObjectRegistry.
+ * Objects are guaranteed to not be valid outside of their defined life-cycle
+ * period. Objects are not guaranteed to be retained through the defined period
+ * as they may be evicted for various reasons.
+ */
+public enum ObjectLifeCycle {
+ /** Objects are valid for the lifetime of the Tez JVM/Session
+ */
+ SESSION,
+ /** Objects are valid for the lifetime of the DAG.
+ */
+ DAG,
+ /** Objects are valid for the lifetime of the Vertex.
+ */
+ VERTEX,
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistry.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistry.java b/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistry.java
new file mode 100644
index 0000000..a27903d
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistry.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.objectregistry;
+
+/**
+ * Preliminary version of a simple shared object cache to re-use
+ * objects across multiple tasks within the same container/JVM.
+ */
+public interface ObjectRegistry {
+
+ /**
+ * Insert or update object into the registry. This will remove an object
+ * associated with the same key with a different life-cycle as there is only
+ * one instance of an Object stored for a given key irrespective of the
+ * life-cycle attached to the Object.
+ * @param lifeCycle What life-cycle is the Object valid for
+ * @param key Key to identify the Object
+ * @param value Object to be inserted
+ * @return Previous Object associated with the key attached if present
+ * else null. Could return the same object if the object was associated with
+ * the same key for a different life-cycle.
+ */
+ public Object add(ObjectLifeCycle lifeCycle, String key, Object value);
+
+ /**
+ * Return the object associated with the provided key
+ * @param key Key to find object
+ * @return Object if found else null
+ */
+ public Object get(String key);
+
+ /**
+ * Delete the object associated with the provided key
+ * @param lifeCycle What life-cycle is the Object valid for
+ * @param key Key to find object
+ * @return True if an object was found and removed
+ */
+ public boolean delete(String key);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryFactory.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryFactory.java b/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryFactory.java
new file mode 100644
index 0000000..94352b3
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryFactory.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.objectregistry;
+
+import com.google.inject.Inject;
+
+public class ObjectRegistryFactory {
+
+ @Inject
+ private static ObjectRegistry objectRegistry;
+
+ public static ObjectRegistry getObjectRegistry() {
+ return objectRegistry;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
new file mode 100644
index 0000000..4385749
--- /dev/null
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.dag.api.records";
+option java_outer_classname = "DAGProtos";
+option java_generate_equals_and_hash = true;
+
+// DAG plan messages
+
+// Many of these types have a dual in the Tez-api. To reduce confusion, these types have prefix or suffix
+// of "Plan" to indicate they are to be used in the dag-plan.
+// The big types use a suffix: JobPlan, VertexPlan, EdgePlan
+// --> these get more direct use in the runtime and the naming is natural.
+// The enums and utility types use prefix: PlanVertexType, PlanEdgeConnectionPaatern, etc
+// --> there is not great naming choice for these that avoids ambiguity, but this one seems acceptable.
+
+enum PlanVertexType {
+ INPUT = 0;
+ NORMAL = 1;
+ OUTPUT = 2;
+}
+
+enum PlanEdgeDataMovementType {
+ ONE_TO_ONE = 0;
+ BROADCAST = 1;
+ SCATTER_GATHER = 2;
+}
+
+enum PlanEdgeDataSourceType {
+ PERSISTED = 0;
+ PERSISTED_RELIABLE = 1;
+ EPHEMERAL = 2;
+}
+
+enum PlanEdgeSchedulingType {
+ SEQUENTIAL = 0;
+ CONCURRENT = 1;
+}
+
+message PlanKeyValuePair {
+ required string key = 1;
+ required string value = 2;
+}
+
+enum PlanLocalResourceType {
+ FILE = 0;
+ ARCHIVE = 1;
+ PATTERN = 2;
+}
+
+enum PlanLocalResourceVisibility {
+ PUBLIC = 0;
+ PRIVATE = 1;
+ APPLICATION = 2;
+}
+
+message PlanLocalResource {
+ required string name = 1;
+ required string uri = 2;
+ required int64 size = 3;
+ required int64 timeStamp = 4;
+ required PlanLocalResourceType type = 5;
+ required PlanLocalResourceVisibility visibility = 6;
+ optional string pattern = 7; // only used if type=PATTERN
+}
+
+// Each taskLocationHint represents a single split in in the input.
+// It is the list of [{rack,machines}] that host a replica of each particular split.
+// For now it is represented as pair-of-arrays rather than array-of-pairs.
+message PlanTaskLocationHint {
+ repeated string rack = 1;
+ repeated string host = 2;
+}
+
+message PlanTaskConfiguration {
+ required int32 numTasks = 1;
+ required int32 memoryMb = 2;
+ required int32 virtualCores = 3;
+ required string javaOpts = 4;
+ required string taskModule = 5;
+ repeated PlanLocalResource localResource = 6;
+ repeated PlanKeyValuePair environmentSetting = 8;
+}
+
+message TezEntityDescriptorProto {
+ optional string class_name = 1;
+ optional bytes user_payload = 2;
+}
+
+message VertexPlan {
+ required string name = 1;
+ required PlanVertexType type = 2;
+ optional TezEntityDescriptorProto processor_descriptor = 3;
+ required PlanTaskConfiguration taskConfig = 4;
+ repeated PlanTaskLocationHint taskLocationHint = 7;
+ repeated string inEdgeId = 8;
+ repeated string outEdgeId = 9;
+}
+
+message EdgePlan {
+ required string id = 1;
+ required string inputVertexName = 2;
+ required string outputVertexName = 3;
+ required PlanEdgeDataMovementType dataMovementType = 4;
+ required PlanEdgeDataSourceType dataSourceType = 5;
+ required PlanEdgeSchedulingType schedulingType = 6;
+ optional TezEntityDescriptorProto edge_source = 7;
+ optional TezEntityDescriptorProto edge_destination = 8;
+}
+
+message ConfigurationProto {
+ repeated PlanKeyValuePair confKeyValues = 1;
+}
+
+message DAGPlan {
+ required string name = 1;
+ repeated VertexPlan vertex = 2;
+ repeated EdgePlan edge = 3;
+ optional ConfigurationProto dagKeyValues = 4;
+}
+
+// DAG monitoring messages
+message ProgressProto {
+ optional int32 totalTaskCount = 1;
+ optional int32 succeededTaskCount = 2;
+ optional int32 runningTaskCount = 3;
+ optional int32 failedTaskCount = 4;
+ optional int32 killedTaskCount = 5;
+}
+
+enum VertexStatusStateProto {
+ VERTEX_INITED = 0;
+ VERTEX_RUNNING = 1;
+ VERTEX_SUCCEEDED = 2;
+ VERTEX_KILLED = 3;
+ VERTEX_FAILED = 4;
+ VERTEX_ERROR = 5;
+ VERTEX_TERMINATING = 6;
+}
+
+message VertexStatusProto {
+ optional VertexStatusStateProto state = 1;
+ repeated string diagnostics = 2;
+ optional ProgressProto progress = 3;
+}
+
+enum DAGStatusStateProto {
+ DAG_SUBMITTED = 0;
+ DAG_INITING = 1;
+ DAG_RUNNING = 2;
+ DAG_SUCCEEDED = 3;
+ DAG_KILLED = 4;
+ DAG_FAILED = 5;
+ DAG_ERROR = 6;
+ DAG_TERMINATING = 7;
+}
+
+message StringProgressPairProto {
+ required string key = 1;
+ required ProgressProto progress = 2;
+}
+
+message DAGStatusProto {
+ optional DAGStatusStateProto state = 1;
+ repeated string diagnostics = 2;
+ optional ProgressProto DAGProgress = 3;
+ repeated StringProgressPairProto vertexProgress = 4;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/proto/DAGClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGClientAMProtocol.proto b/tez-api/src/main/proto/DAGClientAMProtocol.proto
new file mode 100644
index 0000000..6fcd1f8
--- /dev/null
+++ b/tez-api/src/main/proto/DAGClientAMProtocol.proto
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.dag.api.client.rpc";
+option java_outer_classname = "DAGClientAMProtocolRPC";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+//import "DAGClientAMProtocolRecords.proto";
+
+import "DAGApiRecords.proto";
+
+message GetAllDAGsRequestProto {
+}
+
+message GetAllDAGsResponseProto {
+ repeated string dagId = 1;
+}
+
+message GetDAGStatusRequestProto {
+ optional string dagId = 1;
+}
+
+message GetDAGStatusResponseProto {
+ optional DAGStatusProto dagStatus = 1;
+}
+
+message GetVertexStatusRequestProto {
+ optional string dagId = 1;
+ optional string vertexName = 2;
+}
+
+message GetVertexStatusResponseProto {
+ optional VertexStatusProto vertexStatus = 1;
+}
+
+message TryKillDAGRequestProto {
+ optional string dagId = 1;
+}
+
+message TryKillDAGResponseProto {
+ //nothing yet
+}
+
+message SubmitDAGRequestProto {
+ optional DAGPlan d_a_g_plan = 1;
+}
+
+message SubmitDAGResponseProto {
+ optional string dagId = 1;
+}
+
+message ShutdownSessionRequestProto {
+}
+
+message ShutdownSessionResponseProto {
+}
+
+service DAGClientAMProtocol {
+ rpc getAllDAGs (GetAllDAGsRequestProto) returns (GetAllDAGsResponseProto);
+ rpc getDAGStatus (GetDAGStatusRequestProto) returns (GetDAGStatusResponseProto);
+ rpc getVertexStatus (GetVertexStatusRequestProto) returns (GetVertexStatusResponseProto);
+ rpc tryKillDAG (TryKillDAGRequestProto) returns (TryKillDAGResponseProto);
+ rpc submitDAG (SubmitDAGRequestProto) returns (SubmitDAGResponseProto);
+ rpc shutdownSession (ShutdownSessionRequestProto) returns (ShutdownSessionResponseProto);
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/Events.proto b/tez-api/src/main/proto/Events.proto
new file mode 100644
index 0000000..21cacf6
--- /dev/null
+++ b/tez-api/src/main/proto/Events.proto
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.engine.api.events";
+option java_outer_classname = "EventProtos";
+option java_generate_equals_and_hash = true;
+
+message DataMovementEventProto {
+ optional int32 source_index = 1;
+ optional int32 target_index = 2;
+ optional bytes user_payload = 3;
+ optional int32 version = 4;
+}
+
+message InputReadErrorEventProto {
+ optional int32 index = 1;
+ optional string diagnostics = 2;
+ optional int32 version = 3;
+}
+
+message InputFailedEventProto {
+ optional int32 source_index = 1;
+ optional int32 target_index = 2;
+ optional int32 version = 4;
+}
+
+message InputInformationEventProto {
+ optional bytes user_payload = 1;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
new file mode 100644
index 0000000..53ec357
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
+import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
+import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+// based on TestDAGLocationHint
+public class TestDAGPlan {
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder(); //TODO: doesn't seem to be deleting this folder automatically as expected.
+
+ @Test
+ public void testBasicJobPlanSerde() throws IOException {
+
+ DAGPlan job = DAGPlan.newBuilder()
+ .setName("test")
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex1")
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(PlanTaskLocationHint.newBuilder().addHost("machineName").addRack("rack1").build())
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(2)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x.y")
+ .build())
+ .build())
+ .build();
+ File file = tempFolder.newFile("jobPlan");
+ FileOutputStream outStream = null;
+ try {
+ outStream = new FileOutputStream(file);
+ job.writeTo(outStream);
+ }
+ finally {
+ if(outStream != null){
+ outStream.close();
+ }
+ }
+
+ DAGPlan inJob;
+ FileInputStream inputStream;
+ try {
+ inputStream = new FileInputStream(file);
+ inJob = DAGPlan.newBuilder().mergeFrom(inputStream).build();
+ }
+ finally {
+ outStream.close();
+ }
+
+ Assert.assertEquals(job, inJob);
+ }
+
+ @Test
+ public void testUserPayloadSerde() {
+ DAG dag = new DAG("testDag");
+ ProcessorDescriptor pd1 = new ProcessorDescriptor("processor1").
+ setUserPayload("processor1Bytes".getBytes());
+ ProcessorDescriptor pd2 = new ProcessorDescriptor("processor2").
+ setUserPayload("processor2Bytes".getBytes());
+ Vertex v1 = new Vertex("v1", pd1, 10, Resource.newInstance(1024, 1));
+ Vertex v2 = new Vertex("v2", pd2, 1, Resource.newInstance(1024, 1));
+ v1.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
+ .setTaskLocalResources(new HashMap<String, LocalResource>());
+ v2.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
+ .setTaskLocalResources(new HashMap<String, LocalResource>());
+
+ InputDescriptor inputDescriptor = new InputDescriptor("input").
+ setUserPayload("inputBytes".getBytes());
+ OutputDescriptor outputDescriptor = new OutputDescriptor("output").
+ setUserPayload("outputBytes".getBytes());
+ Edge edge = new Edge(v1, v2, new EdgeProperty(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, outputDescriptor, inputDescriptor));
+
+ dag.addVertex(v1).addVertex(v2).addEdge(edge);
+
+ DAGPlan dagProto = dag.createDag(new TezConfiguration());
+
+ assertEquals(2, dagProto.getVertexCount());
+ assertEquals(1, dagProto.getEdgeCount());
+
+ VertexPlan v1Proto = dagProto.getVertex(0);
+ VertexPlan v2Proto = dagProto.getVertex(1);
+ EdgePlan edgeProto = dagProto.getEdge(0);
+
+ assertEquals("processor1Bytes", new String(v1Proto.getProcessorDescriptor()
+ .getUserPayload().toByteArray()));
+ assertEquals("processor1", v1Proto.getProcessorDescriptor().getClassName());
+
+ assertEquals("processor2Bytes", new String(v2Proto.getProcessorDescriptor()
+ .getUserPayload().toByteArray()));
+ assertEquals("processor2", v2Proto.getProcessorDescriptor().getClassName());
+
+ assertEquals("inputBytes", new String(edgeProto.getEdgeDestination()
+ .getUserPayload().toByteArray()));
+ assertEquals("input", edgeProto.getEdgeDestination().getClassName());
+
+ assertEquals("outputBytes", new String(edgeProto.getEdgeSource()
+ .getUserPayload().toByteArray()));
+ assertEquals("output", edgeProto.getEdgeSource().getClassName());
+
+ EdgeProperty edgeProperty = DagTypeConverters
+ .createEdgePropertyMapFromDAGPlan(dagProto.getEdgeList().get(0));
+
+ byte[] ib = edgeProperty.getEdgeDestination().getUserPayload();
+ assertEquals("inputBytes", new String(ib));
+ assertEquals("input", edgeProperty.getEdgeDestination().getClassName());
+
+ byte[] ob = edgeProperty.getEdgeSource().getUserPayload();
+ assertEquals("outputBytes", new String(ob));
+ assertEquals("output", edgeProperty.getEdgeSource().getClassName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
new file mode 100644
index 0000000..b33f3a6
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestDAGVerify {
+
+ private final String dummyProcessorClassName = TestDAGVerify.class.getName();
+ private final String dummyInputClassName = TestDAGVerify.class.getName();
+ private final String dummyOutputClassName = TestDAGVerify.class.getName();
+ private final int dummyTaskCount = 2;
+ private final Resource dummyTaskResource = Resource.newInstance(1, 1);
+
+ // v1
+ // |
+ // v2
+ @Test
+ public void testVerify1() {
+ Vertex v1 = new Vertex("v1",
+ new ProcessorDescriptor(dummyProcessorClassName),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = new Vertex("v2",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Edge e1 = new Edge(v1, v2,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor(dummyOutputClassName),
+ new InputDescriptor(dummyInputClassName)));
+ DAG dag = new DAG("testDag");
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addEdge(e1);
+ dag.verify();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testVerify2() {
+ Vertex v1 = new Vertex("v1",
+ new ProcessorDescriptor(dummyProcessorClassName),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = new Vertex("v2",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Edge e1 = new Edge(v1, v2,
+ new EdgeProperty(DataMovementType.ONE_TO_ONE,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor(dummyOutputClassName),
+ new InputDescriptor(dummyInputClassName)));
+ DAG dag = new DAG("testDag");
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addEdge(e1);
+ dag.verify();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testVerify3() {
+ Vertex v1 = new Vertex("v1",
+ new ProcessorDescriptor(dummyProcessorClassName),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = new Vertex("v2",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Edge e1 = new Edge(v1, v2,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.EPHEMERAL, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor(dummyOutputClassName),
+ new InputDescriptor(dummyInputClassName)));
+ DAG dag = new DAG("testDag");
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addEdge(e1);
+ dag.verify();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testVerify4() {
+ Vertex v1 = new Vertex("v1",
+ new ProcessorDescriptor(dummyProcessorClassName),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = new Vertex("v2",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Edge e1 = new Edge(v1, v2,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.EPHEMERAL, SchedulingType.CONCURRENT,
+ new OutputDescriptor(dummyOutputClassName),
+ new InputDescriptor(dummyInputClassName)));
+ DAG dag = new DAG("testDag");
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addEdge(e1);
+ dag.verify();
+ }
+
+ // v1 <----
+ // | ^
+ // v2 ^
+ // | | ^
+ // v3 v4
+ @Test
+ public void testCycle1() {
+ IllegalStateException ex=null;
+ Vertex v1 = new Vertex("v1",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = new Vertex("v2",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v3 = new Vertex("v3",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v4 = new Vertex("v4",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Edge e1 = new Edge(v1, v2,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("dummy output class"),
+ new InputDescriptor("dummy input class")));
+ Edge e2 = new Edge(v2, v3,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("dummy output class"),
+ new InputDescriptor("dummy input class")));
+ Edge e3 = new Edge(v2, v4,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("dummy output class"),
+ new InputDescriptor("dummy input class")));
+ Edge e4 = new Edge(v4, v1,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("dummy output class"),
+ new InputDescriptor("dummy input class")));
+ DAG dag = new DAG("testDag");
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addVertex(v3);
+ dag.addVertex(v4);
+ dag.addEdge(e1);
+ dag.addEdge(e2);
+ dag.addEdge(e3);
+ dag.addEdge(e4);
+ try{
+ dag.verify();
+ }
+ catch (IllegalStateException e){
+ ex = e;
+ }
+ Assert.assertNotNull(ex);
+ System.out.println(ex.getMessage());
+ Assert.assertTrue(ex.getMessage().startsWith("DAG contains a cycle"));
+ }
+
+ // v1
+ // |
+ // -> v2
+ // ^ | |
+ // v3 v4
+ @Test
+ public void testCycle2() {
+ IllegalStateException ex=null;
+ Vertex v1 = new Vertex("v1",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = new Vertex("v2",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v3 = new Vertex("v3",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v4 = new Vertex("v4",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Edge e1 = new Edge(v1, v2,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("dummy output class"),
+ new InputDescriptor("dummy input class")));
+ Edge e2 = new Edge(v2, v3,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("dummy output class"),
+ new InputDescriptor("dummy input class")));
+ Edge e3 = new Edge(v2, v4,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("dummy output class"),
+ new InputDescriptor("dummy input class")));
+ Edge e4 = new Edge(v3, v2,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("dummy output class"),
+ new InputDescriptor("dummy input class")));
+ DAG dag = new DAG("testDag");
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addVertex(v3);
+ dag.addVertex(v4);
+ dag.addEdge(e1);
+ dag.addEdge(e2);
+ dag.addEdge(e3);
+ dag.addEdge(e4);
+ try{
+ dag.verify();
+ }
+ catch (IllegalStateException e){
+ ex = e;
+ }
+ Assert.assertNotNull(ex);
+ System.out.println(ex.getMessage());
+ Assert.assertTrue(ex.getMessage().startsWith("DAG contains a cycle"));
+ }
+
+ @Test
+ public void repeatedVertexName() {
+ IllegalStateException ex=null;
+ Vertex v1 = new Vertex("v1",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v1repeat = new Vertex("v1",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ DAG dag = new DAG("testDag");
+ dag.addVertex(v1);
+ dag.addVertex(v1repeat);
+ try {
+ dag.verify();
+ }
+ catch (IllegalStateException e){
+ ex = e;
+ }
+ Assert.assertNotNull(ex);
+ System.out.println(ex.getMessage());
+ Assert.assertTrue(ex.getMessage().startsWith("DAG contains multiple vertices with name"));
+ }
+
+ // v1 v2
+ // | |
+ // v3
+ @Test
+ public void BinaryInputDisallowed() {
+ IllegalStateException ex=null;
+ try {
+ Vertex v1 = new Vertex("v1",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = new Vertex("v2",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v3 = new Vertex("v3",
+ new ProcessorDescriptor("ReduceProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Edge e1 = new Edge(v1, v3,
+ new EdgeProperty(DataMovementType.ONE_TO_ONE,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("dummy output class"),
+ new InputDescriptor("dummy input class")));
+ Edge e2 = new Edge(v2, v3,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("dummy output class"),
+ new InputDescriptor("dummy input class")));
+ DAG dag = new DAG("testDag");
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addVertex(v3);
+ dag.addEdge(e1);
+ dag.addEdge(e2);
+ dag.verify();
+ }
+ catch (IllegalStateException e){
+ ex = e;
+ }
+ Assert.assertNotNull(ex);
+ System.out.println(ex.getMessage());
+ Assert.assertTrue(ex.getMessage().startsWith(
+ "Unsupported connection pattern on edge"));
+ }
+
+ // v1 v2
+ // | |
+ // v3
+ @Test
+ public void BinaryInputAllowed() {
+ Vertex v1 = new Vertex("v1",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = new Vertex("v2",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v3 = new Vertex("v3",
+ new ProcessorDescriptor("ReduceProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Edge e1 = new Edge(v1, v3,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("dummy output class"),
+ new InputDescriptor("dummy input class")));
+ Edge e2 = new Edge(v2, v3,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("dummy output class"),
+ new InputDescriptor("dummy input class")));
+ DAG dag = new DAG("testDag");
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addVertex(v3);
+ dag.addEdge(e1);
+ dag.addEdge(e2);
+ dag.verify();
+ }
+
+ // v1
+ // | |
+ // v2 v3
+ @Test
+ public void BinaryOutput() {
+ IllegalStateException ex=null;
+ try {
+ Vertex v1 = new Vertex("v1",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = new Vertex("v2",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v3 = new Vertex("v3",
+ new ProcessorDescriptor("MapProcessor"),
+ dummyTaskCount, dummyTaskResource);
+ Edge e1 = new Edge(v1, v2,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("dummy output class"),
+ new InputDescriptor("dummy input class")));
+ Edge e2 = new Edge(v1, v2,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("dummy output class"),
+ new InputDescriptor("dummy input class")));
+ DAG dag = new DAG("testDag");
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addVertex(v3);
+ dag.addEdge(e1);
+ dag.addEdge(e2);
+ dag.verify();
+ }
+ catch (IllegalStateException e){
+ ex = e;
+ }
+ Assert.assertNotNull(ex);
+ System.out.println(ex.getMessage());
+ Assert.assertTrue(ex.getMessage().startsWith("Vertex has outDegree>1"));
+ }
+
+ @Test
+ public void testDagWithNoVertices() {
+ IllegalStateException ex=null;
+ try {
+ DAG dag = new DAG("testDag");
+ dag.verify();
+ }
+ catch (IllegalStateException e){
+ ex = e;
+ }
+ Assert.assertNotNull(ex);
+ System.out.println(ex.getMessage());
+ Assert.assertTrue(ex.getMessage()
+ .startsWith("Invalid dag containing 0 vertices"));
+ }
+
+ @SuppressWarnings("unused")
+ @Test
+ public void testInvalidVertexConstruction() {
+ try {
+ Vertex v1 = new Vertex("v1",
+ new ProcessorDescriptor("MapProcessor"),
+ 0, dummyTaskResource);
+ Assert.fail("Expected exception for 0 parallelism");
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().startsWith("Parallelism cannot be 0"));
+ }
+ try {
+ Vertex v1 = new Vertex("v1",
+ new ProcessorDescriptor("MapProcessor"),
+ 1, null);
+ Assert.fail("Expected exception for 0 parallelism");
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().startsWith("Resource cannot be null"));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-common/pom.xml
----------------------------------------------------------------------
diff --git a/tez-common/pom.xml b/tez-common/pom.xml
index 87f18d6..b4882bb 100644
--- a/tez-common/pom.xml
+++ b/tez-common/pom.xml
@@ -39,7 +39,7 @@
</dependency>
<dependency>
<groupId>org.apache.tez</groupId>
- <artifactId>tez-dag-api</artifactId>
+ <artifactId>tez-api</artifactId>
</dependency>
</dependencies>