You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/01/23 16:39:07 UTC
[1/2] beam git commit: [BEAM-1273] Error with FlinkPipelineOptions
serialization after setStateBackend
Repository: beam
Updated Branches:
refs/heads/master ddde35327 -> 9db5f746a
[BEAM-1273] Error with FlinkPipelineOptions serialization after setStateBackend
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b5bbadf5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b5bbadf5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b5bbadf5
Branch: refs/heads/master
Commit: b5bbadf59625563d1755f66a24d27c96c5fd3492
Parents: ddde353
Author: Alexey Diomin <di...@gmail.com>
Authored: Mon Jan 16 14:46:08 2017 +0400
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jan 23 17:22:58 2017 +0100
----------------------------------------------------------------------
.../beam/runners/flink/FlinkPipelineOptions.java | 6 +++---
.../apache/beam/runners/flink/PipelineOptionsTest.java | 13 +++++++++++++
2 files changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b5bbadf5/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 3bb358e..ef9afea 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -89,13 +89,13 @@ public interface FlinkPipelineOptions
void setObjectReuse(Boolean reuse);
/**
- * Sets a state backend to store Beam's state during computation.
+ * State backend to store Beam's state during computation.
* Note: Only applicable when executing in streaming mode.
- * @param stateBackend The state backend to use
*/
@Description("Sets the state backend to use in streaming mode. "
+ "Otherwise the default is read from the Flink config.")
- void setStateBackend(AbstractStateBackend stateBackend);
+ @JsonIgnore
AbstractStateBackend getStateBackend();
+ void setStateBackend(AbstractStateBackend stateBackend);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b5bbadf5/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 4c97cc7..23bc6a2 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.flink;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.Collections;
@@ -40,6 +41,7 @@ import org.apache.commons.lang.SerializationUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.joda.time.Instant;
@@ -80,6 +82,17 @@ public class PipelineOptionsTest {
}
@Test
+ public void testIgnoredFieldSerialization() {
+ FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setStateBackend(new MemoryStateBackend());
+
+ FlinkPipelineOptions deserialized =
+ new SerializedPipelineOptions(options).getPipelineOptions().as(FlinkPipelineOptions.class);
+
+ assertNull(deserialized.getStateBackend());
+ }
+
+ @Test
public void testCaching() {
PipelineOptions deserializedOptions =
serializedOptions.getPipelineOptions().as(PipelineOptions.class);
[2/2] beam git commit: This closes #1779
Posted by al...@apache.org.
This closes #1779
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9db5f746
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9db5f746
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9db5f746
Branch: refs/heads/master
Commit: 9db5f746ae81c4fa755a9a8f2c4888759c8e7042
Parents: ddde353 b5bbadf
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jan 23 17:38:37 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jan 23 17:38:37 2017 +0100
----------------------------------------------------------------------
.../beam/runners/flink/FlinkPipelineOptions.java | 6 +++---
.../apache/beam/runners/flink/PipelineOptionsTest.java | 13 +++++++++++++
2 files changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------