You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:32 UTC

[25/50] [abbrv] git commit: TEZ-443. Merge tez-dag-api and tez-engine-api into a single module - tez-api (part of TEZ-398). (sseth)

TEZ-443.  Merge tez-dag-api and tez-engine-api into a single module -
tez-api (part of TEZ-398). (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/d316f723
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/d316f723
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/d316f723

Branch: refs/heads/master
Commit: d316f723508c77eb90936a9477812195714b59a2
Parents: b4950f9
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Sep 23 10:44:42 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Sep 23 10:44:42 2013 -0700

----------------------------------------------------------------------
 pom.xml                                         |  10 +-
 tez-api/findbugs-exclude.xml                    |  16 +
 tez-api/pom.xml                                 |  90 +++
 .../org/apache/tez/client/AMConfiguration.java  | 100 ++++
 .../java/org/apache/tez/client/TezClient.java   | 144 +++++
 .../org/apache/tez/client/TezClientUtils.java   | 560 +++++++++++++++++++
 .../java/org/apache/tez/client/TezSession.java  | 184 ++++++
 .../tez/client/TezSessionConfiguration.java     |  57 ++
 .../org/apache/tez/common/TezJobConfig.java     | 314 +++++++++++
 .../tez/common/counters/AbstractCounter.java    |  52 ++
 .../common/counters/AbstractCounterGroup.java   | 208 +++++++
 .../tez/common/counters/AbstractCounters.java   | 385 +++++++++++++
 .../tez/common/counters/CounterGroup.java       |  32 ++
 .../tez/common/counters/CounterGroupBase.java   | 108 ++++
 .../common/counters/CounterGroupFactory.java    | 180 ++++++
 .../apache/tez/common/counters/DAGCounter.java  |  39 ++
 .../tez/common/counters/FileSystemCounter.java  |  30 +
 .../common/counters/FileSystemCounterGroup.java | 327 +++++++++++
 .../common/counters/FrameworkCounterGroup.java  | 275 +++++++++
 .../tez/common/counters/GenericCounter.java     | 109 ++++
 .../apache/tez/common/counters/JobCounter.java  |  45 ++
 .../common/counters/LimitExceededException.java |  36 ++
 .../org/apache/tez/common/counters/Limits.java  | 112 ++++
 .../tez/common/counters/ResourceBundles.java    |  94 ++++
 .../apache/tez/common/counters/TaskCounter.java |  66 +++
 .../apache/tez/common/counters/TezCounter.java  |  83 +++
 .../apache/tez/common/counters/TezCounters.java | 144 +++++
 .../main/java/org/apache/tez/dag/api/DAG.java   | 377 +++++++++++++
 .../apache/tez/dag/api/DagTypeConverters.java   | 278 +++++++++
 .../main/java/org/apache/tez/dag/api/Edge.java  |  59 ++
 .../org/apache/tez/dag/api/EdgeProperty.java    | 147 +++++
 .../org/apache/tez/dag/api/InputDescriptor.java |  32 ++
 .../apache/tez/dag/api/OutputDescriptor.java    |  32 ++
 .../apache/tez/dag/api/ProcessorDescriptor.java |  31 +
 .../apache/tez/dag/api/TezConfiguration.java    | 223 ++++++++
 .../org/apache/tez/dag/api/TezConstants.java    |  29 +
 .../apache/tez/dag/api/TezEntityDescriptor.java |  42 ++
 .../org/apache/tez/dag/api/TezException.java    |  31 +
 .../tez/dag/api/TezUncheckedException.java      |  33 ++
 .../java/org/apache/tez/dag/api/Vertex.java     | 153 +++++
 .../apache/tez/dag/api/VertexLocationHint.java  | 154 +++++
 .../apache/tez/dag/api/client/DAGClient.java    |  67 +++
 .../apache/tez/dag/api/client/DAGStatus.java    | 130 +++++
 .../org/apache/tez/dag/api/client/Progress.java |  67 +++
 .../apache/tez/dag/api/client/VertexStatus.java |  78 +++
 .../rpc/DAGClientAMProtocolBlockingPB.java      |  30 +
 .../dag/api/client/rpc/DAGClientRPCImpl.java    | 291 ++++++++++
 .../java/org/apache/tez/engine/api/Event.java   |  28 +
 .../java/org/apache/tez/engine/api/Input.java   |  71 +++
 .../tez/engine/api/LogicalIOProcessor.java      |  43 ++
 .../org/apache/tez/engine/api/LogicalInput.java |  37 ++
 .../apache/tez/engine/api/LogicalOutput.java    |  36 ++
 .../java/org/apache/tez/engine/api/Output.java  |  71 +++
 .../org/apache/tez/engine/api/Processor.java    |  55 ++
 .../java/org/apache/tez/engine/api/Reader.java  |  26 +
 .../apache/tez/engine/api/TezInputContext.java  |  32 ++
 .../apache/tez/engine/api/TezOutputContext.java |  33 ++
 .../tez/engine/api/TezProcessorContext.java     |  41 ++
 .../apache/tez/engine/api/TezTaskContext.java   | 130 +++++
 .../java/org/apache/tez/engine/api/Writer.java  |  26 +
 .../engine/api/events/DataMovementEvent.java    | 109 ++++
 .../tez/engine/api/events/InputFailedEvent.java |  89 +++
 .../api/events/InputInformationEvent.java       |  41 ++
 .../engine/api/events/InputReadErrorEvent.java  |  65 +++
 .../common/objectregistry/ObjectLifeCycle.java  |  37 ++
 .../common/objectregistry/ObjectRegistry.java   |  56 ++
 .../objectregistry/ObjectRegistryFactory.java   |  32 ++
 tez-api/src/main/proto/DAGApiRecords.proto      | 183 ++++++
 .../src/main/proto/DAGClientAMProtocol.proto    |  81 +++
 tez-api/src/main/proto/Events.proto             |  44 ++
 .../org/apache/tez/dag/api/TestDAGPlan.java     | 155 +++++
 .../org/apache/tez/dag/api/TestDAGVerify.java   | 417 ++++++++++++++
 tez-common/pom.xml                              |   2 +-
 .../java/org/apache/tez/common/Constants.java   |  57 --
 .../org/apache/tez/common/ContainerContext.java |  64 ---
 .../java/org/apache/tez/common/InputSpec.java   |  85 ---
 .../java/org/apache/tez/common/OutputSpec.java  |  84 ---
 .../org/apache/tez/common/TezJobConfig.java     | 314 -----------
 .../org/apache/tez/common/TezTaskContext.java   |  88 ---
 .../org/apache/tez/common/TezTaskStatus.java    | 104 ----
 .../tez/common/counters/AbstractCounter.java    |  52 --
 .../common/counters/AbstractCounterGroup.java   | 208 -------
 .../tez/common/counters/AbstractCounters.java   | 385 -------------
 .../tez/common/counters/CounterGroup.java       |  32 --
 .../tez/common/counters/CounterGroupBase.java   | 108 ----
 .../common/counters/CounterGroupFactory.java    | 180 ------
 .../apache/tez/common/counters/DAGCounter.java  |  39 --
 .../tez/common/counters/FileSystemCounter.java  |  30 -
 .../common/counters/FileSystemCounterGroup.java | 327 -----------
 .../common/counters/FrameworkCounterGroup.java  | 275 ---------
 .../tez/common/counters/GenericCounter.java     | 109 ----
 .../apache/tez/common/counters/JobCounter.java  |  45 --
 .../common/counters/LimitExceededException.java |  36 --
 .../org/apache/tez/common/counters/Limits.java  | 112 ----
 .../tez/common/counters/ResourceBundles.java    |  94 ----
 .../apache/tez/common/counters/TaskCounter.java |  66 ---
 .../apache/tez/common/counters/TezCounter.java  |  83 ---
 .../apache/tez/common/counters/TezCounters.java | 144 -----
 .../org/apache/tez/records/TezContainerId.java  |  78 ---
 tez-dag-api/findbugs-exclude.xml                |  16 -
 tez-dag-api/pom.xml                             |  88 ---
 .../org/apache/tez/client/AMConfiguration.java  | 100 ----
 .../java/org/apache/tez/client/TezClient.java   | 144 -----
 .../org/apache/tez/client/TezClientUtils.java   | 560 -------------------
 .../java/org/apache/tez/client/TezSession.java  | 184 ------
 .../tez/client/TezSessionConfiguration.java     |  57 --
 .../main/java/org/apache/tez/dag/api/DAG.java   | 377 -------------
 .../apache/tez/dag/api/DagTypeConverters.java   | 278 ---------
 .../main/java/org/apache/tez/dag/api/Edge.java  |  59 --
 .../org/apache/tez/dag/api/EdgeProperty.java    | 147 -----
 .../org/apache/tez/dag/api/InputDescriptor.java |  32 --
 .../apache/tez/dag/api/OutputDescriptor.java    |  32 --
 .../apache/tez/dag/api/ProcessorDescriptor.java |  31 -
 .../apache/tez/dag/api/TezConfiguration.java    | 223 --------
 .../org/apache/tez/dag/api/TezConstants.java    |  29 -
 .../apache/tez/dag/api/TezEntityDescriptor.java |  42 --
 .../org/apache/tez/dag/api/TezException.java    |  31 -
 .../tez/dag/api/TezUncheckedException.java      |  33 --
 .../java/org/apache/tez/dag/api/Vertex.java     | 153 -----
 .../apache/tez/dag/api/VertexLocationHint.java  | 154 -----
 .../apache/tez/dag/api/client/DAGClient.java    |  67 ---
 .../apache/tez/dag/api/client/DAGStatus.java    | 130 -----
 .../org/apache/tez/dag/api/client/Progress.java |  67 ---
 .../apache/tez/dag/api/client/VertexStatus.java |  78 ---
 .../rpc/DAGClientAMProtocolBlockingPB.java      |  30 -
 .../dag/api/client/rpc/DAGClientRPCImpl.java    | 291 ----------
 tez-dag-api/src/main/proto/DAGApiRecords.proto  | 183 ------
 .../src/main/proto/DAGClientAMProtocol.proto    |  81 ---
 .../org/apache/tez/dag/api/TestDAGPlan.java     | 155 -----
 .../org/apache/tez/dag/api/TestDAGVerify.java   | 417 --------------
 tez-dag/pom.xml                                 |  12 +-
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  38 --
 .../org/apache/tez/dag/app/dag/EdgeManager.java |   6 +-
 .../event/TaskAttemptEventOutputConsumable.java |  36 --
 .../dag/app/dag/impl/BroadcastEdgeManager.java  |   6 +-
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |   6 +-
 .../dag/app/dag/impl/OneToOneEdgeManager.java   |   6 +-
 .../app/dag/impl/ScatterGatherEdgeManager.java  |   6 +-
 .../dag/app/dag/impl/ShuffleVertexManager.java  |   6 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   4 +-
 .../TezDependentTaskCompletionEvent.java        | 228 ++++++++
 ...TezTaskDependencyCompletionEventsUpdate.java |  64 +++
 .../dag/app/rm/container/TestAMContainer.java   |  14 +-
 tez-dist/src/main/assembly/tez-dist-full.xml    |   6 +-
 tez-dist/src/main/assembly/tez-dist.xml         |   6 +-
 tez-engine-api/findbugs-exclude.xml             |  16 -
 tez-engine-api/pom.xml                          |  91 ---
 .../java/org/apache/tez/engine/api/Input.java   |  83 ---
 .../java/org/apache/tez/engine/api/Master.java  |  39 --
 .../java/org/apache/tez/engine/api/Output.java  |  65 ---
 .../org/apache/tez/engine/api/Processor.java    |  62 --
 .../java/org/apache/tez/engine/api/Task.java    |  79 ---
 .../common/objectregistry/ObjectLifeCycle.java  |  37 --
 .../common/objectregistry/ObjectRegistry.java   |  56 --
 .../objectregistry/ObjectRegistryFactory.java   |  32 --
 .../org/apache/tez/engine/newapi/Event.java     |  28 -
 .../org/apache/tez/engine/newapi/Input.java     |  71 ---
 .../tez/engine/newapi/LogicalIOProcessor.java   |  43 --
 .../apache/tez/engine/newapi/LogicalInput.java  |  37 --
 .../apache/tez/engine/newapi/LogicalOutput.java |  36 --
 .../org/apache/tez/engine/newapi/Output.java    |  71 ---
 .../org/apache/tez/engine/newapi/Processor.java |  58 --
 .../org/apache/tez/engine/newapi/Reader.java    |  26 -
 .../tez/engine/newapi/TezInputContext.java      |  32 --
 .../tez/engine/newapi/TezOutputContext.java     |  33 --
 .../tez/engine/newapi/TezProcessorContext.java  |  41 --
 .../tez/engine/newapi/TezTaskContext.java       | 130 -----
 .../org/apache/tez/engine/newapi/Writer.java    |  26 -
 .../engine/newapi/events/DataMovementEvent.java | 109 ----
 .../engine/newapi/events/InputFailedEvent.java  |  89 ---
 .../newapi/events/InputInformationEvent.java    |  41 --
 .../newapi/events/InputReadErrorEvent.java      |  65 ---
 .../tez/engine/records/OutputContext.java       |  61 --
 .../TezDependentTaskCompletionEvent.java        | 228 --------
 ...TezTaskDependencyCompletionEventsUpdate.java |  64 ---
 tez-engine-api/src/main/proto/Events.proto      |  44 --
 tez-engine/pom.xml                              |   6 +-
 .../java/org/apache/tez/common/Constants.java   |  57 ++
 .../org/apache/tez/common/ContainerContext.java |  64 +++
 .../tez/common/TezTaskUmbilicalProtocol.java    |  20 -
 .../org/apache/tez/engine/api/KVReader.java     |   2 +-
 .../org/apache/tez/engine/api/KVWriter.java     |   2 +-
 .../api/events/TaskAttemptCompletedEvent.java   |   2 +-
 .../api/events/TaskAttemptFailedEvent.java      |   2 +-
 .../api/events/TaskStatusUpdateEvent.java       |   2 +-
 .../apache/tez/engine/api/impl/TezEvent.java    |  10 +-
 .../engine/api/impl/TezInputContextImpl.java    |   4 +-
 .../engine/api/impl/TezOutputContextImpl.java   |   4 +-
 .../api/impl/TezProcessorContextImpl.java       |   4 +-
 .../tez/engine/api/impl/TezTaskContextImpl.java |   2 +-
 .../broadcast/input/BroadcastInputManager.java  |   2 +-
 .../BroadcastShuffleInputEventHandler.java      |   8 +-
 .../input/BroadcastShuffleManager.java          |   6 +-
 .../broadcast/output/FileBasedKVWriter.java     |   2 +-
 .../tez/engine/common/TezEngineUtils.java       |   4 +-
 .../tez/engine/common/combine/Combiner.java     |   1 -
 .../common/localshuffle/LocalShuffle.java       |   2 +-
 .../tez/engine/common/shuffle/impl/Fetcher.java |   2 +-
 .../common/shuffle/impl/MergeManager.java       |   2 +-
 .../tez/engine/common/shuffle/impl/Shuffle.java |   4 +-
 .../shuffle/impl/ShuffleInputEventHandler.java  |  10 +-
 .../common/shuffle/impl/ShuffleScheduler.java   |   6 +-
 .../common/shuffle/server/ShuffleHandler.java   |   2 +-
 .../engine/common/sort/impl/ExternalSorter.java |   2 +-
 .../common/sort/impl/PipelinedSorter.java       |   2 +-
 .../common/sort/impl/dflt/DefaultSorter.java    |   2 +-
 .../sort/impl/dflt/InMemoryShuffleSorter.java   |   2 +-
 .../tez/engine/lib/input/LocalMergedInput.java  |   6 +-
 .../engine/lib/input/ShuffledMergedInput.java   |   6 +-
 .../lib/input/ShuffledUnorderedKVInput.java     |   8 +-
 .../engine/lib/output/InMemorySortedOutput.java |  10 +-
 .../lib/output/LocalOnFileSorterOutput.java     |   2 +-
 .../engine/lib/output/OnFileSortedOutput.java   |   8 +-
 .../lib/output/OnFileUnorderedKVOutput.java     |   8 +-
 .../LogicalIOProcessorRuntimeTask.java          |  20 +-
 tez-mapreduce/pom.xml                           |   2 +-
 .../org/apache/tez/common/TezTaskStatus.java    | 105 ++++
 .../tez/mapreduce/combine/MRCombiner.java       |   6 +-
 .../tez/mapreduce/hadoop/TezTypeConverters.java |   9 -
 .../tez/mapreduce/hadoop/mapred/MRReporter.java |   4 +-
 .../hadoop/mapreduce/MapContextImpl.java        |   2 +-
 .../mapreduce/TaskAttemptContextImpl.java       |   2 +-
 .../mapreduce/TaskInputOutputContextImpl.java   |   2 +-
 .../apache/tez/mapreduce/input/SimpleInput.java |   6 +-
 .../tez/mapreduce/output/SimpleOutput.java      |   6 +-
 .../apache/tez/mapreduce/processor/MRTask.java  |  13 +-
 .../tez/mapreduce/processor/MRTaskReporter.java |   8 +-
 .../mapreduce/processor/map/MapProcessor.java   |  10 +-
 .../processor/reduce/ReduceProcessor.java       |  10 +-
 .../tez/mapreduce/TestUmbilicalProtocol.java    |  17 -
 tez-yarn-client/pom.xml                         |   2 +-
 231 files changed, 8681 insertions(+), 9660 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fe41471..63f17eb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -90,12 +90,7 @@
     <dependencies>
       <dependency>
         <groupId>org.apache.tez</groupId>
-        <artifactId>tez-dag-api</artifactId>
-        <version>${project.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.tez</groupId>
-        <artifactId>tez-engine-api</artifactId>
+        <artifactId>tez-api</artifactId>
         <version>${project.version}</version>
       </dependency>
       <dependency>
@@ -250,8 +245,7 @@
   </dependencyManagement>
 
   <modules>
-    <module>tez-dag-api</module>
-    <module>tez-engine-api</module>
+    <module>tez-api</module>
     <module>tez-common</module>
     <module>tez-engine</module>
     <module>tez-yarn-client</module>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-api/findbugs-exclude.xml b/tez-api/findbugs-exclude.xml
new file mode 100644
index 0000000..5b11308
--- /dev/null
+++ b/tez-api/findbugs-exclude.xml
@@ -0,0 +1,16 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<FindBugsFilter>
+
+</FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/pom.xml
----------------------------------------------------------------------
diff --git a/tez-api/pom.xml b/tez-api/pom.xml
new file mode 100644
index 0000000..069b0d4
--- /dev/null
+++ b/tez-api/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez</artifactId>
+    <version>0.2.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-api</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+    </dependency>
+    <dependency>
+     <groupId>com.google.protobuf</groupId>
+     <artifactId>protobuf-java</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
+        <executions>
+          <execution>
+            <id>compile-protoc</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>protoc</goal>
+            </goals>
+            <configuration>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <protocCommand>${protoc.path}</protocCommand>
+              <imports>
+                <param>${basedir}/src/main/proto</param>
+              </imports>
+              <source>
+                <directory>${basedir}/src/main/proto</directory>
+                <includes>
+                  <include>DAGApiRecords.proto</include>
+                  <include>DAGClientAMProtocol.proto</include>
+                  <include>Events.proto</include>
+                </includes>
+              </source>
+              <output>${project.build.directory}/generated-sources/java</output>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
new file mode 100644
index 0000000..f452c74
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+
+public class AMConfiguration {
+
+  private final Path stagingDir;
+  private final String queueName;
+  private final Map<String, String> env;
+  private final Map<String, LocalResource> localResources;
+  private final TezConfiguration amConf;
+  private final Credentials credentials;
+
+  public AMConfiguration(String queueName, Map<String, String> env,
+      Map<String, LocalResource> localResources,
+      TezConfiguration conf, Credentials credentials) {
+    this.queueName = queueName;
+    if (conf != null) {
+      this.amConf = conf;
+    } else {
+      this.amConf = new TezConfiguration();
+    }
+
+    if (env != null) {
+      this.env = env;
+    } else {
+      this.env = new HashMap<String, String>(0);
+    }
+    this.localResources = localResources;
+    String stagingDirStr = amConf.get(TezConfiguration.TEZ_AM_STAGING_DIR);
+    if (stagingDirStr == null || stagingDirStr.isEmpty()) {
+      throw new TezUncheckedException("Staging directory for AM resources"
+          + " not specified in config"
+          + ", property=" + TezConfiguration.TEZ_AM_STAGING_DIR);
+    }
+    try {
+      FileSystem fs = FileSystem.get(amConf);
+      this.stagingDir = fs.resolvePath(new Path(stagingDirStr));
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+    this.credentials = credentials;
+  }
+
+  public Path getStagingDir() {
+    return stagingDir;
+  }
+
+  public String getQueueName() {
+    return queueName;
+  }
+
+  public Map<String, String> getEnv() {
+    return env;
+  }
+
+  public Map<String, LocalResource> getLocalResources() {
+    return localResources;
+  }
+
+  public TezConfiguration getAMConf() {
+    return amConf;
+  }
+
+  public Credentials getCredentials() {
+    return credentials;
+  }
+
+  public void isCompatible(AMConfiguration other) {
+    // TODO implement
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
new file mode 100644
index 0000000..df260ec
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -0,0 +1,144 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.client;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
+
+public class TezClient {
+  private static final Log LOG = LogFactory.getLog(TezClient.class);
+
+  private final TezConfiguration conf;
+  private final YarnConfiguration yarnConf;
+  private YarnClient yarnClient;
+  Map<String, LocalResource> tezJarResources = null;
+
+  /**
+   * <p>
+   * Create an instance of the TezClient which will be used to communicate with
+   * a specific instance of YARN, or TezService when that exists.
+   * </p>
+   * <p>
+   * Separate instances of TezClient should be created to communicate with
+   * different instances of YARN
+   * </p>
+   *
+   * @param conf
+   *          the configuration which will be used to establish which YARN or
+   *          Tez service instance this client is associated with.
+   */
+  public TezClient(TezConfiguration conf) {
+    this.conf = conf;
+    this.yarnConf = new YarnConfiguration(conf);
+    yarnClient = new YarnClientImpl();
+    yarnClient.init(yarnConf);
+    yarnClient.start();
+  }
+
+
+  public DAGClient submitDAGApplication(DAG dag, AMConfiguration amConfig)
+      throws TezException, IOException {
+    ApplicationId appId = createApplication();
+    return submitDAGApplication(appId, dag, amConfig);
+  }
+
+  @Private
+  // To be used only by YarnRunner
+  public DAGClient submitDAGApplication(ApplicationId appId,
+      DAG dag, AMConfiguration amConfig)
+          throws TezException, IOException {
+    try {
+      ApplicationSubmissionContext appContext =
+          TezClientUtils.createApplicationSubmissionContext(conf, appId, dag,
+              dag.getName(), amConfig, getTezJarResources());
+      LOG.info("Submitting DAG to YARN"
+          + ", applicationId=" + appId);
+      yarnClient.submitApplication(appContext);
+    } catch (YarnException e) {
+      throw new TezException(e);
+    }
+    return getDAGClient(appId);
+  }
+
+  /**
+   * Create a new YARN application
+   * @return <code>ApplicationId</code> for the new YARN application
+   * @throws YarnException
+   * @throws IOException
+   */
+  public ApplicationId createApplication() throws TezException, IOException {
+    try {
+      return yarnClient.createApplication().
+          getNewApplicationResponse().getApplicationId();
+    } catch (YarnException e) {
+      throw new TezException(e);
+    }
+  }
+
+  private synchronized Map<String, LocalResource> getTezJarResources()
+      throws IOException {
+    if (tezJarResources == null) {
+      tezJarResources = TezClientUtils.setupTezJarsLocalResources(conf);
+    }
+    return tezJarResources;
+  }
+
+  @Private
+  public DAGClient getDAGClient(ApplicationId appId)
+      throws IOException, TezException {
+      return new DAGClientRPCImpl(appId, getDefaultTezDAGID(appId),
+                                   conf);
+  }
+
+  // DO NOT CHANGE THIS. This code is replicated from TezDAGID.java
+  private static final char SEPARATOR = '_';
+  private static final String DAG = "dag";
+  private static final NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setGroupingUsed(false);
+    idFormat.setMinimumIntegerDigits(6);
+  }
+
+  String getDefaultTezDAGID(ApplicationId appId) {
+     return (new StringBuilder(DAG)).append(SEPARATOR).
+                   append(appId.getClusterTimestamp()).
+                   append(SEPARATOR).
+                   append(appId.getId()).
+                   append(SEPARATOR).
+                   append(idFormat.format(1)).toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
new file mode 100644
index 0000000..7c6a5ed
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.Vector;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.Level;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class TezClientUtils {
+
+  private static Log LOG = LogFactory.getLog(TezClientUtils.class);
+
+  public static final FsPermission TEZ_AM_DIR_PERMISSION =
+      FsPermission.createImmutable((short) 0700); // rwx--------
+  public static final FsPermission TEZ_AM_FILE_PERMISSION =
+      FsPermission.createImmutable((short) 0644); // rw-r--r--
+
+  private static final int UTF8_CHUNK_SIZE = 16 * 1024;
+
+  /**
+   * Setup LocalResource map for Tez jars based on provided Configuration
+   * @param conf Configuration to use to access Tez jars' locations
+   * @return Map of LocalResources to use when launching Tez AM
+   * @throws IOException
+   */
+  static Map<String, LocalResource> setupTezJarsLocalResources(
+      TezConfiguration conf)
+      throws IOException {
+    Map<String, LocalResource> tezJarResources =
+        new TreeMap<String, LocalResource>();
+    if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+      return tezJarResources;
+    }
+
+    // Add tez jars to local resource
+    String[] tezJarUris = conf.getStrings(
+        TezConfiguration.TEZ_LIB_URIS);
+    if (tezJarUris == null
+        || tezJarUris.length == 0) {
+      throw new TezUncheckedException("Invalid configuration of tez jars"
+          + ", " + TezConfiguration.TEZ_LIB_URIS
+          + " is not defined in the configurartion");
+    }
+
+    for (String tezJarUri : tezJarUris) {
+      URI uri;
+      try {
+        uri = new URI(tezJarUri.trim());
+      } catch (URISyntaxException e) {
+        String message = "Invalid URI defined in configuration for"
+            + " location of TEZ jars. providedURI=" + tezJarUri;
+        LOG.error(message);
+        throw new TezUncheckedException(message, e);
+      }
+      if (!uri.isAbsolute()) {
+        String message = "Non-absolute URI defined in configuration for"
+            + " location of TEZ jars. providedURI=" + tezJarUri;
+        LOG.error(message);
+        throw new TezUncheckedException(message);
+      }
+      Path p = new Path(uri);
+      FileSystem pathfs = p.getFileSystem(conf);
+      RemoteIterator<LocatedFileStatus> iter = pathfs.listFiles(p, false);
+      while (iter.hasNext()) {
+        LocatedFileStatus fStatus = iter.next();
+        String rsrcName = fStatus.getPath().getName();
+        // FIXME currently not checking for duplicates due to quirks
+        // in assembly generation
+        if (tezJarResources.containsKey(rsrcName)) {
+          String message = "Duplicate resource found"
+              + ", resourceName=" + rsrcName
+              + ", existingPath=" +
+              tezJarResources.get(rsrcName).getResource().toString()
+              + ", newPath=" + fStatus.getPath();
+          LOG.warn(message);
+          // throw new TezUncheckedException(message);
+        }
+        tezJarResources.put(rsrcName,
+            LocalResource.newInstance(
+                ConverterUtils.getYarnUrlFromPath(fStatus.getPath()),
+                LocalResourceType.FILE,
+                LocalResourceVisibility.PUBLIC,
+                fStatus.getLen(),
+                fStatus.getModificationTime()));
+      }
+    }
+    if (tezJarResources.isEmpty()) {
+      LOG.warn("No tez jars found in configured locations"
+          + ". Ignoring for now. Errors may occur");
+    }
+    return tezJarResources;
+  }
+
+  /**
+   * Verify or create the Staging area directory on the configured Filesystem
+   * @param stagingArea Staging area directory path
+   * @return
+   * @throws IOException
+   */
+  public static FileSystem ensureStagingDirExists(Configuration conf,
+      Path stagingArea)
+      throws IOException {
+    FileSystem fs = stagingArea.getFileSystem(conf);
+    String realUser;
+    String currentUser;
+    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+    realUser = ugi.getShortUserName();
+    currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+    if (fs.exists(stagingArea)) {
+      FileStatus fsStatus = fs.getFileStatus(stagingArea);
+      String owner = fsStatus.getOwner();
+      if (!(owner.equals(currentUser) || owner.equals(realUser))) {
+        throw new IOException("The ownership on the staging directory "
+            + stagingArea + " is not as expected. " + "It is owned by " + owner
+            + ". The directory must " + "be owned by the submitter "
+            + currentUser + " or " + "by " + realUser);
+      }
+      if (!fsStatus.getPermission().equals(TEZ_AM_DIR_PERMISSION)) {
+        LOG.info("Permissions on staging directory " + stagingArea + " are "
+            + "incorrect: " + fsStatus.getPermission()
+            + ". Fixing permissions " + "to correct value "
+            + TEZ_AM_DIR_PERMISSION);
+        fs.setPermission(stagingArea, TEZ_AM_DIR_PERMISSION);
+      }
+    } else {
+      fs.mkdirs(stagingArea, new FsPermission(TEZ_AM_DIR_PERMISSION));
+    }
+    return fs;
+  }
+
+  /**
+   * Create an ApplicationSubmissionContext to launch a Tez AM
+   * @param conf
+   * @param appId
+   * @param dag
+   * @param appStagingDir
+   * @param ts
+   * @param amQueueName
+   * @param amName
+   * @param amArgs
+   * @param amEnv
+   * @param amLocalResources
+   * @param appConf
+   * @return
+   * @throws IOException
+   * @throws YarnException
+   */
+  static ApplicationSubmissionContext createApplicationSubmissionContext(
+      Configuration conf, ApplicationId appId, DAG dag, String amName,
+      AMConfiguration amConfig,
+      Map<String, LocalResource> tezJarResources)
+          throws IOException, YarnException{
+
+    FileSystem fs = TezClientUtils.ensureStagingDirExists(conf,
+        amConfig.getStagingDir());
+
+    // Setup resource requirements
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(
+        amConfig.getAMConf().getInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB,
+            TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT));
+    capability.setVirtualCores(
+        amConfig.getAMConf().getInt(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES,
+            TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("AppMaster capability = " + capability);
+    }
+
+    ByteBuffer securityTokens = null;
+    // Setup security tokens
+    if (amConfig.getCredentials() != null) {
+      DataOutputBuffer dob = new DataOutputBuffer();
+      amConfig.getCredentials().writeTokenStorageToStream(dob);
+      securityTokens = ByteBuffer.wrap(dob.getData(), 0,
+          dob.getLength());
+    }
+
+    // Setup the command to run the AM
+    List<String> vargs = new ArrayList<String>(8);
+    vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+
+    String amLogLevel = amConfig.getAMConf().get(
+        TezConfiguration.TEZ_AM_LOG_LEVEL,
+        TezConfiguration.TEZ_AM_LOG_LEVEL_DEFAULT);
+    addLog4jSystemProperties(amLogLevel, vargs);
+
+    vargs.add(amConfig.getAMConf().get(TezConfiguration.TEZ_AM_JAVA_OPTS,
+        TezConfiguration.DEFAULT_TEZ_AM_JAVA_OPTS));
+
+    vargs.add(TezConfiguration.TEZ_APPLICATION_MASTER_CLASS);
+    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+        File.separator + ApplicationConstants.STDOUT);
+    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+        File.separator + ApplicationConstants.STDERR);
+
+
+    Vector<String> vargsFinal = new Vector<String>(8);
+    // Final command
+    StringBuilder mergedCommand = new StringBuilder();
+    for (CharSequence str : vargs) {
+      mergedCommand.append(str).append(" ");
+    }
+    vargsFinal.add(mergedCommand.toString());
+
+    LOG.debug("Command to launch container for ApplicationMaster is : "
+        + mergedCommand);
+
+    // Setup the CLASSPATH in environment
+    // i.e. add { Hadoop jars, job jar, CWD } to classpath.
+    Map<String, String> environment = new HashMap<String, String>();
+
+    boolean isMiniCluster =
+        conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false);
+    if (isMiniCluster) {
+      Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+          System.getProperty("java.class.path"));
+    }
+
+    Apps.addToEnvironment(environment,
+        Environment.CLASSPATH.name(),
+        Environment.PWD.$());
+
+    Apps.addToEnvironment(environment,
+        Environment.CLASSPATH.name(),
+        Environment.PWD.$() + File.separator + "*");
+
+    // Add YARN/COMMON/HDFS jars to path
+    if (!isMiniCluster) {
+      for (String c : conf.getStrings(
+          YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+          YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+        Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+            c.trim());
+      }
+    }
+
+    if (amConfig.getEnv() != null) {
+      for (Map.Entry<String, String> entry : amConfig.getEnv().entrySet()) {
+        Apps.addToEnvironment(environment, entry.getKey(), entry.getValue());
+      }
+    }
+
+    Map<String, LocalResource> localResources =
+        new TreeMap<String, LocalResource>();
+
+    if (amConfig.getLocalResources() != null) {
+      localResources.putAll(amConfig.getLocalResources());
+    }
+    localResources.putAll(tezJarResources);
+
+    // emit conf as PB file
+    Configuration finalTezConf = createFinalTezConfForApp(amConfig.getAMConf());
+    Path binaryConfPath =  new Path(amConfig.getStagingDir(),
+        TezConfiguration.TEZ_PB_BINARY_CONF_NAME + "." + appId.toString());
+    FSDataOutputStream amConfPBOutBinaryStream = null;
+    try {
+      ConfigurationProto.Builder confProtoBuilder =
+          ConfigurationProto.newBuilder();
+      Iterator<Entry<String, String>> iter = finalTezConf.iterator();
+      while (iter.hasNext()) {
+        Entry<String, String> entry = iter.next();
+        PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
+        kvp.setKey(entry.getKey());
+        kvp.setValue(entry.getValue());
+        confProtoBuilder.addConfKeyValues(kvp);
+      }
+      //binary output
+      amConfPBOutBinaryStream = FileSystem.create(fs, binaryConfPath,
+          new FsPermission(TEZ_AM_FILE_PERMISSION));
+      confProtoBuilder.build().writeTo(amConfPBOutBinaryStream);
+    } finally {
+      if(amConfPBOutBinaryStream != null){
+        amConfPBOutBinaryStream.close();
+      }
+    }
+
+    LocalResource binaryConfLRsrc =
+        TezClientUtils.createLocalResource(fs,
+            binaryConfPath, LocalResourceType.FILE,
+            LocalResourceVisibility.APPLICATION);
+    localResources.put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
+        binaryConfLRsrc);
+
+    if(dag != null) {
+      // Add tez jars to vertices too
+      for (Vertex v : dag.getVertices()) {
+        v.getTaskLocalResources().putAll(tezJarResources);
+        v.getTaskLocalResources().put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
+            binaryConfLRsrc);
+      }
+
+      // emit protobuf DAG file style
+      Path binaryPath =  new Path(amConfig.getStagingDir(),
+          TezConfiguration.TEZ_PB_PLAN_BINARY_NAME + "." + appId.toString());
+      amConfig.getAMConf().set(TezConfiguration.TEZ_AM_PLAN_REMOTE_PATH,
+          binaryPath.toUri().toString());
+
+      DAGPlan dagPB = dag.createDag(null);
+
+      FSDataOutputStream dagPBOutBinaryStream = null;
+
+      try {
+        //binary output
+        dagPBOutBinaryStream = FileSystem.create(fs, binaryPath,
+            new FsPermission(TEZ_AM_FILE_PERMISSION));
+        dagPB.writeTo(dagPBOutBinaryStream);
+      } finally {
+        if(dagPBOutBinaryStream != null){
+          dagPBOutBinaryStream.close();
+        }
+      }
+
+      localResources.put(TezConfiguration.TEZ_PB_PLAN_BINARY_NAME,
+          TezClientUtils.createLocalResource(fs,
+              binaryPath, LocalResourceType.FILE,
+              LocalResourceVisibility.APPLICATION));
+
+      if (Level.DEBUG.isGreaterOrEqual(Level.toLevel(amLogLevel))) {
+        Path textPath = localizeDagPlanAsText(dagPB, fs,
+            amConfig.getStagingDir(), appId);
+        localResources.put(TezConfiguration.TEZ_PB_PLAN_TEXT_NAME,
+            TezClientUtils.createLocalResource(fs,
+                textPath, LocalResourceType.FILE,
+                LocalResourceVisibility.APPLICATION));
+      }
+    } else {
+      Apps.addToEnvironment(environment,
+          TezConstants.TEZ_AM_IS_SESSION_ENV, "set");
+    }
+
+    Map<ApplicationAccessType, String> acls
+        = new HashMap<ApplicationAccessType, String>();
+
+    // Setup ContainerLaunchContext for AM container
+    ContainerLaunchContext amContainer =
+        ContainerLaunchContext.newInstance(localResources, environment,
+            vargsFinal, null, securityTokens, acls);
+
+    // Set up the ApplicationSubmissionContext
+    ApplicationSubmissionContext appContext = Records
+        .newRecord(ApplicationSubmissionContext.class);
+
+    appContext.setApplicationType(TezConfiguration.TEZ_APPLICATION_TYPE);
+    appContext.setApplicationId(appId);
+    appContext.setResource(capability);
+    appContext.setQueue(amConfig.getQueueName());
+    appContext.setApplicationName(amName);
+    appContext.setCancelTokensWhenComplete(amConfig.getAMConf().getBoolean(
+        TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN,
+        TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN_DEFAULT));
+    appContext.setAMContainerSpec(amContainer);
+
+    return appContext;
+
+  }
+
+  @VisibleForTesting
+  static void addLog4jSystemProperties(String logLevel,
+      List<String> vargs) {
+    vargs.add("-Dlog4j.configuration="
+        + TezConfiguration.TEZ_CONTAINER_LOG4J_PROPERTIES_FILE);
+    vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "="
+        + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+    vargs.add("-D" + TezConfiguration.TEZ_ROOT_LOGGER_NAME + "=" + logLevel
+        + "," + TezConfiguration.TEZ_CONTAINER_LOGGER_NAME);
+  }
+
+  static Configuration createFinalTezConfForApp(TezConfiguration amConf) {
+    Configuration conf = new Configuration(false);
+    conf.setQuietMode(true);
+
+    assert amConf != null;
+    Iterator<Entry<String, String>> iter = amConf.iterator();
+    while (iter.hasNext()) {
+      Entry<String, String> entry = iter.next();
+      // Copy all tez config parameters.
+      if (entry.getKey().startsWith(TezConfiguration.TEZ_PREFIX)) {
+        conf.set(entry.getKey(), entry.getValue());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Adding tez dag am parameter: " + entry.getKey()
+              + ", with value: " + entry.getValue());
+        }
+      }
+    }
+    return conf;
+  }
+
+  /**
+   * Helper function to create a YARN LocalResource
+   * @param fs FileSystem object
+   * @param p Path of resource to localize
+   * @param type LocalResource Type
+   * @return
+   * @throws IOException
+   */
+  static LocalResource createLocalResource(FileSystem fs, Path p,
+      LocalResourceType type,
+      LocalResourceVisibility visibility) throws IOException {
+    LocalResource rsrc = Records.newRecord(LocalResource.class);
+    FileStatus rsrcStat = fs.getFileStatus(p);
+    rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs.resolvePath(rsrcStat
+        .getPath())));
+    rsrc.setSize(rsrcStat.getLen());
+    rsrc.setTimestamp(rsrcStat.getModificationTime());
+    rsrc.setType(type);
+    rsrc.setVisibility(visibility);
+    return rsrc;
+  }
+
+  private static Path localizeDagPlanAsText(DAGPlan dagPB, FileSystem fs,
+      Path appStagingDir, ApplicationId appId) throws IOException {
+    Path textPath = new Path(appStagingDir,
+        TezConfiguration.TEZ_PB_PLAN_TEXT_NAME + "." + appId.toString());
+    FSDataOutputStream dagPBOutTextStream = null;
+    try {
+      dagPBOutTextStream = FileSystem.create(fs, textPath, new FsPermission(
+          TEZ_AM_FILE_PERMISSION));
+      String dagPBStr = dagPB.toString();
+      int dagPBStrLen = dagPBStr.length();
+      if (dagPBStrLen <= UTF8_CHUNK_SIZE) {
+        dagPBOutTextStream.writeUTF(dagPBStr);
+      } else {
+        int startIndex = 0;
+        while (startIndex < dagPBStrLen) {
+          int endIndex = startIndex + UTF8_CHUNK_SIZE;
+          if (endIndex > dagPBStrLen) {
+            endIndex = dagPBStrLen;
+          }
+          dagPBOutTextStream.writeUTF(dagPBStr.substring(startIndex, endIndex));
+          startIndex += UTF8_CHUNK_SIZE;
+        }
+      }
+    } finally {
+      if (dagPBOutTextStream != null) {
+        dagPBOutTextStream.close();
+      }
+    }
+    return textPath;
+  }
+
+  static DAGClientAMProtocolBlockingPB getAMProxy(YarnClient yarnClient,
+      Configuration conf,
+      ApplicationId applicationId) throws TezException, IOException {
+    ApplicationReport appReport;
+    try {
+      appReport = yarnClient.getApplicationReport(
+          applicationId);
+
+      if(appReport == null) {
+        throw new TezUncheckedException("Could not retrieve application report"
+            + " from YARN, applicationId=" + applicationId);
+      }
+      YarnApplicationState appState = appReport.getYarnApplicationState();
+      if(appState != YarnApplicationState.RUNNING) {
+        if (appState == YarnApplicationState.FINISHED
+            || appState == YarnApplicationState.KILLED
+            || appState == YarnApplicationState.FAILED) {
+          throw new TezUncheckedException("Application not running"
+              + ", applicationId=" + applicationId
+              + ", yarnApplicationState=" + appReport.getYarnApplicationState()
+              + ", finalApplicationStatus="
+              + appReport.getFinalApplicationStatus()
+              + ", trackingUrl=" + appReport.getTrackingUrl());
+        }
+        return null;
+      }
+    } catch (YarnException e) {
+      throw new TezException(e);
+    }
+    return getAMProxy(conf, appReport.getHost(), appReport.getRpcPort());
+  }
+
+  static DAGClientAMProtocolBlockingPB getAMProxy(Configuration conf,
+      String amHost, int amRpcPort) throws IOException {
+    InetSocketAddress addr = new InetSocketAddress(amHost,
+        amRpcPort);
+
+    RPC.setProtocolEngine(conf, DAGClientAMProtocolBlockingPB.class,
+        ProtobufRpcEngine.class);
+    DAGClientAMProtocolBlockingPB proxy =
+        (DAGClientAMProtocolBlockingPB) RPC.getProxy(
+            DAGClientAMProtocolBlockingPB.class, 0, addr, conf);
+    return proxy;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
new file mode 100644
index 0000000..acf523d
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+
+public class TezSession {
+
+  private static final Log LOG = LogFactory.getLog(TezSession.class);
+
+  private final String sessionName;
+  private ApplicationId applicationId;
+  private LocalResource tezConfPBLRsrc = null;
+  private final TezSessionConfiguration sessionConfig;
+  private YarnClient yarnClient;
+  private Map<String, LocalResource> tezJarResources;
+  private boolean sessionStarted = false;
+
+  public TezSession(String sessionName,
+      ApplicationId applicationId,
+      TezSessionConfiguration sessionConfig) {
+    this.sessionName = sessionName;
+    this.sessionConfig = sessionConfig;
+    this.applicationId = applicationId;
+  }
+
+  public TezSession(String sessionName,
+      TezSessionConfiguration sessionConfig) {
+    this(sessionName, null, sessionConfig);
+  }
+
+  public synchronized void start() throws TezException, IOException {
+    yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(sessionConfig.getYarnConfiguration());
+    yarnClient.start();
+
+    tezJarResources = TezClientUtils.setupTezJarsLocalResources(
+        sessionConfig.getTezConfiguration());
+
+    try {
+      if (applicationId == null) {
+        applicationId = yarnClient.createApplication().
+            getNewApplicationResponse().getApplicationId();
+      }
+
+      ApplicationSubmissionContext appContext =
+          TezClientUtils.createApplicationSubmissionContext(
+              sessionConfig.getTezConfiguration(), applicationId,
+              null, sessionName, sessionConfig.getAMConfiguration(),
+              tezJarResources);
+      tezConfPBLRsrc = appContext.getAMContainerSpec().getLocalResources().get(
+          TezConfiguration.TEZ_PB_BINARY_CONF_NAME);
+      yarnClient.submitApplication(appContext);
+    } catch (YarnException e) {
+      throw new TezException(e);
+    }
+    sessionStarted = true;
+  }
+
+  public synchronized DAGClient submitDAG(DAG dag)
+      throws TezException, IOException {
+    if (!sessionStarted) {
+      throw new TezUncheckedException("Session not started");
+    }
+
+    String dagId = null;
+    LOG.info("Submitting dag to TezSession"
+        + ", sessionName=" + sessionName
+        + ", applicationId=" + applicationId);
+    // Add tez jars to vertices too
+    for (Vertex v : dag.getVertices()) {
+      v.getTaskLocalResources().putAll(tezJarResources);
+      if (null != tezConfPBLRsrc) {
+        v.getTaskLocalResources().put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
+            tezConfPBLRsrc);
+      }
+    }
+    DAGPlan dagPlan = dag.createDag(sessionConfig.getTezConfiguration());
+    SubmitDAGRequestProto requestProto =
+        SubmitDAGRequestProto.newBuilder().setDAGPlan(dagPlan).build();
+
+    DAGClientAMProtocolBlockingPB proxy;
+    while (true) {
+      proxy = TezClientUtils.getAMProxy(yarnClient,
+          sessionConfig.getYarnConfiguration(), applicationId);
+      if (proxy != null) {
+        break;
+      }
+      try {
+        Thread.sleep(100l);
+      } catch (InterruptedException e) {
+        // Ignore
+      }
+    }
+
+    try {
+      dagId = proxy.submitDAG(null, requestProto).getDagId();
+    } catch (ServiceException e) {
+      throw new TezException(e);
+    }
+    LOG.info("Submitted dag to TezSession"
+        + ", sessionName=" + sessionName
+        + ", applicationId=" + applicationId
+        + ", dagId=" + dagId);
+    return new DAGClientRPCImpl(applicationId, dagId,
+        sessionConfig.getTezConfiguration());
+  }
+
+  public synchronized void stop() throws TezException, IOException {
+    LOG.info("Shutting down Tez Session"
+        + ", sessionName=" + sessionName
+        + ", applicationId=" + applicationId);
+    DAGClientAMProtocolBlockingPB proxy = TezClientUtils.getAMProxy(yarnClient,
+        sessionConfig.getYarnConfiguration(), applicationId);
+    if (proxy != null) {
+      try {
+        ShutdownSessionRequestProto request =
+            ShutdownSessionRequestProto.newBuilder().build();
+        proxy.shutdownSession(null, request);
+        return;
+      } catch (ServiceException e) {
+        LOG.info("Failed to shutdown Tez Session via proxy", e);
+      }
+    }
+    LOG.info("Could not connect to AM, killing session via YARN"
+        + ", sessionName=" + sessionName
+        + ", applicationId=" + applicationId);
+    try {
+      yarnClient.killApplication(applicationId);
+    } catch (YarnException e) {
+      throw new TezException(e);
+    }
+  }
+
+  public String getSessionName() {
+    return sessionName;
+  }
+
+  @Private
+  @VisibleForTesting
+  public synchronized ApplicationId getApplicationId() {
+    return applicationId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java b/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java
new file mode 100644
index 0000000..61ca60b
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.dag.api.TezConfiguration;
+
+public class TezSessionConfiguration {
+
+  private final AMConfiguration amConfiguration;
+  private final YarnConfiguration yarnConfig;
+  private final TezConfiguration tezConfig;
+
+  public TezSessionConfiguration(AMConfiguration amConfiguration,
+      TezConfiguration tezConfig) {
+    this.amConfiguration = amConfiguration;
+    this.tezConfig = tezConfig;
+    this.yarnConfig = new YarnConfiguration(tezConfig);
+  }
+
+  TezSessionConfiguration(AMConfiguration amConfiguration,
+      TezConfiguration tezConfig,
+      YarnConfiguration yarnConf) {
+    this.amConfiguration = amConfiguration;
+    this.tezConfig = tezConfig;
+    this.yarnConfig = yarnConf;
+  }
+
+  public AMConfiguration getAMConfiguration() {
+    return amConfiguration;
+  }
+
+  public YarnConfiguration getYarnConfiguration() {
+    return yarnConfig;
+  }
+
+  public TezConfiguration getTezConfiguration() {
+    return tezConfig;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
new file mode 100644
index 0000000..7c4540c
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+
+/**
+ * Meant for user configurable job properties. For others look at {@link Constants}
+ *
+ */
+
+// TODO EVENTUALLY A description for each property.
+@Private
+@Evolving
+public class TezJobConfig {
+
+
+
+
+  /** The number of milliseconds between progress reports. */
+  public static final int PROGRESS_INTERVAL = 3000;
+
+  public static final long DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS = 10000;
+
+  /**
+   * Configuration key to enable/disable IFile readahead.
+   */
+  public static final String TEZ_ENGINE_IFILE_READAHEAD =
+      "tez.engine.ifile.readahead";
+  public static final boolean DEFAULT_TEZ_ENGINE_IFILE_READAHEAD = true;
+
+  /**
+   * Configuration key to set the IFile readahead length in bytes.
+   */
+  public static final String TEZ_ENGINE_IFILE_READAHEAD_BYTES =
+      "tez.engine.ifile.readahead.bytes";
+  public static final int DEFAULT_TEZ_ENGINE_IFILE_READAHEAD_BYTES =
+      4 * 1024 * 1024;
+
+  /**
+   * 
+   */
+  public static final String RECORDS_BEFORE_PROGRESS = 
+      "tez.task.merge.progress.records";
+  public static final long DEFAULT_RECORDS_BEFORE_PROGRESS = 10000; 
+
+  /**
+   * List of directories avialble to the engine. 
+   */
+  public static final String LOCAL_DIRS = "tez.engine.local.dirs";
+  public static final String DEFAULT_LOCAL_DIRS = "/tmp";
+
+  /**
+   * One local dir for the speicfic job.
+   */
+  public static final String JOB_LOCAL_DIR = "tez.engine.job.local.dir";
+  
+  /**
+   * The directory which contains the localized files for this task.
+   */
+  @Private
+  public static final String TASK_LOCAL_RESOURCE_DIR = "tez.engine.task-local-resource.dir";
+  public static final String DEFAULT_TASK_LOCAL_RESOURCE_DIR = "/tmp";
+  
+  public static final String TEZ_TASK_WORKING_DIR = "tez.engine.task.working.dir";
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_IO_SORT_FACTOR = 
+      "tez.engine.io.sort.factor";
+  public static final int DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR = 100;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SORT_SPILL_PERCENT = 
+      "tez.engine.sort.spill.percent";
+  public static float DEFAULT_TEZ_ENGINE_SORT_SPILL_PERCENT = 0.8f; 
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_IO_SORT_MB = "tez.engine.io.sort.mb";
+  public static final int DEFAULT_TEZ_ENGINE_IO_SORT_MB = 100;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES = 
+      "tez.engine.index.cache.memory.limit.bytes";
+  public static final int DEFAULT_TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES = 
+      1024 * 1024;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_COMBINE_MIN_SPILLS = 
+      "tez.engine.combine.min.spills";
+  public static final int  DEFAULT_TEZ_ENGINE_COMBINE_MIN_SPILLS = 3;
+  
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SORT_THREADS = 
+	      "tez.engine.sort.threads";
+  public static final int DEFAULT_TEZ_ENGINE_SORT_THREADS = 1;
+
+  /**
+   * Specifies a partitioner class, which is used in Tez engine components like OnFileSortedOutput
+   */
+  public static final String TEZ_ENGINE_PARTITIONER_CLASS = "tez.engine.partitioner.class";
+  
+  /**
+   * Specifies a combiner class (primarily for Shuffle)
+   */
+  public static final String TEZ_ENGINE_COMBINER_CLASS = "tez.engine.combiner.class";
+  
+  public static final String TEZ_ENGINE_NUM_EXPECTED_PARTITIONS = "tez.engine.num.expected.partitions";
+  
+  /**
+   * 
+   */
+  public static final String COUNTERS_MAX_KEY = "tez.engine.job.counters.max";
+  public static final int COUNTERS_MAX_DEFAULT = 120;
+
+  /**
+   * 
+   */
+  public static final String COUNTER_GROUP_NAME_MAX_KEY = "tez.engine.job.counters.group.name.max";
+  public static final int COUNTER_GROUP_NAME_MAX_DEFAULT = 128;
+
+  /**
+   * 
+   */
+  public static final String COUNTER_NAME_MAX_KEY = "tez.engine.job.counters.counter.name.max";
+  public static final int COUNTER_NAME_MAX_DEFAULT = 64;
+
+  /**
+   * 
+   */
+  public static final String COUNTER_GROUPS_MAX_KEY = "tez.engine.job.counters.groups.max";
+  public static final int COUNTER_GROUPS_MAX_DEFAULT = 50;
+
+  
+  /**
+   * Temporary interface for MR only (not chained Tez) to indicate whether
+   * in-memory shuffle should be used.
+   */
+  @Private
+  public static final String TEZ_ENGINE_SHUFFLE_USE_IN_MEMORY =
+      "tez.engine.shuffle.use.in-memory";
+  public static final boolean DEFAULT_TEZ_ENGINE_SHUFFLE_USE_IN_MEMORY = false;
+
+  // TODO NEWTEZ Remove these config parameters. Will be part of an event.
+  @Private
+  public static final String TEZ_ENGINE_SHUFFLE_PARTITION_RANGE = 
+      "tez.engine.shuffle.partition-range";
+  public static int TEZ_ENGINE_SHUFFLE_PARTITION_RANGE_DEFAULT = 1;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES = 
+      "tez.engine.shuffle.parallel.copies";
+  public static final int DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES = 20;
+
+  /**
+   * TODO Is this user configurable.
+   */
+  public static final String TEZ_ENGINE_METRICS_SESSION_ID = 
+      "tez.engine.metrics.session.id";
+  public static final String DEFAULT_TEZ_ENGINE_METRICS_SESSION_ID = "";
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_FETCH_FAILURES = 
+      "tez.engine.shuffle.fetch.failures.limit";
+  public final static int DEFAULT_TEZ_ENGINE_SHUFFLE_FETCH_FAILURES_LIMIT = 10;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR = 
+      "tez.engine.shuffle.notify.readerror";
+  public static final boolean DEFAULT_TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR = true;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_CONNECT_TIMEOUT = 
+      "tez.engine.shuffle.connect.timeout";
+  public static final int DEFAULT_TEZ_ENGINE_SHUFFLE_STALLED_COPY_TIMEOUT = 
+      3 * 60 * 1000;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_READ_TIMEOUT = "tez.engine.shuffle.read.timeout";
+  public final static int DEFAULT_TEZ_ENGINE_SHUFFLE_READ_TIMEOUT = 
+      3 * 60 * 1000;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_ENABLE_SSL = 
+      "tez.engine.shuffle.ssl.enable";
+  public static final boolean DEFAULT_TEZ_ENGINE_SHUFFLE_ENABLE_SSL = false;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT = 
+      "tez.engine.shuffle.input.buffer.percent";
+  public static final float DEFAULT_TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT =
+      0.90f;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT = 
+      "tez.engine.shuffle.memory.limit.percent";
+  public static final float DEFAULT_TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT = 
+      0.25f;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_MERGE_PERCENT = 
+      "tez.engine.shuffle.merge.percent";
+  public static final float DEFAULT_TEZ_ENGINE_SHUFFLE_MERGE_PERCENT = 0.90f;
+  
+  /**
+   * TODO TEZAM3 default value ?
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_MEMTOMEM_SEGMENTS = 
+      "tez.engine.shuffle.memory-to-memory.segments";
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM = 
+      "tez.engine.shuffle.memory-to-memory.enable";
+  public static final boolean DEFAULT_TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM = 
+      false;
+
+  /**
+   * 
+   */
+  public static final String TEZ_ENGINE_INPUT_BUFFER_PERCENT = 
+      "tez.engine.task.input.buffer.percent";
+  public static final float DEFAULT_TEZ_ENGINE_INPUT_BUFFER_PERCENT = 0.0f;
+
+  // TODO Rename. 
+  public static final String TEZ_ENGINE_GROUP_COMPARATOR_CLASS = 
+      "tez.engine.group.comparator.class";
+  
+  // TODO Better name.
+  public static final String TEZ_ENGINE_INTERNAL_SORTER_CLASS = 
+      "tez.engine.internal.sorter.class";
+  
+  public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS = 
+      "tez.engine.intermediate-output.key.comparator.class";
+  public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS = 
+      "tez.engine.intermediate-input.key.comparator.class";
+
+  public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS = 
+      "tez.engine.intermediate-output.key.class";
+  public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS = 
+      "tez.engine.intermediate-input.key.class";
+  
+  public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS = 
+      "tez.engine.intermediate-output.value.class";
+  public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS = 
+      "tez.engine.intermediate-input.value.class";
+  
+  public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS = 
+      "tez.engine.intermediate-output.should-compress";
+  public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_IS_COMPRESSED = 
+      "tez.engine.intermdiate-input.is-compressed";
+  
+  public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_COMPRESS_CODEC = 
+      "tez.engine.intermediate-output.compress.codec";
+  public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_COMPRESS_CODEC = 
+      "tez.engine.intermediate-input.compress.codec";
+
+  public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS = 
+      "tez.engine.intermediate-input.key.secondary.comparator.class";
+  
+  // TODO This should be in DAGConfiguration
+  /* config for tracking the local file where all the credentials for the job
+   * credentials.
+   */
+  public static final String DAG_CREDENTIALS_BINARY =  "tez.dag.credentials.binary";
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounter.java
new file mode 100644
index 0000000..e64a26c
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import com.google.common.base.Objects;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * An abstract counter class to provide common implementation of
+ * the counter interface in both mapred and mapreduce packages.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractCounter implements TezCounter {
+
+  @Deprecated
+  @Override
+  public void setDisplayName(String name) {}
+
+  @Override
+  public synchronized boolean equals(Object genericRight) {
+    if (genericRight instanceof TezCounter) {
+      synchronized (genericRight) {
+        TezCounter right = (TezCounter) genericRight;
+        return getName().equals(right.getName()) &&
+               getDisplayName().equals(right.getDisplayName()) &&
+               getValue() == right.getValue();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public synchronized int hashCode() {
+    return Objects.hashCode(getName(), getDisplayName(), getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java b/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java
new file mode 100644
index 0000000..d8896ed
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.collect.Iterators;
+
+/**
+ * An abstract class to provide common implementation of the
+ * generic counter group in both mapred and mapreduce package.
+ *
+ * @param <T> type of the counter for the group
+ */
+@InterfaceAudience.Private
+public abstract class AbstractCounterGroup<T extends TezCounter>
+    implements CounterGroupBase<T> {
+
+  private final String name;
+  private String displayName;
+  private final ConcurrentMap<String, T> counters =
+      new ConcurrentSkipListMap<String, T>();
+  private final Limits limits;
+
+  public AbstractCounterGroup(String name, String displayName,
+                              Limits limits) {
+    this.name = name;
+    this.displayName = displayName;
+    this.limits = limits;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public synchronized String getDisplayName() {
+    return displayName;
+  }
+
+  @Override
+  public synchronized void setDisplayName(String displayName) {
+    this.displayName = displayName;
+  }
+
+  @Override
+  public synchronized void addCounter(T counter) {
+    counters.put(counter.getName(), counter);
+    limits.incrCounters();
+  }
+
+  @Override
+  public synchronized T addCounter(String counterName, String displayName,
+                                   long value) {
+    String saveName = Limits.filterCounterName(counterName);
+    T counter = findCounterImpl(saveName, false);
+    if (counter == null) {
+      return addCounterImpl(saveName, displayName, value);
+    }
+    counter.setValue(value);
+    return counter;
+  }
+
+  private T addCounterImpl(String name, String displayName, long value) {
+    T counter = newCounter(name, displayName, value);
+    addCounter(counter);
+    return counter;
+  }
+
+  @Override
+  public synchronized T findCounter(String counterName, String displayName) {
+    // Take lock to avoid two threads not finding a counter and trying to add
+    // the same counter.
+    String saveName = Limits.filterCounterName(counterName);
+    T counter = findCounterImpl(saveName, false);
+    if (counter == null) {
+      return addCounterImpl(saveName, displayName, 0);
+    }
+    return counter;
+  }
+
+  @Override
+  public T findCounter(String counterName, boolean create) {
+    return findCounterImpl(Limits.filterCounterName(counterName), create);
+  }
+
+  // Lock the object. Cannot simply use concurrent constructs on the counters
+  // data-structure (like putIfAbsent) because of localization, limits etc.
+  private synchronized T findCounterImpl(String counterName, boolean create) {
+    T counter = counters.get(counterName);
+    if (counter == null && create) {
+      String localized =
+          ResourceBundles.getCounterName(getName(), counterName, counterName);
+      return addCounterImpl(counterName, localized, 0);
+    }
+    return counter;
+  }
+
+  @Override
+  public T findCounter(String counterName) {
+    return findCounter(counterName, true);
+  }
+
+  /**
+   * Abstract factory method to create a new counter of type T
+   * @param counterName of the counter
+   * @param displayName of the counter
+   * @param value of the counter
+   * @return a new counter
+   */
+  protected abstract T newCounter(String counterName, String displayName,
+                                  long value);
+
+  /**
+   * Abstract factory method to create a new counter of type T
+   * @return a new counter object
+   */
+  protected abstract T newCounter();
+
+  @Override
+  public Iterator<T> iterator() {
+    return counters.values().iterator();
+  }
+
+  /**
+   * GenericGroup ::= displayName #counter counter*
+   */
+  @Override
+  public synchronized void write(DataOutput out) throws IOException {
+    Text.writeString(out, displayName);
+    WritableUtils.writeVInt(out, counters.size());
+    for(TezCounter counter: counters.values()) {
+      counter.write(out);
+    }
+  }
+
+  @Override
+  public synchronized void readFields(DataInput in) throws IOException {
+    displayName = Text.readString(in);
+    counters.clear();
+    int size = WritableUtils.readVInt(in);
+    for (int i = 0; i < size; i++) {
+      T counter = newCounter();
+      counter.readFields(in);
+      counters.put(counter.getName(), counter);
+      limits.incrCounters();
+    }
+  }
+
+  @Override
+  public synchronized int size() {
+    return counters.size();
+  }
+
+  @Override
+  public synchronized boolean equals(Object genericRight) {
+    if (genericRight instanceof CounterGroupBase<?>) {
+      @SuppressWarnings("unchecked")
+      CounterGroupBase<T> right = (CounterGroupBase<T>) genericRight;
+      return Iterators.elementsEqual(iterator(), right.iterator());
+    }
+    return false;
+  }
+
+  @Override
+  public synchronized int hashCode() {
+    return counters.hashCode();
+  }
+
+  @Override
+  public void incrAllCounters(CounterGroupBase<T> rightGroup) {
+    try {
+      for (TezCounter right : rightGroup) {
+        TezCounter left = findCounter(right.getName(), right.getDisplayName());
+        left.increment(right.getValue());
+      }
+    } catch (LimitExceededException e) {
+      counters.clear();
+      throw e;
+    }
+  }
+}