You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/05/17 17:21:52 UTC
[1/2] incubator-beam git commit: [flink] replace obsolete reflection
call
Repository: incubator-beam
Updated Branches:
refs/heads/master d627266d8 -> cc64d654c
[flink] replace obsolete reflection call
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f630002
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f630002
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f630002
Branch: refs/heads/master
Commit: 9f630002e235f02042c309e57ea44a163ede8bdf
Parents: d627266
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue May 17 19:12:02 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue May 17 19:12:02 2016 +0200
----------------------------------------------------------------------
.../flink/translation/wrappers/SinkOutputFormat.java | 15 +--------------
1 file changed, 1 insertion(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f630002/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
index 2766a87..53e544d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
@@ -46,23 +46,10 @@ public class SinkOutputFormat<T> implements OutputFormat<T> {
private AbstractID uid = new AbstractID();
public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions pipelineOptions) {
- this.sink = extractSink(transform);
+ this.sink = transform.getSink();
this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
}
- private Sink<T> extractSink(Write.Bound<T> transform) {
- // TODO possibly add a getter in the upstream
- try {
- Field sinkField = transform.getClass().getDeclaredField("sink");
- sinkField.setAccessible(true);
- @SuppressWarnings("unchecked")
- Sink<T> extractedSink = (Sink<T>) sinkField.get(transform);
- return extractedSink;
- } catch (NoSuchFieldException | IllegalAccessException e) {
- throw new RuntimeException("Could not acquire custom sink field.", e);
- }
- }
-
@Override
public void configure(Configuration configuration) {
writeOperation = sink.createWriteOperation(serializedOptions.getPipelineOptions());
[2/2] incubator-beam git commit: This closes #344
Posted by mx...@apache.org.
This closes #344
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cc64d654
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cc64d654
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cc64d654
Branch: refs/heads/master
Commit: cc64d654c5027c197eb1c1d6f64461edf1dee989
Parents: d627266 9f63000
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue May 17 19:19:12 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue May 17 19:19:12 2016 +0200
----------------------------------------------------------------------
.../flink/translation/wrappers/SinkOutputFormat.java | 15 +--------------
1 file changed, 1 insertion(+), 14 deletions(-)
----------------------------------------------------------------------