You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/01/23 16:39:07 UTC

[1/2] beam git commit: [BEAM-1273] Error with FlinkPipelineOptions serialization after setStateBackend

Repository: beam
Updated Branches:
  refs/heads/master ddde35327 -> 9db5f746a


[BEAM-1273] Error with FlinkPipelineOptions serialization after setStateBackend


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b5bbadf5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b5bbadf5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b5bbadf5

Branch: refs/heads/master
Commit: b5bbadf59625563d1755f66a24d27c96c5fd3492
Parents: ddde353
Author: Alexey Diomin <di...@gmail.com>
Authored: Mon Jan 16 14:46:08 2017 +0400
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jan 23 17:22:58 2017 +0100

----------------------------------------------------------------------
 .../beam/runners/flink/FlinkPipelineOptions.java       |  6 +++---
 .../apache/beam/runners/flink/PipelineOptionsTest.java | 13 +++++++++++++
 2 files changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b5bbadf5/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 3bb358e..ef9afea 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -89,13 +89,13 @@ public interface FlinkPipelineOptions
   void setObjectReuse(Boolean reuse);
 
   /**
-   * Sets a state backend to store Beam's state during computation.
+   * State backend to store Beam's state during computation.
    * Note: Only applicable when executing in streaming mode.
-   * @param stateBackend The state backend to use
    */
   @Description("Sets the state backend to use in streaming mode. "
       + "Otherwise the default is read from the Flink config.")
-  void setStateBackend(AbstractStateBackend stateBackend);
+  @JsonIgnore
   AbstractStateBackend getStateBackend();
+  void setStateBackend(AbstractStateBackend stateBackend);
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b5bbadf5/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 4c97cc7..23bc6a2 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.flink;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Collections;
@@ -40,6 +41,7 @@ import org.apache.commons.lang.SerializationUtils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.joda.time.Instant;
@@ -80,6 +82,17 @@ public class PipelineOptionsTest {
   }
 
   @Test
+  public void testIgnoredFieldSerialization() {
+    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setStateBackend(new MemoryStateBackend());
+
+    FlinkPipelineOptions deserialized =
+        new SerializedPipelineOptions(options).getPipelineOptions().as(FlinkPipelineOptions.class);
+
+    assertNull(deserialized.getStateBackend());
+  }
+
+  @Test
   public void testCaching() {
     PipelineOptions deserializedOptions =
         serializedOptions.getPipelineOptions().as(PipelineOptions.class);


[2/2] beam git commit: This closes #1779

Posted by al...@apache.org.
This closes #1779


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9db5f746
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9db5f746
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9db5f746

Branch: refs/heads/master
Commit: 9db5f746ae81c4fa755a9a8f2c4888759c8e7042
Parents: ddde353 b5bbadf
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jan 23 17:38:37 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jan 23 17:38:37 2017 +0100

----------------------------------------------------------------------
 .../beam/runners/flink/FlinkPipelineOptions.java       |  6 +++---
 .../apache/beam/runners/flink/PipelineOptionsTest.java | 13 +++++++++++++
 2 files changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------