You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/12/08 01:44:43 UTC
[1/2] incubator-beam git commit: [BEAM-1095] Add support set config
for reuse-object on flink
Repository: incubator-beam
Updated Branches:
refs/heads/master 3b2e0290d -> c53e0b162
[BEAM-1095] Add support set config for reuse-object on flink
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1b125207
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1b125207
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1b125207
Branch: refs/heads/master
Commit: 1b1252074dd6b57f4fb88ceb82c704d3d3d8147f
Parents: 3b2e029
Author: Alexey Diomin <di...@gmail.com>
Authored: Wed Dec 7 09:39:27 2016 +0400
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Dec 8 09:44:00 2016 +0800
----------------------------------------------------------------------
.../flink/FlinkPipelineExecutionEnvironment.java | 12 ++++++++++++
.../apache/beam/runners/flink/FlinkPipelineOptions.java | 5 +++++
2 files changed, 17 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b125207/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index 391c3f2..69dcd5e 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -159,6 +159,12 @@ public class FlinkPipelineExecutionEnvironment {
// set parallelism in the options (required by some execution code)
options.setParallelism(flinkBatchEnv.getParallelism());
+ if (options.getObjectReuse()) {
+ flinkBatchEnv.getConfig().enableObjectReuse();
+ } else {
+ flinkBatchEnv.getConfig().disableObjectReuse();
+ }
+
return flinkBatchEnv;
}
@@ -197,6 +203,12 @@ public class FlinkPipelineExecutionEnvironment {
// set parallelism in the options (required by some execution code)
options.setParallelism(flinkStreamEnv.getParallelism());
+ if (options.getObjectReuse()) {
+ flinkStreamEnv.getConfig().enableObjectReuse();
+ } else {
+ flinkStreamEnv.getConfig().disableObjectReuse();
+ }
+
// default to event time
flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b125207/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index be99f29..3bb358e 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -83,6 +83,11 @@ public interface FlinkPipelineOptions
Long getExecutionRetryDelay();
void setExecutionRetryDelay(Long delay);
+ @Description("Sets the behavior of reusing objects.")
+ @Default.Boolean(false)
+ Boolean getObjectReuse();
+ void setObjectReuse(Boolean reuse);
+
/**
* Sets a state backend to store Beam's state during computation.
* Note: Only applicable when executing in streaming mode.
[2/2] incubator-beam git commit: This closes #1518
Posted by al...@apache.org.
This closes #1518
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c53e0b16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c53e0b16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c53e0b16
Branch: refs/heads/master
Commit: c53e0b1623e3ee3c08c329e2716440f031681591
Parents: 3b2e029 1b12520
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Dec 8 09:44:07 2016 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Dec 8 09:44:07 2016 +0800
----------------------------------------------------------------------
.../flink/FlinkPipelineExecutionEnvironment.java | 12 ++++++++++++
.../apache/beam/runners/flink/FlinkPipelineOptions.java | 5 +++++
2 files changed, 17 insertions(+)
----------------------------------------------------------------------