You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2018/04/27 07:18:35 UTC

[GitHub] flink pull request #5925: [FLINK-8992] [e2e-tests] General purpose DataStrea...

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/5925

    [FLINK-8992] [e2e-tests] General purpose DataStream e2e test

    ## What is the purpose of the change
    
    This PR adds a general purpose Flink DataStream end-to-end test job, whose purpose is to cover main DataStream APIs and primitives and can be commonly used across several different end-to-end tests.
    
    The initial version currently covers the following characteristics -
    - A generic Kryo input type.
    - A state type for which we register a `KryoSerializer`.
    - Operators with `ValueState`.
    - Allows verifying exactly-once / at-least-once
    - Allows configuring to use different state backends
    
    For a full list of what we plan to add over time, please see description in the umbrella JIRA FLINK-8971.
    
    ## Brief change log
    
    - a936aaa initial version of the general purpose job by @StefanRRichter 
    - c7127a9 integrate job with end-to-end tests' current Maven project structure
    - fa82e2c Fix exactly-once bug in source, and allow enabling checkpointing
    - 3182f0c Adapt savepoint e2e test to use the general purpose job.
    
    ## Verifying this change
    
    The savepoint e2e test serves as a good, generic e2e test script for the general purpose job.
    It runs the job, takes a savepoint, and resumes again (potentially with a different parallelism).
    Metrics monitoring is done in the test script to ensure that the job has made progress.
    The job itself verifies exactly-once semantics; if there are no guarantee violations, there will be no output from the job. Whether or not there are outputs from the job is checked in the end by the test script (by checkpoint the .out files).
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-8992

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5925.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5925
    
----
commit a936aaa65b6583cabc8c4ae7269a4a55ac48dd84
Author: Stefan Richter <s....@...>
Date:   2018-03-15T19:20:45Z

    [FLINK-8992] [e2e-tests] Initial general purpose DataStream job

commit c7127a9b746c1c51b384fea5050f5a041df30954
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-04-25T09:05:24Z

    [FLINK-8992] [e2e-tests] Integrate general DataStream test job with project structure
    
    This also includes minor cleanup of WIP code in the test job.

commit 253039b49d16f4971f237e6f808080bd7a3599a2
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-04-26T09:22:59Z

    [FLINK-8992] [e2e-tests] Add Javadocs for DataStreamAllroundTestProgram

commit fa82e2c56025a28cc7238b67b9595aa58690bd09
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-04-27T06:44:44Z

    [FLINK-8992] [e2e-tests] Ensure exactly-once in general purpose DataStream job

commit e71c5374f56a514332dc9a0a5716eddbfb8c6b62
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-04-27T07:01:13Z

    [FLINK-8992] [e2e-tests] Configurable source throttling for general purpose DataStream job

commit 3182f0ce8048923a652f7b5dd453b35e92740906
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-04-27T06:46:09Z

    [FLINK-8992] [e2e-tests] Let savepoint e2e test use general pupose DataStream job

----


---

[GitHub] flink pull request #5925: [FLINK-8992] [e2e-tests] General purpose DataStrea...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5925#discussion_r184647536
  
    --- Diff: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java ---
    @@ -0,0 +1,304 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.tests;
    +
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.JoinFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.ValueState;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.ConfigOption;
    +import org.apache.flink.configuration.ConfigOptions;
    +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
    +import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    +import org.apache.flink.streaming.api.CheckpointingMode;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    +import org.apache.flink.streaming.api.windowing.time.Time;
    +import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateBuilder;
    +import org.apache.flink.streaming.tests.artificialstate.ArtificialKeyedStateMapper;
    +import org.apache.flink.streaming.tests.artificialstate.eventpayload.ArtificialValueStateBuilder;
    +import org.apache.flink.streaming.tests.artificialstate.eventpayload.ComplexPayload;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.UUID;
    +
    +/**
    + * A general purpose test for Flink's DataStream API operators and primitives.
    + *
    + * <p>It currrently covers the following aspects that are frequently present in Flink DataStream jobs:
    + * <ul>
    + *     <li>A generic Kryo input type.</li>
    + *     <li>A state type for which we register a {@link KryoSerializer}.</li>
    + *     <li>Operators with {@link ValueState}.</li>
    + * </ul>
    + *
    + * <p>The job allows to be configured for different state backends, including memory, file, and RocksDB
    + * state backends. It also allows specifying the processing guarantee semantics, which will also be verified
    + * by the job itself according to the specified semantic.
    + *
    + * <p>Program parameters:
    + * <ul>
    + *     <li>test.semantics (String, default - 'exactly-once'): This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'.</li>
    + *     <li>environment.checkpoint_interval (long, default - 1000): the checkpoint interval.</li>
    + *     <li>environment.parallelism (int, default - 1): parallelism to use for the job.</li>
    + *     <li>environment.max_parallelism (int, default - 128): max parallelism to use for the job</li>
    + *     <li>environment.restart_strategy.delay (long, default - 0): delay between restart attempts, in milliseconds.</li>
    + *     <li>state_backend (String, default - 'file'): Supported values are 'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.</li>
    + *     <li>state_backend.checkpoint_directory (String): The checkpoint directory.</li>
    + *     <li>state_backend.rocks.incremental (boolean, default - false): Activate or deactivate incremental snapshots if RocksDBStateBackend is selected.</li>
    + *     <li>state_backend.file.async (boolean, default - true): Activate or deactivate asynchronous snapshots if FileStateBackend is selected.</li>
    + *     <li>sequence_generator_source.keyspace (int, default - 1000): Number of different keys for events emitted by the sequence generator.</li>
    + *     <li>sequence_generator_source.payload_size (int, default - 20): Length of message payloads emitted by the sequence generator.</li>
    + *     <li>sequence_generator_source.sleep_time (long, default - 0): Milliseconds to sleep after emitting events in the sequence generator. Set to 0 to disable sleeping.</li>
    + *     <li>sequence_generator_source.sleep_after_elements (long, default - 0): Number of elements to emit before sleeping in the sequence generator. Set to 0 to disable sleeping.</li>
    + *     <li>sequence_generator_source.event_time.max_out_of_order (long, default - 500): Max event time out-of-orderness for events emitted by the sequence generator.</li>
    + *     <li>sequence_generator_source.event_time.clock_progress (long, default - 100): The amount of event time to progress per event generated by the sequence generator.</li>
    + * </ul>
    + */
    +public class DataStreamAllroundTestProgram {
    +
    +	private static final ConfigOption<String> TEST_SEMANTICS = ConfigOptions
    +		.key("test.semantics")
    +		.defaultValue("exactly-once")
    +		.withDescription("This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'");
    +
    +	private static final ConfigOption<Long> ENVIRONMENT_CHECKPOINT_INTERVAL = ConfigOptions
    +		.key("environment.checkpoint_interval")
    +		.defaultValue(1000L);
    +
    +	private static final ConfigOption<Integer> ENVIRONMENT_PARALLELISM = ConfigOptions
    +		.key("environment.parallelism")
    +		.defaultValue(1);
    +
    +	private static final ConfigOption<Integer> ENVIRONMENT_MAX_PARALLELISM = ConfigOptions
    +		.key("environment.max_parallelism")
    +		.defaultValue(128);
    +
    +	private static final ConfigOption<Integer> ENVIRONMENT_RESTART_DELAY = ConfigOptions
    +		.key("environment.restart_strategy.delay")
    +		.defaultValue(0);
    +
    +	private static final ConfigOption<String> STATE_BACKEND = ConfigOptions
    +		.key("state_backend")
    +		.defaultValue("file")
    +		.withDescription("Supported values are 'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.");
    +
    +	private static final ConfigOption<String> STATE_BACKEND_CHECKPOINT_DIR = ConfigOptions
    +		.key("state_backend.checkpoint_directory")
    +		.noDefaultValue()
    +		.withDescription("The checkpoint directory.");
    +
    +	private static final ConfigOption<Boolean> STATE_BACKEND_ROCKS_INCREMENTAL = ConfigOptions
    +		.key("state_backend.rocks.incremental")
    +		.defaultValue(false)
    +		.withDescription("Activate or deactivate incremental snapshots if RocksDBStateBackend is selected.");
    +
    +	private static final ConfigOption<Boolean> STATE_BACKEND_FILE_ASYNC = ConfigOptions
    +		.key("state_backend.file.async")
    +		.defaultValue(true)
    +		.withDescription("Activate or deactivate asynchronous snapshots if FileStateBackend is selected.");
    +
    +	private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_KEYSPACE = ConfigOptions
    +		.key("sequence_generator_source.keyspace")
    +		.defaultValue(1000);
    +
    +	private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_PAYLOAD_SIZE = ConfigOptions
    +		.key("sequence_generator_source.payload_size")
    +		.defaultValue(20);
    +
    +	private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_SLEEP_TIME = ConfigOptions
    +		.key("sequence_generator_source.sleep_time")
    +		.defaultValue(0L);
    +
    +	private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS = ConfigOptions
    +		.key("sequence_generator_source.sleep_after_elements")
    +		.defaultValue(0L);
    +
    +	private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS = ConfigOptions
    +		.key("sequence_generator_source.event_time.max_out_of_order")
    +		.defaultValue(500L);
    +
    +	private static final ConfigOption<Long> SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS = ConfigOptions
    +		.key("sequence_generator_source.event_time.clock_progress")
    +		.defaultValue(100L);
    +
    +	// -----------------------------------------------------------------------------------------------------------------
    +
    +	public static void main(String[] args) throws Exception {
    +		final ParameterTool pt = ParameterTool.fromArgs(args);
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		setupEnvironment(env, pt);
    +
    +		env.addSource(createEventSource(pt))
    +			.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
    +			.keyBy(Event::getKey)
    +			.map(createArtificialKeyedStateMapper(
    +				// map function simply forwards the inputs
    +				(MapFunction<Event, Event>) in -> in,
    +				// state is updated per event as a wrapped ComplexPayload state object
    +				(Event first, ComplexPayload second) -> new ComplexPayload(first), //
    +				Arrays.asList(
    +					new KryoSerializer<>(ComplexPayload.class, env.getConfig()))
    +				)
    +			)
    +			.name("ArtificalKeyedStateMapper")
    +			.returns(Event.class)
    +			.keyBy(Event::getKey)
    +			.flatMap(createSemanticsCheckMapper(pt))
    +			.name("SemanticsCheckMapper")
    +			.addSink(new PrintSinkFunction<>());
    +
    +		env.execute("General purpose test job");
    +	}
    +
    +	public static void setupEnvironment(StreamExecutionEnvironment env, ParameterTool pt) throws Exception {
    +
    +		// set checkpointing semantics
    +		String semantics = pt.get(TEST_SEMANTICS.key(), TEST_SEMANTICS.defaultValue());
    +		long checkpointInterval = pt.getLong(ENVIRONMENT_CHECKPOINT_INTERVAL.key(), ENVIRONMENT_CHECKPOINT_INTERVAL.defaultValue());
    +		CheckpointingMode checkpointingMode = semantics.equalsIgnoreCase("exactly-once")
    --- End diff --
    
    Maybe we could already catch any random string that does not represent a checkpointing semantic and throw an `IllegalArgumentException`?


---

[GitHub] flink pull request #5925: [FLINK-8992] [e2e-tests] General purpose DataStrea...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5925#discussion_r184619113
  
    --- Diff: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SemanticsCheckMapper.java ---
    @@ -21,17 +21,15 @@
     import org.apache.flink.api.common.functions.RichFlatMapFunction;
     import org.apache.flink.api.common.state.ValueState;
     import org.apache.flink.api.common.state.ValueStateDescriptor;
    -import org.apache.flink.runtime.state.FunctionInitializationContext;
    -import org.apache.flink.runtime.state.FunctionSnapshotContext;
    -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.configuration.Configuration;
     import org.apache.flink.util.Collector;
     
     import java.io.Serializable;
     
     /**
      * This mapper validates exactly-once and at-least-once semantics in connection with {@link SequenceGeneratorSource}.
      */
    -public class SemanticsCheckMapper extends RichFlatMapFunction<Event, String> implements CheckpointedFunction {
    +public class SemanticsCheckMapper extends RichFlatMapFunction<Event, String> {
    --- End diff --
    
    Actually the changes here in this file are not strictly required for exactly-once.
    I only changed this as part of trying some things out.
    Can revert this if you prefer, @StefanRRichter.


---

[GitHub] flink pull request #5925: [FLINK-8992] [e2e-tests] General purpose DataStrea...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5925


---

[GitHub] flink issue #5925: [FLINK-8992] [e2e-tests] General purpose DataStream e2e t...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5925
  
    @StefanRRichter since you worked on this in the beginning, do you want to take a look at my additional (small) changes?
    I did a review of the code already, and have addressed my own comments as followup commits on top of yours.


---

[GitHub] flink issue #5925: [FLINK-8992] [e2e-tests] General purpose DataStream e2e t...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5925
  
    LGTM 👍 Will merge this.


---