You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/29 02:35:26 UTC
[20/50] [abbrv] git commit: TEZ-1571. Add create method for
DataSinkDescriptor. Contributed by Jeff Zhang.
TEZ-1571. Add create method for DataSinkDescriptor. Contributed by Jeff
Zhang.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/dbe6a38a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/dbe6a38a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/dbe6a38a
Branch: refs/heads/branch-0.5
Commit: dbe6a38a5646841fa60e103566d3a54cc8f3065e
Parents: 5d6b8fd
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Sep 15 22:36:15 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Sep 15 22:36:15 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/api/DataSinkDescriptor.java | 24 ++++++++++++++++++++
.../org/apache/tez/dag/api/TestDAGVerify.java | 10 ++++----
.../tez/dag/app/dag/impl/TestDAGImpl.java | 4 ++--
.../tez/dag/history/utils/TestDAGUtils.java | 4 ++--
.../apache/tez/mapreduce/client/YARNRunner.java | 2 +-
.../apache/tez/mapreduce/output/MROutput.java | 2 +-
.../mapreduce/examples/FilterLinesByWord.java | 2 +-
.../examples/FilterLinesByWordOneToOne.java | 2 +-
.../org/apache/tez/test/TestDAGRecovery2.java | 2 +-
10 files changed, 39 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/dbe6a38a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1520e70..97cf79b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -40,6 +40,7 @@ ALL CHANGES
TEZ-1574. Support additional formats for the tez deployed archive
TEZ-1563. TezClient.submitDAGSession alters DAG local resources regardless
of DAG submission
+ TEZ-1571. Add create method for DataSinkDescriptor.
Release 0.5.0: 2014-09-03
http://git-wip-us.apache.org/repos/asf/tez/blob/dbe6a38a/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
index 2240384..bc43c88 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
@@ -51,6 +51,7 @@ public class DataSinkDescriptor {
* operation.
* @param credentials Credentials needs to access the data sink
*/
+ @Deprecated
public DataSinkDescriptor(OutputDescriptor outputDescriptor,
@Nullable OutputCommitterDescriptor committerDescriptor,
@Nullable Credentials credentials) {
@@ -59,6 +60,29 @@ public class DataSinkDescriptor {
this.credentials = credentials;
}
+ /**
+ * Create a {@link DataSinkDescriptor}
+ * @param outputDescriptor
+ * An {@link OutputDescriptor} for the output
+ * @param committerDescriptor
+ * Specify a committer to be used for the output. Can be null. After all
+ * tasks in the vertex (or in the DAG) have completed, the committer
+ * (if specified) is invoked to commit the outputs. Commit is a data
+ * sink specific operation that usually determines the visibility of
+ * the output to external observers. E.g. moving output files from
+ * temporary dirs to the real output dir. When there are multiple
+ * executions of a task, the commit process also helps decide which
+ * execution will be included in the final output. Users should
+ * consider whether their application or data sink need a commit
+ * operation.
+ * @param credentials Credentials needs to access the data sink
+ */
+ public static DataSinkDescriptor create(OutputDescriptor outputDescriptor,
+ @Nullable OutputCommitterDescriptor committerDescriptor,
+ @Nullable Credentials credentials) {
+ return new DataSinkDescriptor(outputDescriptor, committerDescriptor, credentials);
+ }
+
public OutputDescriptor getOutputDescriptor() {
return outputDescriptor;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/dbe6a38a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index bf9abf1..b0b6d29 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -521,7 +521,7 @@ public class TestDAGVerify {
ProcessorDescriptor.create("MapProcessor"),
dummyTaskCount, dummyTaskResource);
- v1.addDataSink("v2", new DataSinkDescriptor(null, null, null));
+ v1.addDataSink("v2", DataSinkDescriptor.create(null, null, null));
Edge e1 = Edge.create(v1, v2,
EdgeProperty.create(DataMovementType.SCATTER_GATHER,
@@ -545,7 +545,7 @@ public class TestDAGVerify {
ProcessorDescriptor.create("MapProcessor"),
dummyTaskCount, dummyTaskResource);
- v1.addDataSink("v2", new DataSinkDescriptor(null, null, null));
+ v1.addDataSink("v2", DataSinkDescriptor.create(null, null, null));
DAG dag = DAG.create("testDag");
dag.addVertex(v1);
@@ -621,7 +621,7 @@ public class TestDAGVerify {
DAG dag = DAG.create("testDag");
VertexGroup uv12 = dag.createVertexGroup("uv12", v1, v2);
OutputDescriptor outDesc = new OutputDescriptor();
- uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, null, null));
+ uv12.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, null, null));
GroupInputEdge e1 = GroupInputEdge.create(uv12, v3,
EdgeProperty.create(DataMovementType.SCATTER_GATHER,
@@ -678,7 +678,7 @@ public class TestDAGVerify {
String groupName1 = "uv12";
VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
OutputDescriptor outDesc = new OutputDescriptor();
- uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, null, null));
+ uv12.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, null, null));
String groupName2 = "uv23";
VertexGroup uv23 = dag.createVertexGroup(groupName2, v2, v3);
@@ -760,7 +760,7 @@ public class TestDAGVerify {
String groupName1 = "uv12";
VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
OutputDescriptor outDesc = new OutputDescriptor();
- uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, null, null));
+ uv12.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, null, null));
String groupName2 = "uv23";
VertexGroup uv23 = dag.createVertexGroup(groupName2, v2, v3);
http://git-wip-us.apache.org/repos/asf/tez/blob/dbe6a38a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index aba4fd9..03aedef 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -364,8 +364,8 @@ public class TestDAGImpl {
TotalCountingOutputCommitter.class.getName());
org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
OutputDescriptor outDesc = OutputDescriptor.create("output.class");
- uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, ocd, null));
- v3.addDataSink("uvOut", new DataSinkDescriptor(outDesc, ocd, null));
+ uv12.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, ocd, null));
+ v3.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, ocd, null));
GroupInputEdge e1 = GroupInputEdge.create(uv12, v3,
EdgeProperty.create(DataMovementType.SCATTER_GATHER,
http://git-wip-us.apache.org/repos/asf/tez/blob/dbe6a38a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
index 4a0120b..081ed10 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
@@ -73,8 +73,8 @@ public class TestDAGUtils {
.setHistoryText("uvOut HistoryText");
OutputCommitterDescriptor ocd =
OutputCommitterDescriptor.create(OutputCommitter.class.getName());
- uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, ocd, null));
- v3.addDataSink("uvOut", new DataSinkDescriptor(outDesc, ocd, null));
+ uv12.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, ocd, null));
+ v3.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, ocd, null));
GroupInputEdge e1 = GroupInputEdge.create(uv12, v3,
EdgeProperty.create(DataMovementType.SCATTER_GATHER,
http://git-wip-us.apache.org/repos/asf/tez/blob/dbe6a38a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index d6c8057..dfbf0cf 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -428,7 +428,7 @@ public class YARNRunner implements ClientProtocol {
if (stageNum == totalStages -1) {
OutputDescriptor od = OutputDescriptor.create(MROutputLegacy.class.getName())
.setUserPayload(vertexUserPayload);
- vertex.addDataSink("MROutput", new DataSinkDescriptor(od,
+ vertex.addDataSink("MROutput", DataSinkDescriptor.create(od,
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null));
}
http://git-wip-us.apache.org/repos/asf/tez/blob/dbe6a38a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index db9fbea..ab9b41d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -179,7 +179,7 @@ public class MROutput extends AbstractLogicalOutput {
}
}
- return new DataSinkDescriptor(
+ return DataSinkDescriptor.create(
OutputDescriptor.create(outputClassName).setUserPayload(createUserPayload()),
(doCommit ? OutputCommitterDescriptor.create(
MROutputCommitter.class.getName()) : null), credentials);
http://git-wip-us.apache.org/repos/asf/tez/blob/dbe6a38a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index 9f533d9..8430c68 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -196,7 +196,7 @@ public class FilterLinesByWord extends Configured implements Tool {
.setUserPayload(TezUtils.createUserPayloadFromConf(stage2Conf));
OutputCommitterDescriptor ocd =
OutputCommitterDescriptor.create(MROutputCommitter.class.getName());
- stage2Vertex.addDataSink("MROutput", new DataSinkDescriptor(od, ocd, null));
+ stage2Vertex.addDataSink("MROutput", DataSinkDescriptor.create(od, ocd, null));
UnorderedKVEdgeConfig edgeConf = UnorderedKVEdgeConfig
.newBuilder(Text.class.getName(), TextLongPair.class.getName()).build();
http://git-wip-us.apache.org/repos/asf/tez/blob/dbe6a38a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
index e038170..515cf70 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -185,7 +185,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
// Configure the Output for stage2
stage2Vertex.addDataSink(
"MROutput",
- new DataSinkDescriptor(OutputDescriptor.create(MROutput.class.getName())
+ DataSinkDescriptor.create(OutputDescriptor.create(MROutput.class.getName())
.setUserPayload(TezUtils.createUserPayloadFromConf(stage2Conf)),
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null));
http://git-wip-us.apache.org/repos/asf/tez/blob/dbe6a38a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
index 37bea80..b7957a3 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
@@ -183,7 +183,7 @@ public class TestDAGRecovery2 {
.toUserPayload())));
OutputCommitterDescriptor ocd = OutputCommitterDescriptor.create(
MultiAttemptDAG.FailingOutputCommitter.class.getName());
- dag.getVertex("v3").addDataSink("FailingOutput", new DataSinkDescriptor(od, ocd, null));
+ dag.getVertex("v3").addDataSink("FailingOutput", DataSinkDescriptor.create(od, ocd, null));
runDAGAndVerify(dag, State.FAILED);
}