You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/08/03 19:20:24 UTC
[flink] branch release-1.6 updated: [FLINK-9861][tests] Add
StreamingFileSink E2E test
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new 0c227d1 [FLINK-9861][tests] Add StreamingFileSink E2E test
0c227d1 is described below
commit 0c227d1f3833220827d9808be447a7e41d734257
Author: Chesnay <ch...@apache.org>
AuthorDate: Fri Aug 3 14:54:16 2018 +0200
[FLINK-9861][tests] Add StreamingFileSink E2E test
---
.../flink-streaming-file-sink-test/pom.xml | 67 +++++++++
.../src/main/java/StreamingFileSinkProgram.java | 155 +++++++++++++++++++++
flink-end-to-end-tests/pom.xml | 1 +
flink-end-to-end-tests/run-nightly-tests.sh | 1 +
flink-end-to-end-tests/test-scripts/common.sh | 10 ++
.../test-scripts/test_streaming_file_sink.sh | 150 ++++++++++++++++++++
6 files changed, 384 insertions(+)
diff --git a/flink-end-to-end-tests/flink-streaming-file-sink-test/pom.xml b/flink-end-to-end-tests/flink-streaming-file-sink-test/pom.xml
new file mode 100644
index 0000000..50b290c
--- /dev/null
+++ b/flink-end-to-end-tests/flink-streaming-file-sink-test/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <parent>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.6-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-streaming-file-sink-test</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>StreamingFileSinkSinkTestProgram</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>StreamingFileSinkProgram</finalName>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>StreamingFileSinkProgram</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java b/flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java
new file mode 100644
index 0000000..161ed69
--- /dev/null
+++ b/flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test program for the {@link StreamingFileSink}.
+ *
+ * <p>Uses a source that steadily emits a deterministic set of records over 60 seconds,
+ * after which it idles and waits for job cancellation. Every record has a unique index that is
+ * written to the file.
+ *
+ * <p>The sink rolls on each checkpoint, with each part file containing a sequence of integers.
+ * Adding all committed part files together, and numerically sorting the contents, should
+ * result in a complete sequence from 0 (inclusive) to 60000 (exclusive).
+ */
+public enum StreamingFileSinkProgram {
+ ;
+
+ public static void main(final String[] args) throws Exception {
+ final ParameterTool params = ParameterTool.fromArgs(args);
+ final String outputPath = params.getRequired("outputPath");
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ env.setParallelism(4);
+ env.enableCheckpointing(5000L);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10L, TimeUnit.SECONDS)));
+
+ final StreamingFileSink<Tuple2<Integer, Integer>> sink = StreamingFileSink
+ .forRowFormat(new Path(outputPath), (Encoder<Tuple2<Integer, Integer>>) (element, stream) -> {
+ PrintStream out = new PrintStream(stream);
+ out.println(element.f1);
+ })
+ .withBucketAssigner(new KeyBucketAssigner())
+ .withRollingPolicy(OnCheckpointRollingPolicy.build())
+ .build();
+
+ // generate data, shuffle, sink
+ env.addSource(new Generator(10, 10, 60))
+ .keyBy(0)
+ .addSink(sink);
+
+ env.execute("StreamingFileSinkProgram");
+ }
+
+
+ /**
+ * Use first field for buckets.
+ */
+ public static final class KeyBucketAssigner implements BucketAssigner<Tuple2<Integer, Integer>, String> {
+
+ private static final long serialVersionUID = 987325769970523326L;
+
+ @Override
+ public String getBucketId(final Tuple2<Integer, Integer> element, final Context context) {
+ return String.valueOf(element.f0);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<String> getSerializer() {
+ return SimpleVersionedStringSerializer.INSTANCE;
+ }
+ }
+
+ /**
+ * Data-generating source function.
+ */
+ public static final class Generator implements SourceFunction<Tuple2<Integer, Integer>>, ListCheckpointed<Integer> {
+
+ private static final long serialVersionUID = -2819385275681175792L;
+
+ private final int numKeys;
+ private final int idlenessMs;
+ private final int recordsToEmit;
+
+ private volatile int numRecordsEmitted = 0;
+ private volatile boolean canceled = false;
+
+ Generator(final int numKeys, final int idlenessMs, final int durationSeconds) {
+ this.numKeys = numKeys;
+ this.idlenessMs = idlenessMs;
+
+ this.recordsToEmit = ((durationSeconds * 1000) / idlenessMs) * numKeys;
+ }
+
+ @Override
+ public void run(final SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+ while (numRecordsEmitted < recordsToEmit) {
+ synchronized (ctx.getCheckpointLock()) {
+ for (int i = 0; i < numKeys; i++) {
+ ctx.collect(Tuple2.of(i, numRecordsEmitted));
+ numRecordsEmitted++;
+ }
+ }
+ Thread.sleep(idlenessMs);
+ }
+
+ while (!canceled) {
+ Thread.sleep(50);
+ }
+
+ }
+
+ @Override
+ public void cancel() {
+ canceled = true;
+ }
+
+ @Override
+ public List<Integer> snapshotState(final long checkpointId, final long timestamp) {
+ return Collections.singletonList(numRecordsEmitted);
+ }
+
+ @Override
+ public void restoreState(final List<Integer> states) {
+ for (final Integer state : states) {
+ numRecordsEmitted += state;
+ }
+ }
+ }
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 6d759e6..3c3ad90 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -53,6 +53,7 @@ under the License.
<module>flink-confluent-schema-registry</module>
<module>flink-stream-state-ttl-test</module>
<module>flink-sql-client-test</module>
+ <module>flink-streaming-file-sink-test</module>
</modules>
<build>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 8439bad..4693149 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -89,6 +89,7 @@ run_test "Resuming Externalized Checkpoint after terminal failure (rocks, increm
run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
run_test "Streaming SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh"
run_test "Streaming bucketing end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_bucketing.sh"
+run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh"
run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"
run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 file false false"
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 5430d91..46dda70 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -540,6 +540,16 @@ function rollback_flink_slf4j_metric_reporter() {
rm $FLINK_DIR/lib/flink-metrics-slf4j-*.jar
}
+function get_job_metric {
+ local job_id=$1
+ local metric_name=$2
+
+ local json=$(curl -s http://localhost:8081/jobs/${job_id}/metrics?get=${metric_name})
+ local metric_value=$(echo ${json} | sed -n 's/.*"value":"\(.*\)".*/\1/p')
+
+ echo ${metric_value}
+}
+
function get_metric_processed_records {
OPERATOR=$1
JOB_NAME="${2:-General purpose test job}"
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh b/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
new file mode 100755
index 0000000..8652d72
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
@@ -0,0 +1,150 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-streaming-file-sink-test/target/StreamingFileSinkProgram.jar"
+
+OUTPUT_PATH="$TEST_DATA_DIR/out"
+
+function get_num_output_files {
+ local num_files=$(find ${OUTPUT_PATH} -type f | wc -l)
+ echo ${num_files}
+}
+
+function wait_for_restart {
+ local base_num_restarts=$1
+
+ local current_num_restarts=${base_num_restarts}
+ local expected_num_restarts=$((current_num_restarts + 1))
+
+ echo "Waiting for restart to happen"
+ while ! [[ ${current_num_restarts} -eq ${expected_num_restarts} ]]; do
+ sleep 5
+ current_num_restarts=$(get_job_metric ${JOB_ID} "fullRestarts")
+ if [[ -z ${current_num_restarts} ]]; then
+ current_num_restarts=${base_num_restarts}
+ fi
+ done
+}
+
+###################################
+# Wait a specific number of successful checkpoints
+# to have happened
+#
+# Globals:
+# None
+# Arguments:
+# $1: the job id
+# $2: the number of expected successful checkpoints
+# $3: timeout in seconds
+# Returns:
+# None
+###################################
+function wait_for_number_of_checkpoints {
+ local job_id=$1
+ local expected_num_checkpoints=$2
+ local timeout=$3
+ local count=0
+
+ echo "Starting to wait for completion of ${expected_num_checkpoints} checkpoints"
+ while (($(get_completed_number_of_checkpoints ${job_id}) < ${expected_num_checkpoints})); do
+
+ if [[ ${count} -gt ${timeout} ]]; then
+ echo "A timeout occurred waiting for successful checkpoints"
+ exit 1
+ else
+ ((count+=2))
+ fi
+
+ local current_num_checkpoints=$(get_completed_number_of_checkpoints ${job_id})
+ echo "${current_num_checkpoints}/${expected_num_checkpoints} completed checkpoints"
+ sleep 2
+ done
+}
+
+function get_completed_number_of_checkpoints {
+ local job_id=$1
+ local json_res=$(curl -s http://localhost:8081/jobs/${job_id}/checkpoints)
+
+ echo ${json_res} | # {"counts":{"restored":0,"total":25,"in_progress":1,"completed":24,"failed":0} ...
+ cut -d ":" -f 6 | # 24,"failed"
+ sed 's/,.*//' # 24
+}
+
+start_cluster
+
+"${FLINK_DIR}/bin/taskmanager.sh" start
+"${FLINK_DIR}/bin/taskmanager.sh" start
+"${FLINK_DIR}/bin/taskmanager.sh" start
+
+echo "Submitting job."
+CLIENT_OUTPUT=$("$FLINK_DIR/bin/flink" run -d "${TEST_PROGRAM_JAR}" --outputPath "${OUTPUT_PATH}")
+JOB_ID=$(echo "${CLIENT_OUTPUT}" | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+if [[ -z $JOB_ID ]]; then
+ echo "Job could not be submitted."
+ echo "${CLIENT_OUTPUT}"
+ exit 1
+fi
+
+wait_job_running ${JOB_ID}
+
+wait_num_checkpoints "${JOB_ID}" 3
+
+echo "Killing TM"
+kill_random_taskmanager
+
+echo "Starting TM"
+"$FLINK_DIR/bin/taskmanager.sh" start
+
+wait_for_restart 0
+
+echo "Killing 2 TMs"
+kill_random_taskmanager
+kill_random_taskmanager
+
+echo "Starting 2 TMs"
+"$FLINK_DIR/bin/taskmanager.sh" start
+"$FLINK_DIR/bin/taskmanager.sh" start
+
+wait_for_restart 1
+
+echo "Waiting until no new files are being created"
+OLD_COUNT=0
+NEW_COUNT=$(get_num_output_files)
+while ! [[ ${OLD_COUNT} -eq ${NEW_COUNT} ]]; do
+ echo "More output files were created. previous=${OLD_COUNT} now=${NEW_COUNT}"
+ # so long as there is data to process new files should be created for each checkpoint
+ CURRENT_NUM_CHECKPOINTS=$(get_completed_number_of_checkpoints ${JOB_ID})
+ EXPECTED_NUM_CHECKPOINTS=$((CURRENT_NUM_CHECKPOINTS + 1))
+ wait_for_number_of_checkpoints ${JOB_ID} ${EXPECTED_NUM_CHECKPOINTS} 60
+
+ OLD_COUNT=${NEW_COUNT}
+ NEW_COUNT=$(get_num_output_files)
+done
+
+cancel_job "${JOB_ID}"
+
+wait_job_terminal_state "${JOB_ID}" "CANCELED"
+
+# get all lines in part files and sort them numerically
+find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g > "${TEST_DATA_DIR}/complete_result"
+
+check_result_hash "File Streaming Sink" "$TEST_DATA_DIR/complete_result" "6727342fdd3aae2129e61fc8f433fb6f"