You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:10:57 UTC
[02/50] [abbrv] incubator-beam git commit: [sink] generate unique id
for writer initialization
[sink] generate unique id for writer initialization
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/336d394e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/336d394e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/336d394e
Branch: refs/heads/master
Commit: 336d394e9cb4d68143def6027574b56f617080d2
Parents: 8e859e2
Author: Max <ma...@posteo.de>
Authored: Tue Jan 19 10:38:49 2016 +0100
Committer: Davor Bonaci <da...@users.noreply.github.com>
Committed: Fri Mar 4 10:04:23 2016 -0800
----------------------------------------------------------------------
.../flink/dataflow/translation/wrappers/SinkOutputFormat.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336d394e/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
index d87b240..b10c86f 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SinkOutputFormat.java
@@ -23,13 +23,16 @@ import com.google.cloud.dataflow.sdk.io.Sink;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.transforms.Write;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.AbstractID;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Field;
+import java.util.UUID;
/**
* Wrapper class to use generic Write.Bound transforms as sinks.
@@ -44,6 +47,8 @@ public class SinkOutputFormat<T> implements OutputFormat<T> {
private Sink.WriteOperation<T, ?> writeOperation;
private Sink.Writer<T, ?> writer;
+ private AbstractID uid = new AbstractID();
+
public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions pipelineOptions) {
this.sink = extractSink(transform);
this.pipelineOptions = Preconditions.checkNotNull(pipelineOptions);
@@ -80,7 +85,7 @@ public class SinkOutputFormat<T> implements OutputFormat<T> {
throw new IOException("Couldn't create writer.", e);
}
try {
- writer.open(String.valueOf(taskNumber));
+ writer.open(uid + "-" + String.valueOf(taskNumber));
} catch (Exception e) {
throw new IOException("Couldn't open writer.", e);
}