You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/04 14:05:09 UTC

[1/2] flink git commit: [FLINK-8978] Stateful generic stream job upgrade e2e test

Repository: flink
Updated Branches:
  refs/heads/master b50cebb65 -> 5ac4d2960


http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/test-scripts/test_stateful_stream_job_upgrade.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_stateful_stream_job_upgrade.sh b/flink-end-to-end-tests/test-scripts/test_stateful_stream_job_upgrade.sh
new file mode 100755
index 0000000..44d9df0
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_stateful_stream_job_upgrade.sh
@@ -0,0 +1,91 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+ORIGINAL_DOP="${1:-2}"
+NEW_DOP="${2:-4}"
+
+if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+  NUM_SLOTS=${ORIGINAL_DOP}
+else
+  NUM_SLOTS=${NEW_DOP}
+fi
+
+backup_config
+change_conf "taskmanager.numberOfTaskSlots" "1" "${NUM_SLOTS}"
+setup_flink_slf4j_metric_reporter
+
+start_cluster
+
+# make sure to stop Kafka and ZooKeeper at the end, as well as cleaning up the Flink cluster and our moodifications
+function test_cleanup {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+
+  # revert our modifications to the Flink distribution
+  rm ${FLINK_DIR}/lib/flink-metrics-slf4j-*.jar
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+CHECKPOINT_DIR="file://${TEST_DATA_DIR}/savepoint-e2e-test-chckpt-dir"
+
+TEST_PROGRAM_JAR="${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/target/StatefulStreamJobUpgradeTestProgram.jar"
+
+function job() {
+    DOP=$1
+    CMD="${FLINK_DIR}/bin/flink run -d -p ${DOP} ${TEST_PROGRAM_JAR} \
+      --test.semantics exactly-once \
+      --environment.parallelism ${DOP} \
+      --state_backend.checkpoint_directory ${CHECKPOINT_DIR} \
+      --sequence_generator_source.sleep_time 15 \
+      --sequence_generator_source.sleep_after_elements 1"
+    echo "${CMD}"
+}
+
+JOB=$(job ${ORIGINAL_DOP})
+ORIGINAL_JOB=$(${JOB} --test.job.variant original \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running ${ORIGINAL_JOB}
+
+wait_oper_metric_num_in_records stateMap2.1 200
+
+# take a savepoint of the state machine job
+SAVEPOINT_PATH=$(take_savepoint ${ORIGINAL_JOB} ${TEST_DATA_DIR} \
+  | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+cancel_job ${ORIGINAL_JOB}
+
+JOB=$(job ${NEW_DOP})
+UPGRADED_JOB=$(${JOB} --test.job.variant upgraded \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running ${UPGRADED_JOB}
+
+wait_oper_metric_num_in_records stateMap3.2 200
+
+# if state is errorneous and the state machine job produces alerting state transitions,
+# output would be non-empty and the test will not pass


[2/2] flink git commit: [FLINK-8978] Stateful generic stream job upgrade e2e test

Posted by sr...@apache.org.
[FLINK-8978] Stateful generic stream job upgrade e2e test

This closes #5947.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ac4d296
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ac4d296
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ac4d296

Branch: refs/heads/master
Commit: 5ac4d29609842cb086b46fd56f6e49a4287cfa1b
Parents: b50cebb
Author: Andrey Zagrebin <an...@linkresearchtools.org>
Authored: Mon Apr 30 20:25:53 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri May 4 16:03:32 2018 +0200

----------------------------------------------------------------------
 .../tests/DataStreamAllroundTestJobFactory.java | 281 +++++++++++++++++++
 .../tests/DataStreamAllroundTestProgram.java    | 267 ++----------------
 .../ArtificialKeyedStateBuilder.java            |  56 ----
 .../ArtificialKeyedStateMapper.java             |  21 +-
 .../tests/artificialstate/ComplexPayload.java   |  85 ++++++
 .../builder/ArtificialListStateBuilder.java     |  65 +++++
 .../builder/ArtificialMapStateBuilder.java      |  69 +++++
 .../builder/ArtificialStateBuilder.java         |  56 ++++
 .../builder/ArtificialValueStateBuilder.java    |  58 ++++
 .../eventpayload/ArtificialMapStateBuilder.java |  70 -----
 .../ArtificialValueStateBuilder.java            |  60 ----
 .../eventpayload/ComplexPayload.java            |  76 -----
 .../pom.xml                                     |  97 +++++++
 .../StatefulStreamJobUpgradeTestProgram.java    | 158 +++++++++++
 flink-end-to-end-tests/pom.xml                  |   1 +
 flink-end-to-end-tests/run-nightly-tests.sh     |   8 +
 flink-end-to-end-tests/test-scripts/common.sh   |  77 ++++-
 .../test-scripts/test_resume_savepoint.sh       |  58 +---
 .../test_stateful_stream_job_upgrade.sh         |  91 ++++++
 19 files changed, 1085 insertions(+), 569 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
new file mode 100644
index 0000000..e7b0bdc
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateMapper;
+import org.apache.flink.streaming.tests.artificialstate.builder.ArtificialListStateBuilder;
+import org.apache.flink.streaming.tests.artificialstate.builder.ArtificialStateBuilder;
+import org.apache.flink.streaming.tests.artificialstate.builder.ArtificialValueStateBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A factory for components of general purpose test jobs for Flink's DataStream API operators and primitives.
+ *
+ * <p>The components can be configured for different state backends, including memory, file, and RocksDB
+ * state backends. It also allows specifying the processing guarantee semantics, which will also be verified
+ * by the job itself according to the specified semantic.
+ *
+ * <p>Program parameters:
+ * <ul>
+ *     <li>test.semantics (String, default - 'exactly-once'): This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'.</li>
+ *     <li>environment.checkpoint_interval (long, default - 1000): the checkpoint interval.</li>
+ *     <li>environment.parallelism (int, default - 1): parallelism to use for the job.</li>
+ *     <li>environment.max_parallelism (int, default - 128): max parallelism to use for the job</li>
+ *     <li>environment.restart_strategy.delay (long, default - 0): delay between restart attempts, in milliseconds.</li>
+ *     <li>state_backend (String, default - 'file'): Supported values are 'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.</li>
+ *     <li>state_backend.checkpoint_directory (String): The checkpoint directory.</li>
+ *     <li>state_backend.rocks.incremental (boolean, default - false): Activate or deactivate incremental snapshots if RocksDBStateBackend is selected.</li>
+ *     <li>state_backend.file.async (boolean, default - true): Activate or deactivate asynchronous snapshots if FileStateBackend is selected.</li>
+ *     <li>sequence_generator_source.keyspace (int, default - 1000): Number of different keys for events emitted by the sequence generator.</li>
+ *     <li>sequence_generator_source.payload_size (int, default - 20): Length of message payloads emitted by the sequence generator.</li>
+ *     <li>sequence_generator_source.sleep_time (long, default - 0): Milliseconds to sleep after emitting events in the sequence generator. Set to 0 to disable sleeping.</li>
+ *     <li>sequence_generator_source.sleep_after_elements (long, default - 0): Number of elements to emit before sleeping in the sequence generator. Set to 0 to disable sleeping.</li>
+ *     <li>sequence_generator_source.event_time.max_out_of_order (long, default - 500): Max event time out-of-orderness for events emitted by the sequence generator.</li>
+ *     <li>sequence_generator_source.event_time.clock_progress (long, default - 100): The amount of event time to progress per event generated by the sequence generator.</li>
+ * </ul>
+ */
+class DataStreamAllroundTestJobFactory {
+	private static final ConfigOption<String> TEST_SEMANTICS = ConfigOptions
+		.key("test.semantics")
+		.defaultValue("exactly-once")
+		.withDescription("This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'");
+
+	private static final ConfigOption<Long> ENVIRONMENT_CHECKPOINT_INTERVAL = ConfigOptions
+		.key("environment.checkpoint_interval")
+		.defaultValue(1000L);
+
+	private static final ConfigOption<Integer> ENVIRONMENT_PARALLELISM = ConfigOptions
+		.key("environment.parallelism")
+		.defaultValue(1);
+
+	private static final ConfigOption<Integer> ENVIRONMENT_MAX_PARALLELISM = ConfigOptions
+		.key("environment.max_parallelism")
+		.defaultValue(128);
+
+	private static final ConfigOption<Integer> ENVIRONMENT_RESTART_DELAY = ConfigOptions
+		.key("environment.restart_strategy.delay")
+		.defaultValue(0);
+
+	private static final ConfigOption<String> STATE_BACKEND = ConfigOptions
+		.key("state_backend")
+		.defaultValue("file")
+		.withDescription("Supported values are 'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.");
+
+	private static final ConfigOption<String> STATE_BACKEND_CHECKPOINT_DIR = ConfigOptions
+		.key("state_backend.checkpoint_directory")
+		.noDefaultValue()
+		.withDescription("The checkpoint directory.");
+
+	private static final ConfigOption<Boolean> STATE_BACKEND_ROCKS_INCREMENTAL = ConfigOptions
+		.key("state_backend.rocks.incremental")
+		.defaultValue(false)
+		.withDescription("Activate or deactivate incremental snapshots if RocksDBStateBackend is selected.");
+
+	private static final ConfigOption<Boolean> STATE_BACKEND_FILE_ASYNC = ConfigOptions
+		.key("state_backend.file.async")
+		.defaultValue(true)
+		.withDescription("Activate or deactivate asynchronous snapshots if FileStateBackend is selected.");
+
+	private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_KEYSPACE = ConfigOptions
+		.key("sequence_generator_source.keyspace")
+		.defaultValue(200);
+
+	private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE = ConfigOptions
+		.key("sequence_generator_source.payload_size")
+		.defaultValue(20);
+
+	private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_SLEEP_TIME = ConfigOptions
+		.key("sequence_generator_source.sleep_time")
+		.defaultValue(0L);
+
+	private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS = ConfigOptions
+		.key("sequence_generator_source.sleep_after_elements")
+		.defaultValue(0L);
+
+	private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS = ConfigOptions
+		.key("sequence_generator_source.event_time.max_out_of_order")
+		.defaultValue(500L);
+
+	private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS = ConfigOptions
+		.key("sequence_generator_source.event_time.clock_progress")
+		.defaultValue(100L);
+
+	static void setupEnvironment(StreamExecutionEnvironment env, ParameterTool pt) throws Exception {
+
+		// set checkpointing semantics
+		String semantics = pt.get(TEST_SEMANTICS.key(), TEST_SEMANTICS.defaultValue());
+		long checkpointInterval = pt.getLong(ENVIRONMENT_CHECKPOINT_INTERVAL.key(), ENVIRONMENT_CHECKPOINT_INTERVAL.defaultValue());
+		CheckpointingMode checkpointingMode = semantics.equalsIgnoreCase("exactly-once")
+			? CheckpointingMode.EXACTLY_ONCE
+			: CheckpointingMode.AT_LEAST_ONCE;
+
+		env.enableCheckpointing(checkpointInterval, checkpointingMode);
+
+		// use event time
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		// parallelism
+		env.setParallelism(pt.getInt(ENVIRONMENT_PARALLELISM.key(), ENVIRONMENT_PARALLELISM.defaultValue()));
+		env.setMaxParallelism(pt.getInt(ENVIRONMENT_MAX_PARALLELISM.key(), ENVIRONMENT_MAX_PARALLELISM.defaultValue()));
+
+		// restart strategy
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+			Integer.MAX_VALUE,
+			pt.getInt(ENVIRONMENT_RESTART_DELAY.key(), ENVIRONMENT_RESTART_DELAY.defaultValue())));
+
+		// state backend
+		final String stateBackend = pt.get(
+			STATE_BACKEND.key(),
+			STATE_BACKEND.defaultValue());
+
+		final String checkpointDir = pt.getRequired(STATE_BACKEND_CHECKPOINT_DIR.key());
+
+		if ("file".equalsIgnoreCase(stateBackend)) {
+			boolean asyncCheckpoints = pt.getBoolean(
+				STATE_BACKEND_FILE_ASYNC.key(),
+				STATE_BACKEND_FILE_ASYNC.defaultValue());
+
+			env.setStateBackend(new FsStateBackend(checkpointDir, asyncCheckpoints));
+		} else if ("rocks".equalsIgnoreCase(stateBackend)) {
+			boolean incrementalCheckpoints = pt.getBoolean(
+				STATE_BACKEND_ROCKS_INCREMENTAL.key(),
+				STATE_BACKEND_ROCKS_INCREMENTAL.defaultValue());
+
+			env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints));
+		} else {
+			throw new IllegalArgumentException("Unknown backend requested: " + stateBackend);
+		}
+
+		// make parameters available in the web interface
+		env.getConfig().setGlobalJobParameters(pt);
+	}
+
+	static SourceFunction<Event> createEventSource(ParameterTool pt) {
+		return new SequenceGeneratorSource(
+			pt.getInt(
+				SEQUENCE_GENERATOR_SRC_KEYSPACE.key(),
+				SEQUENCE_GENERATOR_SRC_KEYSPACE.defaultValue()),
+			pt.getInt(
+				SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE.key(),
+				SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE.defaultValue()),
+			pt.getLong(
+				SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
+				SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()),
+			pt.getLong(
+				SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.key(),
+				SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.defaultValue()),
+			pt.getLong(
+				SEQUENCE_GENERATOR_SRC_SLEEP_TIME.key(),
+				SEQUENCE_GENERATOR_SRC_SLEEP_TIME.defaultValue()),
+			pt.getLong(
+				SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.key(),
+				SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue()));
+	}
+
+	static BoundedOutOfOrdernessTimestampExtractor<Event> createTimestampExtractor(ParameterTool pt) {
+		return new BoundedOutOfOrdernessTimestampExtractor<Event>(
+			Time.milliseconds(
+				pt.getLong(
+					SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
+					SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()))) {
+
+			@Override
+			public long extractTimestamp(Event element) {
+				return element.getEventTime();
+			}
+		};
+	}
+
+	static FlatMapFunction<Event, String> createSemanticsCheckMapper(ParameterTool pt) {
+
+		String semantics = pt.get(TEST_SEMANTICS.key(), TEST_SEMANTICS.defaultValue());
+
+		SemanticsCheckMapper.ValidatorFunction validatorFunction;
+
+		if (semantics.equalsIgnoreCase("exactly-once")) {
+			validatorFunction = SemanticsCheckMapper.ValidatorFunction.exactlyOnce();
+		} else if (semantics.equalsIgnoreCase("at-least-once")) {
+			validatorFunction = SemanticsCheckMapper.ValidatorFunction.atLeastOnce();
+		} else {
+			throw new IllegalArgumentException("Unknown semantics requested: " + semantics);
+		}
+
+		return new SemanticsCheckMapper(validatorFunction);
+	}
+
+	static <IN, OUT, STATE> ArtificialKeyedStateMapper<IN, OUT> createArtificialKeyedStateMapper(
+		MapFunction<IN, OUT> mapFunction,
+		JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
+		List<TypeSerializer<STATE>> stateSerializers) {
+
+		List<ArtificialStateBuilder<IN>> artificialStateBuilders = new ArrayList<>(stateSerializers.size());
+		for (TypeSerializer<STATE> typeSerializer : stateSerializers) {
+			artificialStateBuilders.add(createValueStateBuilder(inputAndOldStateToNewState, typeSerializer));
+			artificialStateBuilders.add(createListStateBuilder(inputAndOldStateToNewState, typeSerializer));
+		}
+		return new ArtificialKeyedStateMapper<>(mapFunction, artificialStateBuilders);
+	}
+
+	static <IN, STATE> ArtificialStateBuilder<IN> createValueStateBuilder(
+		JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
+		TypeSerializer<STATE> typeSerializer) {
+
+		return new ArtificialValueStateBuilder<>(
+			"valueState-" + typeSerializer.getClass().getSimpleName(),
+			inputAndOldStateToNewState,
+			typeSerializer);
+	}
+
+	static <IN, STATE> ArtificialStateBuilder<IN> createListStateBuilder(
+		JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
+		TypeSerializer<STATE> typeSerializer) {
+
+		JoinFunction<IN, Iterable<STATE>, List<STATE>> listStateGenerator = (first, second) -> {
+			List<STATE> newState = new ArrayList<>();
+			for (STATE s : second) {
+				newState.add(inputAndOldStateToNewState.join(first, s));
+			}
+			return newState;
+		};
+
+		return new ArtificialListStateBuilder<>(
+			"listState-" + typeSerializer.getClass().getSimpleName(),
+			listStateGenerator,
+			listStateGenerator,
+			typeSerializer);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index 2059b99..75c14e5 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -18,136 +18,38 @@
 
 package org.apache.flink.streaming.tests;
 
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateBuilder;
-import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateMapper;
-import org.apache.flink.streaming.tests.artificialstate.eventpayload.ArtificialValueStateBuilder;
-import org.apache.flink.streaming.tests.artificialstate.eventpayload.ComplexPayload;
+import org.apache.flink.streaming.tests.artificialstate.ComplexPayload;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
+import java.util.Collections;
+
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
 
 /**
- * A general purpose test for Flink's DataStream API operators and primitives.
+ * A general purpose test job for Flink's DataStream API operators and primitives.
  *
- * <p>It currrently covers the following aspects that are frequently present in Flink DataStream jobs:
+ * <p>The job is constructed of generic components from {@link DataStreamAllroundTestJobFactory}.
+ * It currently covers the following aspects that are frequently present in Flink DataStream jobs:
  * <ul>
  *     <li>A generic Kryo input type.</li>
  *     <li>A state type for which we register a {@link KryoSerializer}.</li>
  *     <li>Operators with {@link ValueState}.</li>
  * </ul>
  *
- * <p>The job allows to be configured for different state backends, including memory, file, and RocksDB
- * state backends. It also allows specifying the processing guarantee semantics, which will also be verified
- * by the job itself according to the specified semantic.
+ * <p>The cli job configuration options are described in {@link DataStreamAllroundTestJobFactory}.
  *
- * <p>Program parameters:
- * <ul>
- *     <li>test.semantics (String, default - 'exactly-once'): This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'.</li>
- *     <li>environment.checkpoint_interval (long, default - 1000): the checkpoint interval.</li>
- *     <li>environment.parallelism (int, default - 1): parallelism to use for the job.</li>
- *     <li>environment.max_parallelism (int, default - 128): max parallelism to use for the job</li>
- *     <li>environment.restart_strategy.delay (long, default - 0): delay between restart attempts, in milliseconds.</li>
- *     <li>state_backend (String, default - 'file'): Supported values are 'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.</li>
- *     <li>state_backend.checkpoint_directory (String): The checkpoint directory.</li>
- *     <li>state_backend.rocks.incremental (boolean, default - false): Activate or deactivate incremental snapshots if RocksDBStateBackend is selected.</li>
- *     <li>state_backend.file.async (boolean, default - true): Activate or deactivate asynchronous snapshots if FileStateBackend is selected.</li>
- *     <li>sequence_generator_source.keyspace (int, default - 1000): Number of different keys for events emitted by the sequence generator.</li>
- *     <li>sequence_generator_source.payload_size (int, default - 20): Length of message payloads emitted by the sequence generator.</li>
- *     <li>sequence_generator_source.sleep_time (long, default - 0): Milliseconds to sleep after emitting events in the sequence generator. Set to 0 to disable sleeping.</li>
- *     <li>sequence_generator_source.sleep_after_elements (long, default - 0): Number of elements to emit before sleeping in the sequence generator. Set to 0 to disable sleeping.</li>
- *     <li>sequence_generator_source.event_time.max_out_of_order (long, default - 500): Max event time out-of-orderness for events emitted by the sequence generator.</li>
- *     <li>sequence_generator_source.event_time.clock_progress (long, default - 100): The amount of event time to progress per event generated by the sequence generator.</li>
- * </ul>
  */
 public class DataStreamAllroundTestProgram {
-
-	private static final ConfigOption<String> TEST_SEMANTICS = ConfigOptions
-		.key("test.semantics")
-		.defaultValue("exactly-once")
-		.withDescription("This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'");
-
-	private static final ConfigOption<Long> ENVIRONMENT_CHECKPOINT_INTERVAL = ConfigOptions
-		.key("environment.checkpoint_interval")
-		.defaultValue(1000L);
-
-	private static final ConfigOption<Integer> ENVIRONMENT_PARALLELISM = ConfigOptions
-		.key("environment.parallelism")
-		.defaultValue(1);
-
-	private static final ConfigOption<Integer> ENVIRONMENT_MAX_PARALLELISM = ConfigOptions
-		.key("environment.max_parallelism")
-		.defaultValue(128);
-
-	private static final ConfigOption<Integer> ENVIRONMENT_RESTART_DELAY = ConfigOptions
-		.key("environment.restart_strategy.delay")
-		.defaultValue(0);
-
-	private static final ConfigOption<String> STATE_BACKEND = ConfigOptions
-		.key("state_backend")
-		.defaultValue("file")
-		.withDescription("Supported values are 'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.");
-
-	private static final ConfigOption<String> STATE_BACKEND_CHECKPOINT_DIR = ConfigOptions
-		.key("state_backend.checkpoint_directory")
-		.noDefaultValue()
-		.withDescription("The checkpoint directory.");
-
-	private static final ConfigOption<Boolean> STATE_BACKEND_ROCKS_INCREMENTAL = ConfigOptions
-		.key("state_backend.rocks.incremental")
-		.defaultValue(false)
-		.withDescription("Activate or deactivate incremental snapshots if RocksDBStateBackend is selected.");
-
-	private static final ConfigOption<Boolean> STATE_BACKEND_FILE_ASYNC = ConfigOptions
-		.key("state_backend.file.async")
-		.defaultValue(true)
-		.withDescription("Activate or deactivate asynchronous snapshots if FileStateBackend is selected.");
-
-	private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_KEYSPACE = ConfigOptions
-		.key("sequence_generator_source.keyspace")
-		.defaultValue(200);
-
-	private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE = ConfigOptions
-		.key("sequence_generator_source.payload_size")
-		.defaultValue(20);
-
-	private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_SLEEP_TIME = ConfigOptions
-		.key("sequence_generator_source.sleep_time")
-		.defaultValue(0L);
-
-	private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS = ConfigOptions
-		.key("sequence_generator_source.sleep_after_elements")
-		.defaultValue(0L);
-
-	private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS = ConfigOptions
-		.key("sequence_generator_source.event_time.max_out_of_order")
-		.defaultValue(500L);
-
-	private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS = ConfigOptions
-		.key("sequence_generator_source.event_time.clock_progress")
-		.defaultValue(100L);
-
-	// -----------------------------------------------------------------------------------------------------------------
+	private static final String STATE_OPER_NAME = "ArtificalKeyedStateMapper";
 
 	public static void main(String[] args) throws Exception {
 		final ParameterTool pt = ParameterTool.fromArgs(args);
@@ -160,15 +62,20 @@ public class DataStreamAllroundTestProgram {
 			.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
 			.keyBy(Event::getKey)
 			.map(createArtificialKeyedStateMapper(
-				// map function simply forwards the inputs
-				(MapFunction<Event, Event>) in -> in,
-				// state is updated per event as a wrapped ComplexPayload state object
-				(Event first, ComplexPayload second) -> new ComplexPayload(first), //
-				Arrays.asList(
-					new KryoSerializer<>(ComplexPayload.class, env.getConfig()))
+					// map function simply forwards the inputs
+					(MapFunction<Event, Event>) in -> in,
+					// state is verified and updated per event as a wrapped ComplexPayload state object
+					(Event first, ComplexPayload second) -> {
+							if (second != null && !second.getStrPayload().equals(STATE_OPER_NAME)) {
+								System.out.println("State is set or restored incorrectly");
+							}
+							return new ComplexPayload(first, STATE_OPER_NAME);
+						},
+					Collections.singletonList(
+						new KryoSerializer<>(ComplexPayload.class, env.getConfig()))
 				)
 			)
-			.name("ArtificalKeyedStateMapper")
+			.name(STATE_OPER_NAME)
 			.returns(Event.class)
 			.keyBy(Event::getKey)
 			.flatMap(createSemanticsCheckMapper(pt))
@@ -177,128 +84,4 @@ public class DataStreamAllroundTestProgram {
 
 		env.execute("General purpose test job");
 	}
-
-	public static void setupEnvironment(StreamExecutionEnvironment env, ParameterTool pt) throws Exception {
-
-		// set checkpointing semantics
-		String semantics = pt.get(TEST_SEMANTICS.key(), TEST_SEMANTICS.defaultValue());
-		long checkpointInterval = pt.getLong(ENVIRONMENT_CHECKPOINT_INTERVAL.key(), ENVIRONMENT_CHECKPOINT_INTERVAL.defaultValue());
-		CheckpointingMode checkpointingMode = semantics.equalsIgnoreCase("exactly-once")
-			? CheckpointingMode.EXACTLY_ONCE
-			: CheckpointingMode.AT_LEAST_ONCE;
-
-		env.enableCheckpointing(checkpointInterval, checkpointingMode);
-
-		// use event time
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
-		// parallelism
-		env.setParallelism(pt.getInt(ENVIRONMENT_PARALLELISM.key(), ENVIRONMENT_PARALLELISM.defaultValue()));
-		env.setMaxParallelism(pt.getInt(ENVIRONMENT_MAX_PARALLELISM.key(), ENVIRONMENT_MAX_PARALLELISM.defaultValue()));
-
-		// restart strategy
-		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-			Integer.MAX_VALUE,
-			pt.getInt(ENVIRONMENT_RESTART_DELAY.key(), ENVIRONMENT_RESTART_DELAY.defaultValue())));
-
-		// state backend
-		final String stateBackend = pt.get(
-			STATE_BACKEND.key(),
-			STATE_BACKEND.defaultValue());
-
-		final String checkpointDir = pt.getRequired(STATE_BACKEND_CHECKPOINT_DIR.key());
-
-		if ("file".equalsIgnoreCase(stateBackend)) {
-			boolean asyncCheckpoints = pt.getBoolean(
-				STATE_BACKEND_FILE_ASYNC.key(),
-				STATE_BACKEND_FILE_ASYNC.defaultValue());
-
-			env.setStateBackend(new FsStateBackend(checkpointDir, asyncCheckpoints));
-		} else if ("rocks".equalsIgnoreCase(stateBackend)) {
-			boolean incrementalCheckpoints = pt.getBoolean(
-				STATE_BACKEND_ROCKS_INCREMENTAL.key(),
-				STATE_BACKEND_ROCKS_INCREMENTAL.defaultValue());
-
-			env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints));
-		} else {
-			throw new IllegalArgumentException("Unknown backend requested: " + stateBackend);
-		}
-
-		// make parameters available in the web interface
-		env.getConfig().setGlobalJobParameters(pt);
-	}
-
-	private static SourceFunction<Event> createEventSource(ParameterTool pt) {
-		return new SequenceGeneratorSource(
-			pt.getInt(
-				SEQUENCE_GENERATOR_SRC_KEYSPACE.key(),
-				SEQUENCE_GENERATOR_SRC_KEYSPACE.defaultValue()),
-			pt.getInt(
-				SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE.key(),
-				SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE.defaultValue()),
-			pt.getLong(
-				SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
-				SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()),
-			pt.getLong(
-				SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.key(),
-				SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.defaultValue()),
-			pt.getLong(
-				SEQUENCE_GENERATOR_SRC_SLEEP_TIME.key(),
-				SEQUENCE_GENERATOR_SRC_SLEEP_TIME.defaultValue()),
-			pt.getLong(
-				SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.key(),
-				SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue()));
-	}
-
-	private static BoundedOutOfOrdernessTimestampExtractor<Event> createTimestampExtractor(ParameterTool pt) {
-		return new BoundedOutOfOrdernessTimestampExtractor<Event>(
-			Time.milliseconds(
-				pt.getLong(
-					SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
-					SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()))) {
-
-			@Override
-			public long extractTimestamp(Event element) {
-				return element.getEventTime();
-			}
-		};
-	}
-
-	private static FlatMapFunction<Event, String> createSemanticsCheckMapper(ParameterTool pt) {
-
-		String semantics = pt.get(TEST_SEMANTICS.key(), TEST_SEMANTICS.defaultValue());
-
-		SemanticsCheckMapper.ValidatorFunction validatorFunction;
-
-		if (semantics.equalsIgnoreCase("exactly-once")) {
-			validatorFunction = SemanticsCheckMapper.ValidatorFunction.exactlyOnce();
-		} else if (semantics.equalsIgnoreCase("at-least-once")) {
-			validatorFunction = SemanticsCheckMapper.ValidatorFunction.atLeastOnce();
-		} else {
-			throw new IllegalArgumentException("Unknown semantics requested: " + semantics);
-		}
-
-		return new SemanticsCheckMapper(validatorFunction);
-	}
-
-	private static <IN, OUT, STATE> ArtificialKeyedStateMapper<IN, OUT> createArtificialKeyedStateMapper(
-		MapFunction<IN, OUT> mapFunction,
-		JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
-		List<TypeSerializer<STATE>> stateSerializers) {
-
-		List<ArtificialKeyedStateBuilder<IN>> artificialStateBuilders = new ArrayList<>(stateSerializers.size());
-		for (TypeSerializer<STATE> typeSerializer : stateSerializers) {
-
-			String stateName = "valueState-" + typeSerializer.getClass().getSimpleName() + "-" + UUID.randomUUID();
-
-			ArtificialValueStateBuilder<IN, STATE> stateBuilder = new ArtificialValueStateBuilder<>(
-				stateName,
-				inputAndOldStateToNewState,
-				typeSerializer
-			);
-
-			artificialStateBuilders.add(stateBuilder);
-		}
-		return new ArtificialKeyedStateMapper<>(mapFunction, artificialStateBuilders);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateBuilder.java
deleted file mode 100644
index 3db18db..0000000
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateBuilder.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.artificialstate;
-
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-
-import java.io.Serializable;
-
-/**
- * The keyed state builder wraps the logic of registering state in user
- * functions, as well as how state is updated per input element..
- */
-public abstract class ArtificialKeyedStateBuilder<T> implements Serializable {
-
-	private static final long serialVersionUID = -5887676929924485788L;
-
-	protected final String stateName;
-
-	public ArtificialKeyedStateBuilder(String stateName) {
-		this.stateName = stateName;
-	}
-
-	public String getStateName() {
-		return stateName;
-	}
-
-	/**
-	 * Manipulate the state for an input element.
-	 *
-	 * @param element the current input element.
-	 */
-	public abstract void artificialStateForElement(T element) throws Exception;
-
-	/**
-	 * Registers the state.
-	 *
-	 * @param initializationContext the state initialization context, provided by the user function.
-	 */
-	public abstract void initialize(FunctionInitializationContext initializationContext);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateMapper.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateMapper.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateMapper.java
index 9697877..c5857a2 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateMapper.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateMapper.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.tests.artificialstate.builder.ArtificialStateBuilder;
 
 import java.util.Collections;
 import java.util.HashSet;
@@ -31,29 +32,29 @@ import java.util.Set;
 
 /**
  * A generic, stateful {@link MapFunction} that allows specifying what states to maintain
- * based on a provided list of {@link ArtificialKeyedStateBuilder}s.
+ * based on a provided list of {@link ArtificialStateBuilder}s.
  */
 public class ArtificialKeyedStateMapper<IN, OUT> extends RichMapFunction<IN, OUT> implements CheckpointedFunction {
 
 	private static final long serialVersionUID = 513012258173556604L;
 
 	private final MapFunction<IN, OUT> mapFunction;
-	private final List<ArtificialKeyedStateBuilder<IN>> artificialKeyedStateBuilder;
+	private final List<ArtificialStateBuilder<IN>> artificialStateBuilders;
 
 	public ArtificialKeyedStateMapper(
 		MapFunction<IN, OUT> mapFunction,
-		ArtificialKeyedStateBuilder<IN> artificialKeyedStateBuilder) {
-		this(mapFunction, Collections.singletonList(artificialKeyedStateBuilder));
+		ArtificialStateBuilder<IN> artificialStateBuilders) {
+		this(mapFunction, Collections.singletonList(artificialStateBuilders));
 	}
 
 	public ArtificialKeyedStateMapper(
 		MapFunction<IN, OUT> mapFunction,
-		List<ArtificialKeyedStateBuilder<IN>> artificialKeyedStateBuilder) {
+		List<ArtificialStateBuilder<IN>> artificialStateBuilders) {
 
 		this.mapFunction = mapFunction;
-		this.artificialKeyedStateBuilder = artificialKeyedStateBuilder;
-		Set<String> stateNames = new HashSet<>(this.artificialKeyedStateBuilder.size());
-		for (ArtificialKeyedStateBuilder<IN> stateBuilder : this.artificialKeyedStateBuilder) {
+		this.artificialStateBuilders = artificialStateBuilders;
+		Set<String> stateNames = new HashSet<>(this.artificialStateBuilders.size());
+		for (ArtificialStateBuilder<IN> stateBuilder : this.artificialStateBuilders) {
 			if (!stateNames.add(stateBuilder.getStateName())) {
 				throw new IllegalArgumentException("Duplicated state name: " + stateBuilder.getStateName());
 			}
@@ -62,7 +63,7 @@ public class ArtificialKeyedStateMapper<IN, OUT> extends RichMapFunction<IN, OUT
 
 	@Override
 	public OUT map(IN value) throws Exception {
-		for (ArtificialKeyedStateBuilder<IN> stateBuilder : artificialKeyedStateBuilder) {
+		for (ArtificialStateBuilder<IN> stateBuilder : artificialStateBuilders) {
 			stateBuilder.artificialStateForElement(value);
 		}
 
@@ -75,7 +76,7 @@ public class ArtificialKeyedStateMapper<IN, OUT> extends RichMapFunction<IN, OUT
 
 	@Override
 	public void initializeState(FunctionInitializationContext context) throws Exception {
-		for (ArtificialKeyedStateBuilder<IN> stateBuilder : artificialKeyedStateBuilder) {
+		for (ArtificialStateBuilder<IN> stateBuilder : artificialStateBuilders) {
 			stateBuilder.initialize(context);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ComplexPayload.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ComplexPayload.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ComplexPayload.java
new file mode 100644
index 0000000..98ec456
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ComplexPayload.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate;
+
+import org.apache.flink.streaming.tests.DataStreamAllroundTestProgram;
+import org.apache.flink.streaming.tests.Event;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A state type used in the {@link DataStreamAllroundTestProgram}.
+ * Wraps an {@link Event} as state.
+ */
+public class ComplexPayload implements Serializable {
+	private static final long serialVersionUID = 233624606545704853L;
+
+	private final long eventTime;
+	private final List<String> stringList;
+	private final String strPayload;
+	private final InnerPayLoad innerPayLoad;
+
+	public ComplexPayload(Event event, String strPayload) {
+		this.eventTime = event.getEventTime();
+		this.innerPayLoad = new InnerPayLoad(event.getSequenceNumber());
+		this.strPayload = strPayload;
+		this.stringList = Arrays.asList(String.valueOf(event.getKey()), event.getPayload());
+	}
+
+	public ComplexPayload(Event event) {
+		this(event, event.getPayload());
+	}
+
+	/**
+	 * Nested class in state type. Wraps an {@link Event}'s sequence number.
+	 */
+	public static class InnerPayLoad implements Serializable {
+
+		private static final long serialVersionUID = 3986298180012117883L;
+
+		private final long sequenceNumber;
+
+		public InnerPayLoad(long sequenceNumber) {
+			this.sequenceNumber = sequenceNumber;
+		}
+
+		public long getSequenceNumber() {
+			return sequenceNumber;
+		}
+	}
+
+	public long getEventTime() {
+		return eventTime;
+	}
+
+	public List<String> getStringList() {
+		return stringList;
+	}
+
+	public String getStrPayload() {
+		return strPayload;
+	}
+
+	public InnerPayLoad getInnerPayLoad() {
+		return innerPayLoad;
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
new file mode 100644
index 0000000..a2c6387
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate.builder;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import java.util.List;
+
+/**
+ * An {@link ArtificialListStateBuilder} for user operator and keyed state.
+ */
+public class ArtificialListStateBuilder<IN, STATE> extends ArtificialStateBuilder<IN> {
+
+	private static final long serialVersionUID = -1205814329756790916L;
+
+	private transient ListState<STATE> listOperatorState;
+	private transient ListState<STATE> listKeyedState;
+	private final TypeSerializer<STATE> typeSerializer;
+	private final JoinFunction<IN, Iterable<STATE>, List<STATE>> keyedStateGenerator;
+	private final JoinFunction<IN, Iterable<STATE>, List<STATE>> operatorStateGenerator;
+
+	public ArtificialListStateBuilder(
+		String stateName,
+		JoinFunction<IN, Iterable<STATE>, List<STATE>> keyedStateGenerator,
+		JoinFunction<IN, Iterable<STATE>, List<STATE>> operatorStateGenerator,
+		TypeSerializer<STATE> typeSerializer) {
+		super(stateName);
+		this.typeSerializer = typeSerializer;
+		this.keyedStateGenerator = keyedStateGenerator;
+		this.operatorStateGenerator = operatorStateGenerator;
+	}
+
+	@Override
+	public void artificialStateForElement(IN event) throws Exception {
+		listOperatorState.update(keyedStateGenerator.join(event, listOperatorState.get()));
+		listKeyedState.update(operatorStateGenerator.join(event, listKeyedState.get()));
+	}
+
+	@Override
+	public void initialize(FunctionInitializationContext initializationContext) throws Exception {
+		ListStateDescriptor<STATE> listStateDescriptor = new ListStateDescriptor<>(stateName, typeSerializer);
+		listOperatorState = initializationContext.getOperatorStateStore().getListState(listStateDescriptor);
+		listKeyedState = initializationContext.getKeyedStateStore().getListState(listStateDescriptor);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialMapStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialMapStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialMapStateBuilder.java
new file mode 100644
index 0000000..c315e65
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialMapStateBuilder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate.builder;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * An {@link ArtificialStateBuilder} for user {@link MapState}s.
+ */
+public class ArtificialMapStateBuilder<IN, K, V> extends ArtificialStateBuilder<IN> {
+
+	private static final long serialVersionUID = -143079058769306954L;
+
+	private transient MapState<K, V> mapState;
+	private final TypeSerializer<K> keySerializer;
+	private final TypeSerializer<V> valueSerializer;
+	private final JoinFunction<IN, Iterator<Map.Entry<K, V>>, Iterator<Map.Entry<K, V>>> stateValueGenerator;
+
+	public ArtificialMapStateBuilder(
+		String stateName,
+		JoinFunction<IN, Iterator<Map.Entry<K, V>>, Iterator<Map.Entry<K, V>>> stateValueGenerator,
+		TypeSerializer<K> keySerializer,
+		TypeSerializer<V> valueSerializer) {
+
+		super(stateName);
+		this.keySerializer = keySerializer;
+		this.valueSerializer = valueSerializer;
+		this.stateValueGenerator = stateValueGenerator;
+	}
+
+	@Override
+	public void artificialStateForElement(IN event) throws Exception {
+		Iterator<Map.Entry<K, V>> update = stateValueGenerator.join(event, mapState.iterator());
+		while (update.hasNext()) {
+			Map.Entry<K, V> updateEntry = update.next();
+			mapState.put(updateEntry.getKey(), updateEntry.getValue());
+		}
+	}
+
+	@Override
+	public void initialize(FunctionInitializationContext initializationContext) {
+		MapStateDescriptor<K, V> mapStateDescriptor =
+			new MapStateDescriptor<>(stateName, keySerializer, valueSerializer);
+		mapState = initializationContext.getKeyedStateStore().getMapState(mapStateDescriptor);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialStateBuilder.java
new file mode 100644
index 0000000..aed94ba
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialStateBuilder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate.builder;
+
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import java.io.Serializable;
+
+/**
+ * The state builder wraps the logic of registering state in user
+ * functions, as well as how state is updated per input element..
+ */
+public abstract class ArtificialStateBuilder<T> implements Serializable {
+
+	private static final long serialVersionUID = -5887676929924485788L;
+
+	protected final String stateName;
+
+	public ArtificialStateBuilder(String stateName) {
+		this.stateName = stateName;
+	}
+
+	public String getStateName() {
+		return stateName;
+	}
+
+	/**
+	 * Manipulate the state for an input element.
+	 *
+	 * @param element the current input element.
+	 */
+	public abstract void artificialStateForElement(T element) throws Exception;
+
+	/**
+	 * Registers the state.
+	 *
+	 * @param initializationContext the state initialization context, provided by the user function.
+	 */
+	public abstract void initialize(FunctionInitializationContext initializationContext) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
new file mode 100644
index 0000000..6d74e09
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate.builder;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+/**
+ * An {@link ArtificialStateBuilder} for user {@link ValueState}s.
+ */
+public class ArtificialValueStateBuilder<IN, STATE> extends ArtificialStateBuilder<IN> {
+
+	private static final long serialVersionUID = -1205814329756790916L;
+
+	private transient ValueState<STATE> valueState;
+	private final TypeSerializer<STATE> typeSerializer;
+	private final JoinFunction<IN, STATE, STATE> stateValueGenerator;
+
+	public ArtificialValueStateBuilder(
+		String stateName,
+		JoinFunction<IN, STATE, STATE> stateValueGenerator,
+		TypeSerializer<STATE> typeSerializer) {
+		super(stateName);
+		this.typeSerializer = typeSerializer;
+		this.stateValueGenerator = stateValueGenerator;
+	}
+
+	@Override
+	public void artificialStateForElement(IN event) throws Exception {
+		valueState.update(stateValueGenerator.join(event, valueState.value()));
+	}
+
+	@Override
+	public void initialize(FunctionInitializationContext initializationContext) {
+		ValueStateDescriptor<STATE> valueStateDescriptor =
+			new ValueStateDescriptor<>(stateName, typeSerializer);
+		valueState = initializationContext.getKeyedStateStore().getState(valueStateDescriptor);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialMapStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialMapStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialMapStateBuilder.java
deleted file mode 100644
index a29b802..0000000
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialMapStateBuilder.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.artificialstate.eventpayload;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateBuilder;
-
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * An {@link ArtificialKeyedStateBuilder} for user {@link MapState}s.
- */
-public class ArtificialMapStateBuilder<IN, K, V> extends ArtificialKeyedStateBuilder<IN> {
-
-	private static final long serialVersionUID = -143079058769306954L;
-
-	private transient MapState<K, V> mapState;
-	private final TypeSerializer<K> keySerializer;
-	private final TypeSerializer<V> valueSerializer;
-	private final JoinFunction<IN, Iterator<Map.Entry<K, V>>, Iterator<Map.Entry<K, V>>> stateValueGenerator;
-
-	public ArtificialMapStateBuilder(
-		String stateName,
-		JoinFunction<IN, Iterator<Map.Entry<K, V>>, Iterator<Map.Entry<K, V>>> stateValueGenerator,
-		TypeSerializer<K> keySerializer,
-		TypeSerializer<V> valueSerializer) {
-
-		super(stateName);
-		this.keySerializer = keySerializer;
-		this.valueSerializer = valueSerializer;
-		this.stateValueGenerator = stateValueGenerator;
-	}
-
-	@Override
-	public void artificialStateForElement(IN event) throws Exception {
-		Iterator<Map.Entry<K, V>> update = stateValueGenerator.join(event, mapState.iterator());
-		while (update.hasNext()) {
-			Map.Entry<K, V> updateEntry = update.next();
-			mapState.put(updateEntry.getKey(), updateEntry.getValue());
-		}
-	}
-
-	@Override
-	public void initialize(FunctionInitializationContext initializationContext) {
-		MapStateDescriptor<K, V> mapStateDescriptor =
-			new MapStateDescriptor<>(stateName, keySerializer, valueSerializer);
-		mapState = initializationContext.getKeyedStateStore().getMapState(mapStateDescriptor);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java
deleted file mode 100644
index 1d2d5f8..0000000
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.artificialstate.eventpayload;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateBuilder;
-
-/**
- * An {@link ArtificialKeyedStateBuilder} for user {@link ValueState}s.
- */
-public class ArtificialValueStateBuilder<IN, STATE> extends ArtificialKeyedStateBuilder<IN> {
-
-	private static final long serialVersionUID = -1205814329756790916L;
-
-	private transient ValueState<STATE> valueState;
-	private final TypeSerializer<STATE> typeSerializer;
-	private final JoinFunction<IN, STATE, STATE> stateValueGenerator;
-
-	public ArtificialValueStateBuilder(
-		String stateName,
-		JoinFunction<IN, STATE, STATE> stateValueGenerator,
-		TypeSerializer<STATE> typeSerializer) {
-
-		super(stateName);
-		this.typeSerializer = typeSerializer;
-		this.stateValueGenerator = stateValueGenerator;
-	}
-
-	@Override
-	public void artificialStateForElement(IN event) throws Exception {
-		valueState.update(stateValueGenerator.join(event, valueState.value()));
-	}
-
-	@Override
-	public void initialize(FunctionInitializationContext initializationContext) {
-		ValueStateDescriptor<STATE> valueStateDescriptor =
-			new ValueStateDescriptor<>(stateName, typeSerializer);
-		valueState = initializationContext.getKeyedStateStore().getState(valueStateDescriptor);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ComplexPayload.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ComplexPayload.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ComplexPayload.java
deleted file mode 100644
index 8cba366..0000000
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ComplexPayload.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.artificialstate.eventpayload;
-
-import org.apache.flink.streaming.tests.DataStreamAllroundTestProgram;
-import org.apache.flink.streaming.tests.Event;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * A state type used in the {@link DataStreamAllroundTestProgram}.
- * Wraps an {@link Event} as state.
- */
-public class ComplexPayload implements Serializable {
-
-	private static final long serialVersionUID = 233624606545704853L;
-
-	public ComplexPayload(Event event) {
-		this.eventTime = event.getEventTime();
-		this.innerPayLoad = new InnerPayLoad(event.getSequenceNumber());
-		this.stringList = Arrays.asList(String.valueOf(event.getKey()), event.getPayload());
-	}
-
-	private final long eventTime;
-	private final List<String> stringList;
-	private final InnerPayLoad innerPayLoad;
-
-	/**
-	 * Nested class in state type. Wraps an {@link Event}'s sequence number.
-	 */
-	public static class InnerPayLoad implements Serializable {
-
-		private static final long serialVersionUID = 3986298180012117883L;
-
-		private final long sequenceNumber;
-
-		public InnerPayLoad(long sequenceNumber) {
-			this.sequenceNumber = sequenceNumber;
-		}
-
-		public long getSequenceNumber() {
-			return sequenceNumber;
-		}
-	}
-
-	public long getEventTime() {
-		return eventTime;
-	}
-
-	public List<String> getStringList() {
-		return stringList;
-	}
-
-	public InnerPayLoad getInnerPayLoad() {
-		return innerPayLoad;
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/pom.xml b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/pom.xml
new file mode 100644
index 0000000..d1bb86c
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/pom.xml
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.6-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-stream-stateful-job-upgrade-test</artifactId>
+	<name>flink-stream-stateful-job-upgrade-test</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-datastream-allround-test</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>3.0.0</version>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>StatefulStreamJobUpgradeTestProgram</finalName>
+							<artifactSet>
+								<excludes>
+									<exclude>com.google.code.findbugs:jsr305</exclude>
+									<exclude>org.slf4j:*</exclude>
+									<exclude>log4j:*</exclude>
+								</excludes>
+							</artifactSet>
+							<filters>
+								<filter>
+									<artifact>*:*</artifact>
+									<excludes>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+									</excludes>
+								</filter>
+							</filters>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.streaming.tests.StatefulStreamJobUpgradeTestProgram</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
new file mode 100644
index 0000000..0b3b5ed
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.tests.artificialstate.ComplexPayload;
+
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test upgrade of generic stateful job for Flink's DataStream API operators and primitives.
+ *
+ * <p>The job is constructed of generic components from {@link DataStreamAllroundTestJobFactory}.
+ * The goal is to test successful state restoration after taking savepoint and recovery with new job version.
+ * It can be configured with '--test.job.variant' to run different variants of it:
+ * <ul>
+ *     <li><b>original:</b> includes 2 custom stateful map operators</li>
+ *     <li><b>upgraded:</b> changes order of 2 custom stateful map operators and adds one more</li>
+ * </ul>
+ *
+ * <p>The cli job configuration options are described in {@link DataStreamAllroundTestJobFactory}.
+ *
+ * <p>Job specific configuration options:
+ * <ul>
+ *     <li>test.job.variant (String, default - 'original'): This configures the job variant to test. Can be 'original' or 'upgraded'.</li>
+ * </ul>
+ *
+ */
+public class StatefulStreamJobUpgradeTestProgram {
+	private static final String TEST_JOB_VARIANT_ORIGINAL = "original";
+	private static final String TEST_JOB_VARIANT_UPGRADED = "upgraded";
+
+	private static final ConfigOption<String> TEST_JOB_VARIANT = ConfigOptions
+		.key("test.job.variant")
+		.defaultValue(TEST_JOB_VARIANT_ORIGINAL)
+		.withDescription(String.format("This configures the job variant to test. Can be '%s' or '%s'",
+			TEST_JOB_VARIANT_ORIGINAL, TEST_JOB_VARIANT_UPGRADED));
+
+	public static void main(String[] args) throws Exception {
+		final ParameterTool pt = ParameterTool.fromArgs(args);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		setupEnvironment(env, pt);
+
+		KeyedStream<Event, Integer> source = env.addSource(createEventSource(pt))
+			.name("EventSource")
+			.uid("EventSource")
+			.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+			.keyBy(Event::getKey);
+
+		List<TypeSerializer<ComplexPayload>> stateSer =
+			Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
+
+		KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
+			applyOriginalStatefulOperations(source, stateSer) :
+			applyUpgradedStatefulOperations(source, stateSer);
+
+		afterStatefulOperations
+			.flatMap(createSemanticsCheckMapper(pt))
+			.name("SemanticsCheckMapper")
+			.addSink(new PrintSinkFunction<>());
+
+		env.execute("General purpose test job");
+	}
+
+	private static boolean isOriginalJobVariant(final ParameterTool pt) {
+		switch (pt.get(TEST_JOB_VARIANT.key())) {
+			case TEST_JOB_VARIANT_ORIGINAL:
+				return true;
+			case TEST_JOB_VARIANT_UPGRADED:
+				return false;
+			default:
+				throw new IllegalArgumentException(String.format("'--test.job.variant' can be either '%s' or '%s'",
+					TEST_JOB_VARIANT_ORIGINAL, TEST_JOB_VARIANT_UPGRADED));
+		}
+	}
+
+	private static KeyedStream<Event, Integer> applyOriginalStatefulOperations(
+		KeyedStream<Event, Integer> source,
+		List<TypeSerializer<ComplexPayload>> stateSer) {
+		source = applyTestStatefulOperator("stateMap1", simpleStateUpdate("stateMap1"), source, stateSer);
+		return applyTestStatefulOperator("stateMap2", lastStateUpdate("stateMap2"), source, stateSer);
+	}
+
+	private static KeyedStream<Event, Integer> applyUpgradedStatefulOperations(
+		KeyedStream<Event, Integer> source,
+		List<TypeSerializer<ComplexPayload>> stateSer) {
+		source = applyTestStatefulOperator("stateMap2", simpleStateUpdate("stateMap2"), source, stateSer);
+		source = applyTestStatefulOperator("stateMap1", lastStateUpdate("stateMap1"), source, stateSer);
+		return applyTestStatefulOperator("stateMap3", simpleStateUpdate("stateMap3"), source, stateSer);
+	}
+
+	private static KeyedStream<Event, Integer> applyTestStatefulOperator(
+		String name,
+		JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
+		KeyedStream<Event, Integer> source,
+		List<TypeSerializer<ComplexPayload>> stateSer) {
+		return source
+			.map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer))
+			.name(name)
+			.uid(name)
+			.returns(Event.class)
+			.keyBy(Event::getKey);
+	}
+
+	private static JoinFunction<Event, ComplexPayload, ComplexPayload> simpleStateUpdate(String strPayload) {
+		return (Event first, ComplexPayload second) -> {
+			verifyState(strPayload, second);
+			return new ComplexPayload(first, strPayload);
+		};
+	}
+
+	private static JoinFunction<Event, ComplexPayload, ComplexPayload> lastStateUpdate(String strPayload) {
+		return (Event first, ComplexPayload second) -> {
+			verifyState(strPayload, second);
+			boolean isLastEvent = second != null && first.getEventTime() <= second.getEventTime();
+			return isLastEvent ? second : new ComplexPayload(first, strPayload);
+		};
+	}
+
+	private static void verifyState(String strPayload, ComplexPayload state) {
+		if (state != null && !state.getStrPayload().equals(strPayload)) {
+			System.out.println("State is set or restored incorrectly");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 898e4af..5cb75a8 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -42,6 +42,7 @@ under the License.
 		<module>flink-bucketing-sink-test</module>
 		<module>flink-distributed-cache-via-blob-test</module>
 		<module>flink-high-parallelism-iterations-test</module>
+		<module>flink-stream-stateful-job-upgrade-test</module>
 	</modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 29c53c2..f0fe938 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -112,5 +112,13 @@ if [ $EXIT_CODE == 0 ]; then
   EXIT_CODE=$?
 fi
 
+if [ $EXIT_CODE == 0 ]; then
+  printf "\n==============================================================================\n"
+  printf "Running stateful stream job upgrade nightly end-to-end test\n"
+  printf "==============================================================================\n"
+  $END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4
+  EXIT_CODE=$?
+fi
+
 # Exit code for Travis build success/failure
 exit $EXIT_CODE

http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index d8e0de7..c5ea4b8 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -39,6 +39,12 @@ cd $TEST_ROOT
 export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
 echo "TEST_DATA_DIR: $TEST_DATA_DIR"
 
+function backup_config() {
+    # back up the masters and flink-conf.yaml
+    cp $FLINK_DIR/conf/masters $FLINK_DIR/conf/masters.bak
+    cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
+}
+
 function revert_default_config() {
 
     # revert our modifications to the masters file
@@ -52,11 +58,22 @@ function revert_default_config() {
     fi
 }
 
+function set_conf() {
+    CONF_NAME=$1
+    VAL=$2
+    echo "$CONF_NAME: $VAL" >> $FLINK_DIR/conf/flink-conf.yaml
+}
+
+function change_conf() {
+    CONF_NAME=$1
+    OLD_VAL=$2
+    NEW_VAL=$3
+    sed -i -e "s/${CONF_NAME}: ${OLD_VAL}/${CONF_NAME}: ${NEW_VAL}/" ${FLINK_DIR}/conf/flink-conf.yaml
+}
+
 function create_ha_config() {
 
-    # back up the masters and flink-conf.yaml
-    cp $FLINK_DIR/conf/masters $FLINK_DIR/conf/masters.bak
-    cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
+    backup_config
 
     # clean up the dir that will be used for zookeeper storage
     # (see high-availability.zookeeper.storageDir below)
@@ -309,6 +326,60 @@ function kill_random_taskmanager {
   echo "TaskManager $KILL_TM killed."
 }
 
+function setup_flink_slf4j_metric_reporter() {
+  INTERVAL="${1:-1 SECONDS}"
+  cp $FLINK_DIR/opt/flink-metrics-slf4j-*.jar $FLINK_DIR/lib/
+  set_conf "metrics.reporter.slf4j.class" "org.apache.flink.metrics.slf4j.Slf4jReporter"
+  set_conf "metrics.reporter.slf4j.interval" "${INTERVAL}"
+}
+
+function rollback_flink_slf4j_metric_reporter() {
+  rm $FLINK_DIR/lib/flink-metrics-slf4j-*.jar
+}
+
+function get_metric_processed_records {
+  OPERATOR=$1
+  N=$(grep ".General purpose test job.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1)
+  if [ -z $N ]; then
+    N=0
+  fi
+  echo $N
+}
+
+function get_num_metric_samples {
+  OPERATOR=$1
+  N=$(grep ".General purpose test job.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | wc -l)
+  if [ -z $N ]; then
+    N=0
+  fi
+  echo $N
+}
+
+function wait_oper_metric_num_in_records {
+    OPERATOR=$1
+    MAX_NUM_METRICS="${2:-200}"
+    NUM_METRICS=$(get_num_metric_samples ${OPERATOR})
+    OLD_NUM_METRICS=${3:-${NUM_METRICS}}
+    # monitor the numRecordsIn metric of the state machine operator in the second execution
+    # we let the test finish once the second restore execution has processed 200 records
+    while : ; do
+      NUM_METRICS=$(get_num_metric_samples ${OPERATOR})
+      NUM_RECORDS=$(get_metric_processed_records ${OPERATOR})
+
+      # only account for metrics that appeared in the second execution
+      if (( $OLD_NUM_METRICS >= $NUM_METRICS )) ; then
+        NUM_RECORDS=0
+      fi
+
+      if (( $NUM_RECORDS < $MAX_NUM_METRICS )); then
+        echo "Waiting for job to process up to 200 records, current progress: $NUM_RECORDS records ..."
+        sleep 1
+      else
+        break
+      fi
+    done
+}
+
 # make sure to clean up even in case of failures
 function cleanup {
   stop_cluster

http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
index d3b3118..ae37f4a 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
@@ -33,14 +33,9 @@ else
   NUM_SLOTS=$NEW_DOP
 fi
 
-# modify configuration to have enough slots
-cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
-sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $NUM_SLOTS/" $FLINK_DIR/conf/flink-conf.yaml
-
-# modify configuration to use SLF4J reporter; we will be using this to monitor the state machine progress
-cp $FLINK_DIR/opt/flink-metrics-slf4j-*.jar $FLINK_DIR/lib/
-echo "metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter" >> $FLINK_DIR/conf/flink-conf.yaml
-echo "metrics.reporter.slf4j.interval: 1 SECONDS" >> $FLINK_DIR/conf/flink-conf.yaml
+backup_config
+change_conf "taskmanager.numberOfTaskSlots" "1" "${NUM_SLOTS}"
+setup_flink_slf4j_metric_reporter
 
 start_cluster
 
@@ -52,8 +47,7 @@ function test_cleanup {
   trap "" EXIT
 
   # revert our modifications to the Flink distribution
-  mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
-  rm $FLINK_DIR/lib/flink-metrics-slf4j-*.jar
+  rm ${FLINK_DIR}/lib/flink-metrics-slf4j-*.jar
 
   # make sure to run regular cleanup as well
   cleanup
@@ -75,30 +69,7 @@ DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR
 
 wait_job_running $DATASTREAM_JOB
 
-function get_metric_processed_records {
-  grep ".General purpose test job.ArtificalKeyedStateMapper.0.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1
-}
-
-function get_num_metric_samples {
-  grep ".General purpose test job.ArtificalKeyedStateMapper.0.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | wc -l
-}
-
-# monitor the numRecordsIn metric of the state machine operator;
-# only proceed to savepoint when the operator has processed 200 records
-while : ; do
-  NUM_RECORDS=$(get_metric_processed_records)
-
-  if [ -z $NUM_RECORDS ]; then
-    NUM_RECORDS=0
-  fi
-
-  if (( $NUM_RECORDS < 200 )); then
-    echo "Waiting for job to process up to 200 records, current progress: $NUM_RECORDS records ..."
-    sleep 1
-  else
-    break
-  fi
-done
+wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
 
 # take a savepoint of the state machine job
 SAVEPOINT_PATH=$(take_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
@@ -121,24 +92,7 @@ DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TES
 
 wait_job_running $DATASTREAM_JOB
 
-# monitor the numRecordsIn metric of the state machine operator in the second execution
-# we let the test finish once the second restore execution has processed 200 records
-while : ; do
-  NUM_METRICS=$(get_num_metric_samples)
-  NUM_RECORDS=$(get_metric_processed_records)
-
-  # only account for metrics that appeared in the second execution
-  if (( $OLD_NUM_METRICS >= $NUM_METRICS )) ; then
-    NUM_RECORDS=0
-  fi
-
-  if (( $NUM_RECORDS < 200 )); then
-    echo "Waiting for job to process up to 200 records, current progress: $NUM_RECORDS records ..."
-    sleep 1
-  else
-    break
-  fi
-done
+wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
 
 # if state is errorneous and the state machine job produces alerting state transitions,
 # output would be non-empty and the test will not pass