You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/12/08 01:44:43 UTC

[1/2] incubator-beam git commit: [BEAM-1095] Add support set config for reuse-object on flink

Repository: incubator-beam
Updated Branches:
  refs/heads/master 3b2e0290d -> c53e0b162


[BEAM-1095] Add support set config for reuse-object on flink


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1b125207
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1b125207
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1b125207

Branch: refs/heads/master
Commit: 1b1252074dd6b57f4fb88ceb82c704d3d3d8147f
Parents: 3b2e029
Author: Alexey Diomin <di...@gmail.com>
Authored: Wed Dec 7 09:39:27 2016 +0400
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Dec 8 09:44:00 2016 +0800

----------------------------------------------------------------------
 .../flink/FlinkPipelineExecutionEnvironment.java        | 12 ++++++++++++
 .../apache/beam/runners/flink/FlinkPipelineOptions.java |  5 +++++
 2 files changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b125207/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index 391c3f2..69dcd5e 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -159,6 +159,12 @@ public class FlinkPipelineExecutionEnvironment {
     // set parallelism in the options (required by some execution code)
     options.setParallelism(flinkBatchEnv.getParallelism());
 
+    if (options.getObjectReuse()) {
+      flinkBatchEnv.getConfig().enableObjectReuse();
+    } else {
+      flinkBatchEnv.getConfig().disableObjectReuse();
+    }
+
     return flinkBatchEnv;
   }
 
@@ -197,6 +203,12 @@ public class FlinkPipelineExecutionEnvironment {
     // set parallelism in the options (required by some execution code)
     options.setParallelism(flinkStreamEnv.getParallelism());
 
+    if (options.getObjectReuse()) {
+      flinkStreamEnv.getConfig().enableObjectReuse();
+    } else {
+      flinkStreamEnv.getConfig().disableObjectReuse();
+    }
+
     // default to event time
     flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b125207/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index be99f29..3bb358e 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -83,6 +83,11 @@ public interface FlinkPipelineOptions
   Long getExecutionRetryDelay();
   void setExecutionRetryDelay(Long delay);
 
+  @Description("Sets the behavior of reusing objects.")
+  @Default.Boolean(false)
+  Boolean getObjectReuse();
+  void setObjectReuse(Boolean reuse);
+
   /**
    * Sets a state backend to store Beam's state during computation.
    * Note: Only applicable when executing in streaming mode.


[2/2] incubator-beam git commit: This closes #1518

Posted by al...@apache.org.
This closes #1518


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c53e0b16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c53e0b16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c53e0b16

Branch: refs/heads/master
Commit: c53e0b1623e3ee3c08c329e2716440f031681591
Parents: 3b2e029 1b12520
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Dec 8 09:44:07 2016 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Dec 8 09:44:07 2016 +0800

----------------------------------------------------------------------
 .../flink/FlinkPipelineExecutionEnvironment.java        | 12 ++++++++++++
 .../apache/beam/runners/flink/FlinkPipelineOptions.java |  5 +++++
 2 files changed, 17 insertions(+)
----------------------------------------------------------------------