You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/05 13:04:04 UTC

[GitHub] asfgit closed pull request #6957: [FLINK-10627][E2E tests] Test s3 output for streaming file sink

asfgit closed pull request #6957: [FLINK-10627][E2E tests] Test s3 output for streaming file sink
URL: https://github.com/apache/flink/pull/6957
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/flink-e2e-test-utils/pom.xml b/flink-end-to-end-tests/flink-e2e-test-utils/pom.xml
new file mode 100644
index 00000000000..219d6626ef3
--- /dev/null
+++ b/flink-end-to-end-tests/flink-e2e-test-utils/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.7-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-e2e-test-utils</artifactId>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>com.amazonaws</groupId>
+			<artifactId>aws-java-sdk-s3</artifactId>
+			<version>1.11.437</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>S3UtilProgram</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>S3UtilProgram</finalName>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.streaming.tests.util.s3.S3UtilProgram</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>
diff --git a/flink-end-to-end-tests/flink-e2e-test-utils/src/main/java/org/apache/flink/streaming/tests/util/s3/S3QueryUtil.java b/flink-end-to-end-tests/flink-e2e-test-utils/src/main/java/org/apache/flink/streaming/tests/util/s3/S3QueryUtil.java
new file mode 100644
index 00000000000..781a8edd20a
--- /dev/null
+++ b/flink-end-to-end-tests/flink-e2e-test-utils/src/main/java/org/apache/flink/streaming/tests/util/s3/S3QueryUtil.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.util.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.CSVInput;
+import com.amazonaws.services.s3.model.CSVOutput;
+import com.amazonaws.services.s3.model.CompressionType;
+import com.amazonaws.services.s3.model.ExpressionType;
+import com.amazonaws.services.s3.model.InputSerialization;
+import com.amazonaws.services.s3.model.OutputSerialization;
+import com.amazonaws.services.s3.model.SelectObjectContentEvent;
+import com.amazonaws.services.s3.model.SelectObjectContentEventStream;
+import com.amazonaws.services.s3.model.SelectObjectContentEventVisitor;
+import com.amazonaws.services.s3.model.SelectObjectContentRequest;
+import com.amazonaws.services.s3.model.SelectObjectContentResult;
+
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.amazonaws.util.IOUtils.copy;
+
+class S3QueryUtil {
+	/** Run SQL query over non-compressed CSV file saved in s3 object. */
+	static String queryFile(
+			AmazonS3 s3client, String bucket, String s3file, @SuppressWarnings("SameParameterValue") String query) {
+		SelectObjectContentRequest request = generateBaseCSVRequest(bucket, s3file, query);
+		final AtomicBoolean isResultComplete = new AtomicBoolean(false);
+		String res;
+		try (SelectObjectContentResult result = s3client.selectObjectContent(request);
+			SelectObjectContentEventStream payload = result.getPayload();
+			ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+			InputStream resultInputStream = payload.getRecordsInputStream(
+				new SelectObjectContentEventVisitor() {
+					@Override
+					public void visit(SelectObjectContentEvent.EndEvent event) {
+						isResultComplete.set(true);
+					}
+				}
+			);
+			copy(resultInputStream, out);
+			res = out.toString().trim();
+		} catch (Throwable e) {
+			System.out.println("SQL query failure");
+			throw new RuntimeException("SQL query failure", e);
+		}
+		/*
+		 * The End Event indicates all matching records have been transmitted.
+		 * If the End Event is not received, the results may be incomplete.
+		 */
+		if (!isResultComplete.get()) {
+			throw new RuntimeException("S3 Select request was incomplete as End Event was not received.");
+		}
+		return res;
+	}
+
+	private static SelectObjectContentRequest generateBaseCSVRequest(String bucket, String key, String query) {
+		SelectObjectContentRequest request = new SelectObjectContentRequest();
+		request.setBucketName(bucket);
+		request.setKey(key);
+		request.setExpression(query);
+		request.setExpressionType(ExpressionType.SQL);
+
+		InputSerialization inputSerialization = new InputSerialization();
+		inputSerialization.setCsv(new CSVInput());
+		inputSerialization.setCompressionType(CompressionType.NONE);
+		request.setInputSerialization(inputSerialization);
+
+		OutputSerialization outputSerialization = new OutputSerialization();
+		outputSerialization.setCsv(new CSVOutput());
+		request.setOutputSerialization(outputSerialization);
+
+		return request;
+	}
+}
diff --git a/flink-end-to-end-tests/flink-e2e-test-utils/src/main/java/org/apache/flink/streaming/tests/util/s3/S3UtilProgram.java b/flink-end-to-end-tests/flink-e2e-test-utils/src/main/java/org/apache/flink/streaming/tests/util/s3/S3UtilProgram.java
new file mode 100644
index 00000000000..70cb55854c5
--- /dev/null
+++ b/flink-end-to-end-tests/flink-e2e-test-utils/src/main/java/org/apache/flink/streaming/tests/util/s3/S3UtilProgram.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.util.s3;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.transfer.KeyFilter;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * S3 utilities.
+ *
+ * <p>Usage: java -jar S3UtilProgram.jar args.
+ *
+ * <p>Note: {@code S3UtilProgram.Action.lineNumber*} actions are applicable only
+ * to valid non-compressed CSV comma separated files.
+ *
+ * <p>Program parameters:
+ * <ul>
+ *     <li>action (string, required): Action to perform, see {@link S3UtilProgram.Action}.</li>
+ *     <li>bucket (string, required): Bucket where s3 objects reside.</li>
+ *     <li>s3file (string, required for single object actions): s3 object key.</li>
+ *     <li>s3prefix (string, required for actions over objects grouped by key prefix): s3 key prefix.</li>
+ *     <li>s3filePrefix (string, optional for downloadByFullPathAndFileNamePrefix or numberOfLinesInFilesWithFullAndNamePrefix):
+ *     s3 file name prefix w/o directory to filter files by name.</li>
+ *     <li>localFile (string, required for single file actions): local file path.</li>
+ *     <li>localFolder (string, required for actions over folders): local folder path.</li>
+ *     <li>parallelism (int, default 10): parallelism for parallelizable actions
+ *     (e.g. {@code numberOfLinesInFilesWithFullAndNamePrefix}).</li>
+ * </ul>
+ */
+class S3UtilProgram {
+	enum Action {
+		listByFullPathPrefix,
+		downloadFile,
+		downloadByFullPathAndFileNamePrefix,
+		deleteFile,
+		deleteByFullPathPrefix,
+		numberOfLinesInFile,
+		numberOfLinesInFilesWithFullAndNamePrefix
+	}
+
+	private static final Map<Action, Consumer<ParameterTool>> handlers;
+	static {
+		Map<Action, Consumer<ParameterTool>> handlersMutable = new HashMap<>();
+		handlersMutable.put(Action.listByFullPathPrefix, S3UtilProgram::listByFullPathPrefix);
+		handlersMutable.put(Action.downloadFile, S3UtilProgram::downloadFile);
+		handlersMutable.put(Action.downloadByFullPathAndFileNamePrefix,
+			S3UtilProgram::downloadByFullPathAndFileNamePrefix);
+		handlersMutable.put(Action.deleteFile, S3UtilProgram::deleteFile);
+		handlersMutable.put(Action.deleteByFullPathPrefix, S3UtilProgram::deleteByFullPathPrefix);
+		handlersMutable.put(Action.numberOfLinesInFile, S3UtilProgram::numberOfLinesInFile);
+		handlersMutable.put(Action.numberOfLinesInFilesWithFullAndNamePrefix,
+			S3UtilProgram::numberOfLinesInFilesWithFullAndNamePrefix);
+		handlers = Collections.unmodifiableMap(handlersMutable);
+	}
+
+	private static final String countQuery = "select count(*) from s3object";
+
+	public static void main(String[] args) {
+		final ParameterTool params = ParameterTool.fromArgs(args);
+		final Action action = Action.valueOf(params.getRequired("action"));
+		handlers.get(action).accept(params);
+	}
+
+	private static void listByFullPathPrefix(ParameterTool params) {
+		final String bucket = params.getRequired("bucket");
+		final String s3prefix = params.getRequired("s3prefix");
+		listByFullPathPrefix(bucket, s3prefix).forEach(System.out::println);
+	}
+
+	private static List<String> listByFullPathPrefix(final String bucket, final String s3prefix) {
+		return AmazonS3ClientBuilder.defaultClient().listObjects(bucket, s3prefix).getObjectSummaries()
+			.stream().map(S3ObjectSummary::getKey).collect(Collectors.toList());
+	}
+
+	private static void downloadFile(ParameterTool params) {
+		final String bucket = params.getRequired("bucket");
+		final String s3file = params.getRequired("s3file");
+		final String localFile = params.getRequired("localFile");
+		TransferManager tx = TransferManagerBuilder.defaultTransferManager();
+		try {
+			tx.download(bucket, s3file, new File(localFile)).waitForCompletion();
+		} catch (InterruptedException e) {
+			System.out.println("Transfer interrupted");
+		} finally {
+			tx.shutdownNow();
+		}
+	}
+
+	private static void downloadByFullPathAndFileNamePrefix(ParameterTool params) {
+		final String bucket = params.getRequired("bucket");
+		final String s3prefix = params.getRequired("s3prefix");
+		final String localFolder = params.getRequired("localFolder");
+		final String s3filePrefix = params.get("s3filePrefix", "");
+		TransferManager tx = TransferManagerBuilder.defaultTransferManager();
+		Predicate<String> keyPredicate = getKeyFilterByFileNamePrefix(s3filePrefix);
+		KeyFilter keyFilter = s3filePrefix.isEmpty() ? KeyFilter.INCLUDE_ALL :
+			objectSummary -> keyPredicate.test(objectSummary.getKey());
+		try {
+			tx.downloadDirectory(bucket, s3prefix, new File(localFolder), keyFilter).waitForCompletion();
+		} catch (InterruptedException e) {
+			System.out.println("Transfer interrupted");
+		} finally {
+			tx.shutdownNow();
+		}
+	}
+
+	private static Predicate<String> getKeyFilterByFileNamePrefix(String s3filePrefix) {
+		if (s3filePrefix.isEmpty()) {
+			return key -> true;
+		} else {
+			return key -> {
+				String[] parts = key.split("/");
+				String fileName = parts[parts.length - 1];
+				return fileName.startsWith(s3filePrefix);
+			};
+		}
+	}
+
+	private static void deleteFile(ParameterTool params) {
+		final String bucket = params.getRequired("bucket");
+		final String s3file = params.getRequired("s3file");
+		AmazonS3ClientBuilder.defaultClient().deleteObject(bucket, s3file);
+	}
+
+	private static void deleteByFullPathPrefix(ParameterTool params) {
+		final String bucket = params.getRequired("bucket");
+		final String s3prefix = params.getRequired("s3prefix");
+		String[] keys = listByFullPathPrefix(bucket, s3prefix).toArray(new String[] {});
+		if (keys.length > 0) {
+			DeleteObjectsRequest request = new DeleteObjectsRequest(bucket).withKeys(keys);
+			AmazonS3ClientBuilder.defaultClient().deleteObjects(request);
+		}
+	}
+
+	private static void numberOfLinesInFile(ParameterTool params) {
+		final String bucket = params.getRequired("bucket");
+		final String s3file = params.getRequired("s3file");
+		AmazonS3 s3client = AmazonS3ClientBuilder.defaultClient();
+		System.out.print(S3QueryUtil.queryFile(s3client, bucket, s3file, countQuery));
+		s3client.shutdown();
+	}
+
+	private static void numberOfLinesInFilesWithFullAndNamePrefix(ParameterTool params) {
+		final String bucket = params.getRequired("bucket");
+		final String s3prefix = params.getRequired("s3prefix");
+		final String s3filePrefix = params.get("s3filePrefix", "");
+		int parallelism = params.getInt("parallelism", 10);
+
+		List<String> files = listByFullPathPrefix(bucket, s3prefix);
+
+		ExecutorService executor = Executors.newFixedThreadPool(parallelism);
+		AmazonS3 s3client = AmazonS3ClientBuilder.defaultClient();
+		List<CompletableFuture<Integer>> requests =
+			submitLineCountingRequestsForFilesAsync(executor, s3client, bucket, files, s3filePrefix);
+		int count = waitAndComputeTotalLineCountResult(requests);
+
+		executor.shutdownNow();
+		s3client.shutdown();
+		System.out.print(count);
+	}
+
+	private static List<CompletableFuture<Integer>> submitLineCountingRequestsForFilesAsync(
+			ExecutorService executor, AmazonS3 s3client, String bucket, List<String> files, String s3filePrefix) {
+		List<CompletableFuture<Integer>> requests = new ArrayList<>();
+		Predicate<String> keyPredicate = getKeyFilterByFileNamePrefix(s3filePrefix);
+		files.forEach(file -> {
+			if (keyPredicate.test(file)) {
+				CompletableFuture<Integer> result = new CompletableFuture<>();
+				executor.execute(() ->
+					result.complete(Integer.parseInt(S3QueryUtil.queryFile(s3client, bucket, file, countQuery))));
+				requests.add(result);
+			}
+		});
+		return requests;
+	}
+
+	private static int waitAndComputeTotalLineCountResult(List<CompletableFuture<Integer>> requests) {
+		int count = 0;
+		for (CompletableFuture<Integer> result : requests) {
+			try {
+				count += result.get();
+			} catch (Throwable e) {
+				System.out.println("Failed count lines");
+				e.printStackTrace();
+			}
+		}
+		return count;
+	}
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 6b31881f684..e07cf22fa3d 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -55,6 +55,7 @@ under the License.
 		<module>flink-sql-client-test</module>
 		<module>flink-streaming-file-sink-test</module>
 		<module>flink-state-evolution-test</module>
+		<module>flink-e2e-test-utils</module>
 	</modules>
 
 	<build>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index bbb0e9e5885..65a953ef884 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -96,6 +96,7 @@ run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_b
 run_test "Streaming SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh" "skip_check_exceptions"
 run_test "Streaming bucketing end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_bucketing.sh" "skip_check_exceptions"
 run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh" "skip_check_exceptions"
+run_test "Streaming File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh s3" "skip_check_exceptions"
 run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"
 
 run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 file false false" "skip_check_exceptions"
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index e815a85afdd..9b146dc0b6d 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -457,43 +457,6 @@ function check_result_hash {
   fi
 }
 
-function s3_put {
-  local_file=$1
-  bucket=$2
-  s3_file=$3
-  resource="/${bucket}/${s3_file}"
-  contentType="application/octet-stream"
-  dateValue=`date -R`
-  stringToSign="PUT\n\n${contentType}\n${dateValue}\n${resource}"
-  s3Key=$ARTIFACTS_AWS_ACCESS_KEY
-  s3Secret=$ARTIFACTS_AWS_SECRET_KEY
-  signature=`echo -en ${stringToSign} | openssl sha1 -hmac ${s3Secret} -binary | base64`
-  curl -X PUT -T "${local_file}" \
-    -H "Host: ${bucket}.s3.amazonaws.com" \
-    -H "Date: ${dateValue}" \
-    -H "Content-Type: ${contentType}" \
-    -H "Authorization: AWS ${s3Key}:${signature}" \
-    https://${bucket}.s3.amazonaws.com/${s3_file}
-}
-
-function s3_delete {
-  bucket=$1
-  s3_file=$2
-  resource="/${bucket}/${s3_file}"
-  contentType="application/octet-stream"
-  dateValue=`date -R`
-  stringToSign="DELETE\n\n${contentType}\n${dateValue}\n${resource}"
-  s3Key=$ARTIFACTS_AWS_ACCESS_KEY
-  s3Secret=$ARTIFACTS_AWS_SECRET_KEY
-  signature=`echo -en ${stringToSign} | openssl sha1 -hmac ${s3Secret} -binary | base64`
-  curl -X DELETE \
-    -H "Host: ${bucket}.s3.amazonaws.com" \
-    -H "Date: ${dateValue}" \
-    -H "Content-Type: ${contentType}" \
-    -H "Authorization: AWS ${s3Key}:${signature}" \
-    https://${bucket}.s3.amazonaws.com/${s3_file}
-}
-
 # This function starts the given number of task managers and monitors their processes.
 # If a task manager process goes away a replacement is started.
 function tm_watchdog {
@@ -660,3 +623,20 @@ function expect_in_taskmanager_logs {
         fi
     done
 }
+
+function wait_for_restart_to_complete {
+    local base_num_restarts=$1
+    local jobid=$2
+
+    local current_num_restarts=${base_num_restarts}
+    local expected_num_restarts=$((current_num_restarts + 1))
+
+    echo "Waiting for restart to happen"
+    while ! [[ ${current_num_restarts} -eq ${expected_num_restarts} ]]; do
+        sleep 5
+        current_num_restarts=$(get_job_metric ${jobid} "fullRestarts")
+        if [[ -z ${current_num_restarts} ]]; then
+            current_num_restarts=${base_num_restarts}
+        fi
+    done
+}
diff --git a/flink-end-to-end-tests/test-scripts/common_s3.sh b/flink-end-to-end-tests/test-scripts/common_s3.sh
new file mode 100644
index 00000000000..5c16bb75bea
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/common_s3.sh
@@ -0,0 +1,240 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+if [[ -z "$ARTIFACTS_AWS_BUCKET" ]]; then
+    echo "Did not find AWS environment variables, NOT running the e2e test."
+    exit 0
+else
+    echo "Found AWS bucket $ARTIFACTS_AWS_BUCKET, running the e2e test."
+fi
+
+if [[ -z "$ARTIFACTS_AWS_ACCESS_KEY" ]]; then
+    echo "Did not find AWS environment variables, NOT running the e2e test."
+    exit 0
+else
+    echo "Found AWS access key $ARTIFACTS_AWS_ACCESS_KEY, running the e2e test."
+fi
+
+if [[ -z "$ARTIFACTS_AWS_SECRET_KEY" ]]; then
+    echo "Did not find AWS environment variables, NOT running the e2e test."
+    exit 0
+else
+    echo "Found AWS secret key $ARTIFACTS_AWS_SECRET_KEY, running the e2e test."
+fi
+
+AWS_REGION="${AWS_REGION:-eu-west-1}"
+AWS_ACCESS_KEY=$ARTIFACTS_AWS_ACCESS_KEY
+AWS_SECRET_KEY=$ARTIFACTS_AWS_SECRET_KEY
+
+s3util="java -jar ${END_TO_END_DIR}/flink-e2e-test-utils/target/S3UtilProgram.jar"
+
+###################################
+# Setup Flink s3 access.
+#
+# Globals:
+#   FLINK_DIR
+#   ARTIFACTS_AWS_ACCESS_KEY
+#   ARTIFACTS_AWS_SECRET_KEY
+# Arguments:
+#   None
+# Returns:
+#   None
+###################################
+function s3_setup {
+  # make sure we delete the file at the end
+  function s3_cleanup {
+    rm $FLINK_DIR/lib/flink-s3-fs*.jar
+
+    # remove any leftover settings
+    sed -i -e 's/s3.access-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
+    sed -i -e 's/s3.secret-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
+  }
+  trap s3_cleanup EXIT
+
+  cp $FLINK_DIR/opt/flink-s3-fs-hadoop-*.jar $FLINK_DIR/lib/
+  echo "s3.access-key: $ARTIFACTS_AWS_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
+  echo "s3.secret-key: $ARTIFACTS_AWS_SECRET_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
+}
+
+s3_setup
+
+###################################
+# List s3 objects by full path prefix.
+#
+# Globals:
+#   ARTIFACTS_AWS_BUCKET
+# Arguments:
+#   $1 - s3 full path key prefix
+# Returns:
+#   List of s3 object keys, separated by newline
+###################################
+function s3_list {
+  AWS_REGION=$AWS_REGION \
+  ${s3util} --action listByFullPathPrefix --s3prefix "$1" --bucket $ARTIFACTS_AWS_BUCKET
+}
+
+###################################
+# Download s3 object.
+#
+# Globals:
+#   ARTIFACTS_AWS_BUCKET
+# Arguments:
+#   $1 - local path to save file
+#   $2 - s3 object key
+# Returns:
+#   None
+###################################
+function s3_get {
+  AWS_REGION=$AWS_REGION \
+  ${s3util} --action downloadFile --localFile "$1" --s3file "$2" --bucket $ARTIFACTS_AWS_BUCKET
+}
+
+###################################
+# Download s3 objects to folder by full path prefix.
+#
+# Globals:
+#   ARTIFACTS_AWS_BUCKET
+# Arguments:
+#   $1 - local path to save folder with files
+#   $2 - s3 key full path prefix
+#   $3 - s3 file name prefix w/o directory to filter files by name (optional)
+# Returns:
+#   None
+###################################
+function s3_get_by_full_path_and_filename_prefix {
+  local file_prefix="${3-}"
+  AWS_REGION=$AWS_REGION \
+  ${s3util} --action downloadByFullPathAndFileNamePrefix \
+    --localFolder "$1" --s3prefix "$2" --s3filePrefix "${file_prefix}" --bucket $ARTIFACTS_AWS_BUCKET
+}
+
+###################################
+# Upload file to s3 object.
+#
+# Globals:
+#   ARTIFACTS_AWS_BUCKET
+# Arguments:
+#   $1 - local file to upload
+#   $2 - s3 bucket
+#   $3 - s3 object key
+# Returns:
+#   None
+###################################
+function s3_put {
+  local_file=$1
+  bucket=$2
+  s3_file=$3
+  resource="/${bucket}/${s3_file}"
+  contentType="application/octet-stream"
+  dateValue=`date -R`
+  stringToSign="PUT\n\n${contentType}\n${dateValue}\n${resource}"
+  s3Key=$ARTIFACTS_AWS_ACCESS_KEY
+  s3Secret=$ARTIFACTS_AWS_SECRET_KEY
+  signature=`echo -en ${stringToSign} | openssl sha1 -hmac ${s3Secret} -binary | base64`
+  curl -X PUT -T "${local_file}" \
+    -H "Host: ${bucket}.s3.amazonaws.com" \
+    -H "Date: ${dateValue}" \
+    -H "Content-Type: ${contentType}" \
+    -H "Authorization: AWS ${s3Key}:${signature}" \
+    https://${bucket}.s3.amazonaws.com/${s3_file}
+}
+
+###################################
+# Delete s3 object.
+#
+# Globals:
+#   None
+# Arguments:
+#   $1 - s3 bucket
+#   $2 - s3 object key
+#   $3 - (optional) s3 host suffix
+# Returns:
+#   None
+###################################
+function s3_delete {
+  bucket=$1
+  s3_file=$2
+  resource="/${bucket}/${s3_file}"
+  contentType="application/octet-stream"
+  dateValue=`date -R`
+  stringToSign="DELETE\n\n${contentType}\n${dateValue}\n${resource}"
+  s3Key=$ARTIFACTS_AWS_ACCESS_KEY
+  s3Secret=$ARTIFACTS_AWS_SECRET_KEY
+  signature=`echo -en ${stringToSign} | openssl sha1 -hmac ${s3Secret} -binary | base64`
+  curl -X DELETE \
+    -H "Host: ${bucket}.s3.amazonaws.com" \
+    -H "Date: ${dateValue}" \
+    -H "Content-Type: ${contentType}" \
+    -H "Authorization: AWS ${s3Key}:${signature}" \
+    https://${bucket}.s3.amazonaws.com/${s3_file}
+}
+
+###################################
+# Delete s3 objects by full path prefix.
+#
+# Globals:
+#   ARTIFACTS_AWS_BUCKET
+# Arguments:
+#   $1 - s3 key full path prefix
+# Returns:
+#   None
+###################################
+function s3_delete_by_full_path_prefix {
+  AWS_REGION=$AWS_REGION \
+  ${s3util} --action deleteByFullPathPrefix --s3prefix "$1" --bucket $ARTIFACTS_AWS_BUCKET
+}
+
+###################################
+# Count number of lines in a file of s3 object.
+# The lines has to be simple to comply with CSV format
+# because SQL is used to query the s3 object.
+#
+# Globals:
+#   ARTIFACTS_AWS_BUCKET
+# Arguments:
+#   $1 - s3 file object key
+#   $2 - s3 bucket
+# Returns:
+#   None
+###################################
+function s3_get_number_of_lines_in_file {
+  AWS_REGION=$AWS_REGION \
+  ${s3util} --action numberOfLinesInFile --s3file "$1" --bucket $ARTIFACTS_AWS_BUCKET
+}
+
+###################################
+# Count number of lines in files of s3 objects filtered by prefix.
+# The lines has to be simple to comply with CSV format
+# because SQL is used to query the s3 objects.
+#
+# Globals:
+#   ARTIFACTS_AWS_BUCKET
+# Arguments:
+#   $1 - s3 key prefix
+#   $2 - s3 bucket
+#   $3 - s3 file name prefix w/o directory to filter files by name (optional)
+# Returns:
+#   None
+###################################
+function s3_get_number_of_lines_by_prefix {
+  local file_prefix="${3-}"
+  AWS_REGION=$AWS_REGION \
+  ${s3util} --action numberOfLinesInFilesWithFullAndNamePrefix \
+    --s3prefix "$1" --s3filePrefix "${file_prefix}" --bucket $ARTIFACTS_AWS_BUCKET
+}
diff --git a/flink-end-to-end-tests/test-scripts/test_shaded_hadoop_s3a.sh b/flink-end-to-end-tests/test-scripts/test_shaded_hadoop_s3a.sh
index e5ac5bcecdc..3d838675852 100755
--- a/flink-end-to-end-tests/test-scripts/test_shaded_hadoop_s3a.sh
+++ b/flink-end-to-end-tests/test-scripts/test_shaded_hadoop_s3a.sh
@@ -19,33 +19,18 @@
 
 # Tests for our shaded/bundled Hadoop S3A file system.
 
-if [[ -z "$ARTIFACTS_AWS_BUCKET" ]]; then
-    echo "Did not find AWS environment variables, NOT running Shaded Hadoop S3A e2e tests."
-    exit 0
-else
-    echo "Found AWS bucket $ARTIFACTS_AWS_BUCKET, running Shaded Hadoop S3A e2e tests."
-fi
-
 source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/common_s3.sh
 
 s3_put $TEST_INFRA_DIR/test-data/words $ARTIFACTS_AWS_BUCKET flink-end-to-end-test-shaded-s3a
 # make sure we delete the file at the end
-function s3_cleanup {
+function shaded_s3a_cleanup {
   s3_delete $ARTIFACTS_AWS_BUCKET flink-end-to-end-test-shaded-s3a
-  rm $FLINK_DIR/lib/flink-s3-fs*.jar
-
-  # remove any leftover settings
-  sed -i -e 's/s3.access-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
-  sed -i -e 's/s3.secret-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
 }
-trap s3_cleanup EXIT
-
-cp $FLINK_DIR/opt/flink-s3-fs-hadoop-*.jar $FLINK_DIR/lib/
-echo "s3.access-key: $ARTIFACTS_AWS_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
-echo "s3.secret-key: $ARTIFACTS_AWS_SECRET_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
+trap shaded_s3a_cleanup EXIT
 
 start_cluster
 
 $FLINK_DIR/bin/flink run -p 1 $FLINK_DIR/examples/batch/WordCount.jar --input s3:/$resource --output $TEST_DATA_DIR/out/wc_out
 
-check_result_hash "WordCountWithShadedS3A" $TEST_DATA_DIR/out/wc_out "72a690412be8928ba239c2da967328a5"
\ No newline at end of file
+check_result_hash "WordCountWithShadedS3A" $TEST_DATA_DIR/out/wc_out "72a690412be8928ba239c2da967328a5"
diff --git a/flink-end-to-end-tests/test-scripts/test_shaded_presto_s3.sh b/flink-end-to-end-tests/test-scripts/test_shaded_presto_s3.sh
index 4092805dc27..bd33b410dfd 100755
--- a/flink-end-to-end-tests/test-scripts/test_shaded_presto_s3.sh
+++ b/flink-end-to-end-tests/test-scripts/test_shaded_presto_s3.sh
@@ -19,33 +19,18 @@
 
 # Tests for our shaded/bundled Hadoop S3A file system.
 
-if [[ -z "$ARTIFACTS_AWS_BUCKET" ]]; then
-    echo "Did not find AWS environment variables, NOT running Shaded Presto S3 e2e tests."
-    exit 0
-else
-    echo "Found AWS bucket $ARTIFACTS_AWS_BUCKET, running Shaded Presto S3 e2e tests."
-fi
-
 source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/common_s3.sh
 
 s3_put $TEST_INFRA_DIR/test-data/words $ARTIFACTS_AWS_BUCKET flink-end-to-end-test-shaded-presto-s3
 # make sure we delete the file at the end
-function s3_cleanup {
+function shaded_presto_s3_cleanup {
   s3_delete $ARTIFACTS_AWS_BUCKET flink-end-to-end-test-shaded-presto-s3
-  rm $FLINK_DIR/lib/flink-s3-fs*.jar
 }
-trap s3_cleanup EXIT
-
-cp $FLINK_DIR/opt/flink-s3-fs-presto-*.jar $FLINK_DIR/lib/
-echo "s3.access-key: $ARTIFACTS_AWS_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
-echo "s3.secret-key: $ARTIFACTS_AWS_SECRET_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
+trap shaded_presto_s3_cleanup EXIT
 
 start_cluster
 
 $FLINK_DIR/bin/flink run -p 1 $FLINK_DIR/examples/batch/WordCount.jar --input s3:/$resource --output $TEST_DATA_DIR/out/wc_out
 
 check_result_hash "WordCountWithShadedPrestoS3" $TEST_DATA_DIR/out/wc_out "72a690412be8928ba239c2da967328a5"
-
-# remove any leftover settings
-sed -i -e 's/s3.access-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
-sed -i -e 's/s3.secret-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh b/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
index 17389ad6f4e..50f5afc1312 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
@@ -17,27 +17,37 @@
 # limitations under the License.
 ################################################################################
 
-source "$(dirname "$0")"/common.sh
-
-TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-streaming-file-sink-test/target/StreamingFileSinkProgram.jar"
+OUT_TYPE="${1:-local}" # other type: s3
 
-OUTPUT_PATH="$TEST_DATA_DIR/out"
-
-function wait_for_restart {
-    local base_num_restarts=$1
-
-    local current_num_restarts=${base_num_restarts}
-    local expected_num_restarts=$((current_num_restarts + 1))
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/common_s3.sh
+
+OUT=out
+OUTPUT_PATH="$TEST_DATA_DIR/$OUT"
+S3_OUTPUT_PATH="s3://$ARTIFACTS_AWS_BUCKET/$OUT"
+
+mkdir -p $OUTPUT_PATH
+
+if [ "${OUT_TYPE}" == "local" ]; then
+  echo "Use local output"
+  JOB_OUTPUT_PATH=${OUTPUT_PATH}
+elif [ "${OUT_TYPE}" == "s3" ]; then
+  echo "Use s3 output"
+  JOB_OUTPUT_PATH=${S3_OUTPUT_PATH}
+else
+  echo "Unknown output type: ${OUT_TYPE}"
+  exit 1
+fi
 
-    echo "Waiting for restart to happen"
-    while ! [[ ${current_num_restarts} -eq ${expected_num_restarts} ]]; do
-        sleep 5
-        current_num_restarts=$(get_job_metric ${JOB_ID} "fullRestarts")
-        if [[ -z ${current_num_restarts} ]]; then
-            current_num_restarts=${base_num_restarts}
-        fi
-    done
+# make sure we delete the file at the end
+function out_cleanup {
+  s3_delete_by_full_path_prefix $OUT
 }
+if [ "${OUT_TYPE}" == "s3" ]; then
+  trap out_cleanup EXIT
+fi
+
+TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-streaming-file-sink-test/target/StreamingFileSinkProgram.jar"
 
 ###################################
 # Get all lines in part files and sort them numerically.
@@ -47,10 +57,32 @@ function wait_for_restart {
 # Arguments:
 #   None
 # Returns:
-#   None
+#   sorted content of part files
 ###################################
 function get_complete_result {
-    find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
+  if [ "${OUT_TYPE}" == "s3" ]; then
+    rm -rf $OUTPUT_PATH; mkdir -p $OUTPUT_PATH
+    s3_get_by_full_path_and_filename_prefix ${TEST_DATA_DIR} "${OUT}" "part-"
+  fi
+  find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
+}
+
+###################################
+# Get total number of lines in part files.
+#
+# Globals:
+#   OUT
+# Arguments:
+#   None
+# Returns:
+#   line number in part files
+###################################
+function get_total_number_of_valid_lines {
+  if [ "${OUT_TYPE}" == "local" ]; then
+    get_complete_result | wc -l | tr -d '[:space:]'
+  elif [ "${OUT_TYPE}" == "s3" ]; then
+    s3_get_number_of_lines_by_prefix "${OUT}" "part-"
+  fi
 }
 
 ###################################
@@ -83,7 +115,7 @@ function wait_for_complete_result {
         sleep ${polling_interval}
         ((seconds_elapsed += ${polling_interval}))
 
-        number_of_values=$(get_complete_result | wc -l | tr -d '[:space:]')
+        number_of_values=$(get_total_number_of_valid_lines)
         if [[ ${previous_number_of_values} -ne ${number_of_values} ]]; then
             echo "Number of produced values ${number_of_values}/${expected_number_of_values}"
             previous_number_of_values=${number_of_values}
@@ -98,7 +130,7 @@ start_cluster
 "${FLINK_DIR}/bin/taskmanager.sh" start
 
 echo "Submitting job."
-CLIENT_OUTPUT=$("$FLINK_DIR/bin/flink" run -d "${TEST_PROGRAM_JAR}" --outputPath "${OUTPUT_PATH}")
+CLIENT_OUTPUT=$("$FLINK_DIR/bin/flink" run -d "${TEST_PROGRAM_JAR}" --outputPath "${JOB_OUTPUT_PATH}")
 JOB_ID=$(echo "${CLIENT_OUTPUT}" | grep "Job has been submitted with JobID" | sed 's/.* //g')
 
 if [[ -z $JOB_ID ]]; then
@@ -117,7 +149,7 @@ kill_random_taskmanager
 echo "Starting TM"
 "$FLINK_DIR/bin/taskmanager.sh" start
 
-wait_for_restart 0
+wait_for_restart_to_complete 0 ${JOB_ID}
 
 echo "Killing 2 TMs"
 kill_random_taskmanager
@@ -127,7 +159,7 @@ echo "Starting 2 TMs"
 "$FLINK_DIR/bin/taskmanager.sh" start
 "$FLINK_DIR/bin/taskmanager.sh" start
 
-wait_for_restart 1
+wait_for_restart_to_complete 1 ${JOB_ID}
 
 echo "Waiting until all values have been produced"
 wait_for_complete_result 60000 300


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services