You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2018/05/04 09:46:44 UTC

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

GitHub user kl0u opened a pull request:

    https://github.com/apache/flink/pull/5955

    [FLINK-8659] Add migration itcases for broadcast state.

    As the name implies, this PR add migration tests for the newly introduced broadcast state.
    
    For the `scala` case, more refactoring is required so that the shared code between the tests is better distributed, but this is a broader refactoring. It requires the same work that was done for the previous case of the `java` migration tests.
    
    R @aljoscha 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink migration-inv

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5955.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5955
    
----
commit 9ae20e37b557e9ca482bd61cb57e8a6001a7eb6e
Author: kkloudas <kk...@...>
Date:   2018-05-03T08:05:13Z

    [FLINK-8659] Add migration itcases for broadcast state.

----


---

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5955


---

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5955#discussion_r188511280
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java ---
    @@ -0,0 +1,418 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing.utils;
    +
    +import org.apache.flink.api.common.functions.RichFlatMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeHint;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
    +import org.apache.flink.runtime.state.StateBackendLoader;
    +import org.apache.flink.runtime.state.memory.MemoryStateBackend;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.datastream.BroadcastStream;
    +import org.apache.flink.streaming.api.datastream.KeyedStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.util.migration.MigrationVersion;
    +import org.apache.flink.util.Collector;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially)
    + * cover migrating for multiple previous Flink versions, as well as for different state backends.
    + */
    +@RunWith(Parameterized.class)
    +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase {
    +
    +	private static final int NUM_SOURCE_ELEMENTS = 4;
    +
    +	// TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
    +	private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode =
    +			StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
    +
    +	@Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
    +	public static Collection<Tuple2<MigrationVersion, String>> parameters () {
    +		return Arrays.asList(
    +				Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
    +				Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
    +	}
    +
    +	private final MigrationVersion testMigrateVersion;
    +	private final String testStateBackend;
    +
    +	public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception {
    +		this.testMigrateVersion = testMigrateVersionAndBackend.f0;
    +		this.testStateBackend = testMigrateVersionAndBackend.f1;
    +	}
    +
    +	@Test
    +	public void testSavepoint() throws Exception {
    +
    +		final int parallelism = 4;
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setRestartStrategy(RestartStrategies.noRestart());
    +		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +
    +		switch (testStateBackend) {
    +			case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
    +				env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
    +				break;
    +			case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
    +				env.setStateBackend(new MemoryStateBackend());
    +				break;
    +			default:
    +				throw new UnsupportedOperationException();
    +		}
    +
    +		env.enableCheckpointing(500);
    +		env.setParallelism(parallelism);
    +		env.setMaxParallelism(parallelism);
    +
    +		SourceFunction<Tuple2<Long, Long>> nonParallelSource;
    +		SourceFunction<Tuple2<Long, Long>> nonParallelSourceB;
    +		SourceFunction<Tuple2<Long, Long>> parallelSource;
    +		SourceFunction<Tuple2<Long, Long>> parallelSourceB;
    +		RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMap;
    +		OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> timelyOperator;
    +		KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> firstBroadcastFunction;
    +		KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> secondBroadcastFunction;
    +
    +		final Map<Long, Long> expectedFirstState = new HashMap<>();
    +		expectedFirstState.put(0L, 0L);
    +		expectedFirstState.put(1L, 1L);
    +		expectedFirstState.put(2L, 2L);
    +		expectedFirstState.put(3L, 3L);
    +
    +		final Map<String, String> expectedSecondState = new HashMap<>();
    +		expectedSecondState.put("0", "0");
    +		expectedSecondState.put("1", "1");
    +		expectedSecondState.put("2", "2");
    +		expectedSecondState.put("3", "3");
    +
    +		final Map<String, String> expectedThirdState = new HashMap<>();
    +		expectedThirdState.put("0", "0");
    +		expectedThirdState.put("1", "1");
    +		expectedThirdState.put("2", "2");
    +		expectedThirdState.put("3", "3");
    +
    +		if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
    +			nonParallelSource = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +			nonParallelSourceB = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +			parallelSource = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +			parallelSourceB = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +			flatMap = new MigrationTestUtils.CheckpointingKeyedStateFlatMap();
    +			timelyOperator = new MigrationTestUtils.CheckpointingTimelyStatefulOperator();
    +			firstBroadcastFunction = new KeyedBroadcastFunction();
    +			secondBroadcastFunction = new KeyedSingleBroadcastFunction();
    --- End diff --
    
    Same goes here; consistent naming


---

[GitHub] flink issue #5955: [FLINK-8659] Add migration itcases for broadcast state.

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/5955
  
    Could you review it @aljoscha ?


---

[GitHub] flink issue #5955: [FLINK-8659] Add migration itcases for broadcast state.

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/5955
  
    Could you review it @tzulitai ?


---

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5955#discussion_r188511258
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java ---
    @@ -0,0 +1,418 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing.utils;
    +
    +import org.apache.flink.api.common.functions.RichFlatMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeHint;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
    +import org.apache.flink.runtime.state.StateBackendLoader;
    +import org.apache.flink.runtime.state.memory.MemoryStateBackend;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.datastream.BroadcastStream;
    +import org.apache.flink.streaming.api.datastream.KeyedStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.util.migration.MigrationVersion;
    +import org.apache.flink.util.Collector;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially)
    + * cover migrating for multiple previous Flink versions, as well as for different state backends.
    + */
    +@RunWith(Parameterized.class)
    +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase {
    +
    +	private static final int NUM_SOURCE_ELEMENTS = 4;
    +
    +	// TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
    +	private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode =
    +			StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
    +
    +	@Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
    +	public static Collection<Tuple2<MigrationVersion, String>> parameters () {
    +		return Arrays.asList(
    +				Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
    +				Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
    +	}
    +
    +	private final MigrationVersion testMigrateVersion;
    +	private final String testStateBackend;
    +
    +	public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception {
    +		this.testMigrateVersion = testMigrateVersionAndBackend.f0;
    +		this.testStateBackend = testMigrateVersionAndBackend.f1;
    +	}
    +
    +	@Test
    +	public void testSavepoint() throws Exception {
    +
    +		final int parallelism = 4;
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setRestartStrategy(RestartStrategies.noRestart());
    +		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +
    +		switch (testStateBackend) {
    +			case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
    +				env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
    +				break;
    +			case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
    +				env.setStateBackend(new MemoryStateBackend());
    +				break;
    +			default:
    +				throw new UnsupportedOperationException();
    +		}
    +
    +		env.enableCheckpointing(500);
    +		env.setParallelism(parallelism);
    +		env.setMaxParallelism(parallelism);
    +
    +		SourceFunction<Tuple2<Long, Long>> nonParallelSource;
    +		SourceFunction<Tuple2<Long, Long>> nonParallelSourceB;
    +		SourceFunction<Tuple2<Long, Long>> parallelSource;
    +		SourceFunction<Tuple2<Long, Long>> parallelSourceB;
    +		RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMap;
    +		OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> timelyOperator;
    +		KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> firstBroadcastFunction;
    +		KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> secondBroadcastFunction;
    +
    +		final Map<Long, Long> expectedFirstState = new HashMap<>();
    +		expectedFirstState.put(0L, 0L);
    +		expectedFirstState.put(1L, 1L);
    +		expectedFirstState.put(2L, 2L);
    +		expectedFirstState.put(3L, 3L);
    +
    +		final Map<String, String> expectedSecondState = new HashMap<>();
    +		expectedSecondState.put("0", "0");
    +		expectedSecondState.put("1", "1");
    +		expectedSecondState.put("2", "2");
    +		expectedSecondState.put("3", "3");
    +
    +		final Map<String, String> expectedThirdState = new HashMap<>();
    +		expectedThirdState.put("0", "0");
    +		expectedThirdState.put("1", "1");
    +		expectedThirdState.put("2", "2");
    +		expectedThirdState.put("3", "3");
    +
    +		if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
    +			nonParallelSource = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +			nonParallelSourceB = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +			parallelSource = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +			parallelSourceB = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +			flatMap = new MigrationTestUtils.CheckpointingKeyedStateFlatMap();
    +			timelyOperator = new MigrationTestUtils.CheckpointingTimelyStatefulOperator();
    +			firstBroadcastFunction = new KeyedBroadcastFunction();
    --- End diff --
    
    If we decide to continue with the `CheckpointX` and `CheckingX` counter-part operator approach, we should maybe name this as `CheckpointKeyedBroadcastFunction` to be consistent.


---

[GitHub] flink issue #5955: [FLINK-8659] Add migration itcases for broadcast state.

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5955
  
    @kl0u yes, lets do that as a separate commit then.
    
    +1, this looks good to me.
    
    One final comment for the merge:
    When merging to `master`, we should have test savepoints for both `1.5` (taken in the release-1.5 branch), and `1.6` (taken in the current `master`) branch.


---

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5955#discussion_r188512862
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java ---
    @@ -0,0 +1,418 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing.utils;
    +
    +import org.apache.flink.api.common.functions.RichFlatMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeHint;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
    +import org.apache.flink.runtime.state.StateBackendLoader;
    +import org.apache.flink.runtime.state.memory.MemoryStateBackend;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.datastream.BroadcastStream;
    +import org.apache.flink.streaming.api.datastream.KeyedStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.util.migration.MigrationVersion;
    +import org.apache.flink.util.Collector;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially)
    + * cover migrating for multiple previous Flink versions, as well as for different state backends.
    + */
    +@RunWith(Parameterized.class)
    +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase {
    +
    +	private static final int NUM_SOURCE_ELEMENTS = 4;
    +
    +	// TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
    +	private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode =
    +			StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
    +
    +	@Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
    +	public static Collection<Tuple2<MigrationVersion, String>> parameters () {
    +		return Arrays.asList(
    +				Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
    +				Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
    +	}
    +
    +	private final MigrationVersion testMigrateVersion;
    +	private final String testStateBackend;
    +
    +	public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception {
    +		this.testMigrateVersion = testMigrateVersionAndBackend.f0;
    +		this.testStateBackend = testMigrateVersionAndBackend.f1;
    +	}
    +
    +	@Test
    +	public void testSavepoint() throws Exception {
    +
    +		final int parallelism = 4;
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setRestartStrategy(RestartStrategies.noRestart());
    +		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +
    +		switch (testStateBackend) {
    +			case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
    +				env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
    +				break;
    +			case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
    +				env.setStateBackend(new MemoryStateBackend());
    +				break;
    +			default:
    +				throw new UnsupportedOperationException();
    +		}
    +
    +		env.enableCheckpointing(500);
    +		env.setParallelism(parallelism);
    +		env.setMaxParallelism(parallelism);
    +
    +		SourceFunction<Tuple2<Long, Long>> nonParallelSource;
    +		SourceFunction<Tuple2<Long, Long>> nonParallelSourceB;
    +		SourceFunction<Tuple2<Long, Long>> parallelSource;
    +		SourceFunction<Tuple2<Long, Long>> parallelSourceB;
    +		RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMap;
    +		OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> timelyOperator;
    +		KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> firstBroadcastFunction;
    +		KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> secondBroadcastFunction;
    +
    +		final Map<Long, Long> expectedFirstState = new HashMap<>();
    +		expectedFirstState.put(0L, 0L);
    +		expectedFirstState.put(1L, 1L);
    +		expectedFirstState.put(2L, 2L);
    +		expectedFirstState.put(3L, 3L);
    +
    +		final Map<String, String> expectedSecondState = new HashMap<>();
    +		expectedSecondState.put("0", "0");
    +		expectedSecondState.put("1", "1");
    +		expectedSecondState.put("2", "2");
    +		expectedSecondState.put("3", "3");
    +
    +		final Map<String, String> expectedThirdState = new HashMap<>();
    +		expectedThirdState.put("0", "0");
    +		expectedThirdState.put("1", "1");
    +		expectedThirdState.put("2", "2");
    +		expectedThirdState.put("3", "3");
    +
    +		if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
    +			nonParallelSource = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +			nonParallelSourceB = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +			parallelSource = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +			parallelSourceB = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +			flatMap = new MigrationTestUtils.CheckpointingKeyedStateFlatMap();
    +			timelyOperator = new MigrationTestUtils.CheckpointingTimelyStatefulOperator();
    +			firstBroadcastFunction = new KeyedBroadcastFunction();
    +			secondBroadcastFunction = new KeyedSingleBroadcastFunction();
    +		} else if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT) {
    +			nonParallelSource = new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +			nonParallelSourceB = new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +			parallelSource = new MigrationTestUtils.CheckingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +			parallelSourceB = new MigrationTestUtils.CheckingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +			flatMap = new MigrationTestUtils.CheckingKeyedStateFlatMap();
    --- End diff --
    
    Same here; maybe this can be removed


---

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5955#discussion_r188614213
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java ---
    @@ -0,0 +1,418 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing.utils;
    +
    +import org.apache.flink.api.common.functions.RichFlatMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeHint;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
    +import org.apache.flink.runtime.state.StateBackendLoader;
    +import org.apache.flink.runtime.state.memory.MemoryStateBackend;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.datastream.BroadcastStream;
    +import org.apache.flink.streaming.api.datastream.KeyedStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.util.migration.MigrationVersion;
    +import org.apache.flink.util.Collector;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially)
    + * cover migrating for multiple previous Flink versions, as well as for different state backends.
    + */
    +@RunWith(Parameterized.class)
    +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase {
    +
    +	private static final int NUM_SOURCE_ELEMENTS = 4;
    +
    +	// TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
    +	private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode =
    +			StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
    +
    +	@Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
    +	public static Collection<Tuple2<MigrationVersion, String>> parameters () {
    +		return Arrays.asList(
    +				Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
    +				Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
    +	}
    +
    +	private final MigrationVersion testMigrateVersion;
    +	private final String testStateBackend;
    +
    +	public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception {
    +		this.testMigrateVersion = testMigrateVersionAndBackend.f0;
    +		this.testStateBackend = testMigrateVersionAndBackend.f1;
    +	}
    +
    +	@Test
    +	public void testSavepoint() throws Exception {
    +
    +		final int parallelism = 4;
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setRestartStrategy(RestartStrategies.noRestart());
    +		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +
    +		switch (testStateBackend) {
    +			case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
    +				env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
    +				break;
    +			case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
    +				env.setStateBackend(new MemoryStateBackend());
    +				break;
    +			default:
    +				throw new UnsupportedOperationException();
    +		}
    +
    +		env.enableCheckpointing(500);
    +		env.setParallelism(parallelism);
    +		env.setMaxParallelism(parallelism);
    +
    +		SourceFunction<Tuple2<Long, Long>> nonParallelSource;
    +		SourceFunction<Tuple2<Long, Long>> nonParallelSourceB;
    +		SourceFunction<Tuple2<Long, Long>> parallelSource;
    +		SourceFunction<Tuple2<Long, Long>> parallelSourceB;
    +		RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMap;
    +		OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> timelyOperator;
    +		KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> firstBroadcastFunction;
    +		KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> secondBroadcastFunction;
    +
    +		final Map<Long, Long> expectedFirstState = new HashMap<>();
    +		expectedFirstState.put(0L, 0L);
    +		expectedFirstState.put(1L, 1L);
    +		expectedFirstState.put(2L, 2L);
    +		expectedFirstState.put(3L, 3L);
    +
    +		final Map<String, String> expectedSecondState = new HashMap<>();
    +		expectedSecondState.put("0", "0");
    +		expectedSecondState.put("1", "1");
    +		expectedSecondState.put("2", "2");
    +		expectedSecondState.put("3", "3");
    +
    +		final Map<String, String> expectedThirdState = new HashMap<>();
    +		expectedThirdState.put("0", "0");
    +		expectedThirdState.put("1", "1");
    +		expectedThirdState.put("2", "2");
    +		expectedThirdState.put("3", "3");
    +
    +		if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
    +			nonParallelSource = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +			nonParallelSourceB = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +			parallelSource = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +			parallelSourceB = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +			flatMap = new MigrationTestUtils.CheckpointingKeyedStateFlatMap();
    +			timelyOperator = new MigrationTestUtils.CheckpointingTimelyStatefulOperator();
    +			firstBroadcastFunction = new KeyedBroadcastFunction();
    +			secondBroadcastFunction = new KeyedSingleBroadcastFunction();
    +		} else if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT) {
    +			nonParallelSource = new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +			nonParallelSourceB = new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +			parallelSource = new MigrationTestUtils.CheckingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +			parallelSourceB = new MigrationTestUtils.CheckingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +			flatMap = new MigrationTestUtils.CheckingKeyedStateFlatMap();
    +			timelyOperator = new MigrationTestUtils.CheckingTimelyStatefulOperator();
    +			firstBroadcastFunction = new CheckingKeyedBroadcastFunction(expectedFirstState, expectedSecondState);
    --- End diff --
    
    The reason I let it is to verify that the `BroadcastState` plays well with other states collocated on the same operator.


---

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5955#discussion_r188512949
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java ---
    @@ -0,0 +1,418 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing.utils;
    +
    +import org.apache.flink.api.common.functions.RichFlatMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeHint;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
    +import org.apache.flink.runtime.state.StateBackendLoader;
    +import org.apache.flink.runtime.state.memory.MemoryStateBackend;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.datastream.BroadcastStream;
    +import org.apache.flink.streaming.api.datastream.KeyedStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.util.migration.MigrationVersion;
    +import org.apache.flink.util.Collector;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially)
    + * cover migrating for multiple previous Flink versions, as well as for different state backends.
    + */
    +@RunWith(Parameterized.class)
    +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase {
    +
    +	private static final int NUM_SOURCE_ELEMENTS = 4;
    +
    +	// TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
    +	private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode =
    +			StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
    +
    +	@Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
    +	public static Collection<Tuple2<MigrationVersion, String>> parameters () {
    +		return Arrays.asList(
    +				Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
    +				Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
    +	}
    +
    +	private final MigrationVersion testMigrateVersion;
    +	private final String testStateBackend;
    +
    +	public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception {
    +		this.testMigrateVersion = testMigrateVersionAndBackend.f0;
    +		this.testStateBackend = testMigrateVersionAndBackend.f1;
    +	}
    +
    +	@Test
    +	public void testSavepoint() throws Exception {
    +
    +		final int parallelism = 4;
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setRestartStrategy(RestartStrategies.noRestart());
    +		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +
    +		switch (testStateBackend) {
    +			case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
    +				env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
    +				break;
    +			case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
    +				env.setStateBackend(new MemoryStateBackend());
    +				break;
    +			default:
    +				throw new UnsupportedOperationException();
    +		}
    +
    +		env.enableCheckpointing(500);
    +		env.setParallelism(parallelism);
    +		env.setMaxParallelism(parallelism);
    +
    +		SourceFunction<Tuple2<Long, Long>> nonParallelSource;
    +		SourceFunction<Tuple2<Long, Long>> nonParallelSourceB;
    +		SourceFunction<Tuple2<Long, Long>> parallelSource;
    +		SourceFunction<Tuple2<Long, Long>> parallelSourceB;
    +		RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMap;
    +		OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> timelyOperator;
    +		KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> firstBroadcastFunction;
    +		KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> secondBroadcastFunction;
    +
    +		final Map<Long, Long> expectedFirstState = new HashMap<>();
    +		expectedFirstState.put(0L, 0L);
    +		expectedFirstState.put(1L, 1L);
    +		expectedFirstState.put(2L, 2L);
    +		expectedFirstState.put(3L, 3L);
    +
    +		final Map<String, String> expectedSecondState = new HashMap<>();
    +		expectedSecondState.put("0", "0");
    +		expectedSecondState.put("1", "1");
    +		expectedSecondState.put("2", "2");
    +		expectedSecondState.put("3", "3");
    +
    +		final Map<String, String> expectedThirdState = new HashMap<>();
    +		expectedThirdState.put("0", "0");
    +		expectedThirdState.put("1", "1");
    +		expectedThirdState.put("2", "2");
    +		expectedThirdState.put("3", "3");
    +
    +		if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
    +			nonParallelSource = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +			nonParallelSourceB = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +			parallelSource = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +			parallelSourceB = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +			flatMap = new MigrationTestUtils.CheckpointingKeyedStateFlatMap();
    +			timelyOperator = new MigrationTestUtils.CheckpointingTimelyStatefulOperator();
    +			firstBroadcastFunction = new KeyedBroadcastFunction();
    +			secondBroadcastFunction = new KeyedSingleBroadcastFunction();
    +		} else if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT) {
    +			nonParallelSource = new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +			nonParallelSourceB = new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +			parallelSource = new MigrationTestUtils.CheckingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +			parallelSourceB = new MigrationTestUtils.CheckingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +			flatMap = new MigrationTestUtils.CheckingKeyedStateFlatMap();
    +			timelyOperator = new MigrationTestUtils.CheckingTimelyStatefulOperator();
    +			firstBroadcastFunction = new CheckingKeyedBroadcastFunction(expectedFirstState, expectedSecondState);
    --- End diff --
    
    Actually the only state this test should cover is the broadcast ones, no?
    Why do we need to add all the others (apart from the source)?


---

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5955#discussion_r188511081
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java ---
    @@ -0,0 +1,418 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing.utils;
    +
    +import org.apache.flink.api.common.functions.RichFlatMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeHint;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
    +import org.apache.flink.runtime.state.StateBackendLoader;
    +import org.apache.flink.runtime.state.memory.MemoryStateBackend;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.datastream.BroadcastStream;
    +import org.apache.flink.streaming.api.datastream.KeyedStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.util.migration.MigrationVersion;
    +import org.apache.flink.util.Collector;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially)
    + * cover migrating for multiple previous Flink versions, as well as for different state backends.
    + */
    +@RunWith(Parameterized.class)
    +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase {
    +
    +	private static final int NUM_SOURCE_ELEMENTS = 4;
    +
    +	// TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
    +	private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode =
    +			StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
    +
    +	@Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
    +	public static Collection<Tuple2<MigrationVersion, String>> parameters () {
    +		return Arrays.asList(
    +				Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
    +				Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
    +	}
    +
    +	private final MigrationVersion testMigrateVersion;
    +	private final String testStateBackend;
    +
    +	public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception {
    +		this.testMigrateVersion = testMigrateVersionAndBackend.f0;
    +		this.testStateBackend = testMigrateVersionAndBackend.f1;
    +	}
    +
    +	@Test
    +	public void testSavepoint() throws Exception {
    +
    +		final int parallelism = 4;
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setRestartStrategy(RestartStrategies.noRestart());
    +		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +
    +		switch (testStateBackend) {
    +			case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
    +				env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
    +				break;
    +			case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
    +				env.setStateBackend(new MemoryStateBackend());
    +				break;
    +			default:
    +				throw new UnsupportedOperationException();
    +		}
    +
    +		env.enableCheckpointing(500);
    +		env.setParallelism(parallelism);
    +		env.setMaxParallelism(parallelism);
    +
    +		SourceFunction<Tuple2<Long, Long>> nonParallelSource;
    +		SourceFunction<Tuple2<Long, Long>> nonParallelSourceB;
    +		SourceFunction<Tuple2<Long, Long>> parallelSource;
    +		SourceFunction<Tuple2<Long, Long>> parallelSourceB;
    +		RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMap;
    +		OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> timelyOperator;
    +		KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> firstBroadcastFunction;
    +		KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> secondBroadcastFunction;
    +
    +		final Map<Long, Long> expectedFirstState = new HashMap<>();
    +		expectedFirstState.put(0L, 0L);
    +		expectedFirstState.put(1L, 1L);
    +		expectedFirstState.put(2L, 2L);
    +		expectedFirstState.put(3L, 3L);
    +
    +		final Map<String, String> expectedSecondState = new HashMap<>();
    +		expectedSecondState.put("0", "0");
    +		expectedSecondState.put("1", "1");
    +		expectedSecondState.put("2", "2");
    +		expectedSecondState.put("3", "3");
    +
    +		final Map<String, String> expectedThirdState = new HashMap<>();
    +		expectedThirdState.put("0", "0");
    +		expectedThirdState.put("1", "1");
    +		expectedThirdState.put("2", "2");
    +		expectedThirdState.put("3", "3");
    +
    +		if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
    --- End diff --
    
    Just as a separate suggestion that doesn't really need to be for this PR:
    We can maybe reduce quite a bit of LOC, if we simply have a `isVerification` flag in all the operators, instead of having separate `CheckpointingX` and `CheckingX` operator variants.


---

[GitHub] flink issue #5955: [FLINK-8659] Add migration itcases for broadcast state.

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/5955
  
    Hi @tzulitai ! Thanks for the review. I integrated most of your comments. The only one I left out is the one about merging the checkpointing and the checking. I am not against that. It is just that the way it is now, it is aligned with the `StatefulJobSavepointMigrationITCase`. If it were to make the change, then we should change both and I would prefer to do it in a separate commit.
    
    Let me know what do you think about the current changes and if you are ok, I can merge.


---

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5955#discussion_r188512804
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java ---
    @@ -0,0 +1,418 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing.utils;
    +
    +import org.apache.flink.api.common.functions.RichFlatMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeHint;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
    +import org.apache.flink.runtime.state.StateBackendLoader;
    +import org.apache.flink.runtime.state.memory.MemoryStateBackend;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.datastream.BroadcastStream;
    +import org.apache.flink.streaming.api.datastream.KeyedStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.util.migration.MigrationVersion;
    +import org.apache.flink.util.Collector;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially)
    + * cover migrating for multiple previous Flink versions, as well as for different state backends.
    + */
    +@RunWith(Parameterized.class)
    +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase {
    +
    +	private static final int NUM_SOURCE_ELEMENTS = 4;
    +
    +	// TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
    +	private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode =
    +			StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
    +
    +	@Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
    +	public static Collection<Tuple2<MigrationVersion, String>> parameters () {
    +		return Arrays.asList(
    +				Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
    +				Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
    +	}
    +
    +	private final MigrationVersion testMigrateVersion;
    +	private final String testStateBackend;
    +
    +	public StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) throws Exception {
    +		this.testMigrateVersion = testMigrateVersionAndBackend.f0;
    +		this.testStateBackend = testMigrateVersionAndBackend.f1;
    +	}
    +
    +	@Test
    +	public void testSavepoint() throws Exception {
    +
    +		final int parallelism = 4;
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setRestartStrategy(RestartStrategies.noRestart());
    +		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +
    +		switch (testStateBackend) {
    +			case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
    +				env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
    +				break;
    +			case StateBackendLoader.MEMORY_STATE_BACKEND_NAME:
    +				env.setStateBackend(new MemoryStateBackend());
    +				break;
    +			default:
    +				throw new UnsupportedOperationException();
    +		}
    +
    +		env.enableCheckpointing(500);
    +		env.setParallelism(parallelism);
    +		env.setMaxParallelism(parallelism);
    +
    +		SourceFunction<Tuple2<Long, Long>> nonParallelSource;
    +		SourceFunction<Tuple2<Long, Long>> nonParallelSourceB;
    +		SourceFunction<Tuple2<Long, Long>> parallelSource;
    +		SourceFunction<Tuple2<Long, Long>> parallelSourceB;
    +		RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMap;
    +		OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> timelyOperator;
    +		KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> firstBroadcastFunction;
    +		KeyedBroadcastProcessFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> secondBroadcastFunction;
    +
    +		final Map<Long, Long> expectedFirstState = new HashMap<>();
    +		expectedFirstState.put(0L, 0L);
    +		expectedFirstState.put(1L, 1L);
    +		expectedFirstState.put(2L, 2L);
    +		expectedFirstState.put(3L, 3L);
    +
    +		final Map<String, String> expectedSecondState = new HashMap<>();
    +		expectedSecondState.put("0", "0");
    +		expectedSecondState.put("1", "1");
    +		expectedSecondState.put("2", "2");
    +		expectedSecondState.put("3", "3");
    +
    +		final Map<String, String> expectedThirdState = new HashMap<>();
    +		expectedThirdState.put("0", "0");
    +		expectedThirdState.put("1", "1");
    +		expectedThirdState.put("2", "2");
    +		expectedThirdState.put("3", "3");
    +
    +		if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.PERFORM_SAVEPOINT) {
    +			nonParallelSource = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +			nonParallelSourceB = new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +			parallelSource = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +			parallelSourceB = new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    +			flatMap = new MigrationTestUtils.CheckpointingKeyedStateFlatMap();
    +			timelyOperator = new MigrationTestUtils.CheckpointingTimelyStatefulOperator();
    +			firstBroadcastFunction = new KeyedBroadcastFunction();
    +			secondBroadcastFunction = new KeyedSingleBroadcastFunction();
    +		} else if (executionMode == StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT) {
    +			nonParallelSource = new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +			nonParallelSourceB = new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS);
    +			parallelSource = new MigrationTestUtils.CheckingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS);
    --- End diff --
    
    Do we need union list state in this test?


---