You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/05/17 17:21:52 UTC

[1/2] incubator-beam git commit: [flink] replace obsolete reflection call

Repository: incubator-beam
Updated Branches:
  refs/heads/master d627266d8 -> cc64d654c


[flink] replace obsolete reflection call


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f630002
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f630002
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f630002

Branch: refs/heads/master
Commit: 9f630002e235f02042c309e57ea44a163ede8bdf
Parents: d627266
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue May 17 19:12:02 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue May 17 19:12:02 2016 +0200

----------------------------------------------------------------------
 .../flink/translation/wrappers/SinkOutputFormat.java | 15 +--------------
 1 file changed, 1 insertion(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f630002/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
index 2766a87..53e544d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
@@ -46,23 +46,10 @@ public class SinkOutputFormat<T> implements OutputFormat<T> {
   private AbstractID uid = new AbstractID();
 
   public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions pipelineOptions) {
-    this.sink = extractSink(transform);
+    this.sink = transform.getSink();
     this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
   }
 
-  private Sink<T> extractSink(Write.Bound<T> transform) {
-    // TODO possibly add a getter in the upstream
-    try {
-      Field sinkField = transform.getClass().getDeclaredField("sink");
-      sinkField.setAccessible(true);
-      @SuppressWarnings("unchecked")
-      Sink<T> extractedSink = (Sink<T>) sinkField.get(transform);
-      return extractedSink;
-    } catch (NoSuchFieldException | IllegalAccessException e) {
-      throw new RuntimeException("Could not acquire custom sink field.", e);
-    }
-  }
-
   @Override
   public void configure(Configuration configuration) {
     writeOperation = sink.createWriteOperation(serializedOptions.getPipelineOptions());


[2/2] incubator-beam git commit: This closes #344

Posted by mx...@apache.org.
This closes #344


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cc64d654
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cc64d654
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cc64d654

Branch: refs/heads/master
Commit: cc64d654c5027c197eb1c1d6f64461edf1dee989
Parents: d627266 9f63000
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue May 17 19:19:12 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue May 17 19:19:12 2016 +0200

----------------------------------------------------------------------
 .../flink/translation/wrappers/SinkOutputFormat.java | 15 +--------------
 1 file changed, 1 insertion(+), 14 deletions(-)
----------------------------------------------------------------------