You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/09/16 07:46:47 UTC

git commit: TEZ-1571. Add create method for DataSinkDescriptor. Contributed by Jeff Zhang. (cherry picked from commit dbe6a38a5646841fa60e103566d3a54cc8f3065e)

Repository: tez
Updated Branches:
  refs/heads/branch-0.5 18e9a5f22 -> f83387931


TEZ-1571. Add create method for DataSinkDescriptor. Contributed by Jeff
Zhang.
(cherry picked from commit dbe6a38a5646841fa60e103566d3a54cc8f3065e)

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f8338793
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f8338793
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f8338793

Branch: refs/heads/branch-0.5
Commit: f83387931c191631772e95f0626888a918e2751f
Parents: 18e9a5f
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Sep 15 22:36:15 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Sep 15 22:46:25 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  5 ++--
 .../apache/tez/dag/api/DataSinkDescriptor.java  | 24 ++++++++++++++++++++
 .../org/apache/tez/dag/api/TestDAGVerify.java   | 10 ++++----
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |  4 ++--
 .../tez/dag/history/utils/TestDAGUtils.java     |  4 ++--
 .../apache/tez/mapreduce/client/YARNRunner.java |  2 +-
 .../apache/tez/mapreduce/output/MROutput.java   |  2 +-
 .../mapreduce/examples/FilterLinesByWord.java   |  2 +-
 .../examples/FilterLinesByWordOneToOne.java     |  2 +-
 .../org/apache/tez/test/TestDAGRecovery2.java   |  2 +-
 10 files changed, 41 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f8338793/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 005f3d9..70a0952 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,8 +7,6 @@ INCOMPATIBLE CHANGES
   TEZ-1539. Change InputInitializerEvent semantics to SEND_ONCE_ON_TASK_SUCCESS
 
 ALL CHANGES
-  TEZ-1563. TezClient.submitDAGSession alters DAG local resources regardless
-  of DAG submission
   TEZ-1543. Shuffle Errors on heavy load (causing task retries)
   TEZ-1494. DAG hangs waiting for ShuffleManager.getNextInput()
   TEZ-1515. Remove usage of ResourceBundles in Counters.
@@ -20,6 +18,9 @@ ALL CHANGES
   TEZ-1519. TezTaskRunner should not initialize TezConfiguration in TezChild.
   TEZ-1534. Make client side configs available to AM and tasks.
   TEZ-1574. Support additional formats for the tez deployed archive
+  TEZ-1563. TezClient.submitDAGSession alters DAG local resources regardless
+  of DAG submission
+  TEZ-1571. Add create method for DataSinkDescriptor.
 
 Release 0.5.0: 2014-09-03
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f8338793/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
index 2240384..bc43c88 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
@@ -51,6 +51,7 @@ public class DataSinkDescriptor {
    *          operation.
    * @param credentials Credentials needs to access the data sink
    */
+  @Deprecated
   public DataSinkDescriptor(OutputDescriptor outputDescriptor,
       @Nullable OutputCommitterDescriptor committerDescriptor,
       @Nullable Credentials credentials) {
@@ -59,6 +60,29 @@ public class DataSinkDescriptor {
     this.credentials = credentials;
   }
 
+  /**
+   * Create a {@link DataSinkDescriptor}
+   * @param outputDescriptor
+   *          An {@link OutputDescriptor} for the output
+   * @param committerDescriptor
+   *          Specify a committer to be used for the output. Can be null. After all
+   *          tasks in the vertex (or in the DAG) have completed, the committer
+   *          (if specified) is invoked to commit the outputs. Commit is a data
+   *          sink specific operation that usually determines the visibility of
+   *          the output to external observers. E.g. moving output files from
+   *          temporary dirs to the real output dir. When there are multiple
+   *          executions of a task, the commit process also helps decide which
+   *          execution will be included in the final output. Users should
+   *          consider whether their application or data sink need a commit
+   *          operation.
+   * @param credentials Credentials needs to access the data sink
+   */
+  public static DataSinkDescriptor create(OutputDescriptor outputDescriptor,
+      @Nullable OutputCommitterDescriptor committerDescriptor,
+      @Nullable Credentials credentials) {
+    return new DataSinkDescriptor(outputDescriptor, committerDescriptor, credentials);
+  }
+  
   public OutputDescriptor getOutputDescriptor() {
     return outputDescriptor;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/f8338793/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index bf9abf1..b0b6d29 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -521,7 +521,7 @@ public class TestDAGVerify {
         ProcessorDescriptor.create("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     
-    v1.addDataSink("v2", new DataSinkDescriptor(null, null, null));
+    v1.addDataSink("v2", DataSinkDescriptor.create(null, null, null));
     
     Edge e1 = Edge.create(v1, v2,
         EdgeProperty.create(DataMovementType.SCATTER_GATHER,
@@ -545,7 +545,7 @@ public class TestDAGVerify {
         ProcessorDescriptor.create("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     
-    v1.addDataSink("v2", new DataSinkDescriptor(null, null, null));
+    v1.addDataSink("v2", DataSinkDescriptor.create(null, null, null));
     
     DAG dag = DAG.create("testDag");
     dag.addVertex(v1);
@@ -621,7 +621,7 @@ public class TestDAGVerify {
     DAG dag = DAG.create("testDag");
     VertexGroup uv12 = dag.createVertexGroup("uv12", v1, v2);
     OutputDescriptor outDesc = new OutputDescriptor();
-    uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, null, null));
+    uv12.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, null, null));
     
     GroupInputEdge e1 = GroupInputEdge.create(uv12, v3,
         EdgeProperty.create(DataMovementType.SCATTER_GATHER,
@@ -678,7 +678,7 @@ public class TestDAGVerify {
     String groupName1 = "uv12";
     VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
     OutputDescriptor outDesc = new OutputDescriptor();
-    uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, null, null));
+    uv12.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, null, null));
     
     String groupName2 = "uv23";
     VertexGroup uv23 = dag.createVertexGroup(groupName2, v2, v3);
@@ -760,7 +760,7 @@ public class TestDAGVerify {
     String groupName1 = "uv12";
     VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
     OutputDescriptor outDesc = new OutputDescriptor();
-    uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, null, null));
+    uv12.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, null, null));
     
     String groupName2 = "uv23";
     VertexGroup uv23 = dag.createVertexGroup(groupName2, v2, v3);

http://git-wip-us.apache.org/repos/asf/tez/blob/f8338793/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index ec05815..d10a8b3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -364,8 +364,8 @@ public class TestDAGImpl {
         TotalCountingOutputCommitter.class.getName());
     org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
     OutputDescriptor outDesc = OutputDescriptor.create("output.class");
-    uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, ocd, null));
-    v3.addDataSink("uvOut", new DataSinkDescriptor(outDesc, ocd, null));
+    uv12.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, ocd, null));
+    v3.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, ocd, null));
 
     GroupInputEdge e1 = GroupInputEdge.create(uv12, v3,
         EdgeProperty.create(DataMovementType.SCATTER_GATHER,

http://git-wip-us.apache.org/repos/asf/tez/blob/f8338793/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
index 4a0120b..081ed10 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
@@ -73,8 +73,8 @@ public class TestDAGUtils {
         .setHistoryText("uvOut HistoryText");
     OutputCommitterDescriptor ocd =
         OutputCommitterDescriptor.create(OutputCommitter.class.getName());
-    uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, ocd, null));
-    v3.addDataSink("uvOut", new DataSinkDescriptor(outDesc, ocd, null));
+    uv12.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, ocd, null));
+    v3.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, ocd, null));
 
     GroupInputEdge e1 = GroupInputEdge.create(uv12, v3,
         EdgeProperty.create(DataMovementType.SCATTER_GATHER,

http://git-wip-us.apache.org/repos/asf/tez/blob/f8338793/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index d6c8057..dfbf0cf 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -428,7 +428,7 @@ public class YARNRunner implements ClientProtocol {
     if (stageNum == totalStages -1) {
       OutputDescriptor od = OutputDescriptor.create(MROutputLegacy.class.getName())
           .setUserPayload(vertexUserPayload);
-      vertex.addDataSink("MROutput", new DataSinkDescriptor(od,
+      vertex.addDataSink("MROutput", DataSinkDescriptor.create(od,
           OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null));
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f8338793/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index db9fbea..ab9b41d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -179,7 +179,7 @@ public class MROutput extends AbstractLogicalOutput {
         }
       }
 
-      return new DataSinkDescriptor(
+      return DataSinkDescriptor.create(
           OutputDescriptor.create(outputClassName).setUserPayload(createUserPayload()),
           (doCommit ? OutputCommitterDescriptor.create(
               MROutputCommitter.class.getName()) : null), credentials);

http://git-wip-us.apache.org/repos/asf/tez/blob/f8338793/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index 9f533d9..8430c68 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -196,7 +196,7 @@ public class FilterLinesByWord extends Configured implements Tool {
         .setUserPayload(TezUtils.createUserPayloadFromConf(stage2Conf));
     OutputCommitterDescriptor ocd =
         OutputCommitterDescriptor.create(MROutputCommitter.class.getName());
-    stage2Vertex.addDataSink("MROutput", new DataSinkDescriptor(od, ocd, null));
+    stage2Vertex.addDataSink("MROutput", DataSinkDescriptor.create(od, ocd, null));
 
     UnorderedKVEdgeConfig edgeConf = UnorderedKVEdgeConfig
         .newBuilder(Text.class.getName(), TextLongPair.class.getName()).build();

http://git-wip-us.apache.org/repos/asf/tez/blob/f8338793/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
index e038170..515cf70 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -185,7 +185,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
     // Configure the Output for stage2
     stage2Vertex.addDataSink(
         "MROutput",
-        new DataSinkDescriptor(OutputDescriptor.create(MROutput.class.getName())
+        DataSinkDescriptor.create(OutputDescriptor.create(MROutput.class.getName())
             .setUserPayload(TezUtils.createUserPayloadFromConf(stage2Conf)),
             OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null));
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f8338793/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
index 37bea80..b7957a3 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
@@ -183,7 +183,7 @@ public class TestDAGRecovery2 {
             .toUserPayload())));
     OutputCommitterDescriptor ocd = OutputCommitterDescriptor.create(
         MultiAttemptDAG.FailingOutputCommitter.class.getName());
-    dag.getVertex("v3").addDataSink("FailingOutput", new DataSinkDescriptor(od, ocd, null));
+    dag.getVertex("v3").addDataSink("FailingOutput", DataSinkDescriptor.create(od, ocd, null));
     runDAGAndVerify(dag, State.FAILED);
   }