You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/04 14:05:09 UTC
[1/2] flink git commit: [FLINK-8978] Stateful generic stream job
upgrade e2e test
Repository: flink
Updated Branches:
refs/heads/master b50cebb65 -> 5ac4d2960
http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/test-scripts/test_stateful_stream_job_upgrade.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_stateful_stream_job_upgrade.sh b/flink-end-to-end-tests/test-scripts/test_stateful_stream_job_upgrade.sh
new file mode 100755
index 0000000..44d9df0
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_stateful_stream_job_upgrade.sh
@@ -0,0 +1,91 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+ORIGINAL_DOP="${1:-2}"
+NEW_DOP="${2:-4}"
+
+if (( $ORIGINAL_DOP >= $NEW_DOP )); then
+ NUM_SLOTS=${ORIGINAL_DOP}
+else
+ NUM_SLOTS=${NEW_DOP}
+fi
+
+backup_config
+change_conf "taskmanager.numberOfTaskSlots" "1" "${NUM_SLOTS}"
+setup_flink_slf4j_metric_reporter
+
+start_cluster
+
+# make sure to stop Kafka and ZooKeeper at the end, as well as cleaning up the Flink cluster and our moodifications
+function test_cleanup {
+ # don't call ourselves again for another signal interruption
+ trap "exit -1" INT
+ # don't call ourselves again for normal exit
+ trap "" EXIT
+
+ # revert our modifications to the Flink distribution
+ rm ${FLINK_DIR}/lib/flink-metrics-slf4j-*.jar
+
+ # make sure to run regular cleanup as well
+ cleanup
+}
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+CHECKPOINT_DIR="file://${TEST_DATA_DIR}/savepoint-e2e-test-chckpt-dir"
+
+TEST_PROGRAM_JAR="${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/target/StatefulStreamJobUpgradeTestProgram.jar"
+
+function job() {
+ DOP=$1
+ CMD="${FLINK_DIR}/bin/flink run -d -p ${DOP} ${TEST_PROGRAM_JAR} \
+ --test.semantics exactly-once \
+ --environment.parallelism ${DOP} \
+ --state_backend.checkpoint_directory ${CHECKPOINT_DIR} \
+ --sequence_generator_source.sleep_time 15 \
+ --sequence_generator_source.sleep_after_elements 1"
+ echo "${CMD}"
+}
+
+JOB=$(job ${ORIGINAL_DOP})
+ORIGINAL_JOB=$(${JOB} --test.job.variant original \
+ | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running ${ORIGINAL_JOB}
+
+wait_oper_metric_num_in_records stateMap2.1 200
+
+# take a savepoint of the state machine job
+SAVEPOINT_PATH=$(take_savepoint ${ORIGINAL_JOB} ${TEST_DATA_DIR} \
+ | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+cancel_job ${ORIGINAL_JOB}
+
+JOB=$(job ${NEW_DOP})
+UPGRADED_JOB=$(${JOB} --test.job.variant upgraded \
+ | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running ${UPGRADED_JOB}
+
+wait_oper_metric_num_in_records stateMap3.2 200
+
+# if state is errorneous and the state machine job produces alerting state transitions,
+# output would be non-empty and the test will not pass
[2/2] flink git commit: [FLINK-8978] Stateful generic stream job
upgrade e2e test
Posted by sr...@apache.org.
[FLINK-8978] Stateful generic stream job upgrade e2e test
This closes #5947.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ac4d296
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ac4d296
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ac4d296
Branch: refs/heads/master
Commit: 5ac4d29609842cb086b46fd56f6e49a4287cfa1b
Parents: b50cebb
Author: Andrey Zagrebin <an...@linkresearchtools.org>
Authored: Mon Apr 30 20:25:53 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri May 4 16:03:32 2018 +0200
----------------------------------------------------------------------
.../tests/DataStreamAllroundTestJobFactory.java | 281 +++++++++++++++++++
.../tests/DataStreamAllroundTestProgram.java | 267 ++----------------
.../ArtificialKeyedStateBuilder.java | 56 ----
.../ArtificialKeyedStateMapper.java | 21 +-
.../tests/artificialstate/ComplexPayload.java | 85 ++++++
.../builder/ArtificialListStateBuilder.java | 65 +++++
.../builder/ArtificialMapStateBuilder.java | 69 +++++
.../builder/ArtificialStateBuilder.java | 56 ++++
.../builder/ArtificialValueStateBuilder.java | 58 ++++
.../eventpayload/ArtificialMapStateBuilder.java | 70 -----
.../ArtificialValueStateBuilder.java | 60 ----
.../eventpayload/ComplexPayload.java | 76 -----
.../pom.xml | 97 +++++++
.../StatefulStreamJobUpgradeTestProgram.java | 158 +++++++++++
flink-end-to-end-tests/pom.xml | 1 +
flink-end-to-end-tests/run-nightly-tests.sh | 8 +
flink-end-to-end-tests/test-scripts/common.sh | 77 ++++-
.../test-scripts/test_resume_savepoint.sh | 58 +---
.../test_stateful_stream_job_upgrade.sh | 91 ++++++
19 files changed, 1085 insertions(+), 569 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
new file mode 100644
index 0000000..e7b0bdc
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateMapper;
+import org.apache.flink.streaming.tests.artificialstate.builder.ArtificialListStateBuilder;
+import org.apache.flink.streaming.tests.artificialstate.builder.ArtificialStateBuilder;
+import org.apache.flink.streaming.tests.artificialstate.builder.ArtificialValueStateBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A factory for components of general purpose test jobs for Flink's DataStream API operators and primitives.
+ *
+ * <p>The components can be configured for different state backends, including memory, file, and RocksDB
+ * state backends. It also allows specifying the processing guarantee semantics, which will also be verified
+ * by the job itself according to the specified semantic.
+ *
+ * <p>Program parameters:
+ * <ul>
+ * <li>test.semantics (String, default - 'exactly-once'): This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'.</li>
+ * <li>environment.checkpoint_interval (long, default - 1000): the checkpoint interval.</li>
+ * <li>environment.parallelism (int, default - 1): parallelism to use for the job.</li>
+ * <li>environment.max_parallelism (int, default - 128): max parallelism to use for the job</li>
+ * <li>environment.restart_strategy.delay (long, default - 0): delay between restart attempts, in milliseconds.</li>
+ * <li>state_backend (String, default - 'file'): Supported values are 'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.</li>
+ * <li>state_backend.checkpoint_directory (String): The checkpoint directory.</li>
+ * <li>state_backend.rocks.incremental (boolean, default - false): Activate or deactivate incremental snapshots if RocksDBStateBackend is selected.</li>
+ * <li>state_backend.file.async (boolean, default - true): Activate or deactivate asynchronous snapshots if FileStateBackend is selected.</li>
+ * <li>sequence_generator_source.keyspace (int, default - 1000): Number of different keys for events emitted by the sequence generator.</li>
+ * <li>sequence_generator_source.payload_size (int, default - 20): Length of message payloads emitted by the sequence generator.</li>
+ * <li>sequence_generator_source.sleep_time (long, default - 0): Milliseconds to sleep after emitting events in the sequence generator. Set to 0 to disable sleeping.</li>
+ * <li>sequence_generator_source.sleep_after_elements (long, default - 0): Number of elements to emit before sleeping in the sequence generator. Set to 0 to disable sleeping.</li>
+ * <li>sequence_generator_source.event_time.max_out_of_order (long, default - 500): Max event time out-of-orderness for events emitted by the sequence generator.</li>
+ * <li>sequence_generator_source.event_time.clock_progress (long, default - 100): The amount of event time to progress per event generated by the sequence generator.</li>
+ * </ul>
+ */
+class DataStreamAllroundTestJobFactory {
+ private static final ConfigOption<String> TEST_SEMANTICS = ConfigOptions
+ .key("test.semantics")
+ .defaultValue("exactly-once")
+ .withDescription("This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'");
+
+ private static final ConfigOption<Long> ENVIRONMENT_CHECKPOINT_INTERVAL = ConfigOptions
+ .key("environment.checkpoint_interval")
+ .defaultValue(1000L);
+
+ private static final ConfigOption<Integer> ENVIRONMENT_PARALLELISM = ConfigOptions
+ .key("environment.parallelism")
+ .defaultValue(1);
+
+ private static final ConfigOption<Integer> ENVIRONMENT_MAX_PARALLELISM = ConfigOptions
+ .key("environment.max_parallelism")
+ .defaultValue(128);
+
+ private static final ConfigOption<Integer> ENVIRONMENT_RESTART_DELAY = ConfigOptions
+ .key("environment.restart_strategy.delay")
+ .defaultValue(0);
+
+ private static final ConfigOption<String> STATE_BACKEND = ConfigOptions
+ .key("state_backend")
+ .defaultValue("file")
+ .withDescription("Supported values are 'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.");
+
+ private static final ConfigOption<String> STATE_BACKEND_CHECKPOINT_DIR = ConfigOptions
+ .key("state_backend.checkpoint_directory")
+ .noDefaultValue()
+ .withDescription("The checkpoint directory.");
+
+ private static final ConfigOption<Boolean> STATE_BACKEND_ROCKS_INCREMENTAL = ConfigOptions
+ .key("state_backend.rocks.incremental")
+ .defaultValue(false)
+ .withDescription("Activate or deactivate incremental snapshots if RocksDBStateBackend is selected.");
+
+ private static final ConfigOption<Boolean> STATE_BACKEND_FILE_ASYNC = ConfigOptions
+ .key("state_backend.file.async")
+ .defaultValue(true)
+ .withDescription("Activate or deactivate asynchronous snapshots if FileStateBackend is selected.");
+
+ private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_KEYSPACE = ConfigOptions
+ .key("sequence_generator_source.keyspace")
+ .defaultValue(200);
+
+ private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE = ConfigOptions
+ .key("sequence_generator_source.payload_size")
+ .defaultValue(20);
+
+ private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_SLEEP_TIME = ConfigOptions
+ .key("sequence_generator_source.sleep_time")
+ .defaultValue(0L);
+
+ private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS = ConfigOptions
+ .key("sequence_generator_source.sleep_after_elements")
+ .defaultValue(0L);
+
+ private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS = ConfigOptions
+ .key("sequence_generator_source.event_time.max_out_of_order")
+ .defaultValue(500L);
+
+ private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS = ConfigOptions
+ .key("sequence_generator_source.event_time.clock_progress")
+ .defaultValue(100L);
+
+ static void setupEnvironment(StreamExecutionEnvironment env, ParameterTool pt) throws Exception {
+
+ // set checkpointing semantics
+ String semantics = pt.get(TEST_SEMANTICS.key(), TEST_SEMANTICS.defaultValue());
+ long checkpointInterval = pt.getLong(ENVIRONMENT_CHECKPOINT_INTERVAL.key(), ENVIRONMENT_CHECKPOINT_INTERVAL.defaultValue());
+ CheckpointingMode checkpointingMode = semantics.equalsIgnoreCase("exactly-once")
+ ? CheckpointingMode.EXACTLY_ONCE
+ : CheckpointingMode.AT_LEAST_ONCE;
+
+ env.enableCheckpointing(checkpointInterval, checkpointingMode);
+
+ // use event time
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ // parallelism
+ env.setParallelism(pt.getInt(ENVIRONMENT_PARALLELISM.key(), ENVIRONMENT_PARALLELISM.defaultValue()));
+ env.setMaxParallelism(pt.getInt(ENVIRONMENT_MAX_PARALLELISM.key(), ENVIRONMENT_MAX_PARALLELISM.defaultValue()));
+
+ // restart strategy
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+ Integer.MAX_VALUE,
+ pt.getInt(ENVIRONMENT_RESTART_DELAY.key(), ENVIRONMENT_RESTART_DELAY.defaultValue())));
+
+ // state backend
+ final String stateBackend = pt.get(
+ STATE_BACKEND.key(),
+ STATE_BACKEND.defaultValue());
+
+ final String checkpointDir = pt.getRequired(STATE_BACKEND_CHECKPOINT_DIR.key());
+
+ if ("file".equalsIgnoreCase(stateBackend)) {
+ boolean asyncCheckpoints = pt.getBoolean(
+ STATE_BACKEND_FILE_ASYNC.key(),
+ STATE_BACKEND_FILE_ASYNC.defaultValue());
+
+ env.setStateBackend(new FsStateBackend(checkpointDir, asyncCheckpoints));
+ } else if ("rocks".equalsIgnoreCase(stateBackend)) {
+ boolean incrementalCheckpoints = pt.getBoolean(
+ STATE_BACKEND_ROCKS_INCREMENTAL.key(),
+ STATE_BACKEND_ROCKS_INCREMENTAL.defaultValue());
+
+ env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints));
+ } else {
+ throw new IllegalArgumentException("Unknown backend requested: " + stateBackend);
+ }
+
+ // make parameters available in the web interface
+ env.getConfig().setGlobalJobParameters(pt);
+ }
+
+ static SourceFunction<Event> createEventSource(ParameterTool pt) {
+ return new SequenceGeneratorSource(
+ pt.getInt(
+ SEQUENCE_GENERATOR_SRC_KEYSPACE.key(),
+ SEQUENCE_GENERATOR_SRC_KEYSPACE.defaultValue()),
+ pt.getInt(
+ SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE.key(),
+ SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE.defaultValue()),
+ pt.getLong(
+ SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
+ SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()),
+ pt.getLong(
+ SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.key(),
+ SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.defaultValue()),
+ pt.getLong(
+ SEQUENCE_GENERATOR_SRC_SLEEP_TIME.key(),
+ SEQUENCE_GENERATOR_SRC_SLEEP_TIME.defaultValue()),
+ pt.getLong(
+ SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.key(),
+ SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue()));
+ }
+
+ static BoundedOutOfOrdernessTimestampExtractor<Event> createTimestampExtractor(ParameterTool pt) {
+ return new BoundedOutOfOrdernessTimestampExtractor<Event>(
+ Time.milliseconds(
+ pt.getLong(
+ SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
+ SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()))) {
+
+ @Override
+ public long extractTimestamp(Event element) {
+ return element.getEventTime();
+ }
+ };
+ }
+
+ static FlatMapFunction<Event, String> createSemanticsCheckMapper(ParameterTool pt) {
+
+ String semantics = pt.get(TEST_SEMANTICS.key(), TEST_SEMANTICS.defaultValue());
+
+ SemanticsCheckMapper.ValidatorFunction validatorFunction;
+
+ if (semantics.equalsIgnoreCase("exactly-once")) {
+ validatorFunction = SemanticsCheckMapper.ValidatorFunction.exactlyOnce();
+ } else if (semantics.equalsIgnoreCase("at-least-once")) {
+ validatorFunction = SemanticsCheckMapper.ValidatorFunction.atLeastOnce();
+ } else {
+ throw new IllegalArgumentException("Unknown semantics requested: " + semantics);
+ }
+
+ return new SemanticsCheckMapper(validatorFunction);
+ }
+
+ static <IN, OUT, STATE> ArtificialKeyedStateMapper<IN, OUT> createArtificialKeyedStateMapper(
+ MapFunction<IN, OUT> mapFunction,
+ JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
+ List<TypeSerializer<STATE>> stateSerializers) {
+
+ List<ArtificialStateBuilder<IN>> artificialStateBuilders = new ArrayList<>(stateSerializers.size());
+ for (TypeSerializer<STATE> typeSerializer : stateSerializers) {
+ artificialStateBuilders.add(createValueStateBuilder(inputAndOldStateToNewState, typeSerializer));
+ artificialStateBuilders.add(createListStateBuilder(inputAndOldStateToNewState, typeSerializer));
+ }
+ return new ArtificialKeyedStateMapper<>(mapFunction, artificialStateBuilders);
+ }
+
+ static <IN, STATE> ArtificialStateBuilder<IN> createValueStateBuilder(
+ JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
+ TypeSerializer<STATE> typeSerializer) {
+
+ return new ArtificialValueStateBuilder<>(
+ "valueState-" + typeSerializer.getClass().getSimpleName(),
+ inputAndOldStateToNewState,
+ typeSerializer);
+ }
+
+ static <IN, STATE> ArtificialStateBuilder<IN> createListStateBuilder(
+ JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
+ TypeSerializer<STATE> typeSerializer) {
+
+ JoinFunction<IN, Iterable<STATE>, List<STATE>> listStateGenerator = (first, second) -> {
+ List<STATE> newState = new ArrayList<>();
+ for (STATE s : second) {
+ newState.add(inputAndOldStateToNewState.join(first, s));
+ }
+ return newState;
+ };
+
+ return new ArtificialListStateBuilder<>(
+ "listState-" + typeSerializer.getClass().getSimpleName(),
+ listStateGenerator,
+ listStateGenerator,
+ typeSerializer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index 2059b99..75c14e5 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -18,136 +18,38 @@
package org.apache.flink.streaming.tests;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateBuilder;
-import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateMapper;
-import org.apache.flink.streaming.tests.artificialstate.eventpayload.ArtificialValueStateBuilder;
-import org.apache.flink.streaming.tests.artificialstate.eventpayload.ComplexPayload;
+import org.apache.flink.streaming.tests.artificialstate.ComplexPayload;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
+import java.util.Collections;
+
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
/**
- * A general purpose test for Flink's DataStream API operators and primitives.
+ * A general purpose test job for Flink's DataStream API operators and primitives.
*
- * <p>It currrently covers the following aspects that are frequently present in Flink DataStream jobs:
+ * <p>The job is constructed of generic components from {@link DataStreamAllroundTestJobFactory}.
+ * It currently covers the following aspects that are frequently present in Flink DataStream jobs:
* <ul>
* <li>A generic Kryo input type.</li>
* <li>A state type for which we register a {@link KryoSerializer}.</li>
* <li>Operators with {@link ValueState}.</li>
* </ul>
*
- * <p>The job allows to be configured for different state backends, including memory, file, and RocksDB
- * state backends. It also allows specifying the processing guarantee semantics, which will also be verified
- * by the job itself according to the specified semantic.
+ * <p>The cli job configuration options are described in {@link DataStreamAllroundTestJobFactory}.
*
- * <p>Program parameters:
- * <ul>
- * <li>test.semantics (String, default - 'exactly-once'): This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'.</li>
- * <li>environment.checkpoint_interval (long, default - 1000): the checkpoint interval.</li>
- * <li>environment.parallelism (int, default - 1): parallelism to use for the job.</li>
- * <li>environment.max_parallelism (int, default - 128): max parallelism to use for the job</li>
- * <li>environment.restart_strategy.delay (long, default - 0): delay between restart attempts, in milliseconds.</li>
- * <li>state_backend (String, default - 'file'): Supported values are 'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.</li>
- * <li>state_backend.checkpoint_directory (String): The checkpoint directory.</li>
- * <li>state_backend.rocks.incremental (boolean, default - false): Activate or deactivate incremental snapshots if RocksDBStateBackend is selected.</li>
- * <li>state_backend.file.async (boolean, default - true): Activate or deactivate asynchronous snapshots if FileStateBackend is selected.</li>
- * <li>sequence_generator_source.keyspace (int, default - 1000): Number of different keys for events emitted by the sequence generator.</li>
- * <li>sequence_generator_source.payload_size (int, default - 20): Length of message payloads emitted by the sequence generator.</li>
- * <li>sequence_generator_source.sleep_time (long, default - 0): Milliseconds to sleep after emitting events in the sequence generator. Set to 0 to disable sleeping.</li>
- * <li>sequence_generator_source.sleep_after_elements (long, default - 0): Number of elements to emit before sleeping in the sequence generator. Set to 0 to disable sleeping.</li>
- * <li>sequence_generator_source.event_time.max_out_of_order (long, default - 500): Max event time out-of-orderness for events emitted by the sequence generator.</li>
- * <li>sequence_generator_source.event_time.clock_progress (long, default - 100): The amount of event time to progress per event generated by the sequence generator.</li>
- * </ul>
*/
public class DataStreamAllroundTestProgram {
-
- private static final ConfigOption<String> TEST_SEMANTICS = ConfigOptions
- .key("test.semantics")
- .defaultValue("exactly-once")
- .withDescription("This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'");
-
- private static final ConfigOption<Long> ENVIRONMENT_CHECKPOINT_INTERVAL = ConfigOptions
- .key("environment.checkpoint_interval")
- .defaultValue(1000L);
-
- private static final ConfigOption<Integer> ENVIRONMENT_PARALLELISM = ConfigOptions
- .key("environment.parallelism")
- .defaultValue(1);
-
- private static final ConfigOption<Integer> ENVIRONMENT_MAX_PARALLELISM = ConfigOptions
- .key("environment.max_parallelism")
- .defaultValue(128);
-
- private static final ConfigOption<Integer> ENVIRONMENT_RESTART_DELAY = ConfigOptions
- .key("environment.restart_strategy.delay")
- .defaultValue(0);
-
- private static final ConfigOption<String> STATE_BACKEND = ConfigOptions
- .key("state_backend")
- .defaultValue("file")
- .withDescription("Supported values are 'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.");
-
- private static final ConfigOption<String> STATE_BACKEND_CHECKPOINT_DIR = ConfigOptions
- .key("state_backend.checkpoint_directory")
- .noDefaultValue()
- .withDescription("The checkpoint directory.");
-
- private static final ConfigOption<Boolean> STATE_BACKEND_ROCKS_INCREMENTAL = ConfigOptions
- .key("state_backend.rocks.incremental")
- .defaultValue(false)
- .withDescription("Activate or deactivate incremental snapshots if RocksDBStateBackend is selected.");
-
- private static final ConfigOption<Boolean> STATE_BACKEND_FILE_ASYNC = ConfigOptions
- .key("state_backend.file.async")
- .defaultValue(true)
- .withDescription("Activate or deactivate asynchronous snapshots if FileStateBackend is selected.");
-
- private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_KEYSPACE = ConfigOptions
- .key("sequence_generator_source.keyspace")
- .defaultValue(200);
-
- private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE = ConfigOptions
- .key("sequence_generator_source.payload_size")
- .defaultValue(20);
-
- private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_SLEEP_TIME = ConfigOptions
- .key("sequence_generator_source.sleep_time")
- .defaultValue(0L);
-
- private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS = ConfigOptions
- .key("sequence_generator_source.sleep_after_elements")
- .defaultValue(0L);
-
- private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS = ConfigOptions
- .key("sequence_generator_source.event_time.max_out_of_order")
- .defaultValue(500L);
-
- private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS = ConfigOptions
- .key("sequence_generator_source.event_time.clock_progress")
- .defaultValue(100L);
-
- // -----------------------------------------------------------------------------------------------------------------
+ private static final String STATE_OPER_NAME = "ArtificalKeyedStateMapper";
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
@@ -160,15 +62,20 @@ public class DataStreamAllroundTestProgram {
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
.keyBy(Event::getKey)
.map(createArtificialKeyedStateMapper(
- // map function simply forwards the inputs
- (MapFunction<Event, Event>) in -> in,
- // state is updated per event as a wrapped ComplexPayload state object
- (Event first, ComplexPayload second) -> new ComplexPayload(first), //
- Arrays.asList(
- new KryoSerializer<>(ComplexPayload.class, env.getConfig()))
+ // map function simply forwards the inputs
+ (MapFunction<Event, Event>) in -> in,
+ // state is verified and updated per event as a wrapped ComplexPayload state object
+ (Event first, ComplexPayload second) -> {
+ if (second != null && !second.getStrPayload().equals(STATE_OPER_NAME)) {
+ System.out.println("State is set or restored incorrectly");
+ }
+ return new ComplexPayload(first, STATE_OPER_NAME);
+ },
+ Collections.singletonList(
+ new KryoSerializer<>(ComplexPayload.class, env.getConfig()))
)
)
- .name("ArtificalKeyedStateMapper")
+ .name(STATE_OPER_NAME)
.returns(Event.class)
.keyBy(Event::getKey)
.flatMap(createSemanticsCheckMapper(pt))
@@ -177,128 +84,4 @@ public class DataStreamAllroundTestProgram {
env.execute("General purpose test job");
}
-
- public static void setupEnvironment(StreamExecutionEnvironment env, ParameterTool pt) throws Exception {
-
- // set checkpointing semantics
- String semantics = pt.get(TEST_SEMANTICS.key(), TEST_SEMANTICS.defaultValue());
- long checkpointInterval = pt.getLong(ENVIRONMENT_CHECKPOINT_INTERVAL.key(), ENVIRONMENT_CHECKPOINT_INTERVAL.defaultValue());
- CheckpointingMode checkpointingMode = semantics.equalsIgnoreCase("exactly-once")
- ? CheckpointingMode.EXACTLY_ONCE
- : CheckpointingMode.AT_LEAST_ONCE;
-
- env.enableCheckpointing(checkpointInterval, checkpointingMode);
-
- // use event time
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
- // parallelism
- env.setParallelism(pt.getInt(ENVIRONMENT_PARALLELISM.key(), ENVIRONMENT_PARALLELISM.defaultValue()));
- env.setMaxParallelism(pt.getInt(ENVIRONMENT_MAX_PARALLELISM.key(), ENVIRONMENT_MAX_PARALLELISM.defaultValue()));
-
- // restart strategy
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
- Integer.MAX_VALUE,
- pt.getInt(ENVIRONMENT_RESTART_DELAY.key(), ENVIRONMENT_RESTART_DELAY.defaultValue())));
-
- // state backend
- final String stateBackend = pt.get(
- STATE_BACKEND.key(),
- STATE_BACKEND.defaultValue());
-
- final String checkpointDir = pt.getRequired(STATE_BACKEND_CHECKPOINT_DIR.key());
-
- if ("file".equalsIgnoreCase(stateBackend)) {
- boolean asyncCheckpoints = pt.getBoolean(
- STATE_BACKEND_FILE_ASYNC.key(),
- STATE_BACKEND_FILE_ASYNC.defaultValue());
-
- env.setStateBackend(new FsStateBackend(checkpointDir, asyncCheckpoints));
- } else if ("rocks".equalsIgnoreCase(stateBackend)) {
- boolean incrementalCheckpoints = pt.getBoolean(
- STATE_BACKEND_ROCKS_INCREMENTAL.key(),
- STATE_BACKEND_ROCKS_INCREMENTAL.defaultValue());
-
- env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints));
- } else {
- throw new IllegalArgumentException("Unknown backend requested: " + stateBackend);
- }
-
- // make parameters available in the web interface
- env.getConfig().setGlobalJobParameters(pt);
- }
-
- private static SourceFunction<Event> createEventSource(ParameterTool pt) {
- return new SequenceGeneratorSource(
- pt.getInt(
- SEQUENCE_GENERATOR_SRC_KEYSPACE.key(),
- SEQUENCE_GENERATOR_SRC_KEYSPACE.defaultValue()),
- pt.getInt(
- SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE.key(),
- SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE.defaultValue()),
- pt.getLong(
- SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
- SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()),
- pt.getLong(
- SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.key(),
- SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS.defaultValue()),
- pt.getLong(
- SEQUENCE_GENERATOR_SRC_SLEEP_TIME.key(),
- SEQUENCE_GENERATOR_SRC_SLEEP_TIME.defaultValue()),
- pt.getLong(
- SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.key(),
- SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue()));
- }
-
- private static BoundedOutOfOrdernessTimestampExtractor<Event> createTimestampExtractor(ParameterTool pt) {
- return new BoundedOutOfOrdernessTimestampExtractor<Event>(
- Time.milliseconds(
- pt.getLong(
- SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.key(),
- SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS.defaultValue()))) {
-
- @Override
- public long extractTimestamp(Event element) {
- return element.getEventTime();
- }
- };
- }
-
- private static FlatMapFunction<Event, String> createSemanticsCheckMapper(ParameterTool pt) {
-
- String semantics = pt.get(TEST_SEMANTICS.key(), TEST_SEMANTICS.defaultValue());
-
- SemanticsCheckMapper.ValidatorFunction validatorFunction;
-
- if (semantics.equalsIgnoreCase("exactly-once")) {
- validatorFunction = SemanticsCheckMapper.ValidatorFunction.exactlyOnce();
- } else if (semantics.equalsIgnoreCase("at-least-once")) {
- validatorFunction = SemanticsCheckMapper.ValidatorFunction.atLeastOnce();
- } else {
- throw new IllegalArgumentException("Unknown semantics requested: " + semantics);
- }
-
- return new SemanticsCheckMapper(validatorFunction);
- }
-
- private static <IN, OUT, STATE> ArtificialKeyedStateMapper<IN, OUT> createArtificialKeyedStateMapper(
- MapFunction<IN, OUT> mapFunction,
- JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
- List<TypeSerializer<STATE>> stateSerializers) {
-
- List<ArtificialKeyedStateBuilder<IN>> artificialStateBuilders = new ArrayList<>(stateSerializers.size());
- for (TypeSerializer<STATE> typeSerializer : stateSerializers) {
-
- String stateName = "valueState-" + typeSerializer.getClass().getSimpleName() + "-" + UUID.randomUUID();
-
- ArtificialValueStateBuilder<IN, STATE> stateBuilder = new ArtificialValueStateBuilder<>(
- stateName,
- inputAndOldStateToNewState,
- typeSerializer
- );
-
- artificialStateBuilders.add(stateBuilder);
- }
- return new ArtificialKeyedStateMapper<>(mapFunction, artificialStateBuilders);
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateBuilder.java
deleted file mode 100644
index 3db18db..0000000
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateBuilder.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.artificialstate;
-
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-
-import java.io.Serializable;
-
-/**
- * The keyed state builder wraps the logic of registering state in user
- * functions, as well as how state is updated per input element..
- */
-public abstract class ArtificialKeyedStateBuilder<T> implements Serializable {
-
- private static final long serialVersionUID = -5887676929924485788L;
-
- protected final String stateName;
-
- public ArtificialKeyedStateBuilder(String stateName) {
- this.stateName = stateName;
- }
-
- public String getStateName() {
- return stateName;
- }
-
- /**
- * Manipulate the state for an input element.
- *
- * @param element the current input element.
- */
- public abstract void artificialStateForElement(T element) throws Exception;
-
- /**
- * Registers the state.
- *
- * @param initializationContext the state initialization context, provided by the user function.
- */
- public abstract void initialize(FunctionInitializationContext initializationContext);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateMapper.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateMapper.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateMapper.java
index 9697877..c5857a2 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateMapper.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ArtificialKeyedStateMapper.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.tests.artificialstate.builder.ArtificialStateBuilder;
import java.util.Collections;
import java.util.HashSet;
@@ -31,29 +32,29 @@ import java.util.Set;
/**
* A generic, stateful {@link MapFunction} that allows specifying what states to maintain
- * based on a provided list of {@link ArtificialKeyedStateBuilder}s.
+ * based on a provided list of {@link ArtificialStateBuilder}s.
*/
public class ArtificialKeyedStateMapper<IN, OUT> extends RichMapFunction<IN, OUT> implements CheckpointedFunction {
private static final long serialVersionUID = 513012258173556604L;
private final MapFunction<IN, OUT> mapFunction;
- private final List<ArtificialKeyedStateBuilder<IN>> artificialKeyedStateBuilder;
+ private final List<ArtificialStateBuilder<IN>> artificialStateBuilders;
public ArtificialKeyedStateMapper(
MapFunction<IN, OUT> mapFunction,
- ArtificialKeyedStateBuilder<IN> artificialKeyedStateBuilder) {
- this(mapFunction, Collections.singletonList(artificialKeyedStateBuilder));
+ ArtificialStateBuilder<IN> artificialStateBuilders) {
+ this(mapFunction, Collections.singletonList(artificialStateBuilders));
}
public ArtificialKeyedStateMapper(
MapFunction<IN, OUT> mapFunction,
- List<ArtificialKeyedStateBuilder<IN>> artificialKeyedStateBuilder) {
+ List<ArtificialStateBuilder<IN>> artificialStateBuilders) {
this.mapFunction = mapFunction;
- this.artificialKeyedStateBuilder = artificialKeyedStateBuilder;
- Set<String> stateNames = new HashSet<>(this.artificialKeyedStateBuilder.size());
- for (ArtificialKeyedStateBuilder<IN> stateBuilder : this.artificialKeyedStateBuilder) {
+ this.artificialStateBuilders = artificialStateBuilders;
+ Set<String> stateNames = new HashSet<>(this.artificialStateBuilders.size());
+ for (ArtificialStateBuilder<IN> stateBuilder : this.artificialStateBuilders) {
if (!stateNames.add(stateBuilder.getStateName())) {
throw new IllegalArgumentException("Duplicated state name: " + stateBuilder.getStateName());
}
@@ -62,7 +63,7 @@ public class ArtificialKeyedStateMapper<IN, OUT> extends RichMapFunction<IN, OUT
@Override
public OUT map(IN value) throws Exception {
- for (ArtificialKeyedStateBuilder<IN> stateBuilder : artificialKeyedStateBuilder) {
+ for (ArtificialStateBuilder<IN> stateBuilder : artificialStateBuilders) {
stateBuilder.artificialStateForElement(value);
}
@@ -75,7 +76,7 @@ public class ArtificialKeyedStateMapper<IN, OUT> extends RichMapFunction<IN, OUT
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
- for (ArtificialKeyedStateBuilder<IN> stateBuilder : artificialKeyedStateBuilder) {
+ for (ArtificialStateBuilder<IN> stateBuilder : artificialStateBuilders) {
stateBuilder.initialize(context);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ComplexPayload.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ComplexPayload.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ComplexPayload.java
new file mode 100644
index 0000000..98ec456
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/ComplexPayload.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate;
+
+import org.apache.flink.streaming.tests.DataStreamAllroundTestProgram;
+import org.apache.flink.streaming.tests.Event;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A state type used in the {@link DataStreamAllroundTestProgram}.
+ * Wraps an {@link Event} as state.
+ */
+public class ComplexPayload implements Serializable {
+ private static final long serialVersionUID = 233624606545704853L;
+
+ private final long eventTime;
+ private final List<String> stringList;
+ private final String strPayload;
+ private final InnerPayLoad innerPayLoad;
+
+ public ComplexPayload(Event event, String strPayload) {
+ this.eventTime = event.getEventTime();
+ this.innerPayLoad = new InnerPayLoad(event.getSequenceNumber());
+ this.strPayload = strPayload;
+ this.stringList = Arrays.asList(String.valueOf(event.getKey()), event.getPayload());
+ }
+
+ public ComplexPayload(Event event) {
+ this(event, event.getPayload());
+ }
+
+ /**
+ * Nested class in state type. Wraps an {@link Event}'s sequence number.
+ */
+ public static class InnerPayLoad implements Serializable {
+
+ private static final long serialVersionUID = 3986298180012117883L;
+
+ private final long sequenceNumber;
+
+ public InnerPayLoad(long sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public long getSequenceNumber() {
+ return sequenceNumber;
+ }
+ }
+
+ public long getEventTime() {
+ return eventTime;
+ }
+
+ public List<String> getStringList() {
+ return stringList;
+ }
+
+ public String getStrPayload() {
+ return strPayload;
+ }
+
+ public InnerPayLoad getInnerPayLoad() {
+ return innerPayLoad;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
new file mode 100644
index 0000000..a2c6387
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate.builder;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import java.util.List;
+
+/**
+ * An {@link ArtificialListStateBuilder} for user operator and keyed state.
+ */
+public class ArtificialListStateBuilder<IN, STATE> extends ArtificialStateBuilder<IN> {
+
+ private static final long serialVersionUID = -1205814329756790916L;
+
+ private transient ListState<STATE> listOperatorState;
+ private transient ListState<STATE> listKeyedState;
+ private final TypeSerializer<STATE> typeSerializer;
+ private final JoinFunction<IN, Iterable<STATE>, List<STATE>> keyedStateGenerator;
+ private final JoinFunction<IN, Iterable<STATE>, List<STATE>> operatorStateGenerator;
+
+ public ArtificialListStateBuilder(
+ String stateName,
+ JoinFunction<IN, Iterable<STATE>, List<STATE>> keyedStateGenerator,
+ JoinFunction<IN, Iterable<STATE>, List<STATE>> operatorStateGenerator,
+ TypeSerializer<STATE> typeSerializer) {
+ super(stateName);
+ this.typeSerializer = typeSerializer;
+ this.keyedStateGenerator = keyedStateGenerator;
+ this.operatorStateGenerator = operatorStateGenerator;
+ }
+
+ @Override
+ public void artificialStateForElement(IN event) throws Exception {
+ listOperatorState.update(keyedStateGenerator.join(event, listOperatorState.get()));
+ listKeyedState.update(operatorStateGenerator.join(event, listKeyedState.get()));
+ }
+
+ @Override
+ public void initialize(FunctionInitializationContext initializationContext) throws Exception {
+ ListStateDescriptor<STATE> listStateDescriptor = new ListStateDescriptor<>(stateName, typeSerializer);
+ listOperatorState = initializationContext.getOperatorStateStore().getListState(listStateDescriptor);
+ listKeyedState = initializationContext.getKeyedStateStore().getListState(listStateDescriptor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialMapStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialMapStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialMapStateBuilder.java
new file mode 100644
index 0000000..c315e65
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialMapStateBuilder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate.builder;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * An {@link ArtificialStateBuilder} for user {@link MapState}s.
+ */
+public class ArtificialMapStateBuilder<IN, K, V> extends ArtificialStateBuilder<IN> {
+
+ private static final long serialVersionUID = -143079058769306954L;
+
+ private transient MapState<K, V> mapState;
+ private final TypeSerializer<K> keySerializer;
+ private final TypeSerializer<V> valueSerializer;
+ private final JoinFunction<IN, Iterator<Map.Entry<K, V>>, Iterator<Map.Entry<K, V>>> stateValueGenerator;
+
+ public ArtificialMapStateBuilder(
+ String stateName,
+ JoinFunction<IN, Iterator<Map.Entry<K, V>>, Iterator<Map.Entry<K, V>>> stateValueGenerator,
+ TypeSerializer<K> keySerializer,
+ TypeSerializer<V> valueSerializer) {
+
+ super(stateName);
+ this.keySerializer = keySerializer;
+ this.valueSerializer = valueSerializer;
+ this.stateValueGenerator = stateValueGenerator;
+ }
+
+ @Override
+ public void artificialStateForElement(IN event) throws Exception {
+ Iterator<Map.Entry<K, V>> update = stateValueGenerator.join(event, mapState.iterator());
+ while (update.hasNext()) {
+ Map.Entry<K, V> updateEntry = update.next();
+ mapState.put(updateEntry.getKey(), updateEntry.getValue());
+ }
+ }
+
+ @Override
+ public void initialize(FunctionInitializationContext initializationContext) {
+ MapStateDescriptor<K, V> mapStateDescriptor =
+ new MapStateDescriptor<>(stateName, keySerializer, valueSerializer);
+ mapState = initializationContext.getKeyedStateStore().getMapState(mapStateDescriptor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialStateBuilder.java
new file mode 100644
index 0000000..aed94ba
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialStateBuilder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate.builder;
+
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import java.io.Serializable;
+
+/**
+ * The state builder wraps the logic of registering state in user
+ * functions, as well as how state is updated per input element..
+ */
+public abstract class ArtificialStateBuilder<T> implements Serializable {
+
+ private static final long serialVersionUID = -5887676929924485788L;
+
+ protected final String stateName;
+
+ public ArtificialStateBuilder(String stateName) {
+ this.stateName = stateName;
+ }
+
+ public String getStateName() {
+ return stateName;
+ }
+
+ /**
+ * Manipulate the state for an input element.
+ *
+ * @param element the current input element.
+ */
+ public abstract void artificialStateForElement(T element) throws Exception;
+
+ /**
+ * Registers the state.
+ *
+ * @param initializationContext the state initialization context, provided by the user function.
+ */
+ public abstract void initialize(FunctionInitializationContext initializationContext) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
new file mode 100644
index 0000000..6d74e09
--- /dev/null
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.artificialstate.builder;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+/**
+ * An {@link ArtificialStateBuilder} for user {@link ValueState}s.
+ */
+public class ArtificialValueStateBuilder<IN, STATE> extends ArtificialStateBuilder<IN> {
+
+ private static final long serialVersionUID = -1205814329756790916L;
+
+ private transient ValueState<STATE> valueState;
+ private final TypeSerializer<STATE> typeSerializer;
+ private final JoinFunction<IN, STATE, STATE> stateValueGenerator;
+
+ public ArtificialValueStateBuilder(
+ String stateName,
+ JoinFunction<IN, STATE, STATE> stateValueGenerator,
+ TypeSerializer<STATE> typeSerializer) {
+ super(stateName);
+ this.typeSerializer = typeSerializer;
+ this.stateValueGenerator = stateValueGenerator;
+ }
+
+ @Override
+ public void artificialStateForElement(IN event) throws Exception {
+ valueState.update(stateValueGenerator.join(event, valueState.value()));
+ }
+
+ @Override
+ public void initialize(FunctionInitializationContext initializationContext) {
+ ValueStateDescriptor<STATE> valueStateDescriptor =
+ new ValueStateDescriptor<>(stateName, typeSerializer);
+ valueState = initializationContext.getKeyedStateStore().getState(valueStateDescriptor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialMapStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialMapStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialMapStateBuilder.java
deleted file mode 100644
index a29b802..0000000
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialMapStateBuilder.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.artificialstate.eventpayload;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateBuilder;
-
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * An {@link ArtificialKeyedStateBuilder} for user {@link MapState}s.
- */
-public class ArtificialMapStateBuilder<IN, K, V> extends ArtificialKeyedStateBuilder<IN> {
-
- private static final long serialVersionUID = -143079058769306954L;
-
- private transient MapState<K, V> mapState;
- private final TypeSerializer<K> keySerializer;
- private final TypeSerializer<V> valueSerializer;
- private final JoinFunction<IN, Iterator<Map.Entry<K, V>>, Iterator<Map.Entry<K, V>>> stateValueGenerator;
-
- public ArtificialMapStateBuilder(
- String stateName,
- JoinFunction<IN, Iterator<Map.Entry<K, V>>, Iterator<Map.Entry<K, V>>> stateValueGenerator,
- TypeSerializer<K> keySerializer,
- TypeSerializer<V> valueSerializer) {
-
- super(stateName);
- this.keySerializer = keySerializer;
- this.valueSerializer = valueSerializer;
- this.stateValueGenerator = stateValueGenerator;
- }
-
- @Override
- public void artificialStateForElement(IN event) throws Exception {
- Iterator<Map.Entry<K, V>> update = stateValueGenerator.join(event, mapState.iterator());
- while (update.hasNext()) {
- Map.Entry<K, V> updateEntry = update.next();
- mapState.put(updateEntry.getKey(), updateEntry.getValue());
- }
- }
-
- @Override
- public void initialize(FunctionInitializationContext initializationContext) {
- MapStateDescriptor<K, V> mapStateDescriptor =
- new MapStateDescriptor<>(stateName, keySerializer, valueSerializer);
- mapState = initializationContext.getKeyedStateStore().getMapState(mapStateDescriptor);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java
deleted file mode 100644
index 1d2d5f8..0000000
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.artificialstate.eventpayload;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateBuilder;
-
-/**
- * An {@link ArtificialKeyedStateBuilder} for user {@link ValueState}s.
- */
-public class ArtificialValueStateBuilder<IN, STATE> extends ArtificialKeyedStateBuilder<IN> {
-
- private static final long serialVersionUID = -1205814329756790916L;
-
- private transient ValueState<STATE> valueState;
- private final TypeSerializer<STATE> typeSerializer;
- private final JoinFunction<IN, STATE, STATE> stateValueGenerator;
-
- public ArtificialValueStateBuilder(
- String stateName,
- JoinFunction<IN, STATE, STATE> stateValueGenerator,
- TypeSerializer<STATE> typeSerializer) {
-
- super(stateName);
- this.typeSerializer = typeSerializer;
- this.stateValueGenerator = stateValueGenerator;
- }
-
- @Override
- public void artificialStateForElement(IN event) throws Exception {
- valueState.update(stateValueGenerator.join(event, valueState.value()));
- }
-
- @Override
- public void initialize(FunctionInitializationContext initializationContext) {
- ValueStateDescriptor<STATE> valueStateDescriptor =
- new ValueStateDescriptor<>(stateName, typeSerializer);
- valueState = initializationContext.getKeyedStateStore().getState(valueStateDescriptor);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ComplexPayload.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ComplexPayload.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ComplexPayload.java
deleted file mode 100644
index 8cba366..0000000
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ComplexPayload.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.artificialstate.eventpayload;
-
-import org.apache.flink.streaming.tests.DataStreamAllroundTestProgram;
-import org.apache.flink.streaming.tests.Event;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * A state type used in the {@link DataStreamAllroundTestProgram}.
- * Wraps an {@link Event} as state.
- */
-public class ComplexPayload implements Serializable {
-
- private static final long serialVersionUID = 233624606545704853L;
-
- public ComplexPayload(Event event) {
- this.eventTime = event.getEventTime();
- this.innerPayLoad = new InnerPayLoad(event.getSequenceNumber());
- this.stringList = Arrays.asList(String.valueOf(event.getKey()), event.getPayload());
- }
-
- private final long eventTime;
- private final List<String> stringList;
- private final InnerPayLoad innerPayLoad;
-
- /**
- * Nested class in state type. Wraps an {@link Event}'s sequence number.
- */
- public static class InnerPayLoad implements Serializable {
-
- private static final long serialVersionUID = 3986298180012117883L;
-
- private final long sequenceNumber;
-
- public InnerPayLoad(long sequenceNumber) {
- this.sequenceNumber = sequenceNumber;
- }
-
- public long getSequenceNumber() {
- return sequenceNumber;
- }
- }
-
- public long getEventTime() {
- return eventTime;
- }
-
- public List<String> getStringList() {
- return stringList;
- }
-
- public InnerPayLoad getInnerPayLoad() {
- return innerPayLoad;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/pom.xml b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/pom.xml
new file mode 100644
index 0000000..d1bb86c
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/pom.xml
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.6-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-stream-stateful-job-upgrade-test</artifactId>
+ <name>flink-stream-stateful-job-upgrade-test</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-datastream-allround-test</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>StatefulStreamJobUpgradeTestProgram</finalName>
+ <artifactSet>
+ <excludes>
+ <exclude>com.google.code.findbugs:jsr305</exclude>
+ <exclude>org.slf4j:*</exclude>
+ <exclude>log4j:*</exclude>
+ </excludes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.flink.streaming.tests.StatefulStreamJobUpgradeTestProgram</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
new file mode 100644
index 0000000..0b3b5ed
--- /dev/null
+++ b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.tests.artificialstate.ComplexPayload;
+
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test upgrade of generic stateful job for Flink's DataStream API operators and primitives.
+ *
+ * <p>The job is constructed of generic components from {@link DataStreamAllroundTestJobFactory}.
+ * The goal is to test successful state restoration after taking savepoint and recovery with new job version.
+ * It can be configured with '--test.job.variant' to run different variants of it:
+ * <ul>
+ * <li><b>original:</b> includes 2 custom stateful map operators</li>
+ * <li><b>upgraded:</b> changes order of 2 custom stateful map operators and adds one more</li>
+ * </ul>
+ *
+ * <p>The cli job configuration options are described in {@link DataStreamAllroundTestJobFactory}.
+ *
+ * <p>Job specific configuration options:
+ * <ul>
+ * <li>test.job.variant (String, default - 'original'): This configures the job variant to test. Can be 'original' or 'upgraded'.</li>
+ * </ul>
+ *
+ */
+public class StatefulStreamJobUpgradeTestProgram {
+ private static final String TEST_JOB_VARIANT_ORIGINAL = "original";
+ private static final String TEST_JOB_VARIANT_UPGRADED = "upgraded";
+
+ private static final ConfigOption<String> TEST_JOB_VARIANT = ConfigOptions
+ .key("test.job.variant")
+ .defaultValue(TEST_JOB_VARIANT_ORIGINAL)
+ .withDescription(String.format("This configures the job variant to test. Can be '%s' or '%s'",
+ TEST_JOB_VARIANT_ORIGINAL, TEST_JOB_VARIANT_UPGRADED));
+
+ public static void main(String[] args) throws Exception {
+ final ParameterTool pt = ParameterTool.fromArgs(args);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ setupEnvironment(env, pt);
+
+ KeyedStream<Event, Integer> source = env.addSource(createEventSource(pt))
+ .name("EventSource")
+ .uid("EventSource")
+ .assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+ .keyBy(Event::getKey);
+
+ List<TypeSerializer<ComplexPayload>> stateSer =
+ Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
+
+ KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
+ applyOriginalStatefulOperations(source, stateSer) :
+ applyUpgradedStatefulOperations(source, stateSer);
+
+ afterStatefulOperations
+ .flatMap(createSemanticsCheckMapper(pt))
+ .name("SemanticsCheckMapper")
+ .addSink(new PrintSinkFunction<>());
+
+ env.execute("General purpose test job");
+ }
+
+ private static boolean isOriginalJobVariant(final ParameterTool pt) {
+ switch (pt.get(TEST_JOB_VARIANT.key())) {
+ case TEST_JOB_VARIANT_ORIGINAL:
+ return true;
+ case TEST_JOB_VARIANT_UPGRADED:
+ return false;
+ default:
+ throw new IllegalArgumentException(String.format("'--test.job.variant' can be either '%s' or '%s'",
+ TEST_JOB_VARIANT_ORIGINAL, TEST_JOB_VARIANT_UPGRADED));
+ }
+ }
+
+ private static KeyedStream<Event, Integer> applyOriginalStatefulOperations(
+ KeyedStream<Event, Integer> source,
+ List<TypeSerializer<ComplexPayload>> stateSer) {
+ source = applyTestStatefulOperator("stateMap1", simpleStateUpdate("stateMap1"), source, stateSer);
+ return applyTestStatefulOperator("stateMap2", lastStateUpdate("stateMap2"), source, stateSer);
+ }
+
+ private static KeyedStream<Event, Integer> applyUpgradedStatefulOperations(
+ KeyedStream<Event, Integer> source,
+ List<TypeSerializer<ComplexPayload>> stateSer) {
+ source = applyTestStatefulOperator("stateMap2", simpleStateUpdate("stateMap2"), source, stateSer);
+ source = applyTestStatefulOperator("stateMap1", lastStateUpdate("stateMap1"), source, stateSer);
+ return applyTestStatefulOperator("stateMap3", simpleStateUpdate("stateMap3"), source, stateSer);
+ }
+
+ private static KeyedStream<Event, Integer> applyTestStatefulOperator(
+ String name,
+ JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
+ KeyedStream<Event, Integer> source,
+ List<TypeSerializer<ComplexPayload>> stateSer) {
+ return source
+ .map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer))
+ .name(name)
+ .uid(name)
+ .returns(Event.class)
+ .keyBy(Event::getKey);
+ }
+
+ private static JoinFunction<Event, ComplexPayload, ComplexPayload> simpleStateUpdate(String strPayload) {
+ return (Event first, ComplexPayload second) -> {
+ verifyState(strPayload, second);
+ return new ComplexPayload(first, strPayload);
+ };
+ }
+
+ private static JoinFunction<Event, ComplexPayload, ComplexPayload> lastStateUpdate(String strPayload) {
+ return (Event first, ComplexPayload second) -> {
+ verifyState(strPayload, second);
+ boolean isLastEvent = second != null && first.getEventTime() <= second.getEventTime();
+ return isLastEvent ? second : new ComplexPayload(first, strPayload);
+ };
+ }
+
+ private static void verifyState(String strPayload, ComplexPayload state) {
+ if (state != null && !state.getStrPayload().equals(strPayload)) {
+ System.out.println("State is set or restored incorrectly");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 898e4af..5cb75a8 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -42,6 +42,7 @@ under the License.
<module>flink-bucketing-sink-test</module>
<module>flink-distributed-cache-via-blob-test</module>
<module>flink-high-parallelism-iterations-test</module>
+ <module>flink-stream-stateful-job-upgrade-test</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 29c53c2..f0fe938 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -112,5 +112,13 @@ if [ $EXIT_CODE == 0 ]; then
EXIT_CODE=$?
fi
+if [ $EXIT_CODE == 0 ]; then
+ printf "\n==============================================================================\n"
+ printf "Running stateful stream job upgrade nightly end-to-end test\n"
+ printf "==============================================================================\n"
+ $END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4
+ EXIT_CODE=$?
+fi
+
# Exit code for Travis build success/failure
exit $EXIT_CODE
http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index d8e0de7..c5ea4b8 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -39,6 +39,12 @@ cd $TEST_ROOT
export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
echo "TEST_DATA_DIR: $TEST_DATA_DIR"
+function backup_config() {
+ # back up the masters and flink-conf.yaml
+ cp $FLINK_DIR/conf/masters $FLINK_DIR/conf/masters.bak
+ cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
+}
+
function revert_default_config() {
# revert our modifications to the masters file
@@ -52,11 +58,22 @@ function revert_default_config() {
fi
}
+function set_conf() {
+ CONF_NAME=$1
+ VAL=$2
+ echo "$CONF_NAME: $VAL" >> $FLINK_DIR/conf/flink-conf.yaml
+}
+
+function change_conf() {
+ CONF_NAME=$1
+ OLD_VAL=$2
+ NEW_VAL=$3
+ sed -i -e "s/${CONF_NAME}: ${OLD_VAL}/${CONF_NAME}: ${NEW_VAL}/" ${FLINK_DIR}/conf/flink-conf.yaml
+}
+
function create_ha_config() {
- # back up the masters and flink-conf.yaml
- cp $FLINK_DIR/conf/masters $FLINK_DIR/conf/masters.bak
- cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
+ backup_config
# clean up the dir that will be used for zookeeper storage
# (see high-availability.zookeeper.storageDir below)
@@ -309,6 +326,60 @@ function kill_random_taskmanager {
echo "TaskManager $KILL_TM killed."
}
+function setup_flink_slf4j_metric_reporter() {
+ INTERVAL="${1:-1 SECONDS}"
+ cp $FLINK_DIR/opt/flink-metrics-slf4j-*.jar $FLINK_DIR/lib/
+ set_conf "metrics.reporter.slf4j.class" "org.apache.flink.metrics.slf4j.Slf4jReporter"
+ set_conf "metrics.reporter.slf4j.interval" "${INTERVAL}"
+}
+
+function rollback_flink_slf4j_metric_reporter() {
+ rm $FLINK_DIR/lib/flink-metrics-slf4j-*.jar
+}
+
+function get_metric_processed_records {
+ OPERATOR=$1
+ N=$(grep ".General purpose test job.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1)
+ if [ -z $N ]; then
+ N=0
+ fi
+ echo $N
+}
+
+function get_num_metric_samples {
+ OPERATOR=$1
+ N=$(grep ".General purpose test job.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | wc -l)
+ if [ -z $N ]; then
+ N=0
+ fi
+ echo $N
+}
+
+function wait_oper_metric_num_in_records {
+ OPERATOR=$1
+ MAX_NUM_METRICS="${2:-200}"
+ NUM_METRICS=$(get_num_metric_samples ${OPERATOR})
+ OLD_NUM_METRICS=${3:-${NUM_METRICS}}
+ # monitor the numRecordsIn metric of the state machine operator in the second execution
+ # we let the test finish once the second restore execution has processed 200 records
+ while : ; do
+ NUM_METRICS=$(get_num_metric_samples ${OPERATOR})
+ NUM_RECORDS=$(get_metric_processed_records ${OPERATOR})
+
+ # only account for metrics that appeared in the second execution
+ if (( $OLD_NUM_METRICS >= $NUM_METRICS )) ; then
+ NUM_RECORDS=0
+ fi
+
+ if (( $NUM_RECORDS < $MAX_NUM_METRICS )); then
+ echo "Waiting for job to process up to 200 records, current progress: $NUM_RECORDS records ..."
+ sleep 1
+ else
+ break
+ fi
+ done
+}
+
# make sure to clean up even in case of failures
function cleanup {
stop_cluster
http://git-wip-us.apache.org/repos/asf/flink/blob/5ac4d296/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
index d3b3118..ae37f4a 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
@@ -33,14 +33,9 @@ else
NUM_SLOTS=$NEW_DOP
fi
-# modify configuration to have enough slots
-cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
-sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $NUM_SLOTS/" $FLINK_DIR/conf/flink-conf.yaml
-
-# modify configuration to use SLF4J reporter; we will be using this to monitor the state machine progress
-cp $FLINK_DIR/opt/flink-metrics-slf4j-*.jar $FLINK_DIR/lib/
-echo "metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter" >> $FLINK_DIR/conf/flink-conf.yaml
-echo "metrics.reporter.slf4j.interval: 1 SECONDS" >> $FLINK_DIR/conf/flink-conf.yaml
+backup_config
+change_conf "taskmanager.numberOfTaskSlots" "1" "${NUM_SLOTS}"
+setup_flink_slf4j_metric_reporter
start_cluster
@@ -52,8 +47,7 @@ function test_cleanup {
trap "" EXIT
# revert our modifications to the Flink distribution
- mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
- rm $FLINK_DIR/lib/flink-metrics-slf4j-*.jar
+ rm ${FLINK_DIR}/lib/flink-metrics-slf4j-*.jar
# make sure to run regular cleanup as well
cleanup
@@ -75,30 +69,7 @@ DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR
wait_job_running $DATASTREAM_JOB
-function get_metric_processed_records {
- grep ".General purpose test job.ArtificalKeyedStateMapper.0.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1
-}
-
-function get_num_metric_samples {
- grep ".General purpose test job.ArtificalKeyedStateMapper.0.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | wc -l
-}
-
-# monitor the numRecordsIn metric of the state machine operator;
-# only proceed to savepoint when the operator has processed 200 records
-while : ; do
- NUM_RECORDS=$(get_metric_processed_records)
-
- if [ -z $NUM_RECORDS ]; then
- NUM_RECORDS=0
- fi
-
- if (( $NUM_RECORDS < 200 )); then
- echo "Waiting for job to process up to 200 records, current progress: $NUM_RECORDS records ..."
- sleep 1
- else
- break
- fi
-done
+wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
# take a savepoint of the state machine job
SAVEPOINT_PATH=$(take_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
@@ -121,24 +92,7 @@ DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TES
wait_job_running $DATASTREAM_JOB
-# monitor the numRecordsIn metric of the state machine operator in the second execution
-# we let the test finish once the second restore execution has processed 200 records
-while : ; do
- NUM_METRICS=$(get_num_metric_samples)
- NUM_RECORDS=$(get_metric_processed_records)
-
- # only account for metrics that appeared in the second execution
- if (( $OLD_NUM_METRICS >= $NUM_METRICS )) ; then
- NUM_RECORDS=0
- fi
-
- if (( $NUM_RECORDS < 200 )); then
- echo "Waiting for job to process up to 200 records, current progress: $NUM_RECORDS records ..."
- sleep 1
- else
- break
- fi
-done
+wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
# if state is errorneous and the state machine job produces alerting state transitions,
# output would be non-empty and the test will not pass