You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/03 12:54:18 UTC

[GitHub] zentol closed pull request #6478: [FLINK-9861][tests] Add StreamingFileSink E2E test

zentol closed pull request #6478: [FLINK-9861][tests] Add StreamingFileSink E2E test 
URL: https://github.com/apache/flink/pull/6478
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/flink-streaming-file-sink-test/pom.xml b/flink-end-to-end-tests/flink-streaming-file-sink-test/pom.xml
new file mode 100644
index 00000000000..0b033f2339f
--- /dev/null
+++ b/flink-end-to-end-tests/flink-streaming-file-sink-test/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.7-SNAPSHOT</version>
+	</parent>
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-streaming-file-sink-test</artifactId>
+	<version>1.7-SNAPSHOT</version>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>StreamingFileSinkSinkTestProgram</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>StreamingFileSinkProgram</finalName>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>StreamingFileSinkProgram</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>
diff --git a/flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java b/flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java
new file mode 100644
index 00000000000..161ed691e24
--- /dev/null
+++ b/flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test program for the {@link StreamingFileSink}.
+ *
+ * <p>Uses a source that steadily emits a deterministic set of records over 60 seconds,
+ * after which it idles and waits for job cancellation. Every record has a unique index that is
+ * written to the file.
+ *
+ * <p>The sink rolls on each checkpoint, with each part file containing a sequence of integers.
+ * Adding all committed part files together, and numerically sorting the contents, should
+ * result in a complete sequence from 0 (inclusive) to 60000 (exclusive).
+ */
+public enum StreamingFileSinkProgram {
+	;
+
+	public static void main(final String[] args) throws Exception {
+		final ParameterTool params = ParameterTool.fromArgs(args);
+		final String outputPath = params.getRequired("outputPath");
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		env.setParallelism(4);
+		env.enableCheckpointing(5000L);
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10L, TimeUnit.SECONDS)));
+
+		final StreamingFileSink<Tuple2<Integer, Integer>> sink = StreamingFileSink
+			.forRowFormat(new Path(outputPath), (Encoder<Tuple2<Integer, Integer>>) (element, stream) -> {
+				PrintStream out = new PrintStream(stream);
+				out.println(element.f1);
+			})
+			.withBucketAssigner(new KeyBucketAssigner())
+			.withRollingPolicy(OnCheckpointRollingPolicy.build())
+			.build();
+
+		// generate data, shuffle, sink
+		env.addSource(new Generator(10, 10, 60))
+			.keyBy(0)
+			.addSink(sink);
+
+		env.execute("StreamingFileSinkProgram");
+	}
+
+
+	/**
+	 * Use first field for buckets.
+	 */
+	public static final class KeyBucketAssigner implements BucketAssigner<Tuple2<Integer, Integer>, String> {
+
+		private static final long serialVersionUID = 987325769970523326L;
+
+		@Override
+		public String getBucketId(final Tuple2<Integer, Integer> element, final Context context) {
+			return String.valueOf(element.f0);
+		}
+
+		@Override
+		public SimpleVersionedSerializer<String> getSerializer() {
+			return SimpleVersionedStringSerializer.INSTANCE;
+		}
+	}
+
+	/**
+	 * Data-generating source function.
+	 */
+	public static final class Generator implements SourceFunction<Tuple2<Integer, Integer>>, ListCheckpointed<Integer> {
+
+		private static final long serialVersionUID = -2819385275681175792L;
+
+		private final int numKeys;
+		private final int idlenessMs;
+		private final int recordsToEmit;
+
+		private volatile int numRecordsEmitted = 0;
+		private volatile boolean canceled = false;
+
+		Generator(final int numKeys, final int idlenessMs, final int durationSeconds) {
+			this.numKeys = numKeys;
+			this.idlenessMs = idlenessMs;
+
+			this.recordsToEmit = ((durationSeconds * 1000) / idlenessMs) * numKeys;
+		}
+
+		@Override
+		public void run(final SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+			while (numRecordsEmitted < recordsToEmit) {
+				synchronized (ctx.getCheckpointLock()) {
+					for (int i = 0; i < numKeys; i++) {
+						ctx.collect(Tuple2.of(i, numRecordsEmitted));
+						numRecordsEmitted++;
+					}
+				}
+				Thread.sleep(idlenessMs);
+			}
+
+			while (!canceled) {
+				Thread.sleep(50);
+			}
+
+		}
+
+		@Override
+		public void cancel() {
+			canceled = true;
+		}
+
+		@Override
+		public List<Integer> snapshotState(final long checkpointId, final long timestamp) {
+			return Collections.singletonList(numRecordsEmitted);
+		}
+
+		@Override
+		public void restoreState(final List<Integer> states) {
+			for (final Integer state : states) {
+				numRecordsEmitted += state;
+			}
+		}
+	}
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index fea91d5a552..2e060da3a4d 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -53,6 +53,7 @@ under the License.
 		<module>flink-confluent-schema-registry</module>
 		<module>flink-stream-state-ttl-test</module>
 		<module>flink-sql-client-test</module>
+		<module>flink-streaming-file-sink-test</module>
 	</modules>
 
 	<build>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 8439badf2c0..46931495d27 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -89,6 +89,7 @@ run_test "Resuming Externalized Checkpoint after terminal failure (rocks, increm
 run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
 run_test "Streaming SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh"
 run_test "Streaming bucketing end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_bucketing.sh"
+run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh"
 run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"
 
 run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 file false false"
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 5430d912409..46dda7052aa 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -540,6 +540,16 @@ function rollback_flink_slf4j_metric_reporter() {
   rm $FLINK_DIR/lib/flink-metrics-slf4j-*.jar
 }
 
+function get_job_metric {
+  local job_id=$1
+  local metric_name=$2
+
+  local json=$(curl -s http://localhost:8081/jobs/${job_id}/metrics?get=${metric_name})
+  local metric_value=$(echo ${json} | sed -n 's/.*"value":"\(.*\)".*/\1/p')
+
+  echo ${metric_value}
+}
+
 function get_metric_processed_records {
   OPERATOR=$1
   JOB_NAME="${2:-General purpose test job}"
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh b/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
new file mode 100755
index 00000000000..8652d72bfe5
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
@@ -0,0 +1,150 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-streaming-file-sink-test/target/StreamingFileSinkProgram.jar"
+
+OUTPUT_PATH="$TEST_DATA_DIR/out"
+
+function get_num_output_files {
+    local num_files=$(find ${OUTPUT_PATH} -type f | wc -l)
+    echo ${num_files}
+}
+
+function wait_for_restart {
+    local base_num_restarts=$1
+
+    local current_num_restarts=${base_num_restarts}
+    local expected_num_restarts=$((current_num_restarts + 1))
+
+    echo "Waiting for restart to happen"
+    while ! [[ ${current_num_restarts} -eq ${expected_num_restarts} ]]; do
+        sleep 5
+        current_num_restarts=$(get_job_metric ${JOB_ID} "fullRestarts")
+        if [[ -z ${current_num_restarts} ]]; then
+            current_num_restarts=${base_num_restarts}
+        fi
+    done
+}
+
+###################################
+# Wait a specific number of successful checkpoints
+# to have happened
+#
+# Globals:
+#   None
+# Arguments:
+#   $1: the job id
+#   $2: the number of expected successful checkpoints
+#   $3: timeout in seconds
+# Returns:
+#   None
+###################################
+function wait_for_number_of_checkpoints {
+    local job_id=$1
+    local expected_num_checkpoints=$2
+    local timeout=$3
+    local count=0
+
+    echo "Starting to wait for completion of ${expected_num_checkpoints} checkpoints"
+    while (($(get_completed_number_of_checkpoints ${job_id}) < ${expected_num_checkpoints})); do
+
+        if [[ ${count} -gt ${timeout} ]]; then
+            echo "A timeout occurred waiting for successful checkpoints"
+            exit 1
+        else
+            ((count+=2))
+        fi
+
+        local current_num_checkpoints=$(get_completed_number_of_checkpoints ${job_id})
+        echo "${current_num_checkpoints}/${expected_num_checkpoints} completed checkpoints"
+        sleep 2
+    done
+}
+
+function get_completed_number_of_checkpoints {
+    local job_id=$1
+    local json_res=$(curl -s http://localhost:8081/jobs/${job_id}/checkpoints)
+
+    echo ${json_res}    | # {"counts":{"restored":0,"total":25,"in_progress":1,"completed":24,"failed":0} ...
+        cut -d ":" -f 6 | # 24,"failed"
+        sed 's/,.*//'     # 24
+}
+
+start_cluster
+
+"${FLINK_DIR}/bin/taskmanager.sh" start
+"${FLINK_DIR}/bin/taskmanager.sh" start
+"${FLINK_DIR}/bin/taskmanager.sh" start
+
+echo "Submitting job."
+CLIENT_OUTPUT=$("$FLINK_DIR/bin/flink" run -d "${TEST_PROGRAM_JAR}" --outputPath "${OUTPUT_PATH}")
+JOB_ID=$(echo "${CLIENT_OUTPUT}" | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+if [[ -z $JOB_ID ]]; then
+    echo "Job could not be submitted."
+    echo "${CLIENT_OUTPUT}"
+    exit 1
+fi
+
+wait_job_running ${JOB_ID}
+
+wait_num_checkpoints "${JOB_ID}" 3
+
+echo "Killing TM"
+kill_random_taskmanager
+
+echo "Starting TM"
+"$FLINK_DIR/bin/taskmanager.sh" start
+
+wait_for_restart 0
+
+echo "Killing 2 TMs"
+kill_random_taskmanager
+kill_random_taskmanager
+
+echo "Starting 2 TMs"
+"$FLINK_DIR/bin/taskmanager.sh" start
+"$FLINK_DIR/bin/taskmanager.sh" start
+
+wait_for_restart 1
+
+echo "Waiting until no new files are being created"
+OLD_COUNT=0
+NEW_COUNT=$(get_num_output_files)
+while ! [[ ${OLD_COUNT} -eq ${NEW_COUNT} ]]; do
+    echo "More output files were created. previous=${OLD_COUNT} now=${NEW_COUNT}"
+    # so long as there is data to process new files should be created for each checkpoint
+    CURRENT_NUM_CHECKPOINTS=$(get_completed_number_of_checkpoints ${JOB_ID})
+    EXPECTED_NUM_CHECKPOINTS=$((CURRENT_NUM_CHECKPOINTS + 1))
+    wait_for_number_of_checkpoints ${JOB_ID} ${EXPECTED_NUM_CHECKPOINTS} 60
+
+    OLD_COUNT=${NEW_COUNT}
+    NEW_COUNT=$(get_num_output_files)
+done
+
+cancel_job "${JOB_ID}"
+
+wait_job_terminal_state "${JOB_ID}" "CANCELED"
+
+# get all lines in part files and sort them numerically
+find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g > "${TEST_DATA_DIR}/complete_result"
+
+check_result_hash "File Streaming Sink" "$TEST_DATA_DIR/complete_result" "6727342fdd3aae2129e61fc8f433fb6f"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services