You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/09/09 14:03:52 UTC
[1/4] incubator-beam git commit: [flink] use exploded WindowValue in
FlinkDoFnFunction
Repository: incubator-beam
Updated Branches:
refs/heads/master 817515fe4 -> a96ea98a4
[flink] use exploded WindowValue in FlinkDoFnFunction
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3461ce21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3461ce21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3461ce21
Branch: refs/heads/master
Commit: 3461ce21b8b88de18154de777e21dc7af889f2c7
Parents: 26635d7
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 7 14:49:02 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Sep 7 14:49:02 2016 +0200
----------------------------------------------------------------------
.../runners/flink/translation/functions/FlinkDoFnFunction.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3461ce21/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index ac5b345..798a23c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -86,7 +86,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
// is in only one window
for (WindowedValue<InputT> value : values) {
for (WindowedValue<InputT> explodedValue : value.explodeWindows()) {
- context = context.forWindowedValue(value);
+ context = context.forWindowedValue(explodedValue);
doFn.processElement(context);
}
}
[2/4] incubator-beam git commit: [BEAM-617][flink] introduce option
to set state backend
Posted by mx...@apache.org.
[BEAM-617][flink] introduce option to set state backend
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d4f85912
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d4f85912
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d4f85912
Branch: refs/heads/master
Commit: d4f85912effd2c04cac99d693a87bf6e2d597e9c
Parents: be689df
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Sep 6 16:25:32 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Sep 9 15:59:48 2016 +0200
----------------------------------------------------------------------
.../runners/flink/FlinkPipelineExecutionEnvironment.java | 7 +++++++
.../apache/beam/runners/flink/FlinkPipelineOptions.java | 11 +++++++++++
2 files changed, 18 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d4f85912/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index a5d33b4..391c3f2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.CollectionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
@@ -221,6 +222,12 @@ public class FlinkPipelineExecutionEnvironment {
flinkStreamEnv.enableCheckpointing(checkpointInterval);
}
+ // State backend
+ final AbstractStateBackend stateBackend = options.getStateBackend();
+ if (stateBackend != null) {
+ flinkStreamEnv.setStateBackend(stateBackend);
+ }
+
return flinkStreamEnv;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d4f85912/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 1fb23ec..a067e76 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.flink.runtime.state.AbstractStateBackend;
/**
* Options which can be used to configure a Flink PipelineRunner.
@@ -82,4 +83,14 @@ public interface FlinkPipelineOptions
Long getExecutionRetryDelay();
void setExecutionRetryDelay(Long delay);
+ /**
+ * Sets a state backend to store Beam's state during computation.
+ * Note: Only applicable when executing in streaming mode.
+ * @param stateBackend The state backend to use
+ */
+ @Description("Sets the state backend to use in streaming mode. "
+ + "Otherwise the default is read from the Flink config.")
+ void setStateBackend(AbstractStateBackend stateBackend);
+ AbstractStateBackend getStateBackend();
+
}
[4/4] incubator-beam git commit: This closes #928
Posted by mx...@apache.org.
This closes #928
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a96ea98a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a96ea98a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a96ea98a
Branch: refs/heads/master
Commit: a96ea98a48c2fc7e95bdb6265ccf421355584c4d
Parents: 0399dbc 3461ce2
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Sep 9 16:01:29 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Sep 9 16:01:29 2016 +0200
----------------------------------------------------------------------
.../runners/flink/translation/functions/FlinkDoFnFunction.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
[3/4] incubator-beam git commit: This closes #923
Posted by mx...@apache.org.
This closes #923
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0399dbc7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0399dbc7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0399dbc7
Branch: refs/heads/master
Commit: 0399dbc7a843e95ceacf9eff9fc751751f8f4bcc
Parents: 817515f d4f8591
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Sep 9 16:00:33 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Sep 9 16:00:33 2016 +0200
----------------------------------------------------------------------
.../runners/flink/FlinkPipelineExecutionEnvironment.java | 7 +++++++
.../apache/beam/runners/flink/FlinkPipelineOptions.java | 11 +++++++++++
2 files changed, 18 insertions(+)
----------------------------------------------------------------------