You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/23 19:45:34 UTC
[20/20] git commit: TEZ-443. Merge tez-dag-api and tez-engine-api
into a single module - tez-api (part of TEZ-398). (sseth)
TEZ-443. Merge tez-dag-api and tez-engine-api into a single module -
tez-api (part of TEZ-398). (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/d316f723
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/d316f723
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/d316f723
Branch: refs/heads/TEZ-398
Commit: d316f723508c77eb90936a9477812195714b59a2
Parents: b4950f9
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Sep 23 10:44:42 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Sep 23 10:44:42 2013 -0700
----------------------------------------------------------------------
pom.xml | 10 +-
tez-api/findbugs-exclude.xml | 16 +
tez-api/pom.xml | 90 +++
.../org/apache/tez/client/AMConfiguration.java | 100 ++++
.../java/org/apache/tez/client/TezClient.java | 144 +++++
.../org/apache/tez/client/TezClientUtils.java | 560 +++++++++++++++++++
.../java/org/apache/tez/client/TezSession.java | 184 ++++++
.../tez/client/TezSessionConfiguration.java | 57 ++
.../org/apache/tez/common/TezJobConfig.java | 314 +++++++++++
.../tez/common/counters/AbstractCounter.java | 52 ++
.../common/counters/AbstractCounterGroup.java | 208 +++++++
.../tez/common/counters/AbstractCounters.java | 385 +++++++++++++
.../tez/common/counters/CounterGroup.java | 32 ++
.../tez/common/counters/CounterGroupBase.java | 108 ++++
.../common/counters/CounterGroupFactory.java | 180 ++++++
.../apache/tez/common/counters/DAGCounter.java | 39 ++
.../tez/common/counters/FileSystemCounter.java | 30 +
.../common/counters/FileSystemCounterGroup.java | 327 +++++++++++
.../common/counters/FrameworkCounterGroup.java | 275 +++++++++
.../tez/common/counters/GenericCounter.java | 109 ++++
.../apache/tez/common/counters/JobCounter.java | 45 ++
.../common/counters/LimitExceededException.java | 36 ++
.../org/apache/tez/common/counters/Limits.java | 112 ++++
.../tez/common/counters/ResourceBundles.java | 94 ++++
.../apache/tez/common/counters/TaskCounter.java | 66 +++
.../apache/tez/common/counters/TezCounter.java | 83 +++
.../apache/tez/common/counters/TezCounters.java | 144 +++++
.../main/java/org/apache/tez/dag/api/DAG.java | 377 +++++++++++++
.../apache/tez/dag/api/DagTypeConverters.java | 278 +++++++++
.../main/java/org/apache/tez/dag/api/Edge.java | 59 ++
.../org/apache/tez/dag/api/EdgeProperty.java | 147 +++++
.../org/apache/tez/dag/api/InputDescriptor.java | 32 ++
.../apache/tez/dag/api/OutputDescriptor.java | 32 ++
.../apache/tez/dag/api/ProcessorDescriptor.java | 31 +
.../apache/tez/dag/api/TezConfiguration.java | 223 ++++++++
.../org/apache/tez/dag/api/TezConstants.java | 29 +
.../apache/tez/dag/api/TezEntityDescriptor.java | 42 ++
.../org/apache/tez/dag/api/TezException.java | 31 +
.../tez/dag/api/TezUncheckedException.java | 33 ++
.../java/org/apache/tez/dag/api/Vertex.java | 153 +++++
.../apache/tez/dag/api/VertexLocationHint.java | 154 +++++
.../apache/tez/dag/api/client/DAGClient.java | 67 +++
.../apache/tez/dag/api/client/DAGStatus.java | 130 +++++
.../org/apache/tez/dag/api/client/Progress.java | 67 +++
.../apache/tez/dag/api/client/VertexStatus.java | 78 +++
.../rpc/DAGClientAMProtocolBlockingPB.java | 30 +
.../dag/api/client/rpc/DAGClientRPCImpl.java | 291 ++++++++++
.../java/org/apache/tez/engine/api/Event.java | 28 +
.../java/org/apache/tez/engine/api/Input.java | 71 +++
.../tez/engine/api/LogicalIOProcessor.java | 43 ++
.../org/apache/tez/engine/api/LogicalInput.java | 37 ++
.../apache/tez/engine/api/LogicalOutput.java | 36 ++
.../java/org/apache/tez/engine/api/Output.java | 71 +++
.../org/apache/tez/engine/api/Processor.java | 55 ++
.../java/org/apache/tez/engine/api/Reader.java | 26 +
.../apache/tez/engine/api/TezInputContext.java | 32 ++
.../apache/tez/engine/api/TezOutputContext.java | 33 ++
.../tez/engine/api/TezProcessorContext.java | 41 ++
.../apache/tez/engine/api/TezTaskContext.java | 130 +++++
.../java/org/apache/tez/engine/api/Writer.java | 26 +
.../engine/api/events/DataMovementEvent.java | 109 ++++
.../tez/engine/api/events/InputFailedEvent.java | 89 +++
.../api/events/InputInformationEvent.java | 41 ++
.../engine/api/events/InputReadErrorEvent.java | 65 +++
.../common/objectregistry/ObjectLifeCycle.java | 37 ++
.../common/objectregistry/ObjectRegistry.java | 56 ++
.../objectregistry/ObjectRegistryFactory.java | 32 ++
tez-api/src/main/proto/DAGApiRecords.proto | 183 ++++++
.../src/main/proto/DAGClientAMProtocol.proto | 81 +++
tez-api/src/main/proto/Events.proto | 44 ++
.../org/apache/tez/dag/api/TestDAGPlan.java | 155 +++++
.../org/apache/tez/dag/api/TestDAGVerify.java | 417 ++++++++++++++
tez-common/pom.xml | 2 +-
.../java/org/apache/tez/common/Constants.java | 57 --
.../org/apache/tez/common/ContainerContext.java | 64 ---
.../java/org/apache/tez/common/InputSpec.java | 85 ---
.../java/org/apache/tez/common/OutputSpec.java | 84 ---
.../org/apache/tez/common/TezJobConfig.java | 314 -----------
.../org/apache/tez/common/TezTaskContext.java | 88 ---
.../org/apache/tez/common/TezTaskStatus.java | 104 ----
.../tez/common/counters/AbstractCounter.java | 52 --
.../common/counters/AbstractCounterGroup.java | 208 -------
.../tez/common/counters/AbstractCounters.java | 385 -------------
.../tez/common/counters/CounterGroup.java | 32 --
.../tez/common/counters/CounterGroupBase.java | 108 ----
.../common/counters/CounterGroupFactory.java | 180 ------
.../apache/tez/common/counters/DAGCounter.java | 39 --
.../tez/common/counters/FileSystemCounter.java | 30 -
.../common/counters/FileSystemCounterGroup.java | 327 -----------
.../common/counters/FrameworkCounterGroup.java | 275 ---------
.../tez/common/counters/GenericCounter.java | 109 ----
.../apache/tez/common/counters/JobCounter.java | 45 --
.../common/counters/LimitExceededException.java | 36 --
.../org/apache/tez/common/counters/Limits.java | 112 ----
.../tez/common/counters/ResourceBundles.java | 94 ----
.../apache/tez/common/counters/TaskCounter.java | 66 ---
.../apache/tez/common/counters/TezCounter.java | 83 ---
.../apache/tez/common/counters/TezCounters.java | 144 -----
.../org/apache/tez/records/TezContainerId.java | 78 ---
tez-dag-api/findbugs-exclude.xml | 16 -
tez-dag-api/pom.xml | 88 ---
.../org/apache/tez/client/AMConfiguration.java | 100 ----
.../java/org/apache/tez/client/TezClient.java | 144 -----
.../org/apache/tez/client/TezClientUtils.java | 560 -------------------
.../java/org/apache/tez/client/TezSession.java | 184 ------
.../tez/client/TezSessionConfiguration.java | 57 --
.../main/java/org/apache/tez/dag/api/DAG.java | 377 -------------
.../apache/tez/dag/api/DagTypeConverters.java | 278 ---------
.../main/java/org/apache/tez/dag/api/Edge.java | 59 --
.../org/apache/tez/dag/api/EdgeProperty.java | 147 -----
.../org/apache/tez/dag/api/InputDescriptor.java | 32 --
.../apache/tez/dag/api/OutputDescriptor.java | 32 --
.../apache/tez/dag/api/ProcessorDescriptor.java | 31 -
.../apache/tez/dag/api/TezConfiguration.java | 223 --------
.../org/apache/tez/dag/api/TezConstants.java | 29 -
.../apache/tez/dag/api/TezEntityDescriptor.java | 42 --
.../org/apache/tez/dag/api/TezException.java | 31 -
.../tez/dag/api/TezUncheckedException.java | 33 --
.../java/org/apache/tez/dag/api/Vertex.java | 153 -----
.../apache/tez/dag/api/VertexLocationHint.java | 154 -----
.../apache/tez/dag/api/client/DAGClient.java | 67 ---
.../apache/tez/dag/api/client/DAGStatus.java | 130 -----
.../org/apache/tez/dag/api/client/Progress.java | 67 ---
.../apache/tez/dag/api/client/VertexStatus.java | 78 ---
.../rpc/DAGClientAMProtocolBlockingPB.java | 30 -
.../dag/api/client/rpc/DAGClientRPCImpl.java | 291 ----------
tez-dag-api/src/main/proto/DAGApiRecords.proto | 183 ------
.../src/main/proto/DAGClientAMProtocol.proto | 81 ---
.../org/apache/tez/dag/api/TestDAGPlan.java | 155 -----
.../org/apache/tez/dag/api/TestDAGVerify.java | 417 --------------
tez-dag/pom.xml | 12 +-
.../dag/app/TaskAttemptListenerImpTezDag.java | 38 --
.../org/apache/tez/dag/app/dag/EdgeManager.java | 6 +-
.../event/TaskAttemptEventOutputConsumable.java | 36 --
.../dag/app/dag/impl/BroadcastEdgeManager.java | 6 +-
.../org/apache/tez/dag/app/dag/impl/Edge.java | 6 +-
.../dag/app/dag/impl/OneToOneEdgeManager.java | 6 +-
.../app/dag/impl/ScatterGatherEdgeManager.java | 6 +-
.../dag/app/dag/impl/ShuffleVertexManager.java | 6 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 4 +-
.../TezDependentTaskCompletionEvent.java | 228 ++++++++
...TezTaskDependencyCompletionEventsUpdate.java | 64 +++
.../dag/app/rm/container/TestAMContainer.java | 14 +-
tez-dist/src/main/assembly/tez-dist-full.xml | 6 +-
tez-dist/src/main/assembly/tez-dist.xml | 6 +-
tez-engine-api/findbugs-exclude.xml | 16 -
tez-engine-api/pom.xml | 91 ---
.../java/org/apache/tez/engine/api/Input.java | 83 ---
.../java/org/apache/tez/engine/api/Master.java | 39 --
.../java/org/apache/tez/engine/api/Output.java | 65 ---
.../org/apache/tez/engine/api/Processor.java | 62 --
.../java/org/apache/tez/engine/api/Task.java | 79 ---
.../common/objectregistry/ObjectLifeCycle.java | 37 --
.../common/objectregistry/ObjectRegistry.java | 56 --
.../objectregistry/ObjectRegistryFactory.java | 32 --
.../org/apache/tez/engine/newapi/Event.java | 28 -
.../org/apache/tez/engine/newapi/Input.java | 71 ---
.../tez/engine/newapi/LogicalIOProcessor.java | 43 --
.../apache/tez/engine/newapi/LogicalInput.java | 37 --
.../apache/tez/engine/newapi/LogicalOutput.java | 36 --
.../org/apache/tez/engine/newapi/Output.java | 71 ---
.../org/apache/tez/engine/newapi/Processor.java | 58 --
.../org/apache/tez/engine/newapi/Reader.java | 26 -
.../tez/engine/newapi/TezInputContext.java | 32 --
.../tez/engine/newapi/TezOutputContext.java | 33 --
.../tez/engine/newapi/TezProcessorContext.java | 41 --
.../tez/engine/newapi/TezTaskContext.java | 130 -----
.../org/apache/tez/engine/newapi/Writer.java | 26 -
.../engine/newapi/events/DataMovementEvent.java | 109 ----
.../engine/newapi/events/InputFailedEvent.java | 89 ---
.../newapi/events/InputInformationEvent.java | 41 --
.../newapi/events/InputReadErrorEvent.java | 65 ---
.../tez/engine/records/OutputContext.java | 61 --
.../TezDependentTaskCompletionEvent.java | 228 --------
...TezTaskDependencyCompletionEventsUpdate.java | 64 ---
tez-engine-api/src/main/proto/Events.proto | 44 --
tez-engine/pom.xml | 6 +-
.../java/org/apache/tez/common/Constants.java | 57 ++
.../org/apache/tez/common/ContainerContext.java | 64 +++
.../tez/common/TezTaskUmbilicalProtocol.java | 20 -
.../org/apache/tez/engine/api/KVReader.java | 2 +-
.../org/apache/tez/engine/api/KVWriter.java | 2 +-
.../api/events/TaskAttemptCompletedEvent.java | 2 +-
.../api/events/TaskAttemptFailedEvent.java | 2 +-
.../api/events/TaskStatusUpdateEvent.java | 2 +-
.../apache/tez/engine/api/impl/TezEvent.java | 10 +-
.../engine/api/impl/TezInputContextImpl.java | 4 +-
.../engine/api/impl/TezOutputContextImpl.java | 4 +-
.../api/impl/TezProcessorContextImpl.java | 4 +-
.../tez/engine/api/impl/TezTaskContextImpl.java | 2 +-
.../broadcast/input/BroadcastInputManager.java | 2 +-
.../BroadcastShuffleInputEventHandler.java | 8 +-
.../input/BroadcastShuffleManager.java | 6 +-
.../broadcast/output/FileBasedKVWriter.java | 2 +-
.../tez/engine/common/TezEngineUtils.java | 4 +-
.../tez/engine/common/combine/Combiner.java | 1 -
.../common/localshuffle/LocalShuffle.java | 2 +-
.../tez/engine/common/shuffle/impl/Fetcher.java | 2 +-
.../common/shuffle/impl/MergeManager.java | 2 +-
.../tez/engine/common/shuffle/impl/Shuffle.java | 4 +-
.../shuffle/impl/ShuffleInputEventHandler.java | 10 +-
.../common/shuffle/impl/ShuffleScheduler.java | 6 +-
.../common/shuffle/server/ShuffleHandler.java | 2 +-
.../engine/common/sort/impl/ExternalSorter.java | 2 +-
.../common/sort/impl/PipelinedSorter.java | 2 +-
.../common/sort/impl/dflt/DefaultSorter.java | 2 +-
.../sort/impl/dflt/InMemoryShuffleSorter.java | 2 +-
.../tez/engine/lib/input/LocalMergedInput.java | 6 +-
.../engine/lib/input/ShuffledMergedInput.java | 6 +-
.../lib/input/ShuffledUnorderedKVInput.java | 8 +-
.../engine/lib/output/InMemorySortedOutput.java | 10 +-
.../lib/output/LocalOnFileSorterOutput.java | 2 +-
.../engine/lib/output/OnFileSortedOutput.java | 8 +-
.../lib/output/OnFileUnorderedKVOutput.java | 8 +-
.../LogicalIOProcessorRuntimeTask.java | 20 +-
tez-mapreduce/pom.xml | 2 +-
.../org/apache/tez/common/TezTaskStatus.java | 105 ++++
.../tez/mapreduce/combine/MRCombiner.java | 6 +-
.../tez/mapreduce/hadoop/TezTypeConverters.java | 9 -
.../tez/mapreduce/hadoop/mapred/MRReporter.java | 4 +-
.../hadoop/mapreduce/MapContextImpl.java | 2 +-
.../mapreduce/TaskAttemptContextImpl.java | 2 +-
.../mapreduce/TaskInputOutputContextImpl.java | 2 +-
.../apache/tez/mapreduce/input/SimpleInput.java | 6 +-
.../tez/mapreduce/output/SimpleOutput.java | 6 +-
.../apache/tez/mapreduce/processor/MRTask.java | 13 +-
.../tez/mapreduce/processor/MRTaskReporter.java | 8 +-
.../mapreduce/processor/map/MapProcessor.java | 10 +-
.../processor/reduce/ReduceProcessor.java | 10 +-
.../tez/mapreduce/TestUmbilicalProtocol.java | 17 -
tez-yarn-client/pom.xml | 2 +-
231 files changed, 8681 insertions(+), 9660 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fe41471..63f17eb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -90,12 +90,7 @@
<dependencies>
<dependency>
<groupId>org.apache.tez</groupId>
- <artifactId>tez-dag-api</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez-engine-api</artifactId>
+ <artifactId>tez-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@@ -250,8 +245,7 @@
</dependencyManagement>
<modules>
- <module>tez-dag-api</module>
- <module>tez-engine-api</module>
+ <module>tez-api</module>
<module>tez-common</module>
<module>tez-engine</module>
<module>tez-yarn-client</module>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-api/findbugs-exclude.xml b/tez-api/findbugs-exclude.xml
new file mode 100644
index 0000000..5b11308
--- /dev/null
+++ b/tez-api/findbugs-exclude.xml
@@ -0,0 +1,16 @@
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<FindBugsFilter>
+
+</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/pom.xml
----------------------------------------------------------------------
diff --git a/tez-api/pom.xml b/tez-api/pom.xml
new file mode 100644
index 0000000..069b0d4
--- /dev/null
+++ b/tez-api/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>tez-api</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-maven-plugins</artifactId>
+ <executions>
+ <execution>
+ <id>compile-protoc</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>protoc</goal>
+ </goals>
+ <configuration>
+ <protocVersion>${protobuf.version}</protocVersion>
+ <protocCommand>${protoc.path}</protocCommand>
+ <imports>
+ <param>${basedir}/src/main/proto</param>
+ </imports>
+ <source>
+ <directory>${basedir}/src/main/proto</directory>
+ <includes>
+ <include>DAGApiRecords.proto</include>
+ <include>DAGClientAMProtocol.proto</include>
+ <include>Events.proto</include>
+ </includes>
+ </source>
+ <output>${project.build.directory}/generated-sources/java</output>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
new file mode 100644
index 0000000..f452c74
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+
+public class AMConfiguration {
+
+ private final Path stagingDir;
+ private final String queueName;
+ private final Map<String, String> env;
+ private final Map<String, LocalResource> localResources;
+ private final TezConfiguration amConf;
+ private final Credentials credentials;
+
+ public AMConfiguration(String queueName, Map<String, String> env,
+ Map<String, LocalResource> localResources,
+ TezConfiguration conf, Credentials credentials) {
+ this.queueName = queueName;
+ if (conf != null) {
+ this.amConf = conf;
+ } else {
+ this.amConf = new TezConfiguration();
+ }
+
+ if (env != null) {
+ this.env = env;
+ } else {
+ this.env = new HashMap<String, String>(0);
+ }
+ this.localResources = localResources;
+ String stagingDirStr = amConf.get(TezConfiguration.TEZ_AM_STAGING_DIR);
+ if (stagingDirStr == null || stagingDirStr.isEmpty()) {
+ throw new TezUncheckedException("Staging directory for AM resources"
+ + " not specified in config"
+ + ", property=" + TezConfiguration.TEZ_AM_STAGING_DIR);
+ }
+ try {
+ FileSystem fs = FileSystem.get(amConf);
+ this.stagingDir = fs.resolvePath(new Path(stagingDirStr));
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ this.credentials = credentials;
+ }
+
+ public Path getStagingDir() {
+ return stagingDir;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public Map<String, String> getEnv() {
+ return env;
+ }
+
+ public Map<String, LocalResource> getLocalResources() {
+ return localResources;
+ }
+
+ public TezConfiguration getAMConf() {
+ return amConf;
+ }
+
+ public Credentials getCredentials() {
+ return credentials;
+ }
+
+ public void isCompatible(AMConfiguration other) {
+ // TODO implement
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
new file mode 100644
index 0000000..df260ec
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -0,0 +1,144 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.client;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
+
+public class TezClient {
+ private static final Log LOG = LogFactory.getLog(TezClient.class);
+
+ private final TezConfiguration conf;
+ private final YarnConfiguration yarnConf;
+ private YarnClient yarnClient;
+ Map<String, LocalResource> tezJarResources = null;
+
+ /**
+ * <p>
+ * Create an instance of the TezClient which will be used to communicate with
+ * a specific instance of YARN, or TezService when that exists.
+ * </p>
+ * <p>
+ * Separate instances of TezClient should be created to communicate with
+ * different instances of YARN
+ * </p>
+ *
+ * @param conf
+ * the configuration which will be used to establish which YARN or
+ * Tez service instance this client is associated with.
+ */
+ public TezClient(TezConfiguration conf) {
+ this.conf = conf;
+ this.yarnConf = new YarnConfiguration(conf);
+ yarnClient = new YarnClientImpl();
+ yarnClient.init(yarnConf);
+ yarnClient.start();
+ }
+
+
+ public DAGClient submitDAGApplication(DAG dag, AMConfiguration amConfig)
+ throws TezException, IOException {
+ ApplicationId appId = createApplication();
+ return submitDAGApplication(appId, dag, amConfig);
+ }
+
+ @Private
+ // To be used only by YarnRunner
+ public DAGClient submitDAGApplication(ApplicationId appId,
+ DAG dag, AMConfiguration amConfig)
+ throws TezException, IOException {
+ try {
+ ApplicationSubmissionContext appContext =
+ TezClientUtils.createApplicationSubmissionContext(conf, appId, dag,
+ dag.getName(), amConfig, getTezJarResources());
+ LOG.info("Submitting DAG to YARN"
+ + ", applicationId=" + appId);
+ yarnClient.submitApplication(appContext);
+ } catch (YarnException e) {
+ throw new TezException(e);
+ }
+ return getDAGClient(appId);
+ }
+
+ /**
+ * Create a new YARN application
+ * @return <code>ApplicationId</code> for the new YARN application
+ * @throws YarnException
+ * @throws IOException
+ */
+ public ApplicationId createApplication() throws TezException, IOException {
+ try {
+ return yarnClient.createApplication().
+ getNewApplicationResponse().getApplicationId();
+ } catch (YarnException e) {
+ throw new TezException(e);
+ }
+ }
+
+ private synchronized Map<String, LocalResource> getTezJarResources()
+ throws IOException {
+ if (tezJarResources == null) {
+ tezJarResources = TezClientUtils.setupTezJarsLocalResources(conf);
+ }
+ return tezJarResources;
+ }
+
+ @Private
+ public DAGClient getDAGClient(ApplicationId appId)
+ throws IOException, TezException {
+ return new DAGClientRPCImpl(appId, getDefaultTezDAGID(appId),
+ conf);
+ }
+
+ // DO NOT CHANGE THIS. This code is replicated from TezDAGID.java
+ private static final char SEPARATOR = '_';
+ private static final String DAG = "dag";
+ private static final NumberFormat idFormat = NumberFormat.getInstance();
+ static {
+ idFormat.setGroupingUsed(false);
+ idFormat.setMinimumIntegerDigits(6);
+ }
+
+ String getDefaultTezDAGID(ApplicationId appId) {
+ return (new StringBuilder(DAG)).append(SEPARATOR).
+ append(appId.getClusterTimestamp()).
+ append(SEPARATOR).
+ append(appId.getId()).
+ append(SEPARATOR).
+ append(idFormat.format(1)).toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
new file mode 100644
index 0000000..7c6a5ed
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.Vector;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.Level;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class TezClientUtils {
+
+ private static Log LOG = LogFactory.getLog(TezClientUtils.class);
+
+ public static final FsPermission TEZ_AM_DIR_PERMISSION =
+ FsPermission.createImmutable((short) 0700); // rwx--------
+ public static final FsPermission TEZ_AM_FILE_PERMISSION =
+ FsPermission.createImmutable((short) 0644); // rw-r--r--
+
+ private static final int UTF8_CHUNK_SIZE = 16 * 1024;
+
+ /**
+ * Setup LocalResource map for Tez jars based on provided Configuration
+ * @param conf Configuration to use to access Tez jars' locations
+ * @return Map of LocalResources to use when launching Tez AM
+ * @throws IOException
+ */
+ static Map<String, LocalResource> setupTezJarsLocalResources(
+ TezConfiguration conf)
+ throws IOException {
+ Map<String, LocalResource> tezJarResources =
+ new TreeMap<String, LocalResource>();
+ if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+ return tezJarResources;
+ }
+
+ // Add tez jars to local resource
+ String[] tezJarUris = conf.getStrings(
+ TezConfiguration.TEZ_LIB_URIS);
+ if (tezJarUris == null
+ || tezJarUris.length == 0) {
+ throw new TezUncheckedException("Invalid configuration of tez jars"
+ + ", " + TezConfiguration.TEZ_LIB_URIS
+ + " is not defined in the configurartion");
+ }
+
+ for (String tezJarUri : tezJarUris) {
+ URI uri;
+ try {
+ uri = new URI(tezJarUri.trim());
+ } catch (URISyntaxException e) {
+ String message = "Invalid URI defined in configuration for"
+ + " location of TEZ jars. providedURI=" + tezJarUri;
+ LOG.error(message);
+ throw new TezUncheckedException(message, e);
+ }
+ if (!uri.isAbsolute()) {
+ String message = "Non-absolute URI defined in configuration for"
+ + " location of TEZ jars. providedURI=" + tezJarUri;
+ LOG.error(message);
+ throw new TezUncheckedException(message);
+ }
+ Path p = new Path(uri);
+ FileSystem pathfs = p.getFileSystem(conf);
+ RemoteIterator<LocatedFileStatus> iter = pathfs.listFiles(p, false);
+ while (iter.hasNext()) {
+ LocatedFileStatus fStatus = iter.next();
+ String rsrcName = fStatus.getPath().getName();
+ // FIXME currently not checking for duplicates due to quirks
+ // in assembly generation
+ if (tezJarResources.containsKey(rsrcName)) {
+ String message = "Duplicate resource found"
+ + ", resourceName=" + rsrcName
+ + ", existingPath=" +
+ tezJarResources.get(rsrcName).getResource().toString()
+ + ", newPath=" + fStatus.getPath();
+ LOG.warn(message);
+ // throw new TezUncheckedException(message);
+ }
+ tezJarResources.put(rsrcName,
+ LocalResource.newInstance(
+ ConverterUtils.getYarnUrlFromPath(fStatus.getPath()),
+ LocalResourceType.FILE,
+ LocalResourceVisibility.PUBLIC,
+ fStatus.getLen(),
+ fStatus.getModificationTime()));
+ }
+ }
+ if (tezJarResources.isEmpty()) {
+ LOG.warn("No tez jars found in configured locations"
+ + ". Ignoring for now. Errors may occur");
+ }
+ return tezJarResources;
+ }
+
+ /**
+ * Verify or create the Staging area directory on the configured Filesystem
+ * @param stagingArea Staging area directory path
+ * @return
+ * @throws IOException
+ */
+ public static FileSystem ensureStagingDirExists(Configuration conf,
+ Path stagingArea)
+ throws IOException {
+ FileSystem fs = stagingArea.getFileSystem(conf);
+ String realUser;
+ String currentUser;
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+ realUser = ugi.getShortUserName();
+ currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+ if (fs.exists(stagingArea)) {
+ FileStatus fsStatus = fs.getFileStatus(stagingArea);
+ String owner = fsStatus.getOwner();
+ if (!(owner.equals(currentUser) || owner.equals(realUser))) {
+ throw new IOException("The ownership on the staging directory "
+ + stagingArea + " is not as expected. " + "It is owned by " + owner
+ + ". The directory must " + "be owned by the submitter "
+ + currentUser + " or " + "by " + realUser);
+ }
+ if (!fsStatus.getPermission().equals(TEZ_AM_DIR_PERMISSION)) {
+ LOG.info("Permissions on staging directory " + stagingArea + " are "
+ + "incorrect: " + fsStatus.getPermission()
+ + ". Fixing permissions " + "to correct value "
+ + TEZ_AM_DIR_PERMISSION);
+ fs.setPermission(stagingArea, TEZ_AM_DIR_PERMISSION);
+ }
+ } else {
+ fs.mkdirs(stagingArea, new FsPermission(TEZ_AM_DIR_PERMISSION));
+ }
+ return fs;
+ }
+
+ /**
+ * Create an ApplicationSubmissionContext to launch a Tez AM
+ * @param conf
+ * @param appId
+ * @param dag
+ * @param appStagingDir
+ * @param ts
+ * @param amQueueName
+ * @param amName
+ * @param amArgs
+ * @param amEnv
+ * @param amLocalResources
+ * @param appConf
+ * @return
+ * @throws IOException
+ * @throws YarnException
+ */
+ static ApplicationSubmissionContext createApplicationSubmissionContext(
+ Configuration conf, ApplicationId appId, DAG dag, String amName,
+ AMConfiguration amConfig,
+ Map<String, LocalResource> tezJarResources)
+ throws IOException, YarnException{
+
+ FileSystem fs = TezClientUtils.ensureStagingDirExists(conf,
+ amConfig.getStagingDir());
+
+ // Setup resource requirements
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(
+ amConfig.getAMConf().getInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB,
+ TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT));
+ capability.setVirtualCores(
+ amConfig.getAMConf().getInt(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES,
+ TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AppMaster capability = " + capability);
+ }
+
+ ByteBuffer securityTokens = null;
+ // Setup security tokens
+ if (amConfig.getCredentials() != null) {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ amConfig.getCredentials().writeTokenStorageToStream(dob);
+ securityTokens = ByteBuffer.wrap(dob.getData(), 0,
+ dob.getLength());
+ }
+
+ // Setup the command to run the AM
+ List<String> vargs = new ArrayList<String>(8);
+ vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+
+ String amLogLevel = amConfig.getAMConf().get(
+ TezConfiguration.TEZ_AM_LOG_LEVEL,
+ TezConfiguration.TEZ_AM_LOG_LEVEL_DEFAULT);
+ addLog4jSystemProperties(amLogLevel, vargs);
+
+ vargs.add(amConfig.getAMConf().get(TezConfiguration.TEZ_AM_JAVA_OPTS,
+ TezConfiguration.DEFAULT_TEZ_AM_JAVA_OPTS));
+
+ vargs.add(TezConfiguration.TEZ_APPLICATION_MASTER_CLASS);
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+ File.separator + ApplicationConstants.STDOUT);
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+ File.separator + ApplicationConstants.STDERR);
+
+
+ Vector<String> vargsFinal = new Vector<String>(8);
+ // Final command
+ StringBuilder mergedCommand = new StringBuilder();
+ for (CharSequence str : vargs) {
+ mergedCommand.append(str).append(" ");
+ }
+ vargsFinal.add(mergedCommand.toString());
+
+ LOG.debug("Command to launch container for ApplicationMaster is : "
+ + mergedCommand);
+
+ // Setup the CLASSPATH in environment
+ // i.e. add { Hadoop jars, job jar, CWD } to classpath.
+ Map<String, String> environment = new HashMap<String, String>();
+
+ boolean isMiniCluster =
+ conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false);
+ if (isMiniCluster) {
+ Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+ System.getProperty("java.class.path"));
+ }
+
+ Apps.addToEnvironment(environment,
+ Environment.CLASSPATH.name(),
+ Environment.PWD.$());
+
+ Apps.addToEnvironment(environment,
+ Environment.CLASSPATH.name(),
+ Environment.PWD.$() + File.separator + "*");
+
+ // Add YARN/COMMON/HDFS jars to path
+ if (!isMiniCluster) {
+ for (String c : conf.getStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+ c.trim());
+ }
+ }
+
+ if (amConfig.getEnv() != null) {
+ for (Map.Entry<String, String> entry : amConfig.getEnv().entrySet()) {
+ Apps.addToEnvironment(environment, entry.getKey(), entry.getValue());
+ }
+ }
+
+ Map<String, LocalResource> localResources =
+ new TreeMap<String, LocalResource>();
+
+ if (amConfig.getLocalResources() != null) {
+ localResources.putAll(amConfig.getLocalResources());
+ }
+ localResources.putAll(tezJarResources);
+
+ // emit conf as PB file
+ Configuration finalTezConf = createFinalTezConfForApp(amConfig.getAMConf());
+ Path binaryConfPath = new Path(amConfig.getStagingDir(),
+ TezConfiguration.TEZ_PB_BINARY_CONF_NAME + "." + appId.toString());
+ FSDataOutputStream amConfPBOutBinaryStream = null;
+ try {
+ ConfigurationProto.Builder confProtoBuilder =
+ ConfigurationProto.newBuilder();
+ Iterator<Entry<String, String>> iter = finalTezConf.iterator();
+ while (iter.hasNext()) {
+ Entry<String, String> entry = iter.next();
+ PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
+ kvp.setKey(entry.getKey());
+ kvp.setValue(entry.getValue());
+ confProtoBuilder.addConfKeyValues(kvp);
+ }
+ //binary output
+ amConfPBOutBinaryStream = FileSystem.create(fs, binaryConfPath,
+ new FsPermission(TEZ_AM_FILE_PERMISSION));
+ confProtoBuilder.build().writeTo(amConfPBOutBinaryStream);
+ } finally {
+ if(amConfPBOutBinaryStream != null){
+ amConfPBOutBinaryStream.close();
+ }
+ }
+
+ LocalResource binaryConfLRsrc =
+ TezClientUtils.createLocalResource(fs,
+ binaryConfPath, LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION);
+ localResources.put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
+ binaryConfLRsrc);
+
+ if(dag != null) {
+ // Add tez jars to vertices too
+ for (Vertex v : dag.getVertices()) {
+ v.getTaskLocalResources().putAll(tezJarResources);
+ v.getTaskLocalResources().put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
+ binaryConfLRsrc);
+ }
+
+ // emit protobuf DAG file style
+ Path binaryPath = new Path(amConfig.getStagingDir(),
+ TezConfiguration.TEZ_PB_PLAN_BINARY_NAME + "." + appId.toString());
+ amConfig.getAMConf().set(TezConfiguration.TEZ_AM_PLAN_REMOTE_PATH,
+ binaryPath.toUri().toString());
+
+ DAGPlan dagPB = dag.createDag(null);
+
+ FSDataOutputStream dagPBOutBinaryStream = null;
+
+ try {
+ //binary output
+ dagPBOutBinaryStream = FileSystem.create(fs, binaryPath,
+ new FsPermission(TEZ_AM_FILE_PERMISSION));
+ dagPB.writeTo(dagPBOutBinaryStream);
+ } finally {
+ if(dagPBOutBinaryStream != null){
+ dagPBOutBinaryStream.close();
+ }
+ }
+
+ localResources.put(TezConfiguration.TEZ_PB_PLAN_BINARY_NAME,
+ TezClientUtils.createLocalResource(fs,
+ binaryPath, LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION));
+
+ if (Level.DEBUG.isGreaterOrEqual(Level.toLevel(amLogLevel))) {
+ Path textPath = localizeDagPlanAsText(dagPB, fs,
+ amConfig.getStagingDir(), appId);
+ localResources.put(TezConfiguration.TEZ_PB_PLAN_TEXT_NAME,
+ TezClientUtils.createLocalResource(fs,
+ textPath, LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION));
+ }
+ } else {
+ Apps.addToEnvironment(environment,
+ TezConstants.TEZ_AM_IS_SESSION_ENV, "set");
+ }
+
+ Map<ApplicationAccessType, String> acls
+ = new HashMap<ApplicationAccessType, String>();
+
+ // Setup ContainerLaunchContext for AM container
+ ContainerLaunchContext amContainer =
+ ContainerLaunchContext.newInstance(localResources, environment,
+ vargsFinal, null, securityTokens, acls);
+
+ // Set up the ApplicationSubmissionContext
+ ApplicationSubmissionContext appContext = Records
+ .newRecord(ApplicationSubmissionContext.class);
+
+ appContext.setApplicationType(TezConfiguration.TEZ_APPLICATION_TYPE);
+ appContext.setApplicationId(appId);
+ appContext.setResource(capability);
+ appContext.setQueue(amConfig.getQueueName());
+ appContext.setApplicationName(amName);
+ appContext.setCancelTokensWhenComplete(amConfig.getAMConf().getBoolean(
+ TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN,
+ TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN_DEFAULT));
+ appContext.setAMContainerSpec(amContainer);
+
+ return appContext;
+
+ }
+
+ @VisibleForTesting
+ static void addLog4jSystemProperties(String logLevel,
+ List<String> vargs) {
+ vargs.add("-Dlog4j.configuration="
+ + TezConfiguration.TEZ_CONTAINER_LOG4J_PROPERTIES_FILE);
+ vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "="
+ + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+ vargs.add("-D" + TezConfiguration.TEZ_ROOT_LOGGER_NAME + "=" + logLevel
+ + "," + TezConfiguration.TEZ_CONTAINER_LOGGER_NAME);
+ }
+
+ static Configuration createFinalTezConfForApp(TezConfiguration amConf) {
+ Configuration conf = new Configuration(false);
+ conf.setQuietMode(true);
+
+ assert amConf != null;
+ Iterator<Entry<String, String>> iter = amConf.iterator();
+ while (iter.hasNext()) {
+ Entry<String, String> entry = iter.next();
+ // Copy all tez config parameters.
+ if (entry.getKey().startsWith(TezConfiguration.TEZ_PREFIX)) {
+ conf.set(entry.getKey(), entry.getValue());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding tez dag am parameter: " + entry.getKey()
+ + ", with value: " + entry.getValue());
+ }
+ }
+ }
+ return conf;
+ }
+
+ /**
+ * Helper function to create a YARN LocalResource
+ * @param fs FileSystem object
+ * @param p Path of resource to localize
+ * @param type LocalResource Type
+ * @return
+ * @throws IOException
+ */
+ static LocalResource createLocalResource(FileSystem fs, Path p,
+ LocalResourceType type,
+ LocalResourceVisibility visibility) throws IOException {
+ LocalResource rsrc = Records.newRecord(LocalResource.class);
+ FileStatus rsrcStat = fs.getFileStatus(p);
+ rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs.resolvePath(rsrcStat
+ .getPath())));
+ rsrc.setSize(rsrcStat.getLen());
+ rsrc.setTimestamp(rsrcStat.getModificationTime());
+ rsrc.setType(type);
+ rsrc.setVisibility(visibility);
+ return rsrc;
+ }
+
+ private static Path localizeDagPlanAsText(DAGPlan dagPB, FileSystem fs,
+ Path appStagingDir, ApplicationId appId) throws IOException {
+ Path textPath = new Path(appStagingDir,
+ TezConfiguration.TEZ_PB_PLAN_TEXT_NAME + "." + appId.toString());
+ FSDataOutputStream dagPBOutTextStream = null;
+ try {
+ dagPBOutTextStream = FileSystem.create(fs, textPath, new FsPermission(
+ TEZ_AM_FILE_PERMISSION));
+ String dagPBStr = dagPB.toString();
+ int dagPBStrLen = dagPBStr.length();
+ if (dagPBStrLen <= UTF8_CHUNK_SIZE) {
+ dagPBOutTextStream.writeUTF(dagPBStr);
+ } else {
+ int startIndex = 0;
+ while (startIndex < dagPBStrLen) {
+ int endIndex = startIndex + UTF8_CHUNK_SIZE;
+ if (endIndex > dagPBStrLen) {
+ endIndex = dagPBStrLen;
+ }
+ dagPBOutTextStream.writeUTF(dagPBStr.substring(startIndex, endIndex));
+ startIndex += UTF8_CHUNK_SIZE;
+ }
+ }
+ } finally {
+ if (dagPBOutTextStream != null) {
+ dagPBOutTextStream.close();
+ }
+ }
+ return textPath;
+ }
+
+ static DAGClientAMProtocolBlockingPB getAMProxy(YarnClient yarnClient,
+ Configuration conf,
+ ApplicationId applicationId) throws TezException, IOException {
+ ApplicationReport appReport;
+ try {
+ appReport = yarnClient.getApplicationReport(
+ applicationId);
+
+ if(appReport == null) {
+ throw new TezUncheckedException("Could not retrieve application report"
+ + " from YARN, applicationId=" + applicationId);
+ }
+ YarnApplicationState appState = appReport.getYarnApplicationState();
+ if(appState != YarnApplicationState.RUNNING) {
+ if (appState == YarnApplicationState.FINISHED
+ || appState == YarnApplicationState.KILLED
+ || appState == YarnApplicationState.FAILED) {
+ throw new TezUncheckedException("Application not running"
+ + ", applicationId=" + applicationId
+ + ", yarnApplicationState=" + appReport.getYarnApplicationState()
+ + ", finalApplicationStatus="
+ + appReport.getFinalApplicationStatus()
+ + ", trackingUrl=" + appReport.getTrackingUrl());
+ }
+ return null;
+ }
+ } catch (YarnException e) {
+ throw new TezException(e);
+ }
+ return getAMProxy(conf, appReport.getHost(), appReport.getRpcPort());
+ }
+
+ static DAGClientAMProtocolBlockingPB getAMProxy(Configuration conf,
+ String amHost, int amRpcPort) throws IOException {
+ InetSocketAddress addr = new InetSocketAddress(amHost,
+ amRpcPort);
+
+ RPC.setProtocolEngine(conf, DAGClientAMProtocolBlockingPB.class,
+ ProtobufRpcEngine.class);
+ DAGClientAMProtocolBlockingPB proxy =
+ (DAGClientAMProtocolBlockingPB) RPC.getProxy(
+ DAGClientAMProtocolBlockingPB.class, 0, addr, conf);
+ return proxy;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
new file mode 100644
index 0000000..acf523d
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+
+public class TezSession {
+
+ private static final Log LOG = LogFactory.getLog(TezSession.class);
+
+ private final String sessionName;
+ private ApplicationId applicationId;
+ private LocalResource tezConfPBLRsrc = null;
+ private final TezSessionConfiguration sessionConfig;
+ private YarnClient yarnClient;
+ private Map<String, LocalResource> tezJarResources;
+ private boolean sessionStarted = false;
+
+ public TezSession(String sessionName,
+ ApplicationId applicationId,
+ TezSessionConfiguration sessionConfig) {
+ this.sessionName = sessionName;
+ this.sessionConfig = sessionConfig;
+ this.applicationId = applicationId;
+ }
+
+ public TezSession(String sessionName,
+ TezSessionConfiguration sessionConfig) {
+ this(sessionName, null, sessionConfig);
+ }
+
+ public synchronized void start() throws TezException, IOException {
+ yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(sessionConfig.getYarnConfiguration());
+ yarnClient.start();
+
+ tezJarResources = TezClientUtils.setupTezJarsLocalResources(
+ sessionConfig.getTezConfiguration());
+
+ try {
+ if (applicationId == null) {
+ applicationId = yarnClient.createApplication().
+ getNewApplicationResponse().getApplicationId();
+ }
+
+ ApplicationSubmissionContext appContext =
+ TezClientUtils.createApplicationSubmissionContext(
+ sessionConfig.getTezConfiguration(), applicationId,
+ null, sessionName, sessionConfig.getAMConfiguration(),
+ tezJarResources);
+ tezConfPBLRsrc = appContext.getAMContainerSpec().getLocalResources().get(
+ TezConfiguration.TEZ_PB_BINARY_CONF_NAME);
+ yarnClient.submitApplication(appContext);
+ } catch (YarnException e) {
+ throw new TezException(e);
+ }
+ sessionStarted = true;
+ }
+
+ public synchronized DAGClient submitDAG(DAG dag)
+ throws TezException, IOException {
+ if (!sessionStarted) {
+ throw new TezUncheckedException("Session not started");
+ }
+
+ String dagId = null;
+ LOG.info("Submitting dag to TezSession"
+ + ", sessionName=" + sessionName
+ + ", applicationId=" + applicationId);
+ // Add tez jars to vertices too
+ for (Vertex v : dag.getVertices()) {
+ v.getTaskLocalResources().putAll(tezJarResources);
+ if (null != tezConfPBLRsrc) {
+ v.getTaskLocalResources().put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
+ tezConfPBLRsrc);
+ }
+ }
+ DAGPlan dagPlan = dag.createDag(sessionConfig.getTezConfiguration());
+ SubmitDAGRequestProto requestProto =
+ SubmitDAGRequestProto.newBuilder().setDAGPlan(dagPlan).build();
+
+ DAGClientAMProtocolBlockingPB proxy;
+ while (true) {
+ proxy = TezClientUtils.getAMProxy(yarnClient,
+ sessionConfig.getYarnConfiguration(), applicationId);
+ if (proxy != null) {
+ break;
+ }
+ try {
+ Thread.sleep(100l);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+
+ try {
+ dagId = proxy.submitDAG(null, requestProto).getDagId();
+ } catch (ServiceException e) {
+ throw new TezException(e);
+ }
+ LOG.info("Submitted dag to TezSession"
+ + ", sessionName=" + sessionName
+ + ", applicationId=" + applicationId
+ + ", dagId=" + dagId);
+ return new DAGClientRPCImpl(applicationId, dagId,
+ sessionConfig.getTezConfiguration());
+ }
+
+ public synchronized void stop() throws TezException, IOException {
+ LOG.info("Shutting down Tez Session"
+ + ", sessionName=" + sessionName
+ + ", applicationId=" + applicationId);
+ DAGClientAMProtocolBlockingPB proxy = TezClientUtils.getAMProxy(yarnClient,
+ sessionConfig.getYarnConfiguration(), applicationId);
+ if (proxy != null) {
+ try {
+ ShutdownSessionRequestProto request =
+ ShutdownSessionRequestProto.newBuilder().build();
+ proxy.shutdownSession(null, request);
+ return;
+ } catch (ServiceException e) {
+ LOG.info("Failed to shutdown Tez Session via proxy", e);
+ }
+ }
+ LOG.info("Could not connect to AM, killing session via YARN"
+ + ", sessionName=" + sessionName
+ + ", applicationId=" + applicationId);
+ try {
+ yarnClient.killApplication(applicationId);
+ } catch (YarnException e) {
+ throw new TezException(e);
+ }
+ }
+
+ public String getSessionName() {
+ return sessionName;
+ }
+
+ @Private
+ @VisibleForTesting
+ public synchronized ApplicationId getApplicationId() {
+ return applicationId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java b/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java
new file mode 100644
index 0000000..61ca60b
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.client;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.dag.api.TezConfiguration;
+
+public class TezSessionConfiguration {
+
+ private final AMConfiguration amConfiguration;
+ private final YarnConfiguration yarnConfig;
+ private final TezConfiguration tezConfig;
+
+ public TezSessionConfiguration(AMConfiguration amConfiguration,
+ TezConfiguration tezConfig) {
+ this.amConfiguration = amConfiguration;
+ this.tezConfig = tezConfig;
+ this.yarnConfig = new YarnConfiguration(tezConfig);
+ }
+
+ TezSessionConfiguration(AMConfiguration amConfiguration,
+ TezConfiguration tezConfig,
+ YarnConfiguration yarnConf) {
+ this.amConfiguration = amConfiguration;
+ this.tezConfig = tezConfig;
+ this.yarnConfig = yarnConf;
+ }
+
+ public AMConfiguration getAMConfiguration() {
+ return amConfiguration;
+ }
+
+ public YarnConfiguration getYarnConfiguration() {
+ return yarnConfig;
+ }
+
+ public TezConfiguration getTezConfiguration() {
+ return tezConfig;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
new file mode 100644
index 0000000..7c4540c
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+
+/**
+ * Meant for user configurable job properties. For others look at {@link Constants}
+ *
+ */
+
+// TODO EVENTUALLY A description for each property.
+@Private
+@Evolving
+public class TezJobConfig {
+
+
+
+
+ /** The number of milliseconds between progress reports. */
+ public static final int PROGRESS_INTERVAL = 3000;
+
+ public static final long DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS = 10000;
+
+ /**
+ * Configuration key to enable/disable IFile readahead.
+ */
+ public static final String TEZ_ENGINE_IFILE_READAHEAD =
+ "tez.engine.ifile.readahead";
+ public static final boolean DEFAULT_TEZ_ENGINE_IFILE_READAHEAD = true;
+
+ /**
+ * Configuration key to set the IFile readahead length in bytes.
+ */
+ public static final String TEZ_ENGINE_IFILE_READAHEAD_BYTES =
+ "tez.engine.ifile.readahead.bytes";
+ public static final int DEFAULT_TEZ_ENGINE_IFILE_READAHEAD_BYTES =
+ 4 * 1024 * 1024;
+
+ /**
+ *
+ */
+ public static final String RECORDS_BEFORE_PROGRESS =
+ "tez.task.merge.progress.records";
+ public static final long DEFAULT_RECORDS_BEFORE_PROGRESS = 10000;
+
+ /**
+ * List of directories avialble to the engine.
+ */
+ public static final String LOCAL_DIRS = "tez.engine.local.dirs";
+ public static final String DEFAULT_LOCAL_DIRS = "/tmp";
+
+ /**
+ * One local dir for the speicfic job.
+ */
+ public static final String JOB_LOCAL_DIR = "tez.engine.job.local.dir";
+
+ /**
+ * The directory which contains the localized files for this task.
+ */
+ @Private
+ public static final String TASK_LOCAL_RESOURCE_DIR = "tez.engine.task-local-resource.dir";
+ public static final String DEFAULT_TASK_LOCAL_RESOURCE_DIR = "/tmp";
+
+ public static final String TEZ_TASK_WORKING_DIR = "tez.engine.task.working.dir";
+
+ /**
+ *
+ */
+ public static final String TEZ_ENGINE_IO_SORT_FACTOR =
+ "tez.engine.io.sort.factor";
+ public static final int DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR = 100;
+
+ /**
+ *
+ */
+ public static final String TEZ_ENGINE_SORT_SPILL_PERCENT =
+ "tez.engine.sort.spill.percent";
+ public static float DEFAULT_TEZ_ENGINE_SORT_SPILL_PERCENT = 0.8f;
+
+ /**
+ *
+ */
+ public static final String TEZ_ENGINE_IO_SORT_MB = "tez.engine.io.sort.mb";
+ public static final int DEFAULT_TEZ_ENGINE_IO_SORT_MB = 100;
+
+ /**
+ *
+ */
+ public static final String TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES =
+ "tez.engine.index.cache.memory.limit.bytes";
+ public static final int DEFAULT_TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES =
+ 1024 * 1024;
+
+ /**
+ *
+ */
+ public static final String TEZ_ENGINE_COMBINE_MIN_SPILLS =
+ "tez.engine.combine.min.spills";
+ public static final int DEFAULT_TEZ_ENGINE_COMBINE_MIN_SPILLS = 3;
+
+ /**
+ *
+ */
+ public static final String TEZ_ENGINE_SORT_THREADS =
+ "tez.engine.sort.threads";
+ public static final int DEFAULT_TEZ_ENGINE_SORT_THREADS = 1;
+
+ /**
+ * Specifies a partitioner class, which is used in Tez engine components like OnFileSortedOutput
+ */
+ public static final String TEZ_ENGINE_PARTITIONER_CLASS = "tez.engine.partitioner.class";
+
+ /**
+ * Specifies a combiner class (primarily for Shuffle)
+ */
+ public static final String TEZ_ENGINE_COMBINER_CLASS = "tez.engine.combiner.class";
+
+ public static final String TEZ_ENGINE_NUM_EXPECTED_PARTITIONS = "tez.engine.num.expected.partitions";
+
+ /**
+ *
+ */
+ public static final String COUNTERS_MAX_KEY = "tez.engine.job.counters.max";
+ public static final int COUNTERS_MAX_DEFAULT = 120;
+
+ /**
+ *
+ */
+ public static final String COUNTER_GROUP_NAME_MAX_KEY = "tez.engine.job.counters.group.name.max";
+ public static final int COUNTER_GROUP_NAME_MAX_DEFAULT = 128;
+
+ /**
+ *
+ */
+ public static final String COUNTER_NAME_MAX_KEY = "tez.engine.job.counters.counter.name.max";
+ public static final int COUNTER_NAME_MAX_DEFAULT = 64;
+
+ /**
+ *
+ */
+ public static final String COUNTER_GROUPS_MAX_KEY = "tez.engine.job.counters.groups.max";
+ public static final int COUNTER_GROUPS_MAX_DEFAULT = 50;
+
+
+ /**
+ * Temporary interface for MR only (not chained Tez) to indicate whether
+ * in-memory shuffle should be used.
+ */
+ @Private
+ public static final String TEZ_ENGINE_SHUFFLE_USE_IN_MEMORY =
+ "tez.engine.shuffle.use.in-memory";
+ public static final boolean DEFAULT_TEZ_ENGINE_SHUFFLE_USE_IN_MEMORY = false;
+
+ // TODO NEWTEZ Remove these config parameters. Will be part of an event.
+ @Private
+ public static final String TEZ_ENGINE_SHUFFLE_PARTITION_RANGE =
+ "tez.engine.shuffle.partition-range";
+ public static int TEZ_ENGINE_SHUFFLE_PARTITION_RANGE_DEFAULT = 1;
+
+ /**
+ *
+ */
+ public static final String TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES =
+ "tez.engine.shuffle.parallel.copies";
+ public static final int DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES = 20;
+
+ /**
+ * TODO Is this user configurable.
+ */
+ public static final String TEZ_ENGINE_METRICS_SESSION_ID =
+ "tez.engine.metrics.session.id";
+ public static final String DEFAULT_TEZ_ENGINE_METRICS_SESSION_ID = "";
+
+ /**
+ *
+ */
+ public static final String TEZ_ENGINE_SHUFFLE_FETCH_FAILURES =
+ "tez.engine.shuffle.fetch.failures.limit";
+ public final static int DEFAULT_TEZ_ENGINE_SHUFFLE_FETCH_FAILURES_LIMIT = 10;
+
+ /**
+ *
+ */
+ public static final String TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR =
+ "tez.engine.shuffle.notify.readerror";
+ public static final boolean DEFAULT_TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR = true;
+
+ /**
+ *
+ */
+ public static final String TEZ_ENGINE_SHUFFLE_CONNECT_TIMEOUT =
+ "tez.engine.shuffle.connect.timeout";
+ public static final int DEFAULT_TEZ_ENGINE_SHUFFLE_STALLED_COPY_TIMEOUT =
+ 3 * 60 * 1000;
+
+ /**
+ *
+ */
+ public static final String TEZ_ENGINE_SHUFFLE_READ_TIMEOUT = "tez.engine.shuffle.read.timeout";
+ public final static int DEFAULT_TEZ_ENGINE_SHUFFLE_READ_TIMEOUT =
+ 3 * 60 * 1000;
+
+ /**
+ *
+ */
+ public static final String TEZ_ENGINE_SHUFFLE_ENABLE_SSL =
+ "tez.engine.shuffle.ssl.enable";
+ public static final boolean DEFAULT_TEZ_ENGINE_SHUFFLE_ENABLE_SSL = false;
+
+ /**
+ *
+ */
+ public static final String TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT =
+ "tez.engine.shuffle.input.buffer.percent";
+ public static final float DEFAULT_TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT =
+ 0.90f;
+
+ /**
+ *
+ */
+ public static final String TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT =
+ "tez.engine.shuffle.memory.limit.percent";
+ public static final float DEFAULT_TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT =
+ 0.25f;
+
+ /**
+ *
+ */
+ public static final String TEZ_ENGINE_SHUFFLE_MERGE_PERCENT =
+ "tez.engine.shuffle.merge.percent";
+ public static final float DEFAULT_TEZ_ENGINE_SHUFFLE_MERGE_PERCENT = 0.90f;
+
+ /**
+ * TODO TEZAM3 default value ?
+ */
+ public static final String TEZ_ENGINE_SHUFFLE_MEMTOMEM_SEGMENTS =
+ "tez.engine.shuffle.memory-to-memory.segments";
+
+ /**
+ *
+ */
+ public static final String TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM =
+ "tez.engine.shuffle.memory-to-memory.enable";
+ public static final boolean DEFAULT_TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM =
+ false;
+
+ /**
+ *
+ */
+ public static final String TEZ_ENGINE_INPUT_BUFFER_PERCENT =
+ "tez.engine.task.input.buffer.percent";
+ public static final float DEFAULT_TEZ_ENGINE_INPUT_BUFFER_PERCENT = 0.0f;
+
+ // TODO Rename.
+ public static final String TEZ_ENGINE_GROUP_COMPARATOR_CLASS =
+ "tez.engine.group.comparator.class";
+
+ // TODO Better name.
+ public static final String TEZ_ENGINE_INTERNAL_SORTER_CLASS =
+ "tez.engine.internal.sorter.class";
+
+ public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS =
+ "tez.engine.intermediate-output.key.comparator.class";
+ public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS =
+ "tez.engine.intermediate-input.key.comparator.class";
+
+ public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS =
+ "tez.engine.intermediate-output.key.class";
+ public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS =
+ "tez.engine.intermediate-input.key.class";
+
+ public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS =
+ "tez.engine.intermediate-output.value.class";
+ public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS =
+ "tez.engine.intermediate-input.value.class";
+
+ public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS =
+ "tez.engine.intermediate-output.should-compress";
+ public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_IS_COMPRESSED =
+ "tez.engine.intermdiate-input.is-compressed";
+
+ public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_COMPRESS_CODEC =
+ "tez.engine.intermediate-output.compress.codec";
+ public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_COMPRESS_CODEC =
+ "tez.engine.intermediate-input.compress.codec";
+
+ public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS =
+ "tez.engine.intermediate-input.key.secondary.comparator.class";
+
+ // TODO This should be in DAGConfiguration
+ /* config for tracking the local file where all the credentials for the job
+ * credentials.
+ */
+ public static final String DAG_CREDENTIALS_BINARY = "tez.dag.credentials.binary";
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounter.java
new file mode 100644
index 0000000..e64a26c
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import com.google.common.base.Objects;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * An abstract counter class to provide common implementation of
+ * the counter interface in both mapred and mapreduce packages.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractCounter implements TezCounter {
+
+ @Deprecated
+ @Override
+ public void setDisplayName(String name) {}
+
+ @Override
+ public synchronized boolean equals(Object genericRight) {
+ if (genericRight instanceof TezCounter) {
+ synchronized (genericRight) {
+ TezCounter right = (TezCounter) genericRight;
+ return getName().equals(right.getName()) &&
+ getDisplayName().equals(right.getDisplayName()) &&
+ getValue() == right.getValue();
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized int hashCode() {
+ return Objects.hashCode(getName(), getDisplayName(), getValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java b/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java
new file mode 100644
index 0000000..d8896ed
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.collect.Iterators;
+
+/**
+ * An abstract class to provide common implementation of the
+ * generic counter group in both mapred and mapreduce package.
+ *
+ * @param <T> type of the counter for the group
+ */
+@InterfaceAudience.Private
+public abstract class AbstractCounterGroup<T extends TezCounter>
+ implements CounterGroupBase<T> {
+
+ private final String name;
+ private String displayName;
+ private final ConcurrentMap<String, T> counters =
+ new ConcurrentSkipListMap<String, T>();
+ private final Limits limits;
+
+ public AbstractCounterGroup(String name, String displayName,
+ Limits limits) {
+ this.name = name;
+ this.displayName = displayName;
+ this.limits = limits;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public synchronized String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public synchronized void setDisplayName(String displayName) {
+ this.displayName = displayName;
+ }
+
+ @Override
+ public synchronized void addCounter(T counter) {
+ counters.put(counter.getName(), counter);
+ limits.incrCounters();
+ }
+
+ @Override
+ public synchronized T addCounter(String counterName, String displayName,
+ long value) {
+ String saveName = Limits.filterCounterName(counterName);
+ T counter = findCounterImpl(saveName, false);
+ if (counter == null) {
+ return addCounterImpl(saveName, displayName, value);
+ }
+ counter.setValue(value);
+ return counter;
+ }
+
+ private T addCounterImpl(String name, String displayName, long value) {
+ T counter = newCounter(name, displayName, value);
+ addCounter(counter);
+ return counter;
+ }
+
+ @Override
+ public synchronized T findCounter(String counterName, String displayName) {
+ // Take lock to avoid two threads not finding a counter and trying to add
+ // the same counter.
+ String saveName = Limits.filterCounterName(counterName);
+ T counter = findCounterImpl(saveName, false);
+ if (counter == null) {
+ return addCounterImpl(saveName, displayName, 0);
+ }
+ return counter;
+ }
+
+ @Override
+ public T findCounter(String counterName, boolean create) {
+ return findCounterImpl(Limits.filterCounterName(counterName), create);
+ }
+
+ // Lock the object. Cannot simply use concurrent constructs on the counters
+ // data-structure (like putIfAbsent) because of localization, limits etc.
+ private synchronized T findCounterImpl(String counterName, boolean create) {
+ T counter = counters.get(counterName);
+ if (counter == null && create) {
+ String localized =
+ ResourceBundles.getCounterName(getName(), counterName, counterName);
+ return addCounterImpl(counterName, localized, 0);
+ }
+ return counter;
+ }
+
+ @Override
+ public T findCounter(String counterName) {
+ return findCounter(counterName, true);
+ }
+
+ /**
+ * Abstract factory method to create a new counter of type T
+ * @param counterName of the counter
+ * @param displayName of the counter
+ * @param value of the counter
+ * @return a new counter
+ */
+ protected abstract T newCounter(String counterName, String displayName,
+ long value);
+
+ /**
+ * Abstract factory method to create a new counter of type T
+ * @return a new counter object
+ */
+ protected abstract T newCounter();
+
+ @Override
+ public Iterator<T> iterator() {
+ return counters.values().iterator();
+ }
+
+ /**
+ * GenericGroup ::= displayName #counter counter*
+ */
+ @Override
+ public synchronized void write(DataOutput out) throws IOException {
+ Text.writeString(out, displayName);
+ WritableUtils.writeVInt(out, counters.size());
+ for(TezCounter counter: counters.values()) {
+ counter.write(out);
+ }
+ }
+
+ @Override
+ public synchronized void readFields(DataInput in) throws IOException {
+ displayName = Text.readString(in);
+ counters.clear();
+ int size = WritableUtils.readVInt(in);
+ for (int i = 0; i < size; i++) {
+ T counter = newCounter();
+ counter.readFields(in);
+ counters.put(counter.getName(), counter);
+ limits.incrCounters();
+ }
+ }
+
+ @Override
+ public synchronized int size() {
+ return counters.size();
+ }
+
+ @Override
+ public synchronized boolean equals(Object genericRight) {
+ if (genericRight instanceof CounterGroupBase<?>) {
+ @SuppressWarnings("unchecked")
+ CounterGroupBase<T> right = (CounterGroupBase<T>) genericRight;
+ return Iterators.elementsEqual(iterator(), right.iterator());
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized int hashCode() {
+ return counters.hashCode();
+ }
+
+ @Override
+ public void incrAllCounters(CounterGroupBase<T> rightGroup) {
+ try {
+ for (TezCounter right : rightGroup) {
+ TezCounter left = findCounter(right.getName(), right.getDisplayName());
+ left.increment(right.getValue());
+ }
+ } catch (LimitExceededException e) {
+ counters.clear();
+ throw e;
+ }
+ }
+}