You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/04/27 14:11:44 UTC

[3/3] flink git commit: [FLINK-8992] [e2e-tests] Integrate general DataStream test job with project structure

[FLINK-8992] [e2e-tests] Integrate general DataStream test job with project structure

This also includes minor cleanup of WIP code in the test job.

This closes #5925.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff62977d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff62977d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff62977d

Branch: refs/heads/master
Commit: ff62977d63e1551a7d0261fbe8e4dd3b2d124323
Parents: 31c7176
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed Apr 25 17:05:24 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Apr 27 16:09:43 2018 +0200

----------------------------------------------------------------------
 .../flink-datastream-allround-test/pom.xml      |  90 ++++++
 .../tests/DataStreamAllroundTestProgram.java    | 304 +++++++++++++++++++
 .../org/apache/flink/streaming/tests/Event.java |  63 ++++
 .../streaming/tests/SemanticsCheckMapper.java   |  82 +++++
 .../tests/SequenceGeneratorSource.java          | 233 ++++++++++++++
 .../ArtificialKeyedStateBuilder.java            |  56 ++++
 .../ArtificialKeyedStateMapper.java             |  82 +++++
 .../eventpayload/ArtificialMapStateBuilder.java |  70 +++++
 .../ArtificialValueStateBuilder.java            |  60 ++++
 .../eventpayload/ComplexPayload.java            |  76 +++++
 flink-end-to-end-tests/pom.xml                  |   1 +
 .../flink/streaming/tests/general/Event.java    |  60 ----
 .../tests/general/GeneralPurposeJobTest.java    | 252 ---------------
 .../tests/general/SemanticsCheckMapper.java     |  87 ------
 .../tests/general/SequenceGeneratorSource.java  | 240 ---------------
 .../ArtificialKeyedStateBuilder.java            |  40 ---
 .../ArtificialKeyedStateMapper.java             |  76 -----
 .../eventpayload/ArtificialMapStateBuilder.java |  65 ----
 .../ArtificialValueStateBuilder.java            |  56 ----
 .../eventpayload/ComplexPayload.java            |  63 ----
 .../test-scripts/test_resume_savepoint.sh       |  61 ++--
 21 files changed, 1145 insertions(+), 972 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/flink-datastream-allround-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/pom.xml b/flink-end-to-end-tests/flink-datastream-allround-test/pom.xml
new file mode 100644
index 0000000..d5f8913
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<version>1.6-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-datastream-allround-test</artifactId>
+	<name>flink-datastream-allround-test</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<version>2.4</version>
+				<executions>
+					<!-- DataStreamAllroundTestProgram -->
+					<execution>
+						<id>DataStreamAllroundTestProgram</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<finalName>DataStreamAllroundTestProgram</finalName>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.tests.DataStreamAllroundTestProgram</program-class>
+								</manifestEntries>
+							</archive>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
new file mode 100644
index 0000000..2059b99
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateBuilder;
+import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateMapper;
+import org.apache.flink.streaming.tests.artificialstate.eventpayload.ArtificialValueStateBuilder;
+import org.apache.flink.streaming.tests.artificialstate.eventpayload.ComplexPayload;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * A general purpose test for Flink's DataStream API operators and primitives.
+ *
+ * <p>It currrently covers the following aspects that are frequently present in Flink DataStream jobs:
+ * <ul>
+ *     <li>A generic Kryo input type.</li>
+ *     <li>A state type for which we register a {@link KryoSerializer}.</li>
+ *     <li>Operators with {@link ValueState}.</li>
+ * </ul>
+ *
+ * <p>The job allows to be configured for different state backends, including memory, file, and RocksDB
+ * state backends. It also allows specifying the processing guarantee semantics, which will also be verified
+ * by the job itself according to the specified semantic.
+ *
+ * <p>Program parameters:
+ * <ul>
+ *     <li>test.semantics (String, default - 'exactly-once'): This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'.</li>
+ *     <li>environment.checkpoint_interval (long, default - 1000): the checkpoint interval.</li>
+ *     <li>environment.parallelism (int, default - 1): parallelism to use for the job.</li>
+ *     <li>environment.max_parallelism (int, default - 128): max parallelism to use for the job</li>
+ *     <li>environment.restart_strategy.delay (long, default - 0): delay between restart attempts, in milliseconds.</li>
+ *     <li>state_backend (String, default - 'file'): Supported values are 'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.</li>
+ *     <li>state_backend.checkpoint_directory (String): The checkpoint directory.</li>
+ *     <li>state_backend.rocks.incremental (boolean, default - false): Activate or deactivate incremental snapshots if RocksDBStateBackend is selected.</li>
+ *     <li>state_backend.file.async (boolean, default - true): Activate or deactivate asynchronous snapshots if FileStateBackend is selected.</li>
+ *     <li>sequence_generator_source.keyspace (int, default - 1000): Number of different keys for events emitted by the sequence generator.</li>
+ *     <li>sequence_generator_source.payload_size (int, default - 20): Length of message payloads emitted by the sequence generator.</li>
+ *     <li>sequence_generator_source.sleep_time (long, default - 0): Milliseconds to sleep after emitting events in the sequence generator. Set to 0 to disable sleeping.</li>
+ *     <li>sequence_generator_source.sleep_after_elements (long, default - 0): Number of elements to emit before sleeping in the sequence generator. Set to 0 to disable sleeping.</li>
+ *     <li>sequence_generator_source.event_time.max_out_of_order (long, default - 500): Max event time out-of-orderness for events emitted by the sequence generator.</li>
+ *     <li>sequence_generator_source.event_time.clock_progress (long, default - 100): The amount of event time to progress per event generated by the sequence generator.</li>
+ * </ul>
+ */
+public class DataStreamAllroundTestProgram {
+
+	private static final ConfigOption<String> TEST_SEMANTICS = ConfigOptions
+		.key("test.semantics")
+		.defaultValue("exactly-once")
+		.withDescription("This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'");
+
+	private static final ConfigOption<Long> ENVIRONMENT_CHECKPOINT_INTERVAL = ConfigOptions
+		.key("environment.checkpoint_interval")
+		.defaultValue(1000L);
+
+	private static final ConfigOption<Integer> ENVIRONMENT_PARALLELISM = ConfigOptions
+		.key("environment.parallelism")
+		.defaultValue(1);
+
+	private static final ConfigOption<Integer> ENVIRONMENT_MAX_PARALLELISM = ConfigOptions
+		.key("environment.max_parallelism")
+		.defaultValue(128);
+
+	private static final ConfigOption<Integer> ENVIRONMENT_RESTART_DELAY = ConfigOptions
+		.key("environment.restart_strategy.delay")
+		.defaultValue(0);
+
+	private static final ConfigOption<String> STATE_BACKEND = ConfigOptions
+		.key("state_backend")
+		.defaultValue("file")
+		.withDescription("Supported values are 'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.");
+
+	private static final ConfigOption<String> STATE_BACKEND_CHECKPOINT_DIR = ConfigOptions
+		.key("state_backend.checkpoint_directory")
+		.noDefaultValue()
+		.withDescription("The checkpoint directory.");
+
+	private static final ConfigOption<Boolean> STATE_BACKEND_ROCKS_INCREMENTAL = ConfigOptions
+		.key("state_backend.rocks.incremental")
+		.defaultValue(false)
+		.withDescription("Activate or deactivate incremental snapshots if RocksDBStateBackend is selected.");
+
+	private static final ConfigOption<Boolean> STATE_BACKEND_FILE_ASYNC = ConfigOptions
+		.key("state_backend.file.async")
+		.defaultValue(true)
+		.withDescription("Activate or deactivate asynchronous snapshots if FileStateBackend is selected.");
+
+	private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_KEYSPACE = ConfigOptions
+		.key("sequence_generator_source.keyspace")
+		.defaultValue(200);
+
+	private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE = ConfigOptions
+		.key("sequence_generator_source.payload_size")
+		.defaultValue(20);
+
+	private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_SLEEP_TIME = ConfigOptions
+		.key("sequence_generator_source.sleep_time")
+		.defaultValue(0L);
+
+	private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS = ConfigOptions
+		.key("sequence_generator_source.sleep_after_elements")
+		.defaultValue(0L);
+
+	private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS = ConfigOptions
+		.key("sequence_generator_source.event_time.max_out_of_order")
+		.defaultValue(500L);
+
+	private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS = ConfigOptions
+		.key("sequence_generator_source.event_time.clock_progress")
+		.defaultValue(100L);
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public static void main(String[] args) throws Exception {
+		final ParameterTool pt = ParameterTool.fromArgs(args);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		setupEnvironment(env, pt);
+
+		env.addSource(createEventSource(pt))
+			.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+			.keyBy(Event::getKey)
+			.map(createArtificialKeyedStateMapper(
+				// map function simply forwards the inputs
+				(MapFunction<Event, Event>) in -> in,
+				// state is updated per event as a wrapped ComplexPayload state object
+				(Event first, ComplexPayload second) -> new ComplexPayload(first), //
+				Arrays.asList(
+					new KryoSerializer<>(ComplexPayload.class, env.getConfig()))
+				)
+			)
+			.name("ArtificalKeyedStateMapper")
+			.returns(Event.class)
+			.keyBy(Event::getKey)
+			.flatMap(createSemanticsCheckMapper(pt))
+			.name("SemanticsCheckMapper")
+			.addSink(new PrintSinkFunction<>());
+
+		env.execute("General purpose test job");
+	}
+
+	public static void setupEnvironment(StreamExecutionEnvironment env, ParameterTool pt) throws Exception {
+
+		// set checkpointing semantics
+		String semantics = pt.get(TEST_SEMANTICS.key(), TEST_SEMANTICS.defaultValue());
+		long checkpointInterval = pt.getLong(ENVIRONMENT_CHECKPOINT_INTERVAL.key(), ENVIRONMENT_CHECKPOINT_INTERVAL.defaultValue());
+		CheckpointingMode checkpointingMode = semantics.equalsIgnoreCase("exactly-once")
+			? CheckpointingMode.EXACTLY_ONCE
+			: CheckpointingMode.AT_LEAST_ONCE;
+
+		env.enableCheckpointing(checkpointInterval, checkpointingMode);
+
+		// use event time
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		// parallelism
+		env.setParallelism(pt.getInt(ENVIRONMENT_PARALLELISM.key(), ENVIRONMENT_PARALLELISM.defaultValue()));
+		env.setMaxParallelism(pt.getInt(ENVIRONMENT_MAX_PARALLELISM.key(), ENVIRONMENT_MAX_PARALLELISM.defaultValue()));
+
+		// restart strategy
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+			Integer.MAX_VALUE,
+			pt.getInt(ENVIRONMENT_RESTART_DELAY.key(), ENVIRONMENT_RESTART_DELAY.defaultValue())));
+
+		// state backend
+		final String stateBackend = pt.get(
+			STATE_BACKEND.key(),
+			STATE_BACKEND.defaultValue());
+
+		final String checkpointDir = pt.getRequired(STATE_BACKEND_CHECKPOINT_DIR.key());
+
+		if ("file".equalsIgnoreCase(stateBackend)) {
+			boolean asyncCheckpoints = pt.getBoolean(
+				STATE_BACKEND_FILE_ASYNC.key(),
+				STATE_BACKEND_FILE_ASYNC.defaultValue());
+
+			env.setStateBackend(new FsStateBackend(checkpointDir, asyncCheckpoints));
+		} else if ("rocks".equalsIgnoreCase(stateBackend)) {
+			boolean incrementalCheckpoints = pt.getBoolean(
+				STATE_BACKEND_ROCKS_INCREMENTAL.key(),
+				STATE_BACKEND_ROCKS_INCREMENTAL.defaultValue());
+
+			env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints));
+		} else {
+			throw new IllegalArgumentException("Unknown backend requested: " + stateBackend);
+		}
+
+		// make parameters available in the web interface
+		env.getConfig().setGlobalJobParameters(pt);
+	}
+
+	private static SourceFunction<Event> createEventSource(ParameterTool pt) {
+		return new SequenceGeneratorSource(
+			pt.getInt(
+				SEQUENCE_GENERATOR_SRC_KEYSPACE.key(),
+				SEQUENCE_GENERATOR_SRC_KEYSPACE.defaultValue()),
+			pt.getInt(
+				SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE.key(),
+				SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE.defaultValue()),
+			pt.getLong(
+				SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
+				SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()),
+			pt.getLong(
+				SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.key(),
+				SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.defaultValue()),
+			pt.getLong(
+				SEQUENCE_GENERATOR_SRC_SLEEP_TIME.key(),
+				SEQUENCE_GENERATOR_SRC_SLEEP_TIME.defaultValue()),
+			pt.getLong(
+				SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.key(),
+				SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue()));
+	}
+
+	private static BoundedOutOfOrdernessTimestampExtractor<Event> createTimestampExtractor(ParameterTool pt) {
+		return new BoundedOutOfOrdernessTimestampExtractor<Event>(
+			Time.milliseconds(
+				pt.getLong(
+					SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
+					SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()))) {
+
+			@Override
+			public long extractTimestamp(Event element) {
+				return element.getEventTime();
+			}
+		};
+	}
+
+	private static FlatMapFunction<Event, String> createSemanticsCheckMapper(ParameterTool pt) {
+
+		String semantics = pt.get(TEST_SEMANTICS.key(), TEST_SEMANTICS.defaultValue());
+
+		SemanticsCheckMapper.ValidatorFunction validatorFunction;
+
+		if (semantics.equalsIgnoreCase("exactly-once")) {
+			validatorFunction = SemanticsCheckMapper.ValidatorFunction.exactlyOnce();
+		} else if (semantics.equalsIgnoreCase("at-least-once")) {
+			validatorFunction = SemanticsCheckMapper.ValidatorFunction.atLeastOnce();
+		} else {
+			throw new IllegalArgumentException("Unknown semantics requested: " + semantics);
+		}
+
+		return new SemanticsCheckMapper(validatorFunction);
+	}
+
+	private static <IN, OUT, STATE> ArtificialKeyedStateMapper<IN, OUT> createArtificialKeyedStateMapper(
+		MapFunction<IN, OUT> mapFunction,
+		JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
+		List<TypeSerializer<STATE>> stateSerializers) {
+
+		List<ArtificialKeyedStateBuilder<IN>> artificialStateBuilders = new ArrayList<>(stateSerializers.size());
+		for (TypeSerializer<STATE> typeSerializer : stateSerializers) {
+
+			String stateName = "valueState-" + typeSerializer.getClass().getSimpleName() + "-" + UUID.randomUUID();
+
+			ArtificialValueStateBuilder<IN, STATE> stateBuilder = new ArtificialValueStateBuilder<>(
+				stateName,
+				inputAndOldStateToNewState,
+				typeSerializer
+			);
+
+			artificialStateBuilders.add(stateBuilder);
+		}
+		return new ArtificialKeyedStateMapper<>(mapFunction, artificialStateBuilders);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/Event.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/Event.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/Event.java
new file mode 100644
index 0000000..8c219ed
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/Event.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+/**
+ * The event type of records used in the {@link DataStreamAllroundTestProgram}.
+ */
+public class Event {
+
+	private final int key;
+	private final long eventTime;
+	private final long sequenceNumber;
+	private final String payload;
+
+	public Event(int key, long eventTime, long sequenceNumber, String payload) {
+		this.key = key;
+		this.eventTime = eventTime;
+		this.sequenceNumber = sequenceNumber;
+		this.payload = payload;
+	}
+
+	public int getKey() {
+		return key;
+	}
+
+	public long getEventTime() {
+		return eventTime;
+	}
+
+	public long getSequenceNumber() {
+		return sequenceNumber;
+	}
+
+	public String getPayload() {
+		return payload;
+	}
+
+	@Override
+	public String toString() {
+		return "Event{" +
+			"key=" + key +
+			", eventTime=" + eventTime +
+			", sequenceNumber=" + sequenceNumber +
+			", payload='" + payload + '\'' +
+			'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SemanticsCheckMapper.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SemanticsCheckMapper.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SemanticsCheckMapper.java
new file mode 100644
index 0000000..53c3414
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SemanticsCheckMapper.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * This mapper validates exactly-once and at-least-once semantics in connection with {@link SequenceGeneratorSource}.
+ */
+public class SemanticsCheckMapper extends RichFlatMapFunction<Event, String> {
+
+	private static final long serialVersionUID = -744070793650644485L;
+
+	/** This value state tracks the current sequence number per key. */
+	private volatile ValueState<Long> sequenceValue;
+
+	/** This defines how semantics are checked for each update. */
+	private final ValidatorFunction validator;
+
+	SemanticsCheckMapper(ValidatorFunction validator) {
+		this.validator = validator;
+	}
+
+	@Override
+	public void flatMap(Event event, Collector<String> out) throws Exception {
+
+		Long currentValue = sequenceValue.value();
+		if (currentValue == null) {
+			currentValue = 0L;
+		}
+
+		long nextValue = event.getSequenceNumber();
+
+		if (validator.check(currentValue, nextValue)) {
+			sequenceValue.update(nextValue);
+		} else {
+			out.collect("Alert: " + currentValue + " -> " + nextValue + " (" + event.getKey() + ")");
+		}
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		ValueStateDescriptor<Long> sequenceStateDescriptor =
+			new ValueStateDescriptor<>("sequenceState", Long.class);
+
+		sequenceValue = getRuntimeContext().getState(sequenceStateDescriptor);
+	}
+
+	interface ValidatorFunction extends Serializable {
+		boolean check(long current, long update);
+
+		static ValidatorFunction exactlyOnce() {
+			return (current, update) -> (update - current) == 1;
+		}
+
+		static ValidatorFunction atLeastOnce() {
+			return (current, update) -> (update - current) <= 1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
new file mode 100644
index 0000000..e641551
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * This source function generates a sequence of long values per key.
+ */
+public class SequenceGeneratorSource extends RichParallelSourceFunction<Event> implements CheckpointedFunction {
+
+	private static final long serialVersionUID = -3986989644799442178L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(SequenceGeneratorSource.class);
+
+	/** Length of the artificial payload string generated for each event. */
+	private final int payloadLength;
+
+	/** The size of the total key space, i.e. the number of unique keys generated by all parallel sources. */
+	private final int totalKeySpaceSize;
+
+	/** This determines how much the event time progresses for each generated element. */
+	private final long eventTimeClockProgressPerEvent;
+
+	/** Maximum generated deviation in event time from the current event time clock. */
+	private final long maxOutOfOrder;
+
+	/** Time that a sleep takes in milliseconds. A value < 1 deactivates sleeping. */
+	private final long sleepTime;
+
+	/** This determines after how many generated events we sleep. A value < 1 deactivates sleeping. */
+	private final long sleepAfterElements;
+
+	/** This holds the key ranges for which this source generates events. */
+	private transient List<KeyRangeStates> keyRanges;
+
+	/** This is used to snapshot the state of this source, one entry per key range. */
+	private transient ListState<KeyRangeStates> snapshotKeyRanges;
+
+	/** Flag that determines if this source is running, i.e. generating events. */
+	private volatile boolean running;
+
+	SequenceGeneratorSource(
+		int totalKeySpaceSize,
+		int payloadLength,
+		long maxOutOfOrder,
+		long eventTimeClockProgressPerEvent,
+		long sleepTime,
+		long sleepAfterElements) {
+
+		this.totalKeySpaceSize = totalKeySpaceSize;
+		this.maxOutOfOrder = maxOutOfOrder;
+		this.payloadLength = payloadLength;
+		this.eventTimeClockProgressPerEvent = eventTimeClockProgressPerEvent;
+		this.sleepTime = sleepTime;
+		this.sleepAfterElements = sleepTime > 0 ? sleepAfterElements : 0;
+		this.running = true;
+	}
+
+	@Override
+	public void run(SourceContext<Event> ctx) throws Exception {
+
+		Random random = new Random();
+
+		// this holds the current event time, from which generated events can up to +/- (maxOutOfOrder).
+		long monotonousEventTime = 0L;
+		long elementsBeforeSleep = sleepAfterElements;
+
+		while (running) {
+
+			KeyRangeStates randomKeyRangeStates = keyRanges.get(random.nextInt(keyRanges.size()));
+			int randomKey = randomKeyRangeStates.getRandomKey(random);
+
+			long eventTime = Math.max(
+				0,
+				generateEventTimeWithOutOfOrderness(random, monotonousEventTime));
+
+			// uptick the event time clock
+			monotonousEventTime += eventTimeClockProgressPerEvent;
+
+			synchronized (ctx.getCheckpointLock()) {
+				long value = randomKeyRangeStates.incrementAndGet(randomKey);
+
+				Event event = new Event(
+					randomKey,
+					eventTime,
+					value,
+					StringUtils.getRandomString(random, payloadLength, payloadLength, 'A', 'z'));
+
+				ctx.collect(event);
+			}
+
+			if (sleepTime > 0) {
+				if (elementsBeforeSleep == 1) {
+					elementsBeforeSleep = sleepAfterElements;
+					Thread.sleep(sleepTime);
+				} else if (elementsBeforeSleep > 1) {
+					--elementsBeforeSleep;
+				}
+			}
+		}
+	}
+
+	private long generateEventTimeWithOutOfOrderness(Random random, long correctTime) {
+		return correctTime - maxOutOfOrder + ((random.nextLong() & Long.MAX_VALUE) % (2 * maxOutOfOrder));
+	}
+
+	@Override
+	public void cancel() {
+		running = false;
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		snapshotKeyRanges.update(keyRanges);
+	}
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+		final RuntimeContext runtimeContext = getRuntimeContext();
+		final int subtaskIdx = runtimeContext.getIndexOfThisSubtask();
+		final int parallelism = runtimeContext.getNumberOfParallelSubtasks();
+		final int maxParallelism = runtimeContext.getMaxNumberOfParallelSubtasks();
+
+		ListStateDescriptor<KeyRangeStates> stateDescriptor =
+			new ListStateDescriptor<>("keyRanges", KeyRangeStates.class);
+
+		snapshotKeyRanges = context.getOperatorStateStore().getListState(stateDescriptor);
+		keyRanges = new ArrayList<>();
+
+		if (context.isRestored()) {
+			// restore key ranges from the snapshot
+			for (KeyRangeStates keyRange : snapshotKeyRanges.get()) {
+				keyRanges.add(keyRange);
+			}
+		} else {
+			// determine the key ranges that belong to the subtask
+			int rangeStartIdx = (subtaskIdx * maxParallelism) / parallelism;
+			int rangeEndIdx = ((subtaskIdx + 1) * maxParallelism) / parallelism;
+
+			for (int i = rangeStartIdx; i < rangeEndIdx; ++i) {
+
+				int start = ((i * totalKeySpaceSize + maxParallelism - 1) / maxParallelism);
+				int end = 1 + ((i + 1) * totalKeySpaceSize - 1) / maxParallelism;
+
+				if (end - start > 0) {
+					keyRanges.add(new KeyRangeStates(start, end));
+				}
+			}
+		}
+	}
+
+	/**
+	 * This defines the key-range and the current sequence numbers for all keys in the range.
+	 */
+	private static class KeyRangeStates {
+
+		/** Start key of the range (inclusive). */
+		final int startKey;
+
+		/** Start key of the range (exclusive). */
+		final int endKey;
+
+		/** This array contains the current sequence number for each key in the range. */
+		final long[] statesPerKey;
+
+		KeyRangeStates(int startKey, int endKey) {
+			this(startKey, endKey, new long[endKey - startKey]);
+		}
+
+		KeyRangeStates(int startKey, int endKey, long[] statesPerKey) {
+			Preconditions.checkArgument(statesPerKey.length == endKey - startKey);
+			this.startKey = startKey;
+			this.endKey = endKey;
+			this.statesPerKey = statesPerKey;
+		}
+
+		/**
+		 * Increments and returns the current sequence number for the given key.
+		 */
+		long incrementAndGet(int key) {
+			return ++statesPerKey[key - startKey];
+		}
+
+		/**
+		 * Returns a random key that belongs to this key range.
+		 */
+		int getRandomKey(Random random) {
+			return random.nextInt(endKey - startKey) + startKey;
+		}
+
+		@Override
+		public String toString() {
+			return "KeyRangeStates{" +
+				"start=" + startKey +
+				", end=" + endKey +
+				", statesPerKey=" + Arrays.toString(statesPerKey) +
+				'}';
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateBuilder.java
new file mode 100644
index 0000000..3db18db
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateBuilder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate;
+
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import java.io.Serializable;
+
+/**
+ * The keyed state builder wraps the logic of registering state in user
+ * functions, as well as how state is updated per input element..
+ */
+public abstract class ArtificialKeyedStateBuilder<T> implements Serializable {
+
+	private static final long serialVersionUID = -5887676929924485788L;
+
+	protected final String stateName;
+
+	public ArtificialKeyedStateBuilder(String stateName) {
+		this.stateName = stateName;
+	}
+
+	public String getStateName() {
+		return stateName;
+	}
+
+	/**
+	 * Manipulate the state for an input element.
+	 *
+	 * @param element the current input element.
+	 */
+	public abstract void artificialStateForElement(T element) throws Exception;
+
+	/**
+	 * Registers the state.
+	 *
+	 * @param initializationContext the state initialization context, provided by the user function.
+	 */
+	public abstract void initialize(FunctionInitializationContext initializationContext);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateMapper.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateMapper.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateMapper.java
new file mode 100644
index 0000000..9697877
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateMapper.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A generic, stateful {@link MapFunction} that allows specifying what states to maintain
+ * based on a provided list of {@link ArtificialKeyedStateBuilder}s.
+ */
+public class ArtificialKeyedStateMapper<IN, OUT> extends RichMapFunction<IN, OUT> implements CheckpointedFunction {
+
+	private static final long serialVersionUID = 513012258173556604L;
+
+	private final MapFunction<IN, OUT> mapFunction;
+	private final List<ArtificialKeyedStateBuilder<IN>> artificialKeyedStateBuilder;
+
+	public ArtificialKeyedStateMapper(
+		MapFunction<IN, OUT> mapFunction,
+		ArtificialKeyedStateBuilder<IN> artificialKeyedStateBuilder) {
+		this(mapFunction, Collections.singletonList(artificialKeyedStateBuilder));
+	}
+
+	public ArtificialKeyedStateMapper(
+		MapFunction<IN, OUT> mapFunction,
+		List<ArtificialKeyedStateBuilder<IN>> artificialKeyedStateBuilder) {
+
+		this.mapFunction = mapFunction;
+		this.artificialKeyedStateBuilder = artificialKeyedStateBuilder;
+		Set<String> stateNames = new HashSet<>(this.artificialKeyedStateBuilder.size());
+		for (ArtificialKeyedStateBuilder<IN> stateBuilder : this.artificialKeyedStateBuilder) {
+			if (!stateNames.add(stateBuilder.getStateName())) {
+				throw new IllegalArgumentException("Duplicated state name: " + stateBuilder.getStateName());
+			}
+		}
+	}
+
+	@Override
+	public OUT map(IN value) throws Exception {
+		for (ArtificialKeyedStateBuilder<IN> stateBuilder : artificialKeyedStateBuilder) {
+			stateBuilder.artificialStateForElement(value);
+		}
+
+		return mapFunction.map(value);
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+	}
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+		for (ArtificialKeyedStateBuilder<IN> stateBuilder : artificialKeyedStateBuilder) {
+			stateBuilder.initialize(context);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialMapStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialMapStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialMapStateBuilder.java
new file mode 100644
index 0000000..a29b802
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialMapStateBuilder.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate.eventpayload;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateBuilder;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * An {@link ArtificialKeyedStateBuilder} for user {@link MapState}s.
+ */
+public class ArtificialMapStateBuilder<IN, K, V> extends ArtificialKeyedStateBuilder<IN> {
+
+	private static final long serialVersionUID = -143079058769306954L;
+
+	private transient MapState<K, V> mapState;
+	private final TypeSerializer<K> keySerializer;
+	private final TypeSerializer<V> valueSerializer;
+	private final JoinFunction<IN, Iterator<Map.Entry<K, V>>, Iterator<Map.Entry<K, V>>> stateValueGenerator;
+
+	public ArtificialMapStateBuilder(
+		String stateName,
+		JoinFunction<IN, Iterator<Map.Entry<K, V>>, Iterator<Map.Entry<K, V>>> stateValueGenerator,
+		TypeSerializer<K> keySerializer,
+		TypeSerializer<V> valueSerializer) {
+
+		super(stateName);
+		this.keySerializer = keySerializer;
+		this.valueSerializer = valueSerializer;
+		this.stateValueGenerator = stateValueGenerator;
+	}
+
+	@Override
+	public void artificialStateForElement(IN event) throws Exception {
+		Iterator<Map.Entry<K, V>> update = stateValueGenerator.join(event, mapState.iterator());
+		while (update.hasNext()) {
+			Map.Entry<K, V> updateEntry = update.next();
+			mapState.put(updateEntry.getKey(), updateEntry.getValue());
+		}
+	}
+
+	@Override
+	public void initialize(FunctionInitializationContext initializationContext) {
+		MapStateDescriptor<K, V> mapStateDescriptor =
+			new MapStateDescriptor<>(stateName, keySerializer, valueSerializer);
+		mapState = initializationContext.getKeyedStateStore().getMapState(mapStateDescriptor);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java
new file mode 100644
index 0000000..1d2d5f8
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate.eventpayload;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateBuilder;
+
+/**
+ * An {@link ArtificialKeyedStateBuilder} for user {@link ValueState}s.
+ */
+public class ArtificialValueStateBuilder<IN, STATE> extends ArtificialKeyedStateBuilder<IN> {
+
+	private static final long serialVersionUID = -1205814329756790916L;
+
+	private transient ValueState<STATE> valueState;
+	private final TypeSerializer<STATE> typeSerializer;
+	private final JoinFunction<IN, STATE, STATE> stateValueGenerator;
+
+	public ArtificialValueStateBuilder(
+		String stateName,
+		JoinFunction<IN, STATE, STATE> stateValueGenerator,
+		TypeSerializer<STATE> typeSerializer) {
+
+		super(stateName);
+		this.typeSerializer = typeSerializer;
+		this.stateValueGenerator = stateValueGenerator;
+	}
+
+	@Override
+	public void artificialStateForElement(IN event) throws Exception {
+		valueState.update(stateValueGenerator.join(event, valueState.value()));
+	}
+
+	@Override
+	public void initialize(FunctionInitializationContext initializationContext) {
+		ValueStateDescriptor<STATE> valueStateDescriptor =
+			new ValueStateDescriptor<>(stateName, typeSerializer);
+		valueState = initializationContext.getKeyedStateStore().getState(valueStateDescriptor);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ComplexPayload.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ComplexPayload.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ComplexPayload.java
new file mode 100644
index 0000000..8cba366
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ComplexPayload.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate.eventpayload;
+
+import org.apache.flink.streaming.tests.DataStreamAllroundTestProgram;
+import org.apache.flink.streaming.tests.Event;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A state type used in the {@link DataStreamAllroundTestProgram}.
+ * Wraps an {@link Event} as state.
+ */
+public class ComplexPayload implements Serializable {
+
+	private static final long serialVersionUID = 233624606545704853L;
+
+	public ComplexPayload(Event event) {
+		this.eventTime = event.getEventTime();
+		this.innerPayLoad = new InnerPayLoad(event.getSequenceNumber());
+		this.stringList = Arrays.asList(String.valueOf(event.getKey()), event.getPayload());
+	}
+
+	private final long eventTime;
+	private final List<String> stringList;
+	private final InnerPayLoad innerPayLoad;
+
+	/**
+	 * Nested class in state type. Wraps an {@link Event}'s sequence number.
+	 */
+	public static class InnerPayLoad implements Serializable {
+
+		private static final long serialVersionUID = 3986298180012117883L;
+
+		private final long sequenceNumber;
+
+		public InnerPayLoad(long sequenceNumber) {
+			this.sequenceNumber = sequenceNumber;
+		}
+
+		public long getSequenceNumber() {
+			return sequenceNumber;
+		}
+	}
+
+	public long getEventTime() {
+		return eventTime;
+	}
+
+	public List<String> getStringList() {
+		return stringList;
+	}
+
+	public InnerPayLoad getInnerPayLoad() {
+		return innerPayLoad;
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index f7a6a70..34586ed 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -37,6 +37,7 @@ under the License.
 	<modules>
 		<module>flink-parent-child-classloading-test</module>
 		<module>flink-dataset-allround-test</module>
+		<module>flink-datastream-allround-test</module>
 		<module>flink-stream-sql-test</module>
 		<module>flink-bucketing-sink-test</module>
 	</modules>

http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/Event.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/Event.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/Event.java
deleted file mode 100644
index 6bd6d18..0000000
--- a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/Event.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.general;
-
-public class Event {
-
-	private final int key;
-	private final long eventTime;
-	private final long sequenceNumber;
-	private final String payload;
-
-	public Event(int key, long eventTime, long sequenceNumber, String payload) {
-		this.key = key;
-		this.eventTime = eventTime;
-		this.sequenceNumber = sequenceNumber;
-		this.payload = payload;
-	}
-
-	public int getKey() {
-		return key;
-	}
-
-	public long getEventTime() {
-		return eventTime;
-	}
-
-	public long getSequenceNumber() {
-		return sequenceNumber;
-	}
-
-	public String getPayload() {
-		return payload;
-	}
-
-	@Override
-	public String toString() {
-		return "Event{" +
-			"key=" + key +
-			", eventTime=" + eventTime +
-			", sequenceNumber=" + sequenceNumber +
-			", payload='" + payload + '\'' +
-			'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/GeneralPurposeJobTest.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/GeneralPurposeJobTest.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/GeneralPurposeJobTest.java
deleted file mode 100644
index 40234f5..0000000
--- a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/GeneralPurposeJobTest.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.general;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.tests.general.artificialstate.ArtificialKeyedStateBuilder;
-import org.apache.flink.streaming.tests.general.artificialstate.ArtificialKeyedStateMapper;
-import org.apache.flink.streaming.tests.general.artificialstate.eventpayload.ArtificialValueStateBuilder;
-import org.apache.flink.streaming.tests.general.artificialstate.eventpayload.ComplexPayload;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
-
-/**
- * A general purpose test for Flink.
- */
-public class GeneralPurposeJobTest {
-
-	private static final ConfigOption<String> TEST_SEMANTICS = ConfigOptions
-		.key("test.semantics")
-		.defaultValue("exactly-once")
-		.withDescription("This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'");
-
-	private static final ConfigOption<Integer> ENVIRONMENT_PARALLELISM = ConfigOptions
-		.key("environment.parallelism")
-		.defaultValue(1);
-
-	private static final ConfigOption<Integer> ENVIRONMENT_MAX_PARALLELISM = ConfigOptions
-		.key("environment.max_parallelism")
-		.defaultValue(128);
-
-	private static final ConfigOption<Integer> ENVIRONMENT_RESTART_DELAY = ConfigOptions
-		.key("environment.restart_strategy.delay")
-		.defaultValue(0);
-
-	private static final ConfigOption<String> STATE_BACKEND = ConfigOptions
-		.key("state_backend")
-		.defaultValue("file")
-		.withDescription("Supported values are 'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.");
-
-	private static final ConfigOption<String> STATE_BACKEND_CHECKPOINT_DIR = ConfigOptions
-		.key("state_backend.checkpoint_directory")
-		.noDefaultValue()
-		.withDescription("The checkpoint directory.");
-
-	private static final ConfigOption<Boolean> STATE_BACKEND_ROCKS_INCREMENTAL = ConfigOptions
-		.key("state_backend.rocks.incremental")
-		.defaultValue(false)
-		.withDescription("Activate or deactivate incremental snapshots if RocksDBStateBackend is selected.");
-
-	private static final ConfigOption<Boolean> STATE_BACKEND_FILE_ASYNC = ConfigOptions
-		.key("state_backend.file.async")
-		.defaultValue(true)
-		.withDescription("Activate or deactivate asynchronous snapshots if FileStateBackend is selected.");
-
-	private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_KEYSPACE = ConfigOptions
-		.key("sequence_generator_source.keyspace")
-		.defaultValue(1000);
-
-	private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE = ConfigOptions
-		.key("sequence_generator_source.payload_size")
-		.defaultValue(20);
-
-	private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS = ConfigOptions
-		.key("sequence_generator_source.event_time.max_out_of_order")
-		.defaultValue(500L);
-
-	private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS = ConfigOptions
-		.key("sequence_generator_source.event_time.clock_progress")
-		.defaultValue(100L);
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public static void main(String[] args) throws Exception {
-		final ParameterTool pt = ParameterTool.fromArgs(args);
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		setupEnvironment(env, pt);
-
-		env.addSource(createEventSource(pt))
-			.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
-			.keyBy(Event::getKey)
-			.map(createArtificialKeyedStateMapper(
-				(MapFunction<Event, Event>) in -> in,
-				(Event first, ComplexPayload second) -> new ComplexPayload(first),
-				Arrays.asList(
-					new KryoSerializer<>(ComplexPayload.class, env.getConfig()))
-//					AvroUtils.getAvroUtils().createAvroSerializer(ComplexPayload.class))
-				)
-			)
-			.returns(Event.class)
-			.keyBy(Event::getKey)
-			.flatMap(createSemanticsCheckMapper(pt))
-//			.timeWindow(Time.seconds(10), Time.seconds(1))
-//			.apply(new WindowFunction<Event, Object, Integer, TimeWindow>() {
-//				@Override
-//				public void apply(Integer integer, TimeWindow window, Iterable<Event> input, Collector<Object> out) throws Exception {
-//					System.out.println("------------ "+integer);
-//					for (Event event : input) {
-//						System.out.println(event);
-//					}
-//				}
-//			});
-			.addSink(new PrintSinkFunction<>());
-
-		env.execute("General purpose test job");
-	}
-
-	public static void setupEnvironment(StreamExecutionEnvironment env, ParameterTool pt) throws Exception {
-
-		// use event time
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
-		// parallelism
-		env.setParallelism(pt.getInt(ENVIRONMENT_PARALLELISM.key(), ENVIRONMENT_PARALLELISM.defaultValue()));
-		env.setMaxParallelism(pt.getInt(ENVIRONMENT_MAX_PARALLELISM.key(), ENVIRONMENT_MAX_PARALLELISM.defaultValue()));
-
-		// restart strategy
-		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-			Integer.MAX_VALUE,
-			pt.getInt(ENVIRONMENT_RESTART_DELAY.key(), ENVIRONMENT_RESTART_DELAY.defaultValue())));
-
-		// state backend
-		final String stateBackend = pt.get(
-			STATE_BACKEND.key(),
-			STATE_BACKEND.defaultValue());
-
-		final String checkpointDir = pt.getRequired(STATE_BACKEND_CHECKPOINT_DIR.key());
-
-		if ("file".equalsIgnoreCase(stateBackend)) {
-			boolean asyncCheckpoints = pt.getBoolean(
-				STATE_BACKEND_FILE_ASYNC.key(),
-				STATE_BACKEND_FILE_ASYNC.defaultValue());
-
-			env.setStateBackend(new FsStateBackend(checkpointDir, asyncCheckpoints));
-		} else if ("rocks".equalsIgnoreCase(stateBackend)) {
-			boolean incrementalCheckpoints = pt.getBoolean(
-				STATE_BACKEND_ROCKS_INCREMENTAL.key(),
-				STATE_BACKEND_ROCKS_INCREMENTAL.defaultValue());
-
-			env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints));
-		} else {
-			throw new IllegalArgumentException("Unknown backend requested: " + stateBackend);
-		}
-
-		// make parameters available in the web interface
-		env.getConfig().setGlobalJobParameters(pt);
-	}
-
-	private static SourceFunction<Event> createEventSource(ParameterTool pt) {
-		return new SequenceGeneratorSource(
-			pt.getInt(
-				SEQUENCE_GENERATOR_SRC_KEYSPACE.key(),
-				SEQUENCE_GENERATOR_SRC_KEYSPACE.defaultValue()),
-			pt.getInt(
-				SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE.key(),
-				SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE.defaultValue()),
-			pt.getLong(
-				SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
-				SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()),
-			pt.getLong(
-				SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.key(),
-				SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.defaultValue()));
-	}
-
-	private static BoundedOutOfOrdernessTimestampExtractor<Event> createTimestampExtractor(ParameterTool pt) {
-		return new BoundedOutOfOrdernessTimestampExtractor<Event>(
-			Time.milliseconds(
-				pt.getLong(
-					SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
-					SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()))) {
-
-			@Override
-			public long extractTimestamp(Event element) {
-				return element.getEventTime();
-			}
-		};
-	}
-
-	private static FlatMapFunction<Event, String> createSemanticsCheckMapper(ParameterTool pt) {
-
-		String semantics = pt.get(TEST_SEMANTICS.key(), TEST_SEMANTICS.defaultValue());
-
-		SemanticsCheckMapper.ValidatorFunction validatorFunction;
-
-		if (semantics.equalsIgnoreCase("exactly-once")) {
-			validatorFunction = SemanticsCheckMapper.ValidatorFunction.exactlyOnce();
-		} else if (semantics.equalsIgnoreCase("at-least-once")) {
-			validatorFunction = SemanticsCheckMapper.ValidatorFunction.atLeastOnce();
-		} else {
-			throw new IllegalArgumentException("Unknown semantics requested: " + semantics);
-		}
-
-		return new SemanticsCheckMapper(validatorFunction);
-	}
-
-	private static <IN, OUT, STATE> ArtificialKeyedStateMapper<IN, OUT> createArtificialKeyedStateMapper(
-		MapFunction<IN, OUT> mapFunction,
-		JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
-		List<TypeSerializer<STATE>> stateSerializers) {
-
-		List<ArtificialKeyedStateBuilder<IN>> artificialStateBuilders = new ArrayList<>(stateSerializers.size());
-		for (TypeSerializer<STATE> typeSerializer : stateSerializers) {
-
-			String stateName = "valueState-" + typeSerializer.getClass().getSimpleName() + "-" + UUID.randomUUID();
-
-			ArtificialValueStateBuilder<IN, STATE> stateBuilder = new ArtificialValueStateBuilder<>(
-				stateName,
-				inputAndOldStateToNewState,
-				typeSerializer
-			);
-
-			artificialStateBuilders.add(stateBuilder);
-		}
-		return new ArtificialKeyedStateMapper<>(mapFunction, artificialStateBuilders);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/SemanticsCheckMapper.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/SemanticsCheckMapper.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/SemanticsCheckMapper.java
deleted file mode 100644
index 26c0465..0000000
--- a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/SemanticsCheckMapper.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.general;
-
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
-
-/**
- * This mapper validates exactly-once and at-least-once semantics in connection with {@link SequenceGeneratorSource}.
- */
-public class SemanticsCheckMapper extends RichFlatMapFunction<Event, String> implements CheckpointedFunction {
-
-	/** This value state tracks the current sequence number per key. */
-	private volatile ValueState<Long> sequenceValue;
-
-	/** This defines how semantics are checked for each update. */
-	private final ValidatorFunction validator;
-
-	SemanticsCheckMapper(ValidatorFunction validator) {
-		this.validator = validator;
-	}
-
-	@Override
-	public void flatMap(Event event, Collector<String> out) throws Exception {
-
-		Long currentValue = sequenceValue.value();
-		if (currentValue == null) {
-			currentValue = 0L;
-		}
-
-		long nextValue = event.getSequenceNumber();
-
-		if (validator.check(currentValue, nextValue)) {
-			sequenceValue.update(nextValue);
-		} else {
-			out.collect("Alert: " + currentValue + " -> " + nextValue);
-		}
-	}
-
-	@Override
-	public void snapshotState(FunctionSnapshotContext context) {
-	}
-
-	@Override
-	public void initializeState(FunctionInitializationContext context) {
-
-		ValueStateDescriptor<Long> sequenceStateDescriptor =
-			new ValueStateDescriptor<>("sequenceState", Long.class);
-
-		sequenceValue = context.getKeyedStateStore().getState(sequenceStateDescriptor);
-	}
-
-	public interface ValidatorFunction extends Serializable {
-		boolean check(long current, long update);
-
-		static ValidatorFunction exactlyOnce() {
-			return (current, update) -> (update - current) == 1;
-		}
-
-		static ValidatorFunction atLeastOnce() {
-			return (current, update) -> (update - current) <= 1;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/SequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/SequenceGeneratorSource.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/SequenceGeneratorSource.java
deleted file mode 100644
index ed95409..0000000
--- a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/SequenceGeneratorSource.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.general;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.StringUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-
-/**
- * This source function generates a sequence of long values per key.
- */
-public class SequenceGeneratorSource extends RichParallelSourceFunction<Event> implements CheckpointedFunction {
-
-	private final static Logger LOG = LoggerFactory.getLogger(SequenceGeneratorSource.class);
-
-	/** Length of the artificial payload string generated for each event. */
-	private final int payloadLength;
-
-	/** The size of the total key space, i.e. the number of unique keys generated by all parallel sources. */
-	private final int totalKeySpaceSize;
-
-	/** This determines how much the event time progresses for each generated element. */
-	private final long eventTimeClockProgressPerEvent;
-
-	/** Maximum generated deviation in event time from the current event time clock. */
-	private final long maxOutOfOrder;
-
-	/** Time that a sleep takes in milliseconds. A value < 1 deactivates sleeping. */
-	private final long sleepTime;
-
-	/** This determines after how many generated events we sleep. A value < 1 deactivates sleeping. */
-	private final long sleepAfterElements;
-
-	/** This holds the key ranges for which this source generates events. */
-	private transient List<KeyRangeStates> keyRanges;
-
-	/** This is used to snapshot the state of this source, one entry per key range. */
-	private transient ListState<KeyRangeStates> snapshotKeyRanges;
-
-	/** Flag that determines if this source is running, i.e. generating events. */
-	private volatile boolean running;
-
-
-	SequenceGeneratorSource(
-		int totalKeySpaceSize,
-		int payloadLength,
-		long maxOutOfOrder,
-		long eventTimeClockProgressPerEvent) {
-
-		this(totalKeySpaceSize,
-			payloadLength,
-			maxOutOfOrder,
-			eventTimeClockProgressPerEvent,
-			0L,
-			0L);
-	}
-
-	SequenceGeneratorSource(
-		int totalKeySpaceSize,
-		int payloadLength,
-		long maxOutOfOrder,
-		long eventTimeClockProgressPerEvent,
-		long sleepTime,
-		long sleepAfterElements) {
-
-		this.totalKeySpaceSize = totalKeySpaceSize;
-		this.maxOutOfOrder = maxOutOfOrder;
-		this.payloadLength = payloadLength;
-		this.eventTimeClockProgressPerEvent = eventTimeClockProgressPerEvent;
-		this.sleepTime = sleepTime;
-		this.sleepAfterElements = sleepTime > 0 ? sleepAfterElements : 0;
-		this.running = true;
-	}
-
-	@Override
-	public void run(SourceContext<Event> ctx) throws Exception {
-
-		Random random = new Random();
-
-		// this holds the current event time, from which generated events can up to +/- (maxOutOfOrder).
-		long monotonousEventTime = 0L;
-		long elementsBeforeSleep = sleepAfterElements;
-
-		while (running) {
-
-			KeyRangeStates randomKeyRangeStates = keyRanges.get(random.nextInt(keyRanges.size()));
-			int randomKey = randomKeyRangeStates.getRandomKey(random);
-			long value = randomKeyRangeStates.incrementAndGet(randomKey);
-			long eventTime = Math.max(
-				0,
-				generateEventTimeWithOutOfOrderness(random, monotonousEventTime));
-
-			// uptick the event time clock
-			monotonousEventTime += eventTimeClockProgressPerEvent;
-
-			Event event = new Event(
-				randomKey,
-				eventTime,
-				value,
-				StringUtils.getRandomString(random, payloadLength, payloadLength, 'A', 'z'));
-
-			ctx.collect(event);
-
-			if (elementsBeforeSleep == 1) {
-				elementsBeforeSleep = sleepAfterElements;
-				Thread.sleep(sleepTime);
-			} else if (elementsBeforeSleep > 1) {
-				--elementsBeforeSleep;
-			}
-		}
-	}
-
-	private long generateEventTimeWithOutOfOrderness(Random random, long correctTime) {
-		return correctTime - maxOutOfOrder + ((random.nextLong() & Long.MAX_VALUE) % (2 * maxOutOfOrder));
-	}
-
-	@Override
-	public void cancel() {
-		running = false;
-	}
-
-	@Override
-	public void snapshotState(FunctionSnapshotContext context) throws Exception {
-		snapshotKeyRanges.update(keyRanges);
-	}
-
-	@Override
-	public void initializeState(FunctionInitializationContext context) throws Exception {
-		final RuntimeContext runtimeContext = getRuntimeContext();
-		final int subtaskIdx = runtimeContext.getIndexOfThisSubtask();
-		final int parallelism = runtimeContext.getNumberOfParallelSubtasks();
-		final int maxParallelism = runtimeContext.getMaxNumberOfParallelSubtasks();
-
-		ListStateDescriptor<KeyRangeStates> stateDescriptor =
-			new ListStateDescriptor<>("keyRanges", KeyRangeStates.class);
-
-		snapshotKeyRanges = context.getOperatorStateStore().getListState(stateDescriptor);
-		keyRanges = new ArrayList<>();
-
-		if (context.isRestored()) {
-			// restore key ranges from the snapshot
-			for (KeyRangeStates keyRange : snapshotKeyRanges.get()) {
-				keyRanges.add(keyRange);
-			}
-		} else {
-			// determine the key ranges that belong to the subtask
-			int rangeStartIdx = (subtaskIdx * maxParallelism) / parallelism;
-			int rangeEndIdx = ((subtaskIdx + 1) * maxParallelism) / parallelism;
-
-			for (int i = rangeStartIdx; i < rangeEndIdx; ++i) {
-
-				int start = ((i * totalKeySpaceSize + maxParallelism - 1) / maxParallelism);
-				int end = 1 + ((i + 1) * totalKeySpaceSize - 1) / maxParallelism;
-
-				if (end - start > 0) {
-					keyRanges.add(new KeyRangeStates(start, end));
-				}
-			}
-		}
-	}
-
-	/**
-	 * This defines the key-range and the current sequence numbers for all keys in the range.
-	 */
-	private static class KeyRangeStates {
-
-		/** Start key of the range (inclusive). */
-		final int startKey;
-
-		/** Start key of the range (exclusive). */
-		final int endKey;
-
-		/** This array contains the current sequence number for each key in the range. */
-		final long[] statesPerKey;
-
-		KeyRangeStates(int startKey, int endKey) {
-			this(startKey, endKey, new long[endKey - startKey]);
-		}
-
-		KeyRangeStates(int startKey, int endKey, long[] statesPerKey) {
-			Preconditions.checkArgument(statesPerKey.length == endKey - startKey);
-			this.startKey = startKey;
-			this.endKey = endKey;
-			this.statesPerKey = statesPerKey;
-		}
-
-		/**
-		 * Increments and returns the current sequence number for the given key.
-		 */
-		long incrementAndGet(int key) {
-			return ++statesPerKey[key - startKey];
-		}
-
-		/**
-		 * Returns a random key that belongs to this key range.
-		 */
-		int getRandomKey(Random random) {
-			return random.nextInt(endKey - startKey) + startKey;
-		}
-
-		@Override
-		public String toString() {
-			return "KeyRangeStates{" +
-				"start=" + startKey +
-				", end=" + endKey +
-				", statesPerKey=" + Arrays.toString(statesPerKey) +
-				'}';
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/ArtificialKeyedStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/ArtificialKeyedStateBuilder.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/ArtificialKeyedStateBuilder.java
deleted file mode 100644
index c39704d..0000000
--- a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/ArtificialKeyedStateBuilder.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.general.artificialstate;
-
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-
-import java.io.Serializable;
-
-public abstract class ArtificialKeyedStateBuilder<T> implements Serializable {
-
-	protected final String stateName;
-
-	public ArtificialKeyedStateBuilder(String stateName) {
-		this.stateName = stateName;
-	}
-
-	public String getStateName() {
-		return stateName;
-	}
-
-	public abstract void artificialStateForElement(T element) throws Exception;
-
-	public abstract void initialize(FunctionInitializationContext initializationContext);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/ArtificialKeyedStateMapper.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/ArtificialKeyedStateMapper.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/ArtificialKeyedStateMapper.java
deleted file mode 100644
index a49ee03..0000000
--- a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/ArtificialKeyedStateMapper.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.general.artificialstate;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-public class ArtificialKeyedStateMapper<IN, OUT> extends RichMapFunction<IN, OUT> implements CheckpointedFunction {
-
-	private final MapFunction<IN, OUT> mapFunction;
-	private final List<ArtificialKeyedStateBuilder<IN>> artificialKeyedStateBuilder;
-
-	public ArtificialKeyedStateMapper(
-		MapFunction<IN, OUT> mapFunction,
-		ArtificialKeyedStateBuilder<IN> artificialKeyedStateBuilder) {
-		this(mapFunction, Collections.singletonList(artificialKeyedStateBuilder));
-	}
-
-	public ArtificialKeyedStateMapper(
-		MapFunction<IN, OUT> mapFunction,
-		List<ArtificialKeyedStateBuilder<IN>> artificialKeyedStateBuilder) {
-
-		this.mapFunction = mapFunction;
-		this.artificialKeyedStateBuilder = artificialKeyedStateBuilder;
-		Set<String> stateNames = new HashSet<>(this.artificialKeyedStateBuilder.size());
-		for (ArtificialKeyedStateBuilder<IN> stateBuilder : this.artificialKeyedStateBuilder) {
-			if (!stateNames.add(stateBuilder.getStateName())) {
-				throw new IllegalArgumentException("Duplicated state name: " + stateBuilder.getStateName());
-			}
-		}
-	}
-
-	@Override
-	public OUT map(IN value) throws Exception {
-		for (ArtificialKeyedStateBuilder<IN> stateBuilder : artificialKeyedStateBuilder) {
-			stateBuilder.artificialStateForElement(value);
-		}
-
-		return mapFunction.map(value);
-	}
-
-	@Override
-	public void snapshotState(FunctionSnapshotContext context) throws Exception {
-	}
-
-	@Override
-	public void initializeState(FunctionInitializationContext context) throws Exception {
-		for (ArtificialKeyedStateBuilder<IN> stateBuilder : artificialKeyedStateBuilder) {
-			stateBuilder.initialize(context);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ArtificialMapStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ArtificialMapStateBuilder.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ArtificialMapStateBuilder.java
deleted file mode 100644
index 205a4d6..0000000
--- a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ArtificialMapStateBuilder.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.general.artificialstate.eventpayload;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.streaming.tests.general.artificialstate.ArtificialKeyedStateBuilder;
-
-import java.util.Iterator;
-import java.util.Map;
-
-public class ArtificialMapStateBuilder<IN, K, V> extends ArtificialKeyedStateBuilder<IN> {
-
-	private transient MapState<K, V> mapState;
-	private final TypeSerializer<K> keySerializer;
-	private final TypeSerializer<V> valueSerializer;
-	private final JoinFunction<IN, Iterator<Map.Entry<K, V>>, Iterator<Map.Entry<K, V>>> stateValueGenerator;
-
-	public ArtificialMapStateBuilder(
-		String stateName,
-		JoinFunction<IN, Iterator<Map.Entry<K, V>>, Iterator<Map.Entry<K, V>>> stateValueGenerator,
-		TypeSerializer<K> keySerializer,
-		TypeSerializer<V> valueSerializer) {
-
-		super(stateName);
-		this.keySerializer = keySerializer;
-		this.valueSerializer = valueSerializer;
-		this.stateValueGenerator = stateValueGenerator;
-	}
-
-	@Override
-	public void artificialStateForElement(IN event) throws Exception {
-		Iterator<Map.Entry<K, V>> update = stateValueGenerator.join(event, mapState.iterator());
-		while (update.hasNext()) {
-			Map.Entry<K, V> updateEntry = update.next();
-			mapState.put(updateEntry.getKey(), updateEntry.getValue());
-		}
-	}
-
-	@Override
-	public void initialize(FunctionInitializationContext initializationContext) {
-		MapStateDescriptor<K, V> mapStateDescriptor =
-			new MapStateDescriptor<>(stateName, keySerializer, valueSerializer);
-		mapState = initializationContext.getKeyedStateStore().getMapState(mapStateDescriptor);
-	}
-}