You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2018/05/04 09:46:44 UTC
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
GitHub user kl0u opened a pull request:
https://github.com/apache/flink/pull/5955
[FLINK-8659] Add migration itcases for broadcast state.
As the name implies, this PR add migration tests for the newly introduced broadcast state.
For the `scala` case, more refactoring is required so that the shared code between the tests is better distributed, but this is a broader refactoring. It requires the same work that was done for the previous case of the `java` migration tests.
R @aljoscha
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/kl0u/flink migration-inv
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5955.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5955
----
commit 9ae20e37b557e9ca482bd61cb57e8a6001a7eb6e
Author: kkloudas <kk...@...>
Date: 2018-05-03T08:05:13Z
[FLINK-8659] Add migration itcases for broadcast state.
----
---
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5955
---
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5955#discussion_r188511280
--- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase {
+
+ private static final int NUM_SOURCE_ELEMENTS = 4;
+
+ // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+ private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode =
+ StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+ @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+ public static Collection<Tuple2<MigrationVersion, String>> parameters () {
+ return Arrays.asList(
+ Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+ Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+ }
+
+ private final MigrationVersion testMigrateVersion;
+ private final String testStateBackend;
+
+ public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception {
+ this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+ this.testStateBackend = testMigrateVersionAndBackend.f1;
+ }
+
+ @Test
+ public void testSavepoint() throws Exception {
+
+ final int parallelism = 4;
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ switch (testStateBackend) {
+ case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+ env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+ break;
+ case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
+ env.setStateBackend(new MemoryStateBackend());
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ env.enableCheckpointing(500);
+ env.setParallelism(parallelism);
+ env.setMaxParallelism(parallelism);
+
+ SourceFunction<Tuple2<Long, Long>> nonParallelSource;
+ SourceFunction<Tuple2<Long, Long>> nonParallelSourceB;
+ SourceFunction<Tuple2<Long, Long>> parallelSource;
+ SourceFunction<Tuple2<Long, Long>> parallelSourceB;
+ RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMap;
+ OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> timelyOperator;
+ KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> firstBroadcastFunction;
+ KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> secondBroadcastFunction;
+
+ final Map<Long, Long> expectedFirstState = new HashMap<>();
+ expectedFirstState.put(0L, 0L);
+ expectedFirstState.put(1L, 1L);
+ expectedFirstState.put(2L, 2L);
+ expectedFirstState.put(3L, 3L);
+
+ final Map<String, String> expectedSecondState = new HashMap<>();
+ expectedSecondState.put("0", "0");
+ expectedSecondState.put("1", "1");
+ expectedSecondState.put("2", "2");
+ expectedSecondState.put("3", "3");
+
+ final Map<String, String> expectedThirdState = new HashMap<>();
+ expectedThirdState.put("0", "0");
+ expectedThirdState.put("1", "1");
+ expectedThirdState.put("2", "2");
+ expectedThirdState.put("3", "3");
+
+ if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
+ nonParallelSource = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+ nonParallelSourceB = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+ parallelSource = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
+ parallelSourceB = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
+ flatMap = new MigrationTestUtils.CheckpointingKeyedStateFlatMap();
+ timelyOperator = new MigrationTestUtils.CheckpointingTimelyStatefulOperator();
+ firstBroadcastFunction = new KeyedBroadcastFunction();
+ secondBroadcastFunction = new KeyedSingleBroadcastFunction();
--- End diff --
Same goes here; consistent naming
---
[GitHub] flink issue #5955: [FLINK-8659] Add migration itcases for broadcast state.
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:
https://github.com/apache/flink/pull/5955
Could you review it @aljoscha ?
---
[GitHub] flink issue #5955: [FLINK-8659] Add migration itcases for broadcast state.
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:
https://github.com/apache/flink/pull/5955
Could you review it @tzulitai ?
---
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5955#discussion_r188511258
--- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase {
+
+ private static final int NUM_SOURCE_ELEMENTS = 4;
+
+ // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+ private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode =
+ StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+ @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+ public static Collection<Tuple2<MigrationVersion, String>> parameters () {
+ return Arrays.asList(
+ Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+ Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+ }
+
+ private final MigrationVersion testMigrateVersion;
+ private final String testStateBackend;
+
+ public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception {
+ this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+ this.testStateBackend = testMigrateVersionAndBackend.f1;
+ }
+
+ @Test
+ public void testSavepoint() throws Exception {
+
+ final int parallelism = 4;
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ switch (testStateBackend) {
+ case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+ env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+ break;
+ case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
+ env.setStateBackend(new MemoryStateBackend());
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ env.enableCheckpointing(500);
+ env.setParallelism(parallelism);
+ env.setMaxParallelism(parallelism);
+
+ SourceFunction<Tuple2<Long, Long>> nonParallelSource;
+ SourceFunction<Tuple2<Long, Long>> nonParallelSourceB;
+ SourceFunction<Tuple2<Long, Long>> parallelSource;
+ SourceFunction<Tuple2<Long, Long>> parallelSourceB;
+ RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMap;
+ OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> timelyOperator;
+ KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> firstBroadcastFunction;
+ KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> secondBroadcastFunction;
+
+ final Map<Long, Long> expectedFirstState = new HashMap<>();
+ expectedFirstState.put(0L, 0L);
+ expectedFirstState.put(1L, 1L);
+ expectedFirstState.put(2L, 2L);
+ expectedFirstState.put(3L, 3L);
+
+ final Map<String, String> expectedSecondState = new HashMap<>();
+ expectedSecondState.put("0", "0");
+ expectedSecondState.put("1", "1");
+ expectedSecondState.put("2", "2");
+ expectedSecondState.put("3", "3");
+
+ final Map<String, String> expectedThirdState = new HashMap<>();
+ expectedThirdState.put("0", "0");
+ expectedThirdState.put("1", "1");
+ expectedThirdState.put("2", "2");
+ expectedThirdState.put("3", "3");
+
+ if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
+ nonParallelSource = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+ nonParallelSourceB = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+ parallelSource = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
+ parallelSourceB = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
+ flatMap = new MigrationTestUtils.CheckpointingKeyedStateFlatMap();
+ timelyOperator = new MigrationTestUtils.CheckpointingTimelyStatefulOperator();
+ firstBroadcastFunction = new KeyedBroadcastFunction();
--- End diff --
If we decide to continue with the `CheckpointX` and `CheckingX` counter-part operator approach, we should maybe name this as `CheckpointKeyedBroadcastFunction` to be consistent.
---
[GitHub] flink issue #5955: [FLINK-8659] Add migration itcases for broadcast state.
Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:
https://github.com/apache/flink/pull/5955
@kl0u yes, lets do that as a separate commit then.
+1, this looks good to me.
One final comment for the merge:
When merging to `master`, we should have test savepoints for both `1.5` (taken in the release-1.5 branch), and `1.6` (taken in the current `master`) branch.
---
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5955#discussion_r188512862
--- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase {
+
+ private static final int NUM_SOURCE_ELEMENTS = 4;
+
+ // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+ private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode =
+ StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+ @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+ public static Collection<Tuple2<MigrationVersion, String>> parameters () {
+ return Arrays.asList(
+ Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+ Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+ }
+
+ private final MigrationVersion testMigrateVersion;
+ private final String testStateBackend;
+
+ public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception {
+ this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+ this.testStateBackend = testMigrateVersionAndBackend.f1;
+ }
+
+ @Test
+ public void testSavepoint() throws Exception {
+
+ final int parallelism = 4;
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ switch (testStateBackend) {
+ case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+ env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+ break;
+ case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
+ env.setStateBackend(new MemoryStateBackend());
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ env.enableCheckpointing(500);
+ env.setParallelism(parallelism);
+ env.setMaxParallelism(parallelism);
+
+ SourceFunction<Tuple2<Long, Long>> nonParallelSource;
+ SourceFunction<Tuple2<Long, Long>> nonParallelSourceB;
+ SourceFunction<Tuple2<Long, Long>> parallelSource;
+ SourceFunction<Tuple2<Long, Long>> parallelSourceB;
+ RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMap;
+ OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> timelyOperator;
+ KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> firstBroadcastFunction;
+ KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> secondBroadcastFunction;
+
+ final Map<Long, Long> expectedFirstState = new HashMap<>();
+ expectedFirstState.put(0L, 0L);
+ expectedFirstState.put(1L, 1L);
+ expectedFirstState.put(2L, 2L);
+ expectedFirstState.put(3L, 3L);
+
+ final Map<String, String> expectedSecondState = new HashMap<>();
+ expectedSecondState.put("0", "0");
+ expectedSecondState.put("1", "1");
+ expectedSecondState.put("2", "2");
+ expectedSecondState.put("3", "3");
+
+ final Map<String, String> expectedThirdState = new HashMap<>();
+ expectedThirdState.put("0", "0");
+ expectedThirdState.put("1", "1");
+ expectedThirdState.put("2", "2");
+ expectedThirdState.put("3", "3");
+
+ if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
+ nonParallelSource = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+ nonParallelSourceB = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+ parallelSource = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
+ parallelSourceB = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
+ flatMap = new MigrationTestUtils.CheckpointingKeyedStateFlatMap();
+ timelyOperator = new MigrationTestUtils.CheckpointingTimelyStatefulOperator();
+ firstBroadcastFunction = new KeyedBroadcastFunction();
+ secondBroadcastFunction = new KeyedSingleBroadcastFunction();
+ } else if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT) {
+ nonParallelSource = new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+ nonParallelSourceB = new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+ parallelSource = new MigrationTestUtils.CheckingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
+ parallelSourceB = new MigrationTestUtils.CheckingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
+ flatMap = new MigrationTestUtils.CheckingKeyedStateFlatMap();
--- End diff --
Same here; maybe this can be removed
---
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/5955#discussion_r188614213
--- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase {
+
+ private static final int NUM_SOURCE_ELEMENTS = 4;
+
+ // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+ private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode =
+ StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+ @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+ public static Collection<Tuple2<MigrationVersion, String>> parameters () {
+ return Arrays.asList(
+ Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+ Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+ }
+
+ private final MigrationVersion testMigrateVersion;
+ private final String testStateBackend;
+
+ public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception {
+ this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+ this.testStateBackend = testMigrateVersionAndBackend.f1;
+ }
+
+ @Test
+ public void testSavepoint() throws Exception {
+
+ final int parallelism = 4;
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ switch (testStateBackend) {
+ case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+ env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+ break;
+ case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
+ env.setStateBackend(new MemoryStateBackend());
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ env.enableCheckpointing(500);
+ env.setParallelism(parallelism);
+ env.setMaxParallelism(parallelism);
+
+ SourceFunction<Tuple2<Long, Long>> nonParallelSource;
+ SourceFunction<Tuple2<Long, Long>> nonParallelSourceB;
+ SourceFunction<Tuple2<Long, Long>> parallelSource;
+ SourceFunction<Tuple2<Long, Long>> parallelSourceB;
+ RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMap;
+ OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> timelyOperator;
+ KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> firstBroadcastFunction;
+ KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> secondBroadcastFunction;
+
+ final Map<Long, Long> expectedFirstState = new HashMap<>();
+ expectedFirstState.put(0L, 0L);
+ expectedFirstState.put(1L, 1L);
+ expectedFirstState.put(2L, 2L);
+ expectedFirstState.put(3L, 3L);
+
+ final Map<String, String> expectedSecondState = new HashMap<>();
+ expectedSecondState.put("0", "0");
+ expectedSecondState.put("1", "1");
+ expectedSecondState.put("2", "2");
+ expectedSecondState.put("3", "3");
+
+ final Map<String, String> expectedThirdState = new HashMap<>();
+ expectedThirdState.put("0", "0");
+ expectedThirdState.put("1", "1");
+ expectedThirdState.put("2", "2");
+ expectedThirdState.put("3", "3");
+
+ if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
+ nonParallelSource = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+ nonParallelSourceB = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+ parallelSource = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
+ parallelSourceB = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
+ flatMap = new MigrationTestUtils.CheckpointingKeyedStateFlatMap();
+ timelyOperator = new MigrationTestUtils.CheckpointingTimelyStatefulOperator();
+ firstBroadcastFunction = new KeyedBroadcastFunction();
+ secondBroadcastFunction = new KeyedSingleBroadcastFunction();
+ } else if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT) {
+ nonParallelSource = new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+ nonParallelSourceB = new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+ parallelSource = new MigrationTestUtils.CheckingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
+ parallelSourceB = new MigrationTestUtils.CheckingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
+ flatMap = new MigrationTestUtils.CheckingKeyedStateFlatMap();
+ timelyOperator = new MigrationTestUtils.CheckingTimelyStatefulOperator();
+ firstBroadcastFunction = new CheckingKeyedBroadcastFunction(expectedFirstState, expectedSecondState);
--- End diff --
The reason I let it is to verify that the `BroadcastState` plays well with other states collocated on the same operator.
---
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5955#discussion_r188512949
--- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase {
+
+ private static final int NUM_SOURCE_ELEMENTS = 4;
+
+ // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+ private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode =
+ StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+ @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+ public static Collection<Tuple2<MigrationVersion, String>> parameters () {
+ return Arrays.asList(
+ Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+ Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+ }
+
+ private final MigrationVersion testMigrateVersion;
+ private final String testStateBackend;
+
+ public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception {
+ this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+ this.testStateBackend = testMigrateVersionAndBackend.f1;
+ }
+
+ @Test
+ public void testSavepoint() throws Exception {
+
+ final int parallelism = 4;
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ switch (testStateBackend) {
+ case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+ env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+ break;
+ case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
+ env.setStateBackend(new MemoryStateBackend());
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ env.enableCheckpointing(500);
+ env.setParallelism(parallelism);
+ env.setMaxParallelism(parallelism);
+
+ SourceFunction<Tuple2<Long, Long>> nonParallelSource;
+ SourceFunction<Tuple2<Long, Long>> nonParallelSourceB;
+ SourceFunction<Tuple2<Long, Long>> parallelSource;
+ SourceFunction<Tuple2<Long, Long>> parallelSourceB;
+ RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMap;
+ OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> timelyOperator;
+ KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> firstBroadcastFunction;
+ KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> secondBroadcastFunction;
+
+ final Map<Long, Long> expectedFirstState = new HashMap<>();
+ expectedFirstState.put(0L, 0L);
+ expectedFirstState.put(1L, 1L);
+ expectedFirstState.put(2L, 2L);
+ expectedFirstState.put(3L, 3L);
+
+ final Map<String, String> expectedSecondState = new HashMap<>();
+ expectedSecondState.put("0", "0");
+ expectedSecondState.put("1", "1");
+ expectedSecondState.put("2", "2");
+ expectedSecondState.put("3", "3");
+
+ final Map<String, String> expectedThirdState = new HashMap<>();
+ expectedThirdState.put("0", "0");
+ expectedThirdState.put("1", "1");
+ expectedThirdState.put("2", "2");
+ expectedThirdState.put("3", "3");
+
+ if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
+ nonParallelSource = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+ nonParallelSourceB = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+ parallelSource = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
+ parallelSourceB = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
+ flatMap = new MigrationTestUtils.CheckpointingKeyedStateFlatMap();
+ timelyOperator = new MigrationTestUtils.CheckpointingTimelyStatefulOperator();
+ firstBroadcastFunction = new KeyedBroadcastFunction();
+ secondBroadcastFunction = new KeyedSingleBroadcastFunction();
+ } else if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT) {
+ nonParallelSource = new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+ nonParallelSourceB = new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+ parallelSource = new MigrationTestUtils.CheckingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
+ parallelSourceB = new MigrationTestUtils.CheckingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
+ flatMap = new MigrationTestUtils.CheckingKeyedStateFlatMap();
+ timelyOperator = new MigrationTestUtils.CheckingTimelyStatefulOperator();
+ firstBroadcastFunction = new CheckingKeyedBroadcastFunction(expectedFirstState, expectedSecondState);
--- End diff --
Actually the only state this test should cover is the broadcast ones, no?
Why do we need to add all the others (apart from the source)?
---
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5955#discussion_r188511081
--- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase {
+
+ private static final int NUM_SOURCE_ELEMENTS = 4;
+
+ // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+ private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode =
+ StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+ @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+ public static Collection<Tuple2<MigrationVersion, String>> parameters () {
+ return Arrays.asList(
+ Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+ Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+ }
+
+ private final MigrationVersion testMigrateVersion;
+ private final String testStateBackend;
+
+ public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception {
+ this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+ this.testStateBackend = testMigrateVersionAndBackend.f1;
+ }
+
+ @Test
+ public void testSavepoint() throws Exception {
+
+ final int parallelism = 4;
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ switch (testStateBackend) {
+ case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+ env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+ break;
+ case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
+ env.setStateBackend(new MemoryStateBackend());
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ env.enableCheckpointing(500);
+ env.setParallelism(parallelism);
+ env.setMaxParallelism(parallelism);
+
+ SourceFunction<Tuple2<Long, Long>> nonParallelSource;
+ SourceFunction<Tuple2<Long, Long>> nonParallelSourceB;
+ SourceFunction<Tuple2<Long, Long>> parallelSource;
+ SourceFunction<Tuple2<Long, Long>> parallelSourceB;
+ RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMap;
+ OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> timelyOperator;
+ KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> firstBroadcastFunction;
+ KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> secondBroadcastFunction;
+
+ final Map<Long, Long> expectedFirstState = new HashMap<>();
+ expectedFirstState.put(0L, 0L);
+ expectedFirstState.put(1L, 1L);
+ expectedFirstState.put(2L, 2L);
+ expectedFirstState.put(3L, 3L);
+
+ final Map<String, String> expectedSecondState = new HashMap<>();
+ expectedSecondState.put("0", "0");
+ expectedSecondState.put("1", "1");
+ expectedSecondState.put("2", "2");
+ expectedSecondState.put("3", "3");
+
+ final Map<String, String> expectedThirdState = new HashMap<>();
+ expectedThirdState.put("0", "0");
+ expectedThirdState.put("1", "1");
+ expectedThirdState.put("2", "2");
+ expectedThirdState.put("3", "3");
+
+ if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
--- End diff --
Just as a separate suggestion that doesn't really need to be for this PR:
We can maybe reduce quite a bit of LOC, if we simply have a `isVerification` flag in all the operators, instead of having separate `CheckpointingX` and `CheckingX` operator variants.
---
[GitHub] flink issue #5955: [FLINK-8659] Add migration itcases for broadcast state.
Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:
https://github.com/apache/flink/pull/5955
Hi @tzulitai ! Thanks for the review. I integrated most of your comments. The only one I left out is the one about merging the checkpointing and the checking. I am not against that. It is just that the way it is now, it is aligned with the `StatefulJobSavepointMigrationITCase`. If it were to make the change, then we should change both and I would prefer to do it in a separate commit.
Let me know what do you think about the current changes and if you are ok, I can merge.
---
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5955#discussion_r188512804
--- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase {
+
+ private static final int NUM_SOURCE_ELEMENTS = 4;
+
+ // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+ private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode =
+ StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+ @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+ public static Collection<Tuple2<MigrationVersion, String>> parameters () {
+ return Arrays.asList(
+ Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+ Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+ }
+
+ private final MigrationVersion testMigrateVersion;
+ private final String testStateBackend;
+
+ public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception {
+ this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+ this.testStateBackend = testMigrateVersionAndBackend.f1;
+ }
+
+ @Test
+ public void testSavepoint() throws Exception {
+
+ final int parallelism = 4;
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ switch (testStateBackend) {
+ case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+ env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+ break;
+ case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
+ env.setStateBackend(new MemoryStateBackend());
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ env.enableCheckpointing(500);
+ env.setParallelism(parallelism);
+ env.setMaxParallelism(parallelism);
+
+ SourceFunction<Tuple2<Long, Long>> nonParallelSource;
+ SourceFunction<Tuple2<Long, Long>> nonParallelSourceB;
+ SourceFunction<Tuple2<Long, Long>> parallelSource;
+ SourceFunction<Tuple2<Long, Long>> parallelSourceB;
+ RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMap;
+ OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> timelyOperator;
+ KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> firstBroadcastFunction;
+ KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> secondBroadcastFunction;
+
+ final Map<Long, Long> expectedFirstState = new HashMap<>();
+ expectedFirstState.put(0L, 0L);
+ expectedFirstState.put(1L, 1L);
+ expectedFirstState.put(2L, 2L);
+ expectedFirstState.put(3L, 3L);
+
+ final Map<String, String> expectedSecondState = new HashMap<>();
+ expectedSecondState.put("0", "0");
+ expectedSecondState.put("1", "1");
+ expectedSecondState.put("2", "2");
+ expectedSecondState.put("3", "3");
+
+ final Map<String, String> expectedThirdState = new HashMap<>();
+ expectedThirdState.put("0", "0");
+ expectedThirdState.put("1", "1");
+ expectedThirdState.put("2", "2");
+ expectedThirdState.put("3", "3");
+
+ if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
+ nonParallelSource = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+ nonParallelSourceB = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+ parallelSource = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
+ parallelSourceB = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
+ flatMap = new MigrationTestUtils.CheckpointingKeyedStateFlatMap();
+ timelyOperator = new MigrationTestUtils.CheckpointingTimelyStatefulOperator();
+ firstBroadcastFunction = new KeyedBroadcastFunction();
+ secondBroadcastFunction = new KeyedSingleBroadcastFunction();
+ } else if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT) {
+ nonParallelSource = new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+ nonParallelSourceB = new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
+ parallelSource = new MigrationTestUtils.CheckingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
--- End diff --
Do we need union list state in this test?
---