You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/08/03 12:54:21 UTC

[flink] branch master updated: [FLINK-9861][tests] Add StreamingFileSink E2E test

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d623ca  [FLINK-9861][tests] Add StreamingFileSink E2E test
0d623ca is described below

commit 0d623ca4a683a7cb16a1ddd7cf23db9ec5d3a693
Author: Chesnay <ch...@apache.org>
AuthorDate: Fri Aug 3 14:54:16 2018 +0200

    [FLINK-9861][tests] Add StreamingFileSink E2E test
---
 .../flink-streaming-file-sink-test/pom.xml         |  68 +++++++++
 .../src/main/java/StreamingFileSinkProgram.java    | 155 +++++++++++++++++++++
 flink-end-to-end-tests/pom.xml                     |   1 +
 flink-end-to-end-tests/run-nightly-tests.sh        |   1 +
 flink-end-to-end-tests/test-scripts/common.sh      |  10 ++
 .../test-scripts/test_streaming_file_sink.sh       | 150 ++++++++++++++++++++
 6 files changed, 385 insertions(+)

diff --git a/flink-end-to-end-tests/flink-streaming-file-sink-test/pom.xml b/flink-end-to-end-tests/flink-streaming-file-sink-test/pom.xml
new file mode 100644
index 0000000..0b033f2
--- /dev/null
+++ b/flink-end-to-end-tests/flink-streaming-file-sink-test/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.7-SNAPSHOT</version>
+	</parent>
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-streaming-file-sink-test</artifactId>
+	<version>1.7-SNAPSHOT</version>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>StreamingFileSinkSinkTestProgram</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>StreamingFileSinkProgram</finalName>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>StreamingFileSinkProgram</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>
diff --git a/flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java b/flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java
new file mode 100644
index 0000000..161ed69
--- /dev/null
+++ b/flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test program for the {@link StreamingFileSink}.
+ *
+ * <p>Uses a source that steadily emits a deterministic set of records over 60 seconds,
+ * after which it idles and waits for job cancellation. Every record has a unique index that is
+ * written to the file.
+ *
+ * <p>The sink rolls on each checkpoint, with each part file containing a sequence of integers.
+ * Adding all committed part files together, and numerically sorting the contents, should
+ * result in a complete sequence from 0 (inclusive) to 60000 (exclusive).
+ */
+public enum StreamingFileSinkProgram {
+	;
+
+	public static void main(final String[] args) throws Exception {
+		final ParameterTool params = ParameterTool.fromArgs(args);
+		final String outputPath = params.getRequired("outputPath");
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		env.setParallelism(4);
+		env.enableCheckpointing(5000L);
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10L, TimeUnit.SECONDS)));
+
+		final StreamingFileSink<Tuple2<Integer, Integer>> sink = StreamingFileSink
+			.forRowFormat(new Path(outputPath), (Encoder<Tuple2<Integer, Integer>>) (element, stream) -> {
+				PrintStream out = new PrintStream(stream);
+				out.println(element.f1);
+			})
+			.withBucketAssigner(new KeyBucketAssigner())
+			.withRollingPolicy(OnCheckpointRollingPolicy.build())
+			.build();
+
+		// generate data, shuffle, sink
+		env.addSource(new Generator(10, 10, 60))
+			.keyBy(0)
+			.addSink(sink);
+
+		env.execute("StreamingFileSinkProgram");
+	}
+
+
+	/**
+	 * Use first field for buckets.
+	 */
+	public static final class KeyBucketAssigner implements BucketAssigner<Tuple2<Integer, Integer>, String> {
+
+		private static final long serialVersionUID = 987325769970523326L;
+
+		@Override
+		public String getBucketId(final Tuple2<Integer, Integer> element, final Context context) {
+			return String.valueOf(element.f0);
+		}
+
+		@Override
+		public SimpleVersionedSerializer<String> getSerializer() {
+			return SimpleVersionedStringSerializer.INSTANCE;
+		}
+	}
+
+	/**
+	 * Data-generating source function.
+	 */
+	public static final class Generator implements SourceFunction<Tuple2<Integer, Integer>>, ListCheckpointed<Integer> {
+
+		private static final long serialVersionUID = -2819385275681175792L;
+
+		private final int numKeys;
+		private final int idlenessMs;
+		private final int recordsToEmit;
+
+		private volatile int numRecordsEmitted = 0;
+		private volatile boolean canceled = false;
+
+		Generator(final int numKeys, final int idlenessMs, final int durationSeconds) {
+			this.numKeys = numKeys;
+			this.idlenessMs = idlenessMs;
+
+			this.recordsToEmit = ((durationSeconds * 1000) / idlenessMs) * numKeys;
+		}
+
+		@Override
+		public void run(final SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+			while (numRecordsEmitted < recordsToEmit) {
+				synchronized (ctx.getCheckpointLock()) {
+					for (int i = 0; i < numKeys; i++) {
+						ctx.collect(Tuple2.of(i, numRecordsEmitted));
+						numRecordsEmitted++;
+					}
+				}
+				Thread.sleep(idlenessMs);
+			}
+
+			while (!canceled) {
+				Thread.sleep(50);
+			}
+
+		}
+
+		@Override
+		public void cancel() {
+			canceled = true;
+		}
+
+		@Override
+		public List<Integer> snapshotState(final long checkpointId, final long timestamp) {
+			return Collections.singletonList(numRecordsEmitted);
+		}
+
+		@Override
+		public void restoreState(final List<Integer> states) {
+			for (final Integer state : states) {
+				numRecordsEmitted += state;
+			}
+		}
+	}
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index fea91d5..2e060da 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -53,6 +53,7 @@ under the License.
 		<module>flink-confluent-schema-registry</module>
 		<module>flink-stream-state-ttl-test</module>
 		<module>flink-sql-client-test</module>
+		<module>flink-streaming-file-sink-test</module>
 	</modules>
 
 	<build>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 8439bad..4693149 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -89,6 +89,7 @@ run_test "Resuming Externalized Checkpoint after terminal failure (rocks, increm
 run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
 run_test "Streaming SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh"
 run_test "Streaming bucketing end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_bucketing.sh"
+run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh"
 run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"
 
 run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 file false false"
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 5430d91..46dda70 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -540,6 +540,16 @@ function rollback_flink_slf4j_metric_reporter() {
   rm $FLINK_DIR/lib/flink-metrics-slf4j-*.jar
 }
 
+function get_job_metric {
+  local job_id=$1
+  local metric_name=$2
+
+  local json=$(curl -s http://localhost:8081/jobs/${job_id}/metrics?get=${metric_name})
+  local metric_value=$(echo ${json} | sed -n 's/.*"value":"\(.*\)".*/\1/p')
+
+  echo ${metric_value}
+}
+
 function get_metric_processed_records {
   OPERATOR=$1
   JOB_NAME="${2:-General purpose test job}"
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh b/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
new file mode 100755
index 0000000..8652d72
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
@@ -0,0 +1,150 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-streaming-file-sink-test/target/StreamingFileSinkProgram.jar"
+
+OUTPUT_PATH="$TEST_DATA_DIR/out"
+
+function get_num_output_files {
+    local num_files=$(find ${OUTPUT_PATH} -type f | wc -l)
+    echo ${num_files}
+}
+
+function wait_for_restart {
+    local base_num_restarts=$1
+
+    local current_num_restarts=${base_num_restarts}
+    local expected_num_restarts=$((current_num_restarts + 1))
+
+    echo "Waiting for restart to happen"
+    while ! [[ ${current_num_restarts} -eq ${expected_num_restarts} ]]; do
+        sleep 5
+        current_num_restarts=$(get_job_metric ${JOB_ID} "fullRestarts")
+        if [[ -z ${current_num_restarts} ]]; then
+            current_num_restarts=${base_num_restarts}
+        fi
+    done
+}
+
+###################################
+# Wait a specific number of successful checkpoints
+# to have happened
+#
+# Globals:
+#   None
+# Arguments:
+#   $1: the job id
+#   $2: the number of expected successful checkpoints
+#   $3: timeout in seconds
+# Returns:
+#   None
+###################################
+function wait_for_number_of_checkpoints {
+    local job_id=$1
+    local expected_num_checkpoints=$2
+    local timeout=$3
+    local count=0
+
+    echo "Starting to wait for completion of ${expected_num_checkpoints} checkpoints"
+    while (($(get_completed_number_of_checkpoints ${job_id}) < ${expected_num_checkpoints})); do
+
+        if [[ ${count} -gt ${timeout} ]]; then
+            echo "A timeout occurred waiting for successful checkpoints"
+            exit 1
+        else
+            ((count+=2))
+        fi
+
+        local current_num_checkpoints=$(get_completed_number_of_checkpoints ${job_id})
+        echo "${current_num_checkpoints}/${expected_num_checkpoints} completed checkpoints"
+        sleep 2
+    done
+}
+
+function get_completed_number_of_checkpoints {
+    local job_id=$1
+    local json_res=$(curl -s http://localhost:8081/jobs/${job_id}/checkpoints)
+
+    echo ${json_res}    | # {"counts":{"restored":0,"total":25,"in_progress":1,"completed":24,"failed":0} ...
+        cut -d ":" -f 6 | # 24,"failed"
+        sed 's/,.*//'     # 24
+}
+
+start_cluster
+
+"${FLINK_DIR}/bin/taskmanager.sh" start
+"${FLINK_DIR}/bin/taskmanager.sh" start
+"${FLINK_DIR}/bin/taskmanager.sh" start
+
+echo "Submitting job."
+CLIENT_OUTPUT=$("$FLINK_DIR/bin/flink" run -d "${TEST_PROGRAM_JAR}" --outputPath "${OUTPUT_PATH}")
+JOB_ID=$(echo "${CLIENT_OUTPUT}" | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+if [[ -z $JOB_ID ]]; then
+    echo "Job could not be submitted."
+    echo "${CLIENT_OUTPUT}"
+    exit 1
+fi
+
+wait_job_running ${JOB_ID}
+
+wait_num_checkpoints "${JOB_ID}" 3
+
+echo "Killing TM"
+kill_random_taskmanager
+
+echo "Starting TM"
+"$FLINK_DIR/bin/taskmanager.sh" start
+
+wait_for_restart 0
+
+echo "Killing 2 TMs"
+kill_random_taskmanager
+kill_random_taskmanager
+
+echo "Starting 2 TMs"
+"$FLINK_DIR/bin/taskmanager.sh" start
+"$FLINK_DIR/bin/taskmanager.sh" start
+
+wait_for_restart 1
+
+echo "Waiting until no new files are being created"
+OLD_COUNT=0
+NEW_COUNT=$(get_num_output_files)
+while ! [[ ${OLD_COUNT} -eq ${NEW_COUNT} ]]; do
+    echo "More output files were created. previous=${OLD_COUNT} now=${NEW_COUNT}"
+    # so long as there is data to process new files should be created for each checkpoint
+    CURRENT_NUM_CHECKPOINTS=$(get_completed_number_of_checkpoints ${JOB_ID})
+    EXPECTED_NUM_CHECKPOINTS=$((CURRENT_NUM_CHECKPOINTS + 1))
+    wait_for_number_of_checkpoints ${JOB_ID} ${EXPECTED_NUM_CHECKPOINTS} 60
+
+    OLD_COUNT=${NEW_COUNT}
+    NEW_COUNT=$(get_num_output_files)
+done
+
+cancel_job "${JOB_ID}"
+
+wait_job_terminal_state "${JOB_ID}" "CANCELED"
+
+# get all lines in part files and sort them numerically
+find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g > "${TEST_DATA_DIR}/complete_result"
+
+check_result_hash "File Streaming Sink" "$TEST_DATA_DIR/complete_result" "6727342fdd3aae2129e61fc8f433fb6f"