You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:10:57 UTC

[02/50] [abbrv] incubator-beam git commit: [sink] generate unique id for writer initialization

[sink] generate unique id for writer initialization


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/336d394e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/336d394e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/336d394e

Branch: refs/heads/master
Commit: 336d394e9cb4d68143def6027574b56f617080d2
Parents: 8e859e2
Author: Max <ma...@posteo.de>
Authored: Tue Jan 19 10:38:49 2016 +0100
Committer: Davor Bonaci <da...@users.noreply.github.com>
Committed: Fri Mar 4 10:04:23 2016 -0800

----------------------------------------------------------------------
 .../flink/dataflow/translation/wrappers/SinkOutputFormat.java | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336d394e/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
index d87b240..b10c86f 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
@@ -23,13 +23,16 @@ import com.google.cloud.dataflow.sdk.io.Sink;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
 import com.google.cloud.dataflow.sdk.transforms.Write;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.AbstractID;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.lang.reflect.Field;
+import java.util.UUID;
 
 /**
  * Wrapper class to use generic Write.Bound transforms as sinks.
@@ -44,6 +47,8 @@ public class SinkOutputFormat<T> implements OutputFormat<T> {
 	private Sink.WriteOperation<T, ?> writeOperation;
 	private Sink.Writer<T, ?> writer;
 
+	private AbstractID uid = new AbstractID();
+
 	public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions pipelineOptions) {
 		this.sink = extractSink(transform);
 		this.pipelineOptions = Preconditions.checkNotNull(pipelineOptions);
@@ -80,7 +85,7 @@ public class SinkOutputFormat<T> implements OutputFormat<T> {
 			throw new IOException("Couldn't create writer.", e);
 		}
 		try {
-			writer.open(String.valueOf(taskNumber));
+			writer.open(uid + "-" + String.valueOf(taskNumber));
 		} catch (Exception e) {
 			throw new IOException("Couldn't open writer.", e);
 		}