You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/04/27 14:11:44 UTC
[3/3] flink git commit: [FLINK-8992] [e2e-tests] Integrate general
DataStream test job with project structure
[FLINK-8992] [e2e-tests] Integrate general DataStream test job with project structure
This also includes minor cleanup of WIP code in the test job.
This closes #5925.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff62977d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff62977d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff62977d
Branch: refs/heads/master
Commit: ff62977d63e1551a7d0261fbe8e4dd3b2d124323
Parents: 31c7176
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed Apr 25 17:05:24 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Apr 27 16:09:43 2018 +0200
----------------------------------------------------------------------
.../flink-datastream-allround-test/pom.xml | 90 ++++++
.../tests/DataStreamAllroundTestProgram.java | 304 +++++++++++++++++++
.../org/apache/flink/streaming/tests/Event.java | 63 ++++
.../streaming/tests/SemanticsCheckMapper.java | 82 +++++
.../tests/SequenceGeneratorSource.java | 233 ++++++++++++++
.../ArtificialKeyedStateBuilder.java | 56 ++++
.../ArtificialKeyedStateMapper.java | 82 +++++
.../eventpayload/ArtificialMapStateBuilder.java | 70 +++++
.../ArtificialValueStateBuilder.java | 60 ++++
.../eventpayload/ComplexPayload.java | 76 +++++
flink-end-to-end-tests/pom.xml | 1 +
.../flink/streaming/tests/general/Event.java | 60 ----
.../tests/general/GeneralPurposeJobTest.java | 252 ---------------
.../tests/general/SemanticsCheckMapper.java | 87 ------
.../tests/general/SequenceGeneratorSource.java | 240 ---------------
.../ArtificialKeyedStateBuilder.java | 40 ---
.../ArtificialKeyedStateMapper.java | 76 -----
.../eventpayload/ArtificialMapStateBuilder.java | 65 ----
.../ArtificialValueStateBuilder.java | 56 ----
.../eventpayload/ComplexPayload.java | 63 ----
.../test-scripts/test_resume_savepoint.sh | 61 ++--
21 files changed, 1145 insertions(+), 972 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/flink-datastream-allround-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/pom.xml b/flink-end-to-end-tests/flink-datastream-allround-test/pom.xml
new file mode 100644
index 0000000..d5f8913
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <version>1.6-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-datastream-allround-test</artifactId>
+ <name>flink-datastream-allround-test</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <!-- DataStreamAllroundTestProgram -->
+ <execution>
+ <id>DataStreamAllroundTestProgram</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <finalName>DataStreamAllroundTestProgram</finalName>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.tests.DataStreamAllroundTestProgram</program-class>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
new file mode 100644
index 0000000..2059b99
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateBuilder;
+import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateMapper;
+import org.apache.flink.streaming.tests.artificialstate.eventpayload.ArtificialValueStateBuilder;
+import org.apache.flink.streaming.tests.artificialstate.eventpayload.ComplexPayload;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * A general purpose test for Flink's DataStream API operators and primitives.
+ *
+ * <p>It currrently covers the following aspects that are frequently present in Flink DataStream jobs:
+ * <ul>
+ * <li>A generic Kryo input type.</li>
+ * <li>A state type for which we register a {@link KryoSerializer}.</li>
+ * <li>Operators with {@link ValueState}.</li>
+ * </ul>
+ *
+ * <p>The job allows to be configured for different state backends, including memory, file, and RocksDB
+ * state backends. It also allows specifying the processing guarantee semantics, which will also be verified
+ * by the job itself according to the specified semantic.
+ *
+ * <p>Program parameters:
+ * <ul>
+ * <li>test.semantics (String, default - 'exactly-once'): This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'.</li>
+ * <li>environment.checkpoint_interval (long, default - 1000): the checkpoint interval.</li>
+ * <li>environment.parallelism (int, default - 1): parallelism to use for the job.</li>
+ * <li>environment.max_parallelism (int, default - 128): max parallelism to use for the job</li>
+ * <li>environment.restart_strategy.delay (long, default - 0): delay between restart attempts, in milliseconds.</li>
+ * <li>state_backend (String, default - 'file'): Supported values are 'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.</li>
+ * <li>state_backend.checkpoint_directory (String): The checkpoint directory.</li>
+ * <li>state_backend.rocks.incremental (boolean, default - false): Activate or deactivate incremental snapshots if RocksDBStateBackend is selected.</li>
+ * <li>state_backend.file.async (boolean, default - true): Activate or deactivate asynchronous snapshots if FileStateBackend is selected.</li>
+ * <li>sequence_generator_source.keyspace (int, default - 1000): Number of different keys for events emitted by the sequence generator.</li>
+ * <li>sequence_generator_source.payload_size (int, default - 20): Length of message payloads emitted by the sequence generator.</li>
+ * <li>sequence_generator_source.sleep_time (long, default - 0): Milliseconds to sleep after emitting events in the sequence generator. Set to 0 to disable sleeping.</li>
+ * <li>sequence_generator_source.sleep_after_elements (long, default - 0): Number of elements to emit before sleeping in the sequence generator. Set to 0 to disable sleeping.</li>
+ * <li>sequence_generator_source.event_time.max_out_of_order (long, default - 500): Max event time out-of-orderness for events emitted by the sequence generator.</li>
+ * <li>sequence_generator_source.event_time.clock_progress (long, default - 100): The amount of event time to progress per event generated by the sequence generator.</li>
+ * </ul>
+ */
+public class DataStreamAllroundTestProgram {
+
+ private static final ConfigOption<String> TEST_SEMANTICS = ConfigOptions
+ .key("test.semantics")
+ .defaultValue("exactly-once")
+ .withDescription("This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'");
+
+ private static final ConfigOption<Long> ENVIRONMENT_CHECKPOINT_INTERVAL = ConfigOptions
+ .key("environment.checkpoint_interval")
+ .defaultValue(1000L);
+
+ private static final ConfigOption<Integer> ENVIRONMENT_PARALLELISM = ConfigOptions
+ .key("environment.parallelism")
+ .defaultValue(1);
+
+ private static final ConfigOption<Integer> ENVIRONMENT_MAX_PARALLELISM = ConfigOptions
+ .key("environment.max_parallelism")
+ .defaultValue(128);
+
+ private static final ConfigOption<Integer> ENVIRONMENT_RESTART_DELAY = ConfigOptions
+ .key("environment.restart_strategy.delay")
+ .defaultValue(0);
+
+ private static final ConfigOption<String> STATE_BACKEND = ConfigOptions
+ .key("state_backend")
+ .defaultValue("file")
+ .withDescription("Supported values are 'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.");
+
+ private static final ConfigOption<String> STATE_BACKEND_CHECKPOINT_DIR = ConfigOptions
+ .key("state_backend.checkpoint_directory")
+ .noDefaultValue()
+ .withDescription("The checkpoint directory.");
+
+ private static final ConfigOption<Boolean> STATE_BACKEND_ROCKS_INCREMENTAL = ConfigOptions
+ .key("state_backend.rocks.incremental")
+ .defaultValue(false)
+ .withDescription("Activate or deactivate incremental snapshots if RocksDBStateBackend is selected.");
+
+ private static final ConfigOption<Boolean> STATE_BACKEND_FILE_ASYNC = ConfigOptions
+ .key("state_backend.file.async")
+ .defaultValue(true)
+ .withDescription("Activate or deactivate asynchronous snapshots if FileStateBackend is selected.");
+
+ private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_KEYSPACE = ConfigOptions
+ .key("sequence_generator_source.keyspace")
+ .defaultValue(200);
+
+ private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE = ConfigOptions
+ .key("sequence_generator_source.payload_size")
+ .defaultValue(20);
+
+ private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_SLEEP_TIME = ConfigOptions
+ .key("sequence_generator_source.sleep_time")
+ .defaultValue(0L);
+
+ private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS = ConfigOptions
+ .key("sequence_generator_source.sleep_after_elements")
+ .defaultValue(0L);
+
+ private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS = ConfigOptions
+ .key("sequence_generator_source.event_time.max_out_of_order")
+ .defaultValue(500L);
+
+ private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS = ConfigOptions
+ .key("sequence_generator_source.event_time.clock_progress")
+ .defaultValue(100L);
+
+ // -----------------------------------------------------------------------------------------------------------------
+
+ public static void main(String[] args) throws Exception {
+ final ParameterTool pt = ParameterTool.fromArgs(args);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ setupEnvironment(env, pt);
+
+ env.addSource(createEventSource(pt))
+ .assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+ .keyBy(Event::getKey)
+ .map(createArtificialKeyedStateMapper(
+ // map function simply forwards the inputs
+ (MapFunction<Event, Event>) in -> in,
+ // state is updated per event as a wrapped ComplexPayload state object
+ (Event first, ComplexPayload second) -> new ComplexPayload(first), //
+ Arrays.asList(
+ new KryoSerializer<>(ComplexPayload.class, env.getConfig()))
+ )
+ )
+ .name("ArtificalKeyedStateMapper")
+ .returns(Event.class)
+ .keyBy(Event::getKey)
+ .flatMap(createSemanticsCheckMapper(pt))
+ .name("SemanticsCheckMapper")
+ .addSink(new PrintSinkFunction<>());
+
+ env.execute("General purpose test job");
+ }
+
+ public static void setupEnvironment(StreamExecutionEnvironment env, ParameterTool pt) throws Exception {
+
+ // set checkpointing semantics
+ String semantics = pt.get(TEST_SEMANTICS.key(), TEST_SEMANTICS.defaultValue());
+ long checkpointInterval = pt.getLong(ENVIRONMENT_CHECKPOINT_INTERVAL.key(), ENVIRONMENT_CHECKPOINT_INTERVAL.defaultValue());
+ CheckpointingMode checkpointingMode = semantics.equalsIgnoreCase("exactly-once")
+ ? CheckpointingMode.EXACTLY_ONCE
+ : CheckpointingMode.AT_LEAST_ONCE;
+
+ env.enableCheckpointing(checkpointInterval, checkpointingMode);
+
+ // use event time
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ // parallelism
+ env.setParallelism(pt.getInt(ENVIRONMENT_PARALLELISM.key(), ENVIRONMENT_PARALLELISM.defaultValue()));
+ env.setMaxParallelism(pt.getInt(ENVIRONMENT_MAX_PARALLELISM.key(), ENVIRONMENT_MAX_PARALLELISM.defaultValue()));
+
+ // restart strategy
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+ Integer.MAX_VALUE,
+ pt.getInt(ENVIRONMENT_RESTART_DELAY.key(), ENVIRONMENT_RESTART_DELAY.defaultValue())));
+
+ // state backend
+ final String stateBackend = pt.get(
+ STATE_BACKEND.key(),
+ STATE_BACKEND.defaultValue());
+
+ final String checkpointDir = pt.getRequired(STATE_BACKEND_CHECKPOINT_DIR.key());
+
+ if ("file".equalsIgnoreCase(stateBackend)) {
+ boolean asyncCheckpoints = pt.getBoolean(
+ STATE_BACKEND_FILE_ASYNC.key(),
+ STATE_BACKEND_FILE_ASYNC.defaultValue());
+
+ env.setStateBackend(new FsStateBackend(checkpointDir, asyncCheckpoints));
+ } else if ("rocks".equalsIgnoreCase(stateBackend)) {
+ boolean incrementalCheckpoints = pt.getBoolean(
+ STATE_BACKEND_ROCKS_INCREMENTAL.key(),
+ STATE_BACKEND_ROCKS_INCREMENTAL.defaultValue());
+
+ env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints));
+ } else {
+ throw new IllegalArgumentException("Unknown backend requested: " + stateBackend);
+ }
+
+ // make parameters available in the web interface
+ env.getConfig().setGlobalJobParameters(pt);
+ }
+
+ private static SourceFunction<Event> createEventSource(ParameterTool pt) {
+ return new SequenceGeneratorSource(
+ pt.getInt(
+ SEQUENCE_GENERATOR_SRC_KEYSPACE.key(),
+ SEQUENCE_GENERATOR_SRC_KEYSPACE.defaultValue()),
+ pt.getInt(
+ SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE.key(),
+ SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE.defaultValue()),
+ pt.getLong(
+ SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
+ SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()),
+ pt.getLong(
+ SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.key(),
+ SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.defaultValue()),
+ pt.getLong(
+ SEQUENCE_GENERATOR_SRC_SLEEP_TIME.key(),
+ SEQUENCE_GENERATOR_SRC_SLEEP_TIME.defaultValue()),
+ pt.getLong(
+ SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.key(),
+ SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue()));
+ }
+
+ private static BoundedOutOfOrdernessTimestampExtractor<Event> createTimestampExtractor(ParameterTool pt) {
+ return new BoundedOutOfOrdernessTimestampExtractor<Event>(
+ Time.milliseconds(
+ pt.getLong(
+ SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
+ SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()))) {
+
+ @Override
+ public long extractTimestamp(Event element) {
+ return element.getEventTime();
+ }
+ };
+ }
+
+ private static FlatMapFunction<Event, String> createSemanticsCheckMapper(ParameterTool pt) {
+
+ String semantics = pt.get(TEST_SEMANTICS.key(), TEST_SEMANTICS.defaultValue());
+
+ SemanticsCheckMapper.ValidatorFunction validatorFunction;
+
+ if (semantics.equalsIgnoreCase("exactly-once")) {
+ validatorFunction = SemanticsCheckMapper.ValidatorFunction.exactlyOnce();
+ } else if (semantics.equalsIgnoreCase("at-least-once")) {
+ validatorFunction = SemanticsCheckMapper.ValidatorFunction.atLeastOnce();
+ } else {
+ throw new IllegalArgumentException("Unknown semantics requested: " + semantics);
+ }
+
+ return new SemanticsCheckMapper(validatorFunction);
+ }
+
+ private static <IN, OUT, STATE> ArtificialKeyedStateMapper<IN, OUT> createArtificialKeyedStateMapper(
+ MapFunction<IN, OUT> mapFunction,
+ JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
+ List<TypeSerializer<STATE>> stateSerializers) {
+
+ List<ArtificialKeyedStateBuilder<IN>> artificialStateBuilders = new ArrayList<>(stateSerializers.size());
+ for (TypeSerializer<STATE> typeSerializer : stateSerializers) {
+
+ String stateName = "valueState-" + typeSerializer.getClass().getSimpleName() + "-" + UUID.randomUUID();
+
+ ArtificialValueStateBuilder<IN, STATE> stateBuilder = new ArtificialValueStateBuilder<>(
+ stateName,
+ inputAndOldStateToNewState,
+ typeSerializer
+ );
+
+ artificialStateBuilders.add(stateBuilder);
+ }
+ return new ArtificialKeyedStateMapper<>(mapFunction, artificialStateBuilders);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/Event.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/Event.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/Event.java
new file mode 100644
index 0000000..8c219ed
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/Event.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+/**
+ * The event type of records used in the {@link DataStreamAllroundTestProgram}.
+ */
+public class Event {
+
+ private final int key;
+ private final long eventTime;
+ private final long sequenceNumber;
+ private final String payload;
+
+ public Event(int key, long eventTime, long sequenceNumber, String payload) {
+ this.key = key;
+ this.eventTime = eventTime;
+ this.sequenceNumber = sequenceNumber;
+ this.payload = payload;
+ }
+
+ public int getKey() {
+ return key;
+ }
+
+ public long getEventTime() {
+ return eventTime;
+ }
+
+ public long getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ public String getPayload() {
+ return payload;
+ }
+
+ @Override
+ public String toString() {
+ return "Event{" +
+ "key=" + key +
+ ", eventTime=" + eventTime +
+ ", sequenceNumber=" + sequenceNumber +
+ ", payload='" + payload + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SemanticsCheckMapper.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SemanticsCheckMapper.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SemanticsCheckMapper.java
new file mode 100644
index 0000000..53c3414
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SemanticsCheckMapper.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * This mapper validates exactly-once and at-least-once semantics in connection with {@link SequenceGeneratorSource}.
+ */
+public class SemanticsCheckMapper extends RichFlatMapFunction<Event, String> {
+
+ private static final long serialVersionUID = -744070793650644485L;
+
+ /** This value state tracks the current sequence number per key. */
+ private volatile ValueState<Long> sequenceValue;
+
+ /** This defines how semantics are checked for each update. */
+ private final ValidatorFunction validator;
+
+ SemanticsCheckMapper(ValidatorFunction validator) {
+ this.validator = validator;
+ }
+
+ @Override
+ public void flatMap(Event event, Collector<String> out) throws Exception {
+
+ Long currentValue = sequenceValue.value();
+ if (currentValue == null) {
+ currentValue = 0L;
+ }
+
+ long nextValue = event.getSequenceNumber();
+
+ if (validator.check(currentValue, nextValue)) {
+ sequenceValue.update(nextValue);
+ } else {
+ out.collect("Alert: " + currentValue + " -> " + nextValue + " (" + event.getKey() + ")");
+ }
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ ValueStateDescriptor<Long> sequenceStateDescriptor =
+ new ValueStateDescriptor<>("sequenceState", Long.class);
+
+ sequenceValue = getRuntimeContext().getState(sequenceStateDescriptor);
+ }
+
+ interface ValidatorFunction extends Serializable {
+ boolean check(long current, long update);
+
+ static ValidatorFunction exactlyOnce() {
+ return (current, update) -> (update - current) == 1;
+ }
+
+ static ValidatorFunction atLeastOnce() {
+ return (current, update) -> (update - current) <= 1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
new file mode 100644
index 0000000..e641551
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * This source function generates a sequence of long values per key.
+ */
+public class SequenceGeneratorSource extends RichParallelSourceFunction<Event> implements CheckpointedFunction {
+
+ private static final long serialVersionUID = -3986989644799442178L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(SequenceGeneratorSource.class);
+
+ /** Length of the artificial payload string generated for each event. */
+ private final int payloadLength;
+
+ /** The size of the total key space, i.e. the number of unique keys generated by all parallel sources. */
+ private final int totalKeySpaceSize;
+
+ /** This determines how much the event time progresses for each generated element. */
+ private final long eventTimeClockProgressPerEvent;
+
+ /** Maximum generated deviation in event time from the current event time clock. */
+ private final long maxOutOfOrder;
+
+ /** Time that a sleep takes in milliseconds. A value < 1 deactivates sleeping. */
+ private final long sleepTime;
+
+ /** This determines after how many generated events we sleep. A value < 1 deactivates sleeping. */
+ private final long sleepAfterElements;
+
+ /** This holds the key ranges for which this source generates events. */
+ private transient List<KeyRangeStates> keyRanges;
+
+ /** This is used to snapshot the state of this source, one entry per key range. */
+ private transient ListState<KeyRangeStates> snapshotKeyRanges;
+
+ /** Flag that determines if this source is running, i.e. generating events. */
+ private volatile boolean running;
+
+ SequenceGeneratorSource(
+ int totalKeySpaceSize,
+ int payloadLength,
+ long maxOutOfOrder,
+ long eventTimeClockProgressPerEvent,
+ long sleepTime,
+ long sleepAfterElements) {
+
+ this.totalKeySpaceSize = totalKeySpaceSize;
+ this.maxOutOfOrder = maxOutOfOrder;
+ this.payloadLength = payloadLength;
+ this.eventTimeClockProgressPerEvent = eventTimeClockProgressPerEvent;
+ this.sleepTime = sleepTime;
+ this.sleepAfterElements = sleepTime > 0 ? sleepAfterElements : 0;
+ this.running = true;
+ }
+
+ @Override
+ public void run(SourceContext<Event> ctx) throws Exception {
+
+ Random random = new Random();
+
+ // this holds the current event time, from which generated events can up to +/- (maxOutOfOrder).
+ long monotonousEventTime = 0L;
+ long elementsBeforeSleep = sleepAfterElements;
+
+ while (running) {
+
+ KeyRangeStates randomKeyRangeStates = keyRanges.get(random.nextInt(keyRanges.size()));
+ int randomKey = randomKeyRangeStates.getRandomKey(random);
+
+ long eventTime = Math.max(
+ 0,
+ generateEventTimeWithOutOfOrderness(random, monotonousEventTime));
+
+ // uptick the event time clock
+ monotonousEventTime += eventTimeClockProgressPerEvent;
+
+ synchronized (ctx.getCheckpointLock()) {
+ long value = randomKeyRangeStates.incrementAndGet(randomKey);
+
+ Event event = new Event(
+ randomKey,
+ eventTime,
+ value,
+ StringUtils.getRandomString(random, payloadLength, payloadLength, 'A', 'z'));
+
+ ctx.collect(event);
+ }
+
+ if (sleepTime > 0) {
+ if (elementsBeforeSleep == 1) {
+ elementsBeforeSleep = sleepAfterElements;
+ Thread.sleep(sleepTime);
+ } else if (elementsBeforeSleep > 1) {
+ --elementsBeforeSleep;
+ }
+ }
+ }
+ }
+
+ private long generateEventTimeWithOutOfOrderness(Random random, long correctTime) {
+ return correctTime - maxOutOfOrder + ((random.nextLong() & Long.MAX_VALUE) % (2 * maxOutOfOrder));
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ snapshotKeyRanges.update(keyRanges);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ final RuntimeContext runtimeContext = getRuntimeContext();
+ final int subtaskIdx = runtimeContext.getIndexOfThisSubtask();
+ final int parallelism = runtimeContext.getNumberOfParallelSubtasks();
+ final int maxParallelism = runtimeContext.getMaxNumberOfParallelSubtasks();
+
+ ListStateDescriptor<KeyRangeStates> stateDescriptor =
+ new ListStateDescriptor<>("keyRanges", KeyRangeStates.class);
+
+ snapshotKeyRanges = context.getOperatorStateStore().getListState(stateDescriptor);
+ keyRanges = new ArrayList<>();
+
+ if (context.isRestored()) {
+ // restore key ranges from the snapshot
+ for (KeyRangeStates keyRange : snapshotKeyRanges.get()) {
+ keyRanges.add(keyRange);
+ }
+ } else {
+ // determine the key ranges that belong to the subtask
+ int rangeStartIdx = (subtaskIdx * maxParallelism) / parallelism;
+ int rangeEndIdx = ((subtaskIdx + 1) * maxParallelism) / parallelism;
+
+ for (int i = rangeStartIdx; i < rangeEndIdx; ++i) {
+
+ int start = ((i * totalKeySpaceSize + maxParallelism - 1) / maxParallelism);
+ int end = 1 + ((i + 1) * totalKeySpaceSize - 1) / maxParallelism;
+
+ if (end - start > 0) {
+ keyRanges.add(new KeyRangeStates(start, end));
+ }
+ }
+ }
+ }
+
+ /**
+ * This defines the key-range and the current sequence numbers for all keys in the range.
+ */
+ private static class KeyRangeStates {
+
+ /** Start key of the range (inclusive). */
+ final int startKey;
+
+ /** Start key of the range (exclusive). */
+ final int endKey;
+
+ /** This array contains the current sequence number for each key in the range. */
+ final long[] statesPerKey;
+
+ KeyRangeStates(int startKey, int endKey) {
+ this(startKey, endKey, new long[endKey - startKey]);
+ }
+
+ KeyRangeStates(int startKey, int endKey, long[] statesPerKey) {
+ Preconditions.checkArgument(statesPerKey.length == endKey - startKey);
+ this.startKey = startKey;
+ this.endKey = endKey;
+ this.statesPerKey = statesPerKey;
+ }
+
+ /**
+ * Increments and returns the current sequence number for the given key.
+ */
+ long incrementAndGet(int key) {
+ return ++statesPerKey[key - startKey];
+ }
+
+ /**
+ * Returns a random key that belongs to this key range.
+ */
+ int getRandomKey(Random random) {
+ return random.nextInt(endKey - startKey) + startKey;
+ }
+
+ @Override
+ public String toString() {
+ return "KeyRangeStates{" +
+ "start=" + startKey +
+ ", end=" + endKey +
+ ", statesPerKey=" + Arrays.toString(statesPerKey) +
+ '}';
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateBuilder.java
new file mode 100644
index 0000000..3db18db
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateBuilder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate;
+
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import java.io.Serializable;
+
+/**
+ * The keyed state builder wraps the logic of registering state in user
+ * functions, as well as how state is updated per input element..
+ */
+public abstract class ArtificialKeyedStateBuilder<T> implements Serializable {
+
+ private static final long serialVersionUID = -5887676929924485788L;
+
+ protected final String stateName;
+
+ public ArtificialKeyedStateBuilder(String stateName) {
+ this.stateName = stateName;
+ }
+
+ public String getStateName() {
+ return stateName;
+ }
+
+ /**
+ * Manipulate the state for an input element.
+ *
+ * @param element the current input element.
+ */
+ public abstract void artificialStateForElement(T element) throws Exception;
+
+ /**
+ * Registers the state.
+ *
+ * @param initializationContext the state initialization context, provided by the user function.
+ */
+ public abstract void initialize(FunctionInitializationContext initializationContext);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateMapper.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateMapper.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateMapper.java
new file mode 100644
index 0000000..9697877
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateMapper.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A generic, stateful {@link MapFunction} that allows specifying what states to maintain
+ * based on a provided list of {@link ArtificialKeyedStateBuilder}s.
+ */
+public class ArtificialKeyedStateMapper<IN, OUT> extends RichMapFunction<IN, OUT> implements CheckpointedFunction {
+
+ private static final long serialVersionUID = 513012258173556604L;
+
+ private final MapFunction<IN, OUT> mapFunction;
+ private final List<ArtificialKeyedStateBuilder<IN>> artificialKeyedStateBuilder;
+
+ public ArtificialKeyedStateMapper(
+ MapFunction<IN, OUT> mapFunction,
+ ArtificialKeyedStateBuilder<IN> artificialKeyedStateBuilder) {
+ this(mapFunction, Collections.singletonList(artificialKeyedStateBuilder));
+ }
+
+ public ArtificialKeyedStateMapper(
+ MapFunction<IN, OUT> mapFunction,
+ List<ArtificialKeyedStateBuilder<IN>> artificialKeyedStateBuilder) {
+
+ this.mapFunction = mapFunction;
+ this.artificialKeyedStateBuilder = artificialKeyedStateBuilder;
+ Set<String> stateNames = new HashSet<>(this.artificialKeyedStateBuilder.size());
+ for (ArtificialKeyedStateBuilder<IN> stateBuilder : this.artificialKeyedStateBuilder) {
+ if (!stateNames.add(stateBuilder.getStateName())) {
+ throw new IllegalArgumentException("Duplicated state name: " + stateBuilder.getStateName());
+ }
+ }
+ }
+
+ @Override
+ public OUT map(IN value) throws Exception {
+ for (ArtificialKeyedStateBuilder<IN> stateBuilder : artificialKeyedStateBuilder) {
+ stateBuilder.artificialStateForElement(value);
+ }
+
+ return mapFunction.map(value);
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ for (ArtificialKeyedStateBuilder<IN> stateBuilder : artificialKeyedStateBuilder) {
+ stateBuilder.initialize(context);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialMapStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialMapStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialMapStateBuilder.java
new file mode 100644
index 0000000..a29b802
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialMapStateBuilder.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate.eventpayload;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateBuilder;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * An {@link ArtificialKeyedStateBuilder} for user {@link MapState}s.
+ */
+public class ArtificialMapStateBuilder<IN, K, V> extends ArtificialKeyedStateBuilder<IN> {
+
+ private static final long serialVersionUID = -143079058769306954L;
+
+ private transient MapState<K, V> mapState;
+ private final TypeSerializer<K> keySerializer;
+ private final TypeSerializer<V> valueSerializer;
+ private final JoinFunction<IN, Iterator<Map.Entry<K, V>>, Iterator<Map.Entry<K, V>>> stateValueGenerator;
+
+ public ArtificialMapStateBuilder(
+ String stateName,
+ JoinFunction<IN, Iterator<Map.Entry<K, V>>, Iterator<Map.Entry<K, V>>> stateValueGenerator,
+ TypeSerializer<K> keySerializer,
+ TypeSerializer<V> valueSerializer) {
+
+ super(stateName);
+ this.keySerializer = keySerializer;
+ this.valueSerializer = valueSerializer;
+ this.stateValueGenerator = stateValueGenerator;
+ }
+
+ @Override
+ public void artificialStateForElement(IN event) throws Exception {
+ Iterator<Map.Entry<K, V>> update = stateValueGenerator.join(event, mapState.iterator());
+ while (update.hasNext()) {
+ Map.Entry<K, V> updateEntry = update.next();
+ mapState.put(updateEntry.getKey(), updateEntry.getValue());
+ }
+ }
+
+ @Override
+ public void initialize(FunctionInitializationContext initializationContext) {
+ MapStateDescriptor<K, V> mapStateDescriptor =
+ new MapStateDescriptor<>(stateName, keySerializer, valueSerializer);
+ mapState = initializationContext.getKeyedStateStore().getMapState(mapStateDescriptor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java
new file mode 100644
index 0000000..1d2d5f8
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate.eventpayload;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateBuilder;
+
+/**
+ * An {@link ArtificialKeyedStateBuilder} for user {@link ValueState}s.
+ */
+public class ArtificialValueStateBuilder<IN, STATE> extends ArtificialKeyedStateBuilder<IN> {
+
+ private static final long serialVersionUID = -1205814329756790916L;
+
+ private transient ValueState<STATE> valueState;
+ private final TypeSerializer<STATE> typeSerializer;
+ private final JoinFunction<IN, STATE, STATE> stateValueGenerator;
+
+ public ArtificialValueStateBuilder(
+ String stateName,
+ JoinFunction<IN, STATE, STATE> stateValueGenerator,
+ TypeSerializer<STATE> typeSerializer) {
+
+ super(stateName);
+ this.typeSerializer = typeSerializer;
+ this.stateValueGenerator = stateValueGenerator;
+ }
+
+ @Override
+ public void artificialStateForElement(IN event) throws Exception {
+ valueState.update(stateValueGenerator.join(event, valueState.value()));
+ }
+
+ @Override
+ public void initialize(FunctionInitializationContext initializationContext) {
+ ValueStateDescriptor<STATE> valueStateDescriptor =
+ new ValueStateDescriptor<>(stateName, typeSerializer);
+ valueState = initializationContext.getKeyedStateStore().getState(valueStateDescriptor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ComplexPayload.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ComplexPayload.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ComplexPayload.java
new file mode 100644
index 0000000..8cba366
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ComplexPayload.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate.eventpayload;
+
+import org.apache.flink.streaming.tests.DataStreamAllroundTestProgram;
+import org.apache.flink.streaming.tests.Event;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A state type used in the {@link DataStreamAllroundTestProgram}.
+ * Wraps an {@link Event} as state.
+ */
+public class ComplexPayload implements Serializable {
+
+ private static final long serialVersionUID = 233624606545704853L;
+
+ public ComplexPayload(Event event) {
+ this.eventTime = event.getEventTime();
+ this.innerPayLoad = new InnerPayLoad(event.getSequenceNumber());
+ this.stringList = Arrays.asList(String.valueOf(event.getKey()), event.getPayload());
+ }
+
+ private final long eventTime;
+ private final List<String> stringList;
+ private final InnerPayLoad innerPayLoad;
+
+ /**
+ * Nested class in state type. Wraps an {@link Event}'s sequence number.
+ */
+ public static class InnerPayLoad implements Serializable {
+
+ private static final long serialVersionUID = 3986298180012117883L;
+
+ private final long sequenceNumber;
+
+ public InnerPayLoad(long sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public long getSequenceNumber() {
+ return sequenceNumber;
+ }
+ }
+
+ public long getEventTime() {
+ return eventTime;
+ }
+
+ public List<String> getStringList() {
+ return stringList;
+ }
+
+ public InnerPayLoad getInnerPayLoad() {
+ return innerPayLoad;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index f7a6a70..34586ed 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -37,6 +37,7 @@ under the License.
<modules>
<module>flink-parent-child-classloading-test</module>
<module>flink-dataset-allround-test</module>
+ <module>flink-datastream-allround-test</module>
<module>flink-stream-sql-test</module>
<module>flink-bucketing-sink-test</module>
</modules>
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/Event.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/Event.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/Event.java
deleted file mode 100644
index 6bd6d18..0000000
--- a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/Event.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.general;
-
-public class Event {
-
- private final int key;
- private final long eventTime;
- private final long sequenceNumber;
- private final String payload;
-
- public Event(int key, long eventTime, long sequenceNumber, String payload) {
- this.key = key;
- this.eventTime = eventTime;
- this.sequenceNumber = sequenceNumber;
- this.payload = payload;
- }
-
- public int getKey() {
- return key;
- }
-
- public long getEventTime() {
- return eventTime;
- }
-
- public long getSequenceNumber() {
- return sequenceNumber;
- }
-
- public String getPayload() {
- return payload;
- }
-
- @Override
- public String toString() {
- return "Event{" +
- "key=" + key +
- ", eventTime=" + eventTime +
- ", sequenceNumber=" + sequenceNumber +
- ", payload='" + payload + '\'' +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/GeneralPurposeJobTest.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/GeneralPurposeJobTest.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/GeneralPurposeJobTest.java
deleted file mode 100644
index 40234f5..0000000
--- a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/GeneralPurposeJobTest.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.general;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.tests.general.artificialstate.ArtificialKeyedStateBuilder;
-import org.apache.flink.streaming.tests.general.artificialstate.ArtificialKeyedStateMapper;
-import org.apache.flink.streaming.tests.general.artificialstate.eventpayload.ArtificialValueStateBuilder;
-import org.apache.flink.streaming.tests.general.artificialstate.eventpayload.ComplexPayload;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
-
-/**
- * A general purpose test for Flink.
- */
-public class GeneralPurposeJobTest {
-
- private static final ConfigOption<String> TEST_SEMANTICS = ConfigOptions
- .key("test.semantics")
- .defaultValue("exactly-once")
- .withDescription("This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'");
-
- private static final ConfigOption<Integer> ENVIRONMENT_PARALLELISM = ConfigOptions
- .key("environment.parallelism")
- .defaultValue(1);
-
- private static final ConfigOption<Integer> ENVIRONMENT_MAX_PARALLELISM = ConfigOptions
- .key("environment.max_parallelism")
- .defaultValue(128);
-
- private static final ConfigOption<Integer> ENVIRONMENT_RESTART_DELAY = ConfigOptions
- .key("environment.restart_strategy.delay")
- .defaultValue(0);
-
- private static final ConfigOption<String> STATE_BACKEND = ConfigOptions
- .key("state_backend")
- .defaultValue("file")
- .withDescription("Supported values are 'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.");
-
- private static final ConfigOption<String> STATE_BACKEND_CHECKPOINT_DIR = ConfigOptions
- .key("state_backend.checkpoint_directory")
- .noDefaultValue()
- .withDescription("The checkpoint directory.");
-
- private static final ConfigOption<Boolean> STATE_BACKEND_ROCKS_INCREMENTAL = ConfigOptions
- .key("state_backend.rocks.incremental")
- .defaultValue(false)
- .withDescription("Activate or deactivate incremental snapshots if RocksDBStateBackend is selected.");
-
- private static final ConfigOption<Boolean> STATE_BACKEND_FILE_ASYNC = ConfigOptions
- .key("state_backend.file.async")
- .defaultValue(true)
- .withDescription("Activate or deactivate asynchronous snapshots if FileStateBackend is selected.");
-
- private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_KEYSPACE = ConfigOptions
- .key("sequence_generator_source.keyspace")
- .defaultValue(1000);
-
- private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE = ConfigOptions
- .key("sequence_generator_source.payload_size")
- .defaultValue(20);
-
- private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS = ConfigOptions
- .key("sequence_generator_source.event_time.max_out_of_order")
- .defaultValue(500L);
-
- private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS = ConfigOptions
- .key("sequence_generator_source.event_time.clock_progress")
- .defaultValue(100L);
-
- // -----------------------------------------------------------------------------------------------------------------
-
- public static void main(String[] args) throws Exception {
- final ParameterTool pt = ParameterTool.fromArgs(args);
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- setupEnvironment(env, pt);
-
- env.addSource(createEventSource(pt))
- .assignTimestampsAndWatermarks(createTimestampExtractor(pt))
- .keyBy(Event::getKey)
- .map(createArtificialKeyedStateMapper(
- (MapFunction<Event, Event>) in -> in,
- (Event first, ComplexPayload second) -> new ComplexPayload(first),
- Arrays.asList(
- new KryoSerializer<>(ComplexPayload.class, env.getConfig()))
-// AvroUtils.getAvroUtils().createAvroSerializer(ComplexPayload.class))
- )
- )
- .returns(Event.class)
- .keyBy(Event::getKey)
- .flatMap(createSemanticsCheckMapper(pt))
-// .timeWindow(Time.seconds(10), Time.seconds(1))
-// .apply(new WindowFunction<Event, Object, Integer, TimeWindow>() {
-// @Override
-// public void apply(Integer integer, TimeWindow window, Iterable<Event> input, Collector<Object> out) throws Exception {
-// System.out.println("------------ "+integer);
-// for (Event event : input) {
-// System.out.println(event);
-// }
-// }
-// });
- .addSink(new PrintSinkFunction<>());
-
- env.execute("General purpose test job");
- }
-
- public static void setupEnvironment(StreamExecutionEnvironment env, ParameterTool pt) throws Exception {
-
- // use event time
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
- // parallelism
- env.setParallelism(pt.getInt(ENVIRONMENT_PARALLELISM.key(), ENVIRONMENT_PARALLELISM.defaultValue()));
- env.setMaxParallelism(pt.getInt(ENVIRONMENT_MAX_PARALLELISM.key(), ENVIRONMENT_MAX_PARALLELISM.defaultValue()));
-
- // restart strategy
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
- Integer.MAX_VALUE,
- pt.getInt(ENVIRONMENT_RESTART_DELAY.key(), ENVIRONMENT_RESTART_DELAY.defaultValue())));
-
- // state backend
- final String stateBackend = pt.get(
- STATE_BACKEND.key(),
- STATE_BACKEND.defaultValue());
-
- final String checkpointDir = pt.getRequired(STATE_BACKEND_CHECKPOINT_DIR.key());
-
- if ("file".equalsIgnoreCase(stateBackend)) {
- boolean asyncCheckpoints = pt.getBoolean(
- STATE_BACKEND_FILE_ASYNC.key(),
- STATE_BACKEND_FILE_ASYNC.defaultValue());
-
- env.setStateBackend(new FsStateBackend(checkpointDir, asyncCheckpoints));
- } else if ("rocks".equalsIgnoreCase(stateBackend)) {
- boolean incrementalCheckpoints = pt.getBoolean(
- STATE_BACKEND_ROCKS_INCREMENTAL.key(),
- STATE_BACKEND_ROCKS_INCREMENTAL.defaultValue());
-
- env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints));
- } else {
- throw new IllegalArgumentException("Unknown backend requested: " + stateBackend);
- }
-
- // make parameters available in the web interface
- env.getConfig().setGlobalJobParameters(pt);
- }
-
- private static SourceFunction<Event> createEventSource(ParameterTool pt) {
- return new SequenceGeneratorSource(
- pt.getInt(
- SEQUENCE_GENERATOR_SRC_KEYSPACE.key(),
- SEQUENCE_GENERATOR_SRC_KEYSPACE.defaultValue()),
- pt.getInt(
- SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE.key(),
- SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE.defaultValue()),
- pt.getLong(
- SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
- SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()),
- pt.getLong(
- SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.key(),
- SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.defaultValue()));
- }
-
- private static BoundedOutOfOrdernessTimestampExtractor<Event> createTimestampExtractor(ParameterTool pt) {
- return new BoundedOutOfOrdernessTimestampExtractor<Event>(
- Time.milliseconds(
- pt.getLong(
- SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
- SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()))) {
-
- @Override
- public long extractTimestamp(Event element) {
- return element.getEventTime();
- }
- };
- }
-
- private static FlatMapFunction<Event, String> createSemanticsCheckMapper(ParameterTool pt) {
-
- String semantics = pt.get(TEST_SEMANTICS.key(), TEST_SEMANTICS.defaultValue());
-
- SemanticsCheckMapper.ValidatorFunction validatorFunction;
-
- if (semantics.equalsIgnoreCase("exactly-once")) {
- validatorFunction = SemanticsCheckMapper.ValidatorFunction.exactlyOnce();
- } else if (semantics.equalsIgnoreCase("at-least-once")) {
- validatorFunction = SemanticsCheckMapper.ValidatorFunction.atLeastOnce();
- } else {
- throw new IllegalArgumentException("Unknown semantics requested: " + semantics);
- }
-
- return new SemanticsCheckMapper(validatorFunction);
- }
-
- private static <IN, OUT, STATE> ArtificialKeyedStateMapper<IN, OUT> createArtificialKeyedStateMapper(
- MapFunction<IN, OUT> mapFunction,
- JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
- List<TypeSerializer<STATE>> stateSerializers) {
-
- List<ArtificialKeyedStateBuilder<IN>> artificialStateBuilders = new ArrayList<>(stateSerializers.size());
- for (TypeSerializer<STATE> typeSerializer : stateSerializers) {
-
- String stateName = "valueState-" + typeSerializer.getClass().getSimpleName() + "-" + UUID.randomUUID();
-
- ArtificialValueStateBuilder<IN, STATE> stateBuilder = new ArtificialValueStateBuilder<>(
- stateName,
- inputAndOldStateToNewState,
- typeSerializer
- );
-
- artificialStateBuilders.add(stateBuilder);
- }
- return new ArtificialKeyedStateMapper<>(mapFunction, artificialStateBuilders);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/SemanticsCheckMapper.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/SemanticsCheckMapper.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/SemanticsCheckMapper.java
deleted file mode 100644
index 26c0465..0000000
--- a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/SemanticsCheckMapper.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.general;
-
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
-
-/**
- * This mapper validates exactly-once and at-least-once semantics in connection with {@link SequenceGeneratorSource}.
- */
-public class SemanticsCheckMapper extends RichFlatMapFunction<Event, String> implements CheckpointedFunction {
-
- /** This value state tracks the current sequence number per key. */
- private volatile ValueState<Long> sequenceValue;
-
- /** This defines how semantics are checked for each update. */
- private final ValidatorFunction validator;
-
- SemanticsCheckMapper(ValidatorFunction validator) {
- this.validator = validator;
- }
-
- @Override
- public void flatMap(Event event, Collector<String> out) throws Exception {
-
- Long currentValue = sequenceValue.value();
- if (currentValue == null) {
- currentValue = 0L;
- }
-
- long nextValue = event.getSequenceNumber();
-
- if (validator.check(currentValue, nextValue)) {
- sequenceValue.update(nextValue);
- } else {
- out.collect("Alert: " + currentValue + " -> " + nextValue);
- }
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext context) {
- }
-
- @Override
- public void initializeState(FunctionInitializationContext context) {
-
- ValueStateDescriptor<Long> sequenceStateDescriptor =
- new ValueStateDescriptor<>("sequenceState", Long.class);
-
- sequenceValue = context.getKeyedStateStore().getState(sequenceStateDescriptor);
- }
-
- public interface ValidatorFunction extends Serializable {
- boolean check(long current, long update);
-
- static ValidatorFunction exactlyOnce() {
- return (current, update) -> (update - current) == 1;
- }
-
- static ValidatorFunction atLeastOnce() {
- return (current, update) -> (update - current) <= 1;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/SequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/SequenceGeneratorSource.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/SequenceGeneratorSource.java
deleted file mode 100644
index ed95409..0000000
--- a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/SequenceGeneratorSource.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.general;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.StringUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-
-/**
- * This source function generates a sequence of long values per key.
- */
-public class SequenceGeneratorSource extends RichParallelSourceFunction<Event> implements CheckpointedFunction {
-
- private final static Logger LOG = LoggerFactory.getLogger(SequenceGeneratorSource.class);
-
- /** Length of the artificial payload string generated for each event. */
- private final int payloadLength;
-
- /** The size of the total key space, i.e. the number of unique keys generated by all parallel sources. */
- private final int totalKeySpaceSize;
-
- /** This determines how much the event time progresses for each generated element. */
- private final long eventTimeClockProgressPerEvent;
-
- /** Maximum generated deviation in event time from the current event time clock. */
- private final long maxOutOfOrder;
-
- /** Time that a sleep takes in milliseconds. A value < 1 deactivates sleeping. */
- private final long sleepTime;
-
- /** This determines after how many generated events we sleep. A value < 1 deactivates sleeping. */
- private final long sleepAfterElements;
-
- /** This holds the key ranges for which this source generates events. */
- private transient List<KeyRangeStates> keyRanges;
-
- /** This is used to snapshot the state of this source, one entry per key range. */
- private transient ListState<KeyRangeStates> snapshotKeyRanges;
-
- /** Flag that determines if this source is running, i.e. generating events. */
- private volatile boolean running;
-
-
- SequenceGeneratorSource(
- int totalKeySpaceSize,
- int payloadLength,
- long maxOutOfOrder,
- long eventTimeClockProgressPerEvent) {
-
- this(totalKeySpaceSize,
- payloadLength,
- maxOutOfOrder,
- eventTimeClockProgressPerEvent,
- 0L,
- 0L);
- }
-
- SequenceGeneratorSource(
- int totalKeySpaceSize,
- int payloadLength,
- long maxOutOfOrder,
- long eventTimeClockProgressPerEvent,
- long sleepTime,
- long sleepAfterElements) {
-
- this.totalKeySpaceSize = totalKeySpaceSize;
- this.maxOutOfOrder = maxOutOfOrder;
- this.payloadLength = payloadLength;
- this.eventTimeClockProgressPerEvent = eventTimeClockProgressPerEvent;
- this.sleepTime = sleepTime;
- this.sleepAfterElements = sleepTime > 0 ? sleepAfterElements : 0;
- this.running = true;
- }
-
- @Override
- public void run(SourceContext<Event> ctx) throws Exception {
-
- Random random = new Random();
-
- // this holds the current event time, from which generated events can up to +/- (maxOutOfOrder).
- long monotonousEventTime = 0L;
- long elementsBeforeSleep = sleepAfterElements;
-
- while (running) {
-
- KeyRangeStates randomKeyRangeStates = keyRanges.get(random.nextInt(keyRanges.size()));
- int randomKey = randomKeyRangeStates.getRandomKey(random);
- long value = randomKeyRangeStates.incrementAndGet(randomKey);
- long eventTime = Math.max(
- 0,
- generateEventTimeWithOutOfOrderness(random, monotonousEventTime));
-
- // uptick the event time clock
- monotonousEventTime += eventTimeClockProgressPerEvent;
-
- Event event = new Event(
- randomKey,
- eventTime,
- value,
- StringUtils.getRandomString(random, payloadLength, payloadLength, 'A', 'z'));
-
- ctx.collect(event);
-
- if (elementsBeforeSleep == 1) {
- elementsBeforeSleep = sleepAfterElements;
- Thread.sleep(sleepTime);
- } else if (elementsBeforeSleep > 1) {
- --elementsBeforeSleep;
- }
- }
- }
-
- private long generateEventTimeWithOutOfOrderness(Random random, long correctTime) {
- return correctTime - maxOutOfOrder + ((random.nextLong() & Long.MAX_VALUE) % (2 * maxOutOfOrder));
- }
-
- @Override
- public void cancel() {
- running = false;
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- snapshotKeyRanges.update(keyRanges);
- }
-
- @Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- final RuntimeContext runtimeContext = getRuntimeContext();
- final int subtaskIdx = runtimeContext.getIndexOfThisSubtask();
- final int parallelism = runtimeContext.getNumberOfParallelSubtasks();
- final int maxParallelism = runtimeContext.getMaxNumberOfParallelSubtasks();
-
- ListStateDescriptor<KeyRangeStates> stateDescriptor =
- new ListStateDescriptor<>("keyRanges", KeyRangeStates.class);
-
- snapshotKeyRanges = context.getOperatorStateStore().getListState(stateDescriptor);
- keyRanges = new ArrayList<>();
-
- if (context.isRestored()) {
- // restore key ranges from the snapshot
- for (KeyRangeStates keyRange : snapshotKeyRanges.get()) {
- keyRanges.add(keyRange);
- }
- } else {
- // determine the key ranges that belong to the subtask
- int rangeStartIdx = (subtaskIdx * maxParallelism) / parallelism;
- int rangeEndIdx = ((subtaskIdx + 1) * maxParallelism) / parallelism;
-
- for (int i = rangeStartIdx; i < rangeEndIdx; ++i) {
-
- int start = ((i * totalKeySpaceSize + maxParallelism - 1) / maxParallelism);
- int end = 1 + ((i + 1) * totalKeySpaceSize - 1) / maxParallelism;
-
- if (end - start > 0) {
- keyRanges.add(new KeyRangeStates(start, end));
- }
- }
- }
- }
-
- /**
- * This defines the key-range and the current sequence numbers for all keys in the range.
- */
- private static class KeyRangeStates {
-
- /** Start key of the range (inclusive). */
- final int startKey;
-
- /** Start key of the range (exclusive). */
- final int endKey;
-
- /** This array contains the current sequence number for each key in the range. */
- final long[] statesPerKey;
-
- KeyRangeStates(int startKey, int endKey) {
- this(startKey, endKey, new long[endKey - startKey]);
- }
-
- KeyRangeStates(int startKey, int endKey, long[] statesPerKey) {
- Preconditions.checkArgument(statesPerKey.length == endKey - startKey);
- this.startKey = startKey;
- this.endKey = endKey;
- this.statesPerKey = statesPerKey;
- }
-
- /**
- * Increments and returns the current sequence number for the given key.
- */
- long incrementAndGet(int key) {
- return ++statesPerKey[key - startKey];
- }
-
- /**
- * Returns a random key that belongs to this key range.
- */
- int getRandomKey(Random random) {
- return random.nextInt(endKey - startKey) + startKey;
- }
-
- @Override
- public String toString() {
- return "KeyRangeStates{" +
- "start=" + startKey +
- ", end=" + endKey +
- ", statesPerKey=" + Arrays.toString(statesPerKey) +
- '}';
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/ArtificialKeyedStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/ArtificialKeyedStateBuilder.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/ArtificialKeyedStateBuilder.java
deleted file mode 100644
index c39704d..0000000
--- a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/ArtificialKeyedStateBuilder.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.general.artificialstate;
-
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-
-import java.io.Serializable;
-
-public abstract class ArtificialKeyedStateBuilder<T> implements Serializable {
-
- protected final String stateName;
-
- public ArtificialKeyedStateBuilder(String stateName) {
- this.stateName = stateName;
- }
-
- public String getStateName() {
- return stateName;
- }
-
- public abstract void artificialStateForElement(T element) throws Exception;
-
- public abstract void initialize(FunctionInitializationContext initializationContext);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/ArtificialKeyedStateMapper.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/ArtificialKeyedStateMapper.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/ArtificialKeyedStateMapper.java
deleted file mode 100644
index a49ee03..0000000
--- a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/ArtificialKeyedStateMapper.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.general.artificialstate;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-public class ArtificialKeyedStateMapper<IN, OUT> extends RichMapFunction<IN, OUT> implements CheckpointedFunction {
-
- private final MapFunction<IN, OUT> mapFunction;
- private final List<ArtificialKeyedStateBuilder<IN>> artificialKeyedStateBuilder;
-
- public ArtificialKeyedStateMapper(
- MapFunction<IN, OUT> mapFunction,
- ArtificialKeyedStateBuilder<IN> artificialKeyedStateBuilder) {
- this(mapFunction, Collections.singletonList(artificialKeyedStateBuilder));
- }
-
- public ArtificialKeyedStateMapper(
- MapFunction<IN, OUT> mapFunction,
- List<ArtificialKeyedStateBuilder<IN>> artificialKeyedStateBuilder) {
-
- this.mapFunction = mapFunction;
- this.artificialKeyedStateBuilder = artificialKeyedStateBuilder;
- Set<String> stateNames = new HashSet<>(this.artificialKeyedStateBuilder.size());
- for (ArtificialKeyedStateBuilder<IN> stateBuilder : this.artificialKeyedStateBuilder) {
- if (!stateNames.add(stateBuilder.getStateName())) {
- throw new IllegalArgumentException("Duplicated state name: " + stateBuilder.getStateName());
- }
- }
- }
-
- @Override
- public OUT map(IN value) throws Exception {
- for (ArtificialKeyedStateBuilder<IN> stateBuilder : artificialKeyedStateBuilder) {
- stateBuilder.artificialStateForElement(value);
- }
-
- return mapFunction.map(value);
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- }
-
- @Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- for (ArtificialKeyedStateBuilder<IN> stateBuilder : artificialKeyedStateBuilder) {
- stateBuilder.initialize(context);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ArtificialMapStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ArtificialMapStateBuilder.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ArtificialMapStateBuilder.java
deleted file mode 100644
index 205a4d6..0000000
--- a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ArtificialMapStateBuilder.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.general.artificialstate.eventpayload;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.streaming.tests.general.artificialstate.ArtificialKeyedStateBuilder;
-
-import java.util.Iterator;
-import java.util.Map;
-
-public class ArtificialMapStateBuilder<IN, K, V> extends ArtificialKeyedStateBuilder<IN> {
-
- private transient MapState<K, V> mapState;
- private final TypeSerializer<K> keySerializer;
- private final TypeSerializer<V> valueSerializer;
- private final JoinFunction<IN, Iterator<Map.Entry<K, V>>, Iterator<Map.Entry<K, V>>> stateValueGenerator;
-
- public ArtificialMapStateBuilder(
- String stateName,
- JoinFunction<IN, Iterator<Map.Entry<K, V>>, Iterator<Map.Entry<K, V>>> stateValueGenerator,
- TypeSerializer<K> keySerializer,
- TypeSerializer<V> valueSerializer) {
-
- super(stateName);
- this.keySerializer = keySerializer;
- this.valueSerializer = valueSerializer;
- this.stateValueGenerator = stateValueGenerator;
- }
-
- @Override
- public void artificialStateForElement(IN event) throws Exception {
- Iterator<Map.Entry<K, V>> update = stateValueGenerator.join(event, mapState.iterator());
- while (update.hasNext()) {
- Map.Entry<K, V> updateEntry = update.next();
- mapState.put(updateEntry.getKey(), updateEntry.getValue());
- }
- }
-
- @Override
- public void initialize(FunctionInitializationContext initializationContext) {
- MapStateDescriptor<K, V> mapStateDescriptor =
- new MapStateDescriptor<>(stateName, keySerializer, valueSerializer);
- mapState = initializationContext.getKeyedStateStore().getMapState(mapStateDescriptor);
- }
-}