You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:30 UTC
[23/50] [abbrv] TEZ-443. Merge tez-dag-api and tez-engine-api into a
single module - tez-api (part of TEZ-398). (sseth)
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
new file mode 100644
index 0000000..9cb602c
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Stack;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
+import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
+import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
+
+
+public class DAG { // FIXME rename to Topology
+ final List<Vertex> vertices;
+ final List<Edge> edges;
+ final String name;
+
+ public DAG(String name) {
+ this.vertices = new ArrayList<Vertex>();
+ this.edges = new ArrayList<Edge>();
+ this.name = name;
+ }
+
+ public synchronized DAG addVertex(Vertex vertex) {
+ if (vertices.contains(vertex)) {
+ throw new IllegalArgumentException(
+ "Vertex " + vertex + " already defined!");
+ }
+ vertices.add(vertex);
+ return this;
+ }
+
+ @Private
+ public synchronized List<Vertex> getVertices() {
+ return Collections.unmodifiableList(this.vertices);
+ }
+
+ public synchronized DAG addEdge(Edge edge) {
+ // Sanity checks
+ if (!vertices.contains(edge.getInputVertex())) {
+ throw new IllegalArgumentException(
+ "Input vertex " + edge.getInputVertex() + " doesn't exist!");
+ }
+ if (!vertices.contains(edge.getOutputVertex())) {
+ throw new IllegalArgumentException(
+ "Output vertex " + edge.getOutputVertex() + " doesn't exist!");
+ }
+ if (edges.contains(edge)) {
+ throw new IllegalArgumentException(
+ "Edge " + edge + " already defined!");
+ }
+
+ // Inform the vertices
+ edge.getInputVertex().addOutputVertex(edge.getOutputVertex(), edge.getId());
+ edge.getOutputVertex().addInputVertex(edge.getInputVertex(), edge.getId());
+
+ edges.add(edge);
+ return this;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ // AnnotatedVertex is used by verify()
+ private static class AnnotatedVertex {
+ Vertex v;
+
+ int index; //for Tarjan's algorithm
+ int lowlink; //for Tarjan's algorithm
+ boolean onstack; //for Tarjan's algorithm
+
+ int outDegree;
+
+ private AnnotatedVertex(Vertex v){
+ this.v = v;
+ index = -1;
+ lowlink = -1;
+ outDegree = 0;
+ }
+ }
+
+ // verify()
+ //
+ // Default rules
+ // Illegal:
+ // - duplicate vertex id
+ // - cycles
+ //
+ // Ok:
+ // - orphaned vertex. Occurs in map-only
+ // - islands. Occurs if job has unrelated workflows.
+ //
+ // Not yet categorized:
+ // - orphaned vertex in DAG of >1 vertex. Could be unrelated map-only job.
+ // - v1->v2 via two edges. perhaps some self-join job would use this?
+ //
+ // "restricted" mode:
+ // In short term, the supported DAGs are limited. Call with restricted=true for these verifications.
+ // Illegal:
+ // - any vertex with more than one input or output edge. (n-ary input, n-ary merge)
+ public void verify() throws IllegalStateException {
+ verify(true);
+ }
+
+ public void verify(boolean restricted) throws IllegalStateException {
+ if (vertices.isEmpty()) {
+ throw new IllegalStateException("Invalid dag containing 0 vertices");
+ }
+
+ Map<Vertex, List<Edge>> edgeMap = new HashMap<Vertex, List<Edge>>();
+ for(Edge e : edges){
+ Vertex inputVertex = e.getInputVertex();
+ List<Edge> edgeList = edgeMap.get(inputVertex);
+ if(edgeList == null){
+ edgeList = new ArrayList<Edge>();
+ edgeMap.put(inputVertex, edgeList);
+ }
+ edgeList.add(e);
+ }
+
+ // check for valid vertices, duplicate vertex names,
+ // and prepare for cycle detection
+ Map<String, AnnotatedVertex> vertexMap = new HashMap<String, AnnotatedVertex>();
+ for(Vertex v : vertices){
+ if(vertexMap.containsKey(v.getVertexName())){
+ throw new IllegalStateException("DAG contains multiple vertices"
+ + " with name: " + v.getVertexName());
+ }
+ vertexMap.put(v.getVertexName(), new AnnotatedVertex(v));
+ }
+
+ detectCycles(edgeMap, vertexMap);
+
+ if(restricted){
+ for(Edge e : edges){
+ vertexMap.get(e.getInputVertex().getVertexName()).outDegree++;
+ if (e.getEdgeProperty().getDataMovementType() !=
+ DataMovementType.SCATTER_GATHER) {
+ throw new IllegalStateException(
+ "Unsupported connection pattern on edge. " + e);
+ }
+ if (e.getEdgeProperty().getDataSourceType() !=
+ DataSourceType.PERSISTED) {
+ throw new IllegalStateException(
+ "Unsupported source type on edge. " + e);
+ }
+ if (e.getEdgeProperty().getSchedulingType() !=
+ SchedulingType.SEQUENTIAL) {
+ throw new IllegalStateException(
+ "Unsupported scheduling type on edge. " + e);
+ }
+ }
+ for(AnnotatedVertex av: vertexMap.values()){
+ if (av.outDegree > 1) {
+ throw new IllegalStateException("Vertex has outDegree>1: "
+ + av.v.getVertexName());
+ }
+ }
+ }
+ }
+
+ // Adaptation of Tarjan's algorithm for connected components.
+ // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
+ private void detectCycles(Map<Vertex, List<Edge>> edgeMap, Map<String, AnnotatedVertex> vertexMap)
+ throws IllegalStateException{
+ Integer nextIndex = 0; // boxed integer so it is passed by reference.
+ Stack<AnnotatedVertex> stack = new Stack<DAG.AnnotatedVertex>();
+ for(AnnotatedVertex av: vertexMap.values()){
+ if(av.index == -1){
+ assert stack.empty();
+ strongConnect(av, vertexMap, edgeMap, stack, nextIndex);
+ }
+ }
+ }
+
+ // part of Tarjan's algorithm for connected components.
+ // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
+ private void strongConnect(
+ AnnotatedVertex av,
+ Map<String, AnnotatedVertex> vertexMap,
+ Map<Vertex, List<Edge>> edgeMap,
+ Stack<AnnotatedVertex> stack, Integer nextIndex) throws IllegalStateException{
+ av.index = nextIndex;
+ av.lowlink = nextIndex;
+ nextIndex++;
+ stack.push(av);
+ av.onstack = true;
+
+ List<Edge> edges = edgeMap.get(av.v);
+ if(edges != null){
+ for(Edge e : edgeMap.get(av.v)){
+ AnnotatedVertex outVertex = vertexMap.get(e.getOutputVertex().getVertexName());
+ if(outVertex.index == -1){
+ strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex);
+ av.lowlink = Math.min(av.lowlink, outVertex.lowlink);
+ }
+ else if(outVertex.onstack){
+ // strongly connected component detected, but we will wait till later so that the full cycle can be displayed.
+ // update lowlink in case outputVertex should be considered the root of this component.
+ av.lowlink = Math.min(av.lowlink, outVertex.index);
+ }
+ }
+ }
+
+ if(av.lowlink == av.index ){
+ AnnotatedVertex pop = stack.pop();
+ pop.onstack = false;
+ if(pop != av){
+ // there was something on the stack other than this "av".
+ // this indicates there is a scc/cycle. It comprises all nodes from top of stack to "av"
+ StringBuilder message = new StringBuilder();
+ message.append(av.v.getVertexName() + " <- ");
+ for( ; pop != av; pop = stack.pop()){
+ message.append(pop.v.getVertexName() + " <- ");
+ pop.onstack = false;
+ }
+ message.append(av.v.getVertexName());
+ throw new IllegalStateException("DAG contains a cycle: " + message);
+ }
+ }
+ }
+
+
+ // create protobuf message describing DAG
+ @Private
+ public DAGPlan createDag(Configuration dagConf) {
+ verify(true);
+
+ DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
+
+ dagBuilder.setName(this.name);
+
+ for(Vertex vertex : vertices){
+ VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
+ vertexBuilder.setName(vertex.getVertexName());
+ vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until TEZ-46.
+ vertexBuilder.setProcessorDescriptor(DagTypeConverters
+ .convertToDAGPlan(vertex.getProcessorDescriptor()));
+
+ //task config
+ PlanTaskConfiguration.Builder taskConfigBuilder = PlanTaskConfiguration.newBuilder();
+ Resource resource = vertex.getTaskResource();
+ taskConfigBuilder.setNumTasks(vertex.getParallelism());
+ taskConfigBuilder.setMemoryMb(resource.getMemory());
+ taskConfigBuilder.setVirtualCores(resource.getVirtualCores());
+ taskConfigBuilder.setJavaOpts(vertex.getJavaOpts());
+
+ taskConfigBuilder.setTaskModule(vertex.getVertexName());
+ PlanLocalResource.Builder localResourcesBuilder = PlanLocalResource.newBuilder();
+ Map<String,LocalResource> lrs = vertex.getTaskLocalResources();
+ for(Entry<String, LocalResource> entry : lrs.entrySet()){
+ String key = entry.getKey();
+ LocalResource lr = entry.getValue();
+ localResourcesBuilder.setName(key);
+ localResourcesBuilder.setUri(
+ DagTypeConverters.convertToDAGPlan(lr.getResource()));
+ localResourcesBuilder.setSize(lr.getSize());
+ localResourcesBuilder.setTimeStamp(lr.getTimestamp());
+ localResourcesBuilder.setType(
+ DagTypeConverters.convertToDAGPlan(lr.getType()));
+ localResourcesBuilder.setVisibility(
+ DagTypeConverters.convertToDAGPlan(lr.getVisibility()));
+ if(lr.getType() == LocalResourceType.PATTERN){
+ if (lr.getPattern() == null || lr.getPattern().isEmpty()) {
+ throw new TezUncheckedException("LocalResource type set to pattern"
+ + " but pattern is null or empty");
+ }
+ localResourcesBuilder.setPattern(lr.getPattern());
+ }
+ taskConfigBuilder.addLocalResource(localResourcesBuilder);
+ }
+
+ if(vertex.getTaskEnvironment() != null){
+ for(String key : vertex.getTaskEnvironment().keySet()){
+ PlanKeyValuePair.Builder envSettingBuilder = PlanKeyValuePair.newBuilder();
+ envSettingBuilder.setKey(key);
+ envSettingBuilder.setValue(vertex.getTaskEnvironment().get(key));
+ taskConfigBuilder.addEnvironmentSetting(envSettingBuilder);
+ }
+ }
+
+ if(vertex.getTaskLocationsHint() != null ){
+ if(vertex.getTaskLocationsHint().getTaskLocationHints() != null){
+ for(TaskLocationHint hint : vertex.getTaskLocationsHint().getTaskLocationHints()){
+ PlanTaskLocationHint.Builder taskLocationHintBuilder = PlanTaskLocationHint.newBuilder();
+
+ if(hint.getDataLocalHosts() != null){
+ taskLocationHintBuilder.addAllHost(hint.getDataLocalHosts());
+ }
+ if(hint.getRacks() != null){
+ taskLocationHintBuilder.addAllRack(hint.getRacks());
+ }
+
+ vertexBuilder.addTaskLocationHint(taskLocationHintBuilder);
+ }
+ }
+ }
+
+ for(String inEdgeId : vertex.getInputEdgeIds()){
+ vertexBuilder.addInEdgeId(inEdgeId);
+ }
+
+ for(String outEdgeId : vertex.getOutputEdgeIds()){
+ vertexBuilder.addOutEdgeId(outEdgeId);
+ }
+
+ vertexBuilder.setTaskConfig(taskConfigBuilder);
+ dagBuilder.addVertex(vertexBuilder);
+ }
+
+ for(Edge edge : edges){
+ EdgePlan.Builder edgeBuilder = EdgePlan.newBuilder();
+ edgeBuilder.setId(edge.getId());
+ edgeBuilder.setInputVertexName(edge.getInputVertex().getVertexName());
+ edgeBuilder.setOutputVertexName(edge.getOutputVertex().getVertexName());
+ edgeBuilder.setDataMovementType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getDataMovementType()));
+ edgeBuilder.setDataSourceType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getDataSourceType()));
+ edgeBuilder.setSchedulingType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getSchedulingType()));
+ edgeBuilder.setEdgeSource(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeSource()));
+ edgeBuilder.setEdgeDestination(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeDestination()));
+ dagBuilder.addEdge(edgeBuilder);
+ }
+
+ if(dagConf != null) {
+ Iterator<Entry<String, String>> iter = dagConf.iterator();
+ ConfigurationProto.Builder confProtoBuilder =
+ ConfigurationProto.newBuilder();
+ while (iter.hasNext()) {
+ Entry<String, String> entry = iter.next();
+ PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
+ kvp.setKey(entry.getKey());
+ kvp.setValue(entry.getValue());
+ confProtoBuilder.addConfKeyValues(kvp);
+ }
+ dagBuilder.setDagKeyValues(confProtoBuilder);
+ }
+
+ return dagBuilder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
new file mode 100644
index 0000000..1fd78f1
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataSourceType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceVisibility;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+
+import com.google.protobuf.ByteString;
+
+
+public class DagTypeConverters {
+
+ public static PlanLocalResourceVisibility convertToDAGPlan(LocalResourceVisibility visibility){
+ switch(visibility){
+ case PUBLIC : return PlanLocalResourceVisibility.PUBLIC;
+ case PRIVATE : return PlanLocalResourceVisibility.PRIVATE;
+ case APPLICATION : return PlanLocalResourceVisibility.APPLICATION;
+ default : throw new RuntimeException("unknown 'visibility': " + visibility);
+ }
+ }
+
+ public static LocalResourceVisibility convertFromDAGPlan(PlanLocalResourceVisibility visibility){
+ switch(visibility){
+ case PUBLIC : return LocalResourceVisibility.PUBLIC;
+ case PRIVATE : return LocalResourceVisibility.PRIVATE;
+ case APPLICATION : return LocalResourceVisibility.APPLICATION;
+ default : throw new RuntimeException("unknown 'visibility': " + visibility);
+ }
+ }
+
+ public static PlanEdgeDataSourceType convertToDAGPlan(DataSourceType sourceType){
+ switch(sourceType){
+ case PERSISTED : return PlanEdgeDataSourceType.PERSISTED;
+ case PERSISTED_RELIABLE : return PlanEdgeDataSourceType.PERSISTED_RELIABLE;
+ case EPHEMERAL : return PlanEdgeDataSourceType.EPHEMERAL;
+ default : throw new RuntimeException("unknown 'dataSourceType': " + sourceType);
+ }
+ }
+
+ public static DataSourceType convertFromDAGPlan(PlanEdgeDataSourceType sourceType){
+ switch(sourceType){
+ case PERSISTED : return DataSourceType.PERSISTED;
+ case PERSISTED_RELIABLE : return DataSourceType.PERSISTED_RELIABLE;
+ case EPHEMERAL : return DataSourceType.EPHEMERAL;
+ default : throw new RuntimeException("unknown 'dataSourceType': " + sourceType);
+ }
+ }
+
+ public static PlanEdgeDataMovementType convertToDAGPlan(DataMovementType type){
+ switch(type){
+ case ONE_TO_ONE : return PlanEdgeDataMovementType.ONE_TO_ONE;
+ case BROADCAST : return PlanEdgeDataMovementType.BROADCAST;
+ case SCATTER_GATHER : return PlanEdgeDataMovementType.SCATTER_GATHER;
+ default : throw new RuntimeException("unknown 'dataMovementType': " + type);
+ }
+ }
+
+ public static DataMovementType convertFromDAGPlan(PlanEdgeDataMovementType type){
+ switch(type){
+ case ONE_TO_ONE : return DataMovementType.ONE_TO_ONE;
+ case BROADCAST : return DataMovementType.BROADCAST;
+ case SCATTER_GATHER : return DataMovementType.SCATTER_GATHER;
+ default : throw new IllegalArgumentException("unknown 'dataMovementType': " + type);
+ }
+ }
+
+ public static PlanEdgeSchedulingType convertToDAGPlan(SchedulingType type){
+ switch(type){
+ case SEQUENTIAL : return PlanEdgeSchedulingType.SEQUENTIAL;
+ case CONCURRENT : return PlanEdgeSchedulingType.CONCURRENT;
+ default : throw new RuntimeException("unknown 'SchedulingType': " + type);
+ }
+ }
+
+ public static SchedulingType convertFromDAGPlan(PlanEdgeSchedulingType type){
+ switch(type){
+ case SEQUENTIAL : return SchedulingType.SEQUENTIAL;
+ case CONCURRENT : return SchedulingType.CONCURRENT;
+ default : throw new IllegalArgumentException("unknown 'SchedulingType': " + type);
+ }
+ }
+
+ public static PlanLocalResourceType convertToDAGPlan(LocalResourceType type) {
+ switch(type){
+ case ARCHIVE : return PlanLocalResourceType.ARCHIVE;
+ case FILE : return PlanLocalResourceType.FILE;
+ case PATTERN : return PlanLocalResourceType.PATTERN;
+ default : throw new IllegalArgumentException("unknown 'type': " + type);
+ }
+ }
+
+ public static LocalResourceType convertFromDAGPlan(PlanLocalResourceType type) {
+ switch(type){
+ case ARCHIVE : return LocalResourceType.ARCHIVE;
+ case FILE : return LocalResourceType.FILE;
+ case PATTERN : return LocalResourceType.PATTERN;
+ default : throw new IllegalArgumentException("unknown 'type': " + type);
+ }
+ }
+
+ public static VertexLocationHint convertFromDAGPlan(
+ List<PlanTaskLocationHint> locationHints) {
+
+ List<TaskLocationHint> outputList = new ArrayList<TaskLocationHint>();
+
+ for(PlanTaskLocationHint inputHint : locationHints){
+ TaskLocationHint outputHint = new TaskLocationHint(
+ new HashSet<String>(inputHint.getHostList()),
+ new HashSet<String>(inputHint.getRackList()));
+ outputList.add(outputHint);
+ }
+ return new VertexLocationHint(outputList.size(), outputList);
+ }
+
+ // notes re HDFS URL handling:
+ // Resource URLs in the protobuf message are strings of the form hdfs://host:port/path
+ // org.apache.hadoop.fs.Path.Path is actually a URI type that allows any scheme
+ // org.apache.hadoop.yarn.api.records.URL is a URL type used by YARN.
+ // java.net.URL cannot be used out of the box as it rejects unknown schemes such as HDFS.
+
+ public static String convertToDAGPlan(URL resource) {
+ // see above notes on HDFS URL handling
+ String out = resource.getScheme() + "://" + resource.getHost() + ":" + resource.getPort()
+ + resource.getFile();
+ return out;
+ }
+
+ public static Map<String, LocalResource> createLocalResourceMapFromDAGPlan(
+ List<PlanLocalResource> localResourcesList) {
+ Map<String, LocalResource> map = new HashMap<String, LocalResource>();
+ for(PlanLocalResource res : localResourcesList){
+ LocalResource r = new LocalResourcePBImpl();
+
+ //NOTE: have to check every optional field in protobuf generated classes for existence before accessing
+ //else we will receive a default value back, eg ""
+ if(res.hasPattern()){
+ r.setPattern(res.getPattern());
+ }
+ r.setResource(ConverterUtils.getYarnUrlFromPath(new Path(res.getUri()))); // see above notes on HDFS URL handling
+ r.setSize(res.getSize());
+ r.setTimestamp(res.getTimeStamp());
+ r.setType(DagTypeConverters.convertFromDAGPlan(res.getType()));
+ r.setVisibility(DagTypeConverters.convertFromDAGPlan(res.getVisibility()));
+ map.put(res.getName(), r);
+ }
+ return map;
+ }
+
+ public static Map<String, String> createEnvironmentMapFromDAGPlan(
+ List<PlanKeyValuePair> environmentSettingList) {
+
+ Map<String, String> map = new HashMap<String, String>();
+ for(PlanKeyValuePair setting : environmentSettingList){
+ map.put(setting.getKey(), setting.getValue());
+ }
+
+ return map;
+ }
+
+ public static Map<String, EdgePlan> createEdgePlanMapFromDAGPlan(List<EdgePlan> edgeList){
+ Map<String, EdgePlan> edgePlanMap =
+ new HashMap<String, EdgePlan>();
+ for(EdgePlan edgePlanItem : edgeList){
+ edgePlanMap.put(edgePlanItem.getId(), edgePlanItem);
+ }
+ return edgePlanMap;
+ }
+
+ public static EdgeProperty createEdgePropertyMapFromDAGPlan(EdgePlan edge) {
+ return new EdgeProperty(
+ convertFromDAGPlan(edge.getDataMovementType()),
+ convertFromDAGPlan(edge.getDataSourceType()),
+ convertFromDAGPlan(edge.getSchedulingType()),
+ convertOutputDescriptorFromDAGPlan(edge.getEdgeSource()),
+ convertInputDescriptorFromDAGPlan(edge.getEdgeDestination())
+ );
+ }
+
+ public static Resource createResourceRequestFromTaskConfig(
+ PlanTaskConfiguration taskConfig) {
+ return Resource.newInstance(taskConfig.getMemoryMb(), taskConfig.getVirtualCores());
+ }
+
+ public static Map<String, String> convertConfFromProto(
+ ConfigurationProto confProto) {
+ List<PlanKeyValuePair> settingList = confProto.getConfKeyValuesList();
+ Map<String, String> map = new HashMap<String, String>();
+ for(PlanKeyValuePair setting: settingList){
+ map.put(setting.getKey(), setting.getValue());
+ }
+ return map;
+ }
+
+ public static TezEntityDescriptorProto convertToDAGPlan(
+ TezEntityDescriptor descriptor) {
+ TezEntityDescriptorProto.Builder builder = TezEntityDescriptorProto
+ .newBuilder();
+ builder.setClassName(descriptor.getClassName());
+ if (descriptor.getUserPayload() != null) {
+ builder
+ .setUserPayload(ByteString.copyFrom(descriptor.getUserPayload()));
+ }
+ return builder.build();
+ }
+
+ public static InputDescriptor convertInputDescriptorFromDAGPlan(
+ TezEntityDescriptorProto proto) {
+ String className = proto.getClassName();
+ byte[] bb = null;
+ if (proto.hasUserPayload()) {
+ bb = proto.getUserPayload().toByteArray();
+ }
+ return new InputDescriptor(className).setUserPayload(bb);
+ }
+
+ public static OutputDescriptor convertOutputDescriptorFromDAGPlan(
+ TezEntityDescriptorProto proto) {
+ String className = proto.getClassName();
+ byte[] bb = null;
+ if (proto.hasUserPayload()) {
+ bb = proto.getUserPayload().toByteArray();
+ }
+ return new OutputDescriptor(className).setUserPayload(bb);
+ }
+
+ public static ProcessorDescriptor convertProcessorDescriptorFromDAGPlan(
+ TezEntityDescriptorProto proto) {
+ String className = proto.getClassName();
+ byte[] bb = null;
+ if (proto.hasUserPayload()) {
+ bb = proto.getUserPayload().toByteArray();
+ }
+ return new ProcessorDescriptor(className).setUserPayload(bb);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java b/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
new file mode 100644
index 0000000..a893bc3
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api;
+
+public class Edge{
+
+ private final Vertex inputVertex;
+ private final Vertex outputVertex;
+ private final EdgeProperty edgeProperty;
+
+ public Edge(Vertex inputVertex,
+ Vertex outputVertex,
+ EdgeProperty edgeProperty) {
+ this.inputVertex = inputVertex;
+ this.outputVertex = outputVertex;
+ this.edgeProperty = edgeProperty;
+ }
+
+ // RENAME to source and destination
+ public Vertex getInputVertex() {
+ return inputVertex;
+ }
+
+ public Vertex getOutputVertex() {
+ return outputVertex;
+ }
+
+ public EdgeProperty getEdgeProperty() {
+ return edgeProperty;
+ }
+
+ /*
+ * Used to identify the edge in the configuration
+ */
+ public String getId() {
+ return String.valueOf(this.hashCode());
+ }
+
+ @Override
+ public String toString() {
+ return inputVertex + " -> " + outputVertex + " (" + edgeProperty + ")";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
new file mode 100644
index 0000000..326d3d0
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api;
+
+public class EdgeProperty {
+
+ /**
+ * Defines the manner of data movement between source and destination tasks.
+ * Determines which destination tasks have access to data produced on this
+ * edge by a source task. A destination task may choose to read any portion of
+ * the data available to it.
+ */
+ public enum DataMovementType {
+ /**
+ * Output on this edge produced by the i-th source task is available to the
+ * i-th destination task.
+ */
+ ONE_TO_ONE,
+ /**
+ * Output on this edge produced by any source task is available to all
+ * destination tasks.
+ */
+ BROADCAST,
+ /**
+ * The i-th output on this edge produced by all source tasks is available to
+ * the same destination task. Source tasks scatter their outputs and they
+ * are gathered by designated destination tasks.
+ */
+ SCATTER_GATHER
+ }
+
+ /**
+ * Determines the lifetime of the data produced on this edge by a source task.
+ */
+ public enum DataSourceType {
+ /**
+ * Data produced by the source is persisted and available even when the
+ * task is not running. The data may become unavailable and may cause the
+ * source task to be re-executed.
+ */
+ PERSISTED,
+ /**
+ * Source data is stored reliably and will always be available
+ */
+ PERSISTED_RELIABLE,
+ /**
+ * Data produced by the source task is available only while the source task
+ * is running. This requires the destination task to run concurrently with
+ * the source task.
+ */
+ EPHEMERAL
+ }
+
+ /**
+ * Determines when the destination task is eligible to run, once the source
+ * task is eligible to run.
+ */
+ public enum SchedulingType {
+ /**
+ * Destination task is eligible to run after one or more of its source tasks
+ * have started or completed.
+ */
+ SEQUENTIAL,
+ /**
+ * Destination task must run concurrently with the source task
+ */
+ CONCURRENT
+ }
+
+ DataMovementType dataMovementType;
+ DataSourceType dataSourceType;
+ SchedulingType schedulingType;
+ InputDescriptor inputDescriptor;
+ OutputDescriptor outputDescriptor;
+
+ /**
+ * @param dataMovementType
+ * @param dataSourceType
+ * @param edgeSource
+ * The {@link OutputDescriptor} that generates data on the edge.
+ * @param edgeDestination
+ * The {@link InputDescriptor} which will consume data from the edge.
+ */
+ public EdgeProperty(DataMovementType dataMovementType,
+ DataSourceType dataSourceType,
+ SchedulingType schedulingType,
+ OutputDescriptor edgeSource,
+ InputDescriptor edgeDestination) {
+ this.dataMovementType = dataMovementType;
+ this.dataSourceType = dataSourceType;
+ this.schedulingType = schedulingType;
+ this.inputDescriptor = edgeDestination;
+ this.outputDescriptor = edgeSource;
+ }
+
+ public DataMovementType getDataMovementType() {
+ return dataMovementType;
+ }
+
+ public DataSourceType getDataSourceType() {
+ return dataSourceType;
+ }
+
+ public SchedulingType getSchedulingType() {
+ return schedulingType;
+ }
+
+ /**
+ * Returns the {@link InputDescriptor} which will consume data from the edge.
+ *
+ * @return
+ */
+ public InputDescriptor getEdgeDestination() {
+ return inputDescriptor;
+ }
+
+ /**
+ * Returns the {@link OutputDescriptor} which produces data on the edge.
+ *
+ * @return
+ */
+ public OutputDescriptor getEdgeSource() {
+ return outputDescriptor;
+ }
+
+ @Override
+ public String toString() {
+ return "{ " + dataMovementType + " : " + inputDescriptor.getClassName()
+ + " >> " + dataSourceType + " >> " + outputDescriptor.getClassName() + " }";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
new file mode 100644
index 0000000..dea9001
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+public class InputDescriptor extends TezEntityDescriptor {
+
+ public InputDescriptor(String inputClassName) {
+ super(inputClassName);
+ }
+
+ @Override
+ public InputDescriptor setUserPayload(byte[] userPayload) {
+ this.userPayload = userPayload;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
new file mode 100644
index 0000000..16fb9b1
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+public class OutputDescriptor extends TezEntityDescriptor {
+
+ public OutputDescriptor(String outputClassName) {
+ super(outputClassName);
+ }
+
+ @Override
+ public OutputDescriptor setUserPayload(byte[] userPayload) {
+ this.userPayload = userPayload;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
new file mode 100644
index 0000000..092147d
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+public class ProcessorDescriptor extends TezEntityDescriptor {
+
+ public ProcessorDescriptor(String processorClassName) {
+ super(processorClassName);
+ }
+
+ public ProcessorDescriptor setUserPayload(byte[] userPayload) {
+ this.userPayload = userPayload;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
new file mode 100644
index 0000000..7447974
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class TezConfiguration extends Configuration {
+
+ public final static String TEZ_SITE_XML = "tez-site.xml";
+
+ static {
+ addDefaultResource(TEZ_SITE_XML);
+ }
+
+ public TezConfiguration() {
+ super();
+ }
+
+ public TezConfiguration(Configuration conf) {
+ super(conf);
+ }
+
+ public static final String TEZ_PREFIX = "tez.";
+ public static final String TEZ_AM_PREFIX = TEZ_PREFIX + "am.";
+ public static final String TEZ_TASK_PREFIX = TEZ_PREFIX + "task.";
+
+ public static final String TEZ_AM_STAGING_DIR = TEZ_PREFIX + "staging-dir";
+ public static final String TEZ_AM_STAGING_DIR_DEFAULT = "/tmp/tez/staging";
+
+ // TODO Should not be required once all tokens are handled via AppSubmissionContext
+ public static final String JOB_SUBMIT_DIR = TEZ_PREFIX + "jobSubmitDir";
+ public static final String APPLICATION_TOKENS_FILE = "appTokens";
+ public static final String TEZ_APPLICATION_MASTER_CLASS =
+ "org.apache.tez.dag.app.DAGAppMaster";
+
+ /** Root Logging level passed to the Tez app master.*/
+ public static final String TEZ_AM_LOG_LEVEL = TEZ_AM_PREFIX+"log.level";
+ public static final String TEZ_AM_LOG_LEVEL_DEFAULT = "INFO";
+
+ public static final String TEZ_AM_JAVA_OPTS = TEZ_AM_PREFIX
+ + "java.opts";
+ public static final String DEFAULT_TEZ_AM_JAVA_OPTS = " -Xmx1024m ";
+
+ public static final String TEZ_AM_CANCEL_DELEGATION_TOKEN = TEZ_AM_PREFIX +
+ "am.complete.cancel.delegation.tokens";
+ public static final boolean TEZ_AM_CANCEL_DELEGATION_TOKEN_DEFAULT = true;
+
+ public static final String TEZ_AM_TASK_LISTENER_THREAD_COUNT =
+ TEZ_AM_PREFIX + "task.listener.thread-count";
+ public static final int TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT = 30;
+
+ public static final String TEZ_AM_CONTAINER_LISTENER_THREAD_COUNT =
+ TEZ_AM_PREFIX + "container.listener.thread-count";
+ public static final int TEZ_AM_CONTAINER_LISTENER_THREAD_COUNT_DEFAULT = 30;
+
+ // TODO Some of the DAG properties are job specific and not AM specific. Rename accordingly.
+ // TODO Are any of these node blacklisting properties required. (other than for MR compat)
+ public static final String TEZ_AM_MAX_TASK_FAILURES_PER_NODE = TEZ_AM_PREFIX
+ + "maxtaskfailures.per.node";
+ public static final int TEZ_AM_MAX_TASK_FAILURES_PER_NODE_DEFAULT = 3;
+
+ public static final String TEZ_AM_MAX_TASK_ATTEMPTS =
+ TEZ_AM_PREFIX + "max.task.attempts";
+ public static final int TEZ_AM_MAX_TASK_ATTEMPTS_DEFAULT = 4;
+
+ public static final String TEZ_AM_NODE_BLACKLISTING_ENABLED = TEZ_AM_PREFIX
+ + "node-blacklisting.enabled";
+ public static final boolean TEZ_AM_NODE_BLACKLISTING_ENABLED_DEFAULT = true;
+ public static final String TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD = TEZ_AM_PREFIX
+ + "node-blacklisting.ignore-threshold-node-percent";
+ public static final int TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT = 33;
+
+ /** Number of threads to handle job client RPC requests.*/
+ public static final String TEZ_AM_CLIENT_THREAD_COUNT =
+ TEZ_AM_PREFIX + "client.am.thread-count";
+ public static final int TEZ_AM_CLIENT_THREAD_COUNT_DEFAULT = 1;
+ /**
+ * Range of ports that the AM can use when binding. Leave blank
+ * if you want all possible ports.
+ */
+ public static final String TEZ_AM_CLIENT_AM_PORT_RANGE =
+ TEZ_AM_PREFIX + "client.am.port-range";
+
+
+ public static final String TEZ_AM_RESOURCE_MEMORY_MB = TEZ_AM_PREFIX
+ + "resource.memory.mb";
+ public static final int TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT = 1536;
+
+ public static final String TEZ_AM_RESOURCE_CPU_VCORES = TEZ_AM_PREFIX
+ + "resource.cpu.vcores";
+ public static final int TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT = 1;
+
+ public static final String
+ TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION = TEZ_AM_PREFIX
+ + "shuffle-vertex-manager.min-src-fraction";
+ public static final float
+ TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT = 0.25f;
+
+ public static final String
+ TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION = TEZ_AM_PREFIX
+ + "shuffle-vertex-manager.max-src-fraction";
+ public static final float
+ TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT = 0.75f;
+
+ public static final String
+ TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL = TEZ_AM_PREFIX +
+ "shuffle-vertex-manager.enable.auto-parallel";
+ public static final boolean
+ TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL_DEFAULT = false;
+
+ public static final String
+ TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE = TEZ_AM_PREFIX +
+ "shuffle-vertex-manager.desired-task-input-size";
+ public static final long
+ TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT =
+ 1024*1024*100L;
+
+ public static final String
+ TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM = TEZ_AM_PREFIX +
+ "shuffle-vertex-manager.min-task-parallelism";
+ public static final int
+ TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM_DEFAULT = 1;
+
+ public static final String
+ TEZ_AM_SLOWSTART_DAG_SCHEDULER_MIN_SHUFFLE_RESOURCE_FRACTION = TEZ_AM_PREFIX
+ + "slowstart-dag-scheduler.min-resource-fraction";
+ public static final float
+ TEZ_AM_SLOWSTART_DAG_SCHEDULER_MIN_SHUFFLE_RESOURCE_FRACTION_DEFAULT = 0.5f;
+
+ public static final String TEZ_AM_AGGRESSIVE_SCHEDULING = TEZ_AM_PREFIX +
+ "aggressive.scheduling";
+ public static boolean TEZ_AM_AGGRESSIVE_SCHEDULING_DEFAULT = false;
+
+ /**
+ * The complete path to the serialized dag plan file
+ * <code>TEZ_AM_PLAN_PB_BINARY</code>. Used to make the plan available to
+ * individual tasks if needed. This will typically be a path in the job submit
+ * directory.
+ */
+ public static final String TEZ_AM_PLAN_REMOTE_PATH = TEZ_AM_PREFIX
+ + "dag-am-plan.remote.path";
+
+ public static final String TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX = TEZ_AM_PREFIX
+ + "am-rm.heartbeat.interval-ms.max";
+ public static final int TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX_DEFAULT = 1000;
+
+ public static final String TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX = TEZ_TASK_PREFIX
+ + "get-task.sleep.interval-ms.max";
+ public static final int TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT = 500;
+
+ public static final String TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS = TEZ_TASK_PREFIX
+ + "am.heartbeat.interval-ms.max";
+ public static final int TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 100;
+
+ public static final String TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT = TEZ_TASK_PREFIX
+ + "max-events-per-heartbeat.max";
+ public static final int TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT = 100;
+
+ /**
+ * Configuration to specify whether container should be reused.
+ */
+ public static final String TEZ_AM_CONTAINER_REUSE_ENABLED = TEZ_AM_PREFIX
+ + "container.reuse.enabled";
+ public static final boolean TEZ_AM_CONTAINER_REUSE_ENABLED_DEFAULT = true;
+
+ /**
+ * Whether to reuse containers for rack local tasks. Active only if reuse is
+ * enabled.
+ */
+ public static final String TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED = TEZ_AM_PREFIX
+ + "container.reuse.rack-fallback.enabled";
+ public static final boolean TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED_DEFAULT = true;
+
+ /**
+ * Whether to reuse containers for non-local tasks. Active only if reuse is
+ * enabled.
+ */
+ public static final String TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED = TEZ_AM_PREFIX
+ + "container.reuse.non-local-fallback.enabled";
+ public static final boolean TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED_DEFAULT = false;
+
+ public static final String TEZ_AM_CONTAINER_REUSE_DELAY_ALLOCATION_MILLIS = TEZ_AM_PREFIX
+ + "container.reuse.delay-allocation-millis";
+ public static final long TEZ_AM_CONTAINER_REUSE_DELAY_ALLOCATION_MILLIS_DEFAULT = 3000l;
+
+ public static final String TEZ_PB_BINARY_CONF_NAME = "tez-conf.pb";
+ public static final String TEZ_PB_PLAN_BINARY_NAME = "tez-dag.pb";
+ public static final String TEZ_PB_PLAN_TEXT_NAME = "tez-dag.pb.txt";
+
+ /*
+ * Logger properties
+ */
+ public static final String TEZ_CONTAINER_LOG4J_PROPERTIES_FILE = "tez-container-log4j.properties";
+ public static final String TEZ_CONTAINER_LOGGER_NAME = "CLA";
+ public static final String TEZ_ROOT_LOGGER_NAME = "tez.root.logger";
+ public static final String TEZ_CONTAINER_LOG_FILE_NAME = "syslog";
+ public static final String TEZ_CONTAINER_ERR_FILE_NAME = "stderr";
+ public static final String TEZ_CONTAINER_OUT_FILE_NAME = "stdout";
+
+
+ public static final String TEZ_LIB_URIS =
+ TEZ_PREFIX + "lib.uris";
+
+ public static final String TEZ_APPLICATION_TYPE = "TEZ-MR*";
+
+ public static final String LOCAL_FRAMEWORK_NAME = "local-tez";
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
new file mode 100644
index 0000000..5463d65
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+/**
+ * Specifies all constant values in Tez
+ */
+public class TezConstants {
+
+ // Env variable names
+ public static final String TEZ_AM_IS_SESSION_ENV = "TEZ_AM_IS_SESSION";
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
new file mode 100644
index 0000000..9d4b2c4
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+public abstract class TezEntityDescriptor {
+
+ protected byte[] userPayload;
+ private String className;
+
+ public TezEntityDescriptor(String className) {
+ this.className = className;
+ }
+
+ public byte[] getUserPayload() {
+ return this.userPayload;
+ }
+
+ public TezEntityDescriptor setUserPayload(byte[] userPayload) {
+ this.userPayload = userPayload;
+ return this;
+ }
+
+ public String getClassName() {
+ return this.className;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/TezException.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezException.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezException.java
new file mode 100644
index 0000000..e3b14e7
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezException.java
@@ -0,0 +1,31 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api;
+
+/**
+ * Base TezException
+ */
+public class TezException extends Exception {
+ private static final long serialVersionUID = 6337442733802964447L;
+ public TezException(Throwable cause) { super(cause); }
+ public TezException(String message) { super(message); }
+ public TezException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/TezUncheckedException.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezUncheckedException.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezUncheckedException.java
new file mode 100644
index 0000000..f55f6dd
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezUncheckedException.java
@@ -0,0 +1,33 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api;
+
+/**
+ * Base Tez Unchecked Exception
+ */
+public class TezUncheckedException extends RuntimeException {
+
+ private static final long serialVersionUID = -4956339297375386184L;
+
+ public TezUncheckedException(Throwable cause) { super(cause); }
+ public TezUncheckedException(String message) { super(message); }
+ public TezUncheckedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
new file mode 100644
index 0000000..900822b
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+
+public class Vertex { // FIXME rename to Task
+
+ private final String vertexName;
+ private final ProcessorDescriptor processorDescriptor;
+
+ private final int parallelism;
+ private VertexLocationHint taskLocationsHint;
+ private final Resource taskResource;
+ private Map<String, LocalResource> taskLocalResources;
+ private Map<String, String> taskEnvironment;
+
+ private final List<Vertex> inputVertices = new ArrayList<Vertex>();
+ private final List<Vertex> outputVertices = new ArrayList<Vertex>();
+ private final List<String> inputEdgeIds = new ArrayList<String>();
+ private final List<String> outputEdgeIds = new ArrayList<String>();
+ private String javaOpts = "";
+
+
+ public Vertex(String vertexName,
+ ProcessorDescriptor processorDescriptor,
+ int parallelism,
+ Resource taskResource) {
+ this.vertexName = vertexName;
+ this.processorDescriptor = processorDescriptor;
+ this.parallelism = parallelism;
+ this.taskResource = taskResource;
+ if (parallelism == 0) {
+ throw new IllegalArgumentException("Parallelism cannot be 0");
+ }
+ if (taskResource == null) {
+ throw new IllegalArgumentException("Resource cannot be null");
+ }
+ }
+
+ public String getVertexName() { // FIXME rename to getName()
+ return vertexName;
+ }
+
+ public ProcessorDescriptor getProcessorDescriptor() {
+ return this.processorDescriptor;
+ }
+
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ public Resource getTaskResource() {
+ return taskResource;
+ }
+
+ public Vertex setTaskLocationsHint(List<TaskLocationHint> locations) {
+ if (locations == null) {
+ return this;
+ }
+ assert locations.size() == parallelism;
+ taskLocationsHint = new VertexLocationHint(parallelism, locations);
+ return this;
+ }
+
+ // used internally to create parallelism location resource file
+ VertexLocationHint getTaskLocationsHint() {
+ return taskLocationsHint;
+ }
+
+ public Vertex setTaskLocalResources(Map<String, LocalResource> localResources) {
+ this.taskLocalResources = localResources;
+ return this;
+ }
+
+ public Map<String, LocalResource> getTaskLocalResources() {
+ return taskLocalResources;
+ }
+
+ public Vertex setTaskEnvironment(Map<String, String> environment) {
+ this.taskEnvironment = environment;
+ return this;
+ }
+
+ public Map<String, String> getTaskEnvironment() {
+ return taskEnvironment;
+ }
+
+ public Vertex setJavaOpts(String javaOpts){
+ this. javaOpts = javaOpts;
+ return this;
+ }
+
+ public String getJavaOpts(){
+ return javaOpts;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + vertexName + " : " + processorDescriptor.getClassName() + "]";
+ }
+
+ void addInputVertex(Vertex inputVertex, String edgeId) {
+ inputVertices.add(inputVertex);
+ inputEdgeIds.add(edgeId);
+ }
+
+ void addOutputVertex(Vertex outputVertex, String edgeId) {
+ outputVertices.add(outputVertex);
+ outputEdgeIds.add(edgeId);
+ }
+
+ List<Vertex> getInputVertices() {
+ return inputVertices;
+ }
+
+ List<Vertex> getOutputVertices() {
+ return outputVertices;
+ }
+
+ List<String> getInputEdgeIds() {
+ return inputEdgeIds;
+ }
+
+ List<String> getOutputEdgeIds() {
+ return outputEdgeIds;
+ }
+
+ // FIXME how do we support profiling? Can't profile all tasks.
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
new file mode 100644
index 0000000..4f19314
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class VertexLocationHint {
+
+ private final int numTasks;
+ private final List<TaskLocationHint> taskLocationHints;
+
+ public VertexLocationHint(int numTasks,
+ List<TaskLocationHint> taskLocationHints) {
+ this.numTasks = numTasks;
+ if (taskLocationHints != null) {
+ this.taskLocationHints = Collections.unmodifiableList(taskLocationHints);
+ } else {
+ this.taskLocationHints = null;
+ }
+ }
+
+ public int getNumTasks() {
+ return numTasks;
+ }
+
+ public List<TaskLocationHint> getTaskLocationHints() {
+ return taskLocationHints;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 7883;
+ int result = 1;
+ result = prime * result + numTasks;
+ if (taskLocationHints != null) {
+ result = prime * result + taskLocationHints.hashCode();
+ }
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ VertexLocationHint other = (VertexLocationHint) obj;
+ if (numTasks != other.numTasks) {
+ return false;
+ }
+ if (taskLocationHints != null) {
+ if (!taskLocationHints.equals(other.taskLocationHints)) {
+ return false;
+ }
+ } else if (other.taskLocationHints != null) {
+ return false;
+ }
+ return true;
+ }
+
+ public static class TaskLocationHint {
+
+ // Host names if any to be used
+ private final Set<String> hosts;
+ // Rack names if any to be used
+ private final Set<String> racks;
+
+ public TaskLocationHint(Set<String> hosts, Set<String> racks) {
+ if (hosts != null) {
+ this.hosts = Collections.unmodifiableSet(hosts);
+ } else {
+ this.hosts = null;
+ }
+ if (racks != null) {
+ this.racks = Collections.unmodifiableSet(racks);
+ } else {
+ this.racks = null;
+ }
+ }
+
+ public Set<String> getDataLocalHosts() {
+ return hosts;
+ }
+
+ public Set<String> getRacks() {
+ return racks;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 9397;
+ int result = 1;
+ result = ( hosts != null) ?
+ prime * result + hosts.hashCode() :
+ result + prime;
+ result = ( racks != null) ?
+ prime * result + racks.hashCode() :
+ result + prime;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ TaskLocationHint other = (TaskLocationHint) obj;
+ if (hosts != null) {
+ if (!hosts.equals(other.hosts)) {
+ return false;
+ }
+ } else if (other.hosts != null) {
+ return false;
+ }
+ if (racks != null) {
+ if (!racks.equals(other.racks)) {
+ return false;
+ }
+ } else if (other.racks != null) {
+ return false;
+ }
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
new file mode 100644
index 0000000..9062e8e
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
@@ -0,0 +1,67 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.tez.dag.api.TezException;
+
+/*
+ * Interface class for monitoring the <code>DAG</code> running in a Tez DAG
+ * Application Master.
+ */
+public interface DAGClient extends Closeable {
+
+ /**
+ * Get the YARN ApplicationId for the app running the DAG
+ * @return <code>ApplicationId</code>
+ */
+ public ApplicationId getApplicationId();
+
+ @Private
+ /**
+ * Get the YARN ApplicationReport for the app running the DAG. For performance
+ * reasons this may be stale copy and should be used to access static info. It
+ * may be null.
+ * @return <code>ApplicationReport</code> or null
+ */
+ public ApplicationReport getApplicationReport();
+
+ /**
+ * Get the status of the specified DAG
+ */
+ public DAGStatus getDAGStatus() throws IOException, TezException;
+
+ /**
+ * Get the status of a Vertex of a DAG
+ */
+ public VertexStatus getVertexStatus(String vertexName)
+ throws IOException, TezException;
+
+ /**
+ * Kill a running DAG
+ *
+ */
+ public void tryKillDAG() throws TezException, IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
new file mode 100644
index 0000000..d61173d
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
@@ -0,0 +1,130 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api.client;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProtoOrBuilder;
+import org.apache.tez.dag.api.records.DAGProtos.StringProgressPairProto;
+import org.apache.tez.dag.api.TezUncheckedException;
+
+public class DAGStatus {
+
+ public enum State {
+ SUBMITTED,
+ INITING,
+ RUNNING,
+ SUCCEEDED,
+ KILLED,
+ FAILED,
+ ERROR,
+ };
+
+ DAGStatusProtoOrBuilder proxy = null;
+ Progress progress = null;
+ Map<String, Progress> vertexProgress = null;
+
+ public DAGStatus(DAGStatusProtoOrBuilder proxy) {
+ this.proxy = proxy;
+ }
+
+ public State getState() {
+ switch(proxy.getState()) {
+ case DAG_SUBMITTED:
+ return DAGStatus.State.SUBMITTED;
+ // For simplicity, initing/terminating states are presented as running
+ case DAG_INITING:
+ case DAG_TERMINATING:
+ case DAG_RUNNING:
+ return DAGStatus.State.RUNNING;
+ case DAG_SUCCEEDED:
+ return DAGStatus.State.SUCCEEDED;
+ case DAG_FAILED:
+ return DAGStatus.State.FAILED;
+ case DAG_KILLED:
+ return DAGStatus.State.KILLED;
+ case DAG_ERROR:
+ return DAGStatus.State.ERROR;
+ default:
+ throw new TezUncheckedException("Unsupported value for DAGStatus.State : " +
+ proxy.getState());
+ }
+ }
+
+ public boolean isCompleted() {
+ State state = getState();
+ return (state == State.SUCCEEDED ||
+ state == State.FAILED ||
+ state == State.KILLED ||
+ state == State.ERROR);
+ }
+
+ public List<String> getDiagnostics() {
+ return proxy.getDiagnosticsList();
+ }
+
+ /**
+ * Gets overall progress value of the DAG.
+ *
+ * @return Progress of the DAG. Maybe null when the DAG is not running. Maybe
+ * null when the DAG is running and the application master cannot be
+ * reached - e.g. when the execution platform has restarted the
+ * application master.
+ * @see Progress
+ */
+ public Progress getDAGProgress() {
+ if(progress == null && proxy.hasDAGProgress()) {
+ progress = new Progress(proxy.getDAGProgress());
+ }
+ return progress;
+ }
+
+ /**
+ * Get the progress of a vertex in the DAG
+ *
+ * @return Progress of the vertex. May be null when the DAG is not running.
+ * Maybe null when the DAG is running and the application master
+ * cannot be reached - e.g. when the execution platform has restarted
+ * the application master.
+ * @see Progress
+ */
+ public Map<String, Progress> getVertexProgress() {
+ if(vertexProgress == null) {
+ if(proxy.getVertexProgressList() != null) {
+ List<StringProgressPairProto> kvList = proxy.getVertexProgressList();
+ vertexProgress = new HashMap<String, Progress>(kvList.size());
+ for(StringProgressPairProto kv : kvList){
+ vertexProgress.put(kv.getKey(), new Progress(kv.getProgress()));
+ }
+ }
+ }
+ return vertexProgress;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("status=" + getState()
+ + ", progress=" + getDAGProgress());
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/client/Progress.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/Progress.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/Progress.java
new file mode 100644
index 0000000..9577320
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/Progress.java
@@ -0,0 +1,67 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api.client;
+
+import org.apache.tez.dag.api.records.DAGProtos.ProgressProtoOrBuilder;
+
+public class Progress {
+
+ ProgressProtoOrBuilder proxy = null;
+
+ Progress(ProgressProtoOrBuilder proxy) {
+ this.proxy = proxy;
+ }
+
+ public int getTotalTaskCount() {
+ return proxy.getTotalTaskCount();
+ }
+
+ public int getSucceededTaskCount() {
+ return proxy.getSucceededTaskCount();
+ }
+
+ public int getRunningTaskCount() {
+ return proxy.getRunningTaskCount();
+ }
+
+ public int getFailedTaskCount() {
+ return proxy.getFailedTaskCount();
+ }
+
+ public int getKilledTaskCount() {
+ return proxy.getKilledTaskCount();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("TotalTasks: ");
+ sb.append(getTotalTaskCount());
+ sb.append(" Succeeded: ");
+ sb.append(getSucceededTaskCount());
+ sb.append(" Running: ");
+ sb.append(getRunningTaskCount());
+ sb.append(" Failed: ");
+ sb.append(getFailedTaskCount());
+ sb.append(" Killed: ");
+ sb.append(getKilledTaskCount());
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
new file mode 100644
index 0000000..ce5dbe0
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
@@ -0,0 +1,78 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api.client;
+
+import java.util.List;
+
+import org.apache.tez.dag.api.records.DAGProtos.VertexStatusProtoOrBuilder;
+import org.apache.tez.dag.api.TezUncheckedException;
+
+public class VertexStatus {
+
+ public enum State {
+ INITED,
+ RUNNING,
+ SUCCEEDED,
+ KILLED,
+ FAILED,
+ ERROR,
+ TERMINATING,
+ };
+
+ VertexStatusProtoOrBuilder proxy = null;
+ Progress progress = null;
+
+ public VertexStatus(VertexStatusProtoOrBuilder proxy) {
+ this.proxy = proxy;
+ }
+
+ public State getState() {
+ switch(proxy.getState()) {
+ case VERTEX_INITED:
+ return VertexStatus.State.INITED;
+ case VERTEX_RUNNING:
+ return VertexStatus.State.RUNNING;
+ case VERTEX_SUCCEEDED:
+ return VertexStatus.State.SUCCEEDED;
+ case VERTEX_FAILED:
+ return VertexStatus.State.FAILED;
+ case VERTEX_KILLED:
+ return VertexStatus.State.KILLED;
+ case VERTEX_ERROR:
+ return VertexStatus.State.ERROR;
+ case VERTEX_TERMINATING:
+ return VertexStatus.State.TERMINATING;
+ default:
+ throw new TezUncheckedException("Unsupported value for VertexStatus.State : " +
+ proxy.getState());
+ }
+ }
+
+ public List<String> getDiagnostics() {
+ return proxy.getDiagnosticsList();
+ }
+
+ public Progress getProgress() {
+ if(progress == null && proxy.hasProgress()) {
+ progress = new Progress(proxy.getProgress());
+ }
+ return progress;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPB.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPB.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPB.java
new file mode 100644
index 0000000..a1ee18f
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPB.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.client.rpc;
+
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.DAGClientAMProtocol;
+
+@ProtocolInfo(
+ protocolName = "org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolPB",
+ protocolVersion = 1)
+public interface DAGClientAMProtocolBlockingPB
+ extends DAGClientAMProtocol.BlockingInterface {
+
+}