You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/09/09 14:03:52 UTC

[1/4] incubator-beam git commit: [flink] use exploded WindowValue in FlinkDoFnFunction

Repository: incubator-beam
Updated Branches:
  refs/heads/master 817515fe4 -> a96ea98a4


[flink] use exploded WindowValue in FlinkDoFnFunction


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3461ce21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3461ce21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3461ce21

Branch: refs/heads/master
Commit: 3461ce21b8b88de18154de777e21dc7af889f2c7
Parents: 26635d7
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 7 14:49:02 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Sep 7 14:49:02 2016 +0200

----------------------------------------------------------------------
 .../runners/flink/translation/functions/FlinkDoFnFunction.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3461ce21/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index ac5b345..798a23c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -86,7 +86,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
       // is in only one window
       for (WindowedValue<InputT> value : values) {
         for (WindowedValue<InputT> explodedValue : value.explodeWindows()) {
-          context = context.forWindowedValue(value);
+          context = context.forWindowedValue(explodedValue);
           doFn.processElement(context);
         }
       }


[2/4] incubator-beam git commit: [BEAM-617][flink] introduce option to set state backend

Posted by mx...@apache.org.
[BEAM-617][flink] introduce option to set state backend


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d4f85912
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d4f85912
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d4f85912

Branch: refs/heads/master
Commit: d4f85912effd2c04cac99d693a87bf6e2d597e9c
Parents: be689df
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Sep 6 16:25:32 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Sep 9 15:59:48 2016 +0200

----------------------------------------------------------------------
 .../runners/flink/FlinkPipelineExecutionEnvironment.java |  7 +++++++
 .../apache/beam/runners/flink/FlinkPipelineOptions.java  | 11 +++++++++++
 2 files changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d4f85912/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index a5d33b4..391c3f2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.CollectionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.slf4j.Logger;
@@ -221,6 +222,12 @@ public class FlinkPipelineExecutionEnvironment {
       flinkStreamEnv.enableCheckpointing(checkpointInterval);
     }
 
+    // State backend
+    final AbstractStateBackend stateBackend = options.getStateBackend();
+    if (stateBackend != null) {
+      flinkStreamEnv.setStateBackend(stateBackend);
+    }
+
     return flinkStreamEnv;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d4f85912/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 1fb23ec..a067e76 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.flink.runtime.state.AbstractStateBackend;
 
 /**
  * Options which can be used to configure a Flink PipelineRunner.
@@ -82,4 +83,14 @@ public interface FlinkPipelineOptions
   Long getExecutionRetryDelay();
   void setExecutionRetryDelay(Long delay);
 
+  /**
+   * Sets a state backend to store Beam's state during computation.
+   * Note: Only applicable when executing in streaming mode.
+   * @param stateBackend The state backend to use
+   */
+  @Description("Sets the state backend to use in streaming mode. "
+      + "Otherwise the default is read from the Flink config.")
+  void setStateBackend(AbstractStateBackend stateBackend);
+  AbstractStateBackend getStateBackend();
+
 }


[4/4] incubator-beam git commit: This closes #928

Posted by mx...@apache.org.
This closes #928


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a96ea98a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a96ea98a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a96ea98a

Branch: refs/heads/master
Commit: a96ea98a48c2fc7e95bdb6265ccf421355584c4d
Parents: 0399dbc 3461ce2
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Sep 9 16:01:29 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Sep 9 16:01:29 2016 +0200

----------------------------------------------------------------------
 .../runners/flink/translation/functions/FlinkDoFnFunction.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------



[3/4] incubator-beam git commit: This closes #923

Posted by mx...@apache.org.
This closes #923


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0399dbc7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0399dbc7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0399dbc7

Branch: refs/heads/master
Commit: 0399dbc7a843e95ceacf9eff9fc751751f8f4bcc
Parents: 817515f d4f8591
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Sep 9 16:00:33 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Sep 9 16:00:33 2016 +0200

----------------------------------------------------------------------
 .../runners/flink/FlinkPipelineExecutionEnvironment.java |  7 +++++++
 .../apache/beam/runners/flink/FlinkPipelineOptions.java  | 11 +++++++++++
 2 files changed, 18 insertions(+)
----------------------------------------------------------------------