You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/19 08:31:23 UTC

[GitHub] StephanEwen closed pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables.

StephanEwen closed pull request #7098: [FLINK-10869] [build] Update S3 tests to reference new access key environment variables.
URL: https://github.com/apache/flink/pull/7098
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.travis.yml b/.travis.yml
index e2291f64a77..a7b1eebb492 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -41,10 +41,6 @@ env:
   global:
     # Global variable to avoid hanging travis builds when downloading cache archives.
     - MALLOC_ARENA_MAX=2
-    # Build artifacts like logs (variables for apache/flink repo)
-    - secure: "gL3QRn6/XyVK+Em9RmVqpM6nbTwlhjK4/JiRYZGGCkBgTq4ZnG+Eq2qKAO22TAsqRSi7g7WAoAhUulPt0SJqH7hjMe0LetbO0izbVXDefwf2PJlsNgBbuFG6604++VUaUEyfPYYw9ADjV59LWG7+B/fjbRsevqRBZ30b1gv/tQQ="
-    - secure: "eM9r8IglvnUKctxz/ga6hwGnCpdOvGyYdGj0H/UiNDEx3Lq1A6yp3gChEIXGJqRUXDI5TaIuidunUGY7KHml8urm8eG2Yk2ttxXehZqLpEaOU2jdNJCdLX8tlVfh14T9bxG5AYHQEV3qJUqDFtfXD3whvzuinrm1oEIA3qUxiA8="
-    - secure: "EQYDWgJM5ANJ/sAFwmSEwSTOe9CDN/ENyQAr5/ntM67XanhTZj2Amgt9LthCRUU4EEPl/OFUTwNHMpv/+wa3q7dwVFldSIg5wyCndzJSATPyPBVjYgsXIQZVIjsq4TwTyrTteT55V6Oz2+l27Fvung2FPuN83ovswsJePFzMBxI="
     - DOCKER_COMPOSE_VERSION=1.22.0
 
 before_script:
diff --git a/flink-end-to-end-tests/test-scripts/common_s3.sh b/flink-end-to-end-tests/test-scripts/common_s3.sh
index 5c16bb75bea..1f3b2ee2531 100644
--- a/flink-end-to-end-tests/test-scripts/common_s3.sh
+++ b/flink-end-to-end-tests/test-scripts/common_s3.sh
@@ -17,30 +17,30 @@
 # limitations under the License.
 ################################################################################
 
-if [[ -z "$ARTIFACTS_AWS_BUCKET" ]]; then
+if [[ -z "$IT_CASE_S3_BUCKET" ]]; then
     echo "Did not find AWS environment variables, NOT running the e2e test."
     exit 0
 else
-    echo "Found AWS bucket $ARTIFACTS_AWS_BUCKET, running the e2e test."
+    echo "Found AWS bucket $IT_CASE_S3_BUCKET, running the e2e test."
 fi
 
-if [[ -z "$ARTIFACTS_AWS_ACCESS_KEY" ]]; then
+if [[ -z "$IT_CASE_S3_ACCESS_KEY" ]]; then
     echo "Did not find AWS environment variables, NOT running the e2e test."
     exit 0
 else
-    echo "Found AWS access key $ARTIFACTS_AWS_ACCESS_KEY, running the e2e test."
+    echo "Found AWS access key, running the e2e test."
 fi
 
-if [[ -z "$ARTIFACTS_AWS_SECRET_KEY" ]]; then
+if [[ -z "$IT_CASE_S3_SECRET_KEY" ]]; then
     echo "Did not find AWS environment variables, NOT running the e2e test."
     exit 0
 else
-    echo "Found AWS secret key $ARTIFACTS_AWS_SECRET_KEY, running the e2e test."
+    echo "Found AWS secret key, running the e2e test."
 fi
 
-AWS_REGION="${AWS_REGION:-eu-west-1}"
-AWS_ACCESS_KEY=$ARTIFACTS_AWS_ACCESS_KEY
-AWS_SECRET_KEY=$ARTIFACTS_AWS_SECRET_KEY
+AWS_REGION="${AWS_REGION:-us-east-1}"
+AWS_ACCESS_KEY=$IT_CASE_S3_ACCESS_KEY
+AWS_SECRET_KEY=$IT_CASE_S3_SECRET_KEY
 
 s3util="java -jar ${END_TO_END_DIR}/flink-e2e-test-utils/target/S3UtilProgram.jar"
 
@@ -49,8 +49,8 @@ s3util="java -jar ${END_TO_END_DIR}/flink-e2e-test-utils/target/S3UtilProgram.ja
 #
 # Globals:
 #   FLINK_DIR
-#   ARTIFACTS_AWS_ACCESS_KEY
-#   ARTIFACTS_AWS_SECRET_KEY
+#   IT_CASE_S3_ACCESS_KEY
+#   IT_CASE_S3_SECRET_KEY
 # Arguments:
 #   None
 # Returns:
@@ -68,8 +68,8 @@ function s3_setup {
   trap s3_cleanup EXIT
 
   cp $FLINK_DIR/opt/flink-s3-fs-hadoop-*.jar $FLINK_DIR/lib/
-  echo "s3.access-key: $ARTIFACTS_AWS_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
-  echo "s3.secret-key: $ARTIFACTS_AWS_SECRET_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
+  echo "s3.access-key: $IT_CASE_S3_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
+  echo "s3.secret-key: $IT_CASE_S3_SECRET_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
 }
 
 s3_setup
@@ -78,7 +78,7 @@ s3_setup
 # List s3 objects by full path prefix.
 #
 # Globals:
-#   ARTIFACTS_AWS_BUCKET
+#   IT_CASE_S3_BUCKET
 # Arguments:
 #   $1 - s3 full path key prefix
 # Returns:
@@ -86,14 +86,14 @@ s3_setup
 ###################################
 function s3_list {
   AWS_REGION=$AWS_REGION \
-  ${s3util} --action listByFullPathPrefix --s3prefix "$1" --bucket $ARTIFACTS_AWS_BUCKET
+  ${s3util} --action listByFullPathPrefix --s3prefix "$1" --bucket $IT_CASE_S3_BUCKET
 }
 
 ###################################
 # Download s3 object.
 #
 # Globals:
-#   ARTIFACTS_AWS_BUCKET
+#   IT_CASE_S3_BUCKET
 # Arguments:
 #   $1 - local path to save file
 #   $2 - s3 object key
@@ -102,14 +102,14 @@ function s3_list {
 ###################################
 function s3_get {
   AWS_REGION=$AWS_REGION \
-  ${s3util} --action downloadFile --localFile "$1" --s3file "$2" --bucket $ARTIFACTS_AWS_BUCKET
+  ${s3util} --action downloadFile --localFile "$1" --s3file "$2" --bucket $IT_CASE_S3_BUCKET
 }
 
 ###################################
 # Download s3 objects to folder by full path prefix.
 #
 # Globals:
-#   ARTIFACTS_AWS_BUCKET
+#   IT_CASE_S3_BUCKET
 # Arguments:
 #   $1 - local path to save folder with files
 #   $2 - s3 key full path prefix
@@ -121,14 +121,14 @@ function s3_get_by_full_path_and_filename_prefix {
   local file_prefix="${3-}"
   AWS_REGION=$AWS_REGION \
   ${s3util} --action downloadByFullPathAndFileNamePrefix \
-    --localFolder "$1" --s3prefix "$2" --s3filePrefix "${file_prefix}" --bucket $ARTIFACTS_AWS_BUCKET
+    --localFolder "$1" --s3prefix "$2" --s3filePrefix "${file_prefix}" --bucket $IT_CASE_S3_BUCKET
 }
 
 ###################################
 # Upload file to s3 object.
 #
 # Globals:
-#   ARTIFACTS_AWS_BUCKET
+#   IT_CASE_S3_BUCKET
 # Arguments:
 #   $1 - local file to upload
 #   $2 - s3 bucket
@@ -144,8 +144,8 @@ function s3_put {
   contentType="application/octet-stream"
   dateValue=`date -R`
   stringToSign="PUT\n\n${contentType}\n${dateValue}\n${resource}"
-  s3Key=$ARTIFACTS_AWS_ACCESS_KEY
-  s3Secret=$ARTIFACTS_AWS_SECRET_KEY
+  s3Key=$IT_CASE_S3_ACCESS_KEY
+  s3Secret=$IT_CASE_S3_SECRET_KEY
   signature=`echo -en ${stringToSign} | openssl sha1 -hmac ${s3Secret} -binary | base64`
   curl -X PUT -T "${local_file}" \
     -H "Host: ${bucket}.s3.amazonaws.com" \
@@ -174,8 +174,8 @@ function s3_delete {
   contentType="application/octet-stream"
   dateValue=`date -R`
   stringToSign="DELETE\n\n${contentType}\n${dateValue}\n${resource}"
-  s3Key=$ARTIFACTS_AWS_ACCESS_KEY
-  s3Secret=$ARTIFACTS_AWS_SECRET_KEY
+  s3Key=$IT_CASE_S3_ACCESS_KEY
+  s3Secret=$IT_CASE_S3_SECRET_KEY
   signature=`echo -en ${stringToSign} | openssl sha1 -hmac ${s3Secret} -binary | base64`
   curl -X DELETE \
     -H "Host: ${bucket}.s3.amazonaws.com" \
@@ -189,7 +189,7 @@ function s3_delete {
 # Delete s3 objects by full path prefix.
 #
 # Globals:
-#   ARTIFACTS_AWS_BUCKET
+#   IT_CASE_S3_BUCKET
 # Arguments:
 #   $1 - s3 key full path prefix
 # Returns:
@@ -197,7 +197,7 @@ function s3_delete {
 ###################################
 function s3_delete_by_full_path_prefix {
   AWS_REGION=$AWS_REGION \
-  ${s3util} --action deleteByFullPathPrefix --s3prefix "$1" --bucket $ARTIFACTS_AWS_BUCKET
+  ${s3util} --action deleteByFullPathPrefix --s3prefix "$1" --bucket $IT_CASE_S3_BUCKET
 }
 
 ###################################
@@ -206,7 +206,7 @@ function s3_delete_by_full_path_prefix {
 # because SQL is used to query the s3 object.
 #
 # Globals:
-#   ARTIFACTS_AWS_BUCKET
+#   IT_CASE_S3_BUCKET
 # Arguments:
 #   $1 - s3 file object key
 #   $2 - s3 bucket
@@ -215,7 +215,7 @@ function s3_delete_by_full_path_prefix {
 ###################################
 function s3_get_number_of_lines_in_file {
   AWS_REGION=$AWS_REGION \
-  ${s3util} --action numberOfLinesInFile --s3file "$1" --bucket $ARTIFACTS_AWS_BUCKET
+  ${s3util} --action numberOfLinesInFile --s3file "$1" --bucket $IT_CASE_S3_BUCKET
 }
 
 ###################################
@@ -224,7 +224,7 @@ function s3_get_number_of_lines_in_file {
 # because SQL is used to query the s3 objects.
 #
 # Globals:
-#   ARTIFACTS_AWS_BUCKET
+#   IT_CASE_S3_BUCKET
 # Arguments:
 #   $1 - s3 key prefix
 #   $2 - s3 bucket
@@ -236,5 +236,5 @@ function s3_get_number_of_lines_by_prefix {
   local file_prefix="${3-}"
   AWS_REGION=$AWS_REGION \
   ${s3util} --action numberOfLinesInFilesWithFullAndNamePrefix \
-    --s3prefix "$1" --s3filePrefix "${file_prefix}" --bucket $ARTIFACTS_AWS_BUCKET
+    --s3prefix "$1" --s3filePrefix "${file_prefix}" --bucket $IT_CASE_S3_BUCKET
 }
diff --git a/flink-end-to-end-tests/test-scripts/test_shaded_hadoop_s3a.sh b/flink-end-to-end-tests/test-scripts/test_shaded_hadoop_s3a.sh
index 3d838675852..e6ae9929aef 100755
--- a/flink-end-to-end-tests/test-scripts/test_shaded_hadoop_s3a.sh
+++ b/flink-end-to-end-tests/test-scripts/test_shaded_hadoop_s3a.sh
@@ -22,10 +22,10 @@
 source "$(dirname "$0")"/common.sh
 source "$(dirname "$0")"/common_s3.sh
 
-s3_put $TEST_INFRA_DIR/test-data/words $ARTIFACTS_AWS_BUCKET flink-end-to-end-test-shaded-s3a
+s3_put $TEST_INFRA_DIR/test-data/words $IT_CASE_S3_BUCKET temp/flink-end-to-end-test-shaded-s3a
 # make sure we delete the file at the end
 function shaded_s3a_cleanup {
-  s3_delete $ARTIFACTS_AWS_BUCKET flink-end-to-end-test-shaded-s3a
+  s3_delete $IT_CASE_S3_BUCKET temp/flink-end-to-end-test-shaded-s3a
 }
 trap shaded_s3a_cleanup EXIT
 
diff --git a/flink-end-to-end-tests/test-scripts/test_shaded_presto_s3.sh b/flink-end-to-end-tests/test-scripts/test_shaded_presto_s3.sh
index bd33b410dfd..0421c840a3d 100755
--- a/flink-end-to-end-tests/test-scripts/test_shaded_presto_s3.sh
+++ b/flink-end-to-end-tests/test-scripts/test_shaded_presto_s3.sh
@@ -22,10 +22,10 @@
 source "$(dirname "$0")"/common.sh
 source "$(dirname "$0")"/common_s3.sh
 
-s3_put $TEST_INFRA_DIR/test-data/words $ARTIFACTS_AWS_BUCKET flink-end-to-end-test-shaded-presto-s3
+s3_put $TEST_INFRA_DIR/test-data/words $IT_CASE_S3_BUCKET temp/flink-end-to-end-test-shaded-presto-s3
 # make sure we delete the file at the end
 function shaded_presto_s3_cleanup {
-  s3_delete $ARTIFACTS_AWS_BUCKET flink-end-to-end-test-shaded-presto-s3
+  s3_delete $IT_CASE_S3_BUCKET temp/flink-end-to-end-test-shaded-presto-s3
 }
 trap shaded_presto_s3_cleanup EXIT
 
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh b/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
index 6c8d0b85435..7a469aa7951 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
@@ -26,7 +26,7 @@ set_conf_ssl "mutual"
 
 OUT=out
 OUTPUT_PATH="$TEST_DATA_DIR/$OUT"
-S3_OUTPUT_PATH="s3://$ARTIFACTS_AWS_BUCKET/$OUT"
+S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/temp/$OUT"
 
 mkdir -p $OUTPUT_PATH
 
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemBehaviorITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemBehaviorITCase.java
index c8aaaeef74d..df73cdae34c 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemBehaviorITCase.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemBehaviorITCase.java
@@ -23,9 +23,9 @@
 import org.apache.flink.core.fs.FileSystemBehaviorTestSuite;
 import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.testutils.s3.S3TestCredentials;
 
 import org.junit.AfterClass;
-import org.junit.Assume;
 import org.junit.BeforeClass;
 
 import java.io.IOException;
@@ -36,24 +36,17 @@
  */
 public class HadoopS3FileSystemBehaviorITCase extends FileSystemBehaviorTestSuite {
 
-	private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET");
-
 	private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
 
-	private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
-	private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY");
-
 	@BeforeClass
 	public static void checkCredentialsAndSetup() throws IOException {
 		// check whether credentials exist
-		Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null);
-		Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null);
-		Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null);
+		S3TestCredentials.assumeCredentialsAvailable();
 
 		// initialize configuration with valid credentials
 		final Configuration conf = new Configuration();
-		conf.setString("s3.access.key", ACCESS_KEY);
-		conf.setString("s3.secret.key", SECRET_KEY);
+		conf.setString("s3.access.key", S3TestCredentials.getS3AccessKey());
+		conf.setString("s3.secret.key", S3TestCredentials.getS3SecretKey());
 		FileSystem.initialize(conf);
 	}
 
@@ -69,7 +62,7 @@ public FileSystem getFileSystem() throws Exception {
 
 	@Override
 	public Path getBasePath() throws Exception {
-		return new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR);
+		return new Path(S3TestCredentials.getTestBucketUri() + TEST_DATA_DIR);
 	}
 
 	@Override
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
index 6dbdac511f4..2195bd0e763 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
@@ -25,10 +25,10 @@
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.testutils.s3.S3TestCredentials;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
-import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -69,10 +69,6 @@
 
 	private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
 
-	private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET");
-	private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
-	private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY");
-
 	/**
 	 * Will be updated by {@link #checkCredentialsAndSetup()} if the test is not skipped.
 	 */
@@ -81,18 +77,16 @@
 	@BeforeClass
 	public static void checkCredentialsAndSetup() throws IOException {
 		// check whether credentials exist
-		Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null);
-		Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null);
-		Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null);
+		S3TestCredentials.assumeCredentialsAvailable();
 
 		// initialize configuration with valid credentials
 		final Configuration conf = new Configuration();
-		conf.setString("s3.access.key", ACCESS_KEY);
-		conf.setString("s3.secret.key", SECRET_KEY);
+		conf.setString("s3.access.key", S3TestCredentials.getS3AccessKey());
+		conf.setString("s3.secret.key", S3TestCredentials.getS3SecretKey());
 		FileSystem.initialize(conf);
 
 		// check for uniqueness of the test directory
-		final Path directory = new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR);
+		final Path directory = new Path(S3TestCredentials.getTestBucketUri() + TEST_DATA_DIR);
 		final FileSystem fs = directory.getFileSystem();
 
 		// directory must not yet exist
@@ -108,11 +102,11 @@ public static void cleanUp() throws IOException, InterruptedException {
 				final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
 				// initialize configuration with valid credentials
 				final Configuration conf = new Configuration();
-				conf.setString("s3.access.key", ACCESS_KEY);
-				conf.setString("s3.secret.key", SECRET_KEY);
+				conf.setString("s3.access.key", S3TestCredentials.getS3AccessKey());
+				conf.setString("s3.secret.key", S3TestCredentials.getS3SecretKey());
 				FileSystem.initialize(conf);
 
-				final Path directory = new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR);
+				final Path directory = new Path(S3TestCredentials.getTestBucketUri() + TEST_DATA_DIR);
 				final FileSystem fs = directory.getFileSystem();
 
 				// clean up
@@ -128,7 +122,7 @@ public static void cleanUp() throws IOException, InterruptedException {
 	}
 
 	private String getBasePath() {
-		return scheme + "://" + BUCKET + '/' + TEST_DATA_DIR + "/" + scheme;
+		return S3TestCredentials.getTestBucketUriWithScheme(scheme) + TEST_DATA_DIR + '/' + scheme;
 	}
 
 	@Test
@@ -138,8 +132,8 @@ public void testConfigKeysForwarding() throws Exception {
 		// standard Hadoop-style credential keys
 		{
 			Configuration conf = new Configuration();
-			conf.setString("fs.s3a.access.key", ACCESS_KEY);
-			conf.setString("fs.s3a.secret.key", SECRET_KEY);
+			conf.setString("fs.s3a.access.key", S3TestCredentials.getS3AccessKey());
+			conf.setString("fs.s3a.secret.key", S3TestCredentials.getS3SecretKey());
 
 			FileSystem.initialize(conf);
 			path.getFileSystem();
@@ -148,8 +142,8 @@ public void testConfigKeysForwarding() throws Exception {
 		// shortened Hadoop-style credential keys
 		{
 			Configuration conf = new Configuration();
-			conf.setString("s3.access.key", ACCESS_KEY);
-			conf.setString("s3.secret.key", SECRET_KEY);
+			conf.setString("s3.access.key", S3TestCredentials.getS3AccessKey());
+			conf.setString("s3.secret.key", S3TestCredentials.getS3SecretKey());
 
 			FileSystem.initialize(conf);
 			path.getFileSystem();
@@ -158,8 +152,8 @@ public void testConfigKeysForwarding() throws Exception {
 		// shortened Presto-style credential keys
 		{
 			Configuration conf = new Configuration();
-			conf.setString("s3.access-key", ACCESS_KEY);
-			conf.setString("s3.secret-key", SECRET_KEY);
+			conf.setString("s3.access-key", S3TestCredentials.getS3AccessKey());
+			conf.setString("s3.secret-key", S3TestCredentials.getS3SecretKey());
 
 			FileSystem.initialize(conf);
 			path.getFileSystem();
@@ -170,8 +164,8 @@ public void testConfigKeysForwarding() throws Exception {
 	public void testSimpleFileWriteAndRead() throws Exception {
 		final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
 		final Configuration conf = new Configuration();
-		conf.setString("s3.access.key", ACCESS_KEY);
-		conf.setString("s3.secret.key", SECRET_KEY);
+		conf.setString("s3.access.key", S3TestCredentials.getS3AccessKey());
+		conf.setString("s3.secret.key", S3TestCredentials.getS3SecretKey());
 
 		final String testLine = "Hello Upload!";
 
@@ -208,8 +202,8 @@ public void testSimpleFileWriteAndRead() throws Exception {
 	public void testDirectoryListing() throws Exception {
 		final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
 		final Configuration conf = new Configuration();
-		conf.setString("s3.access.key", ACCESS_KEY);
-		conf.setString("s3.secret.key", SECRET_KEY);
+		conf.setString("s3.access.key", S3TestCredentials.getS3AccessKey());
+		conf.setString("s3.secret.key", S3TestCredentials.getS3SecretKey());
 
 		FileSystem.initialize(conf);
 
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterExceptionTest.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterExceptionTest.java
index 634fa00344d..3fb67c1414f 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterExceptionTest.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterExceptionTest.java
@@ -25,12 +25,12 @@
 import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.fs.s3.common.FlinkS3FileSystem;
+import org.apache.flink.testutils.s3.S3TestCredentials;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -55,10 +55,6 @@
 
 	// ----------------------- S3 general configuration -----------------------
 
-	private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
-	private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY");
-	private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET");
-
 	private static final long PART_UPLOAD_MIN_SIZE_VALUE = 7L << 20;
 	private static final int MAX_CONCURRENT_UPLOADS_VALUE = 2;
 
@@ -66,9 +62,7 @@
 
 	private static final Random RND = new Random();
 
-	private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
-
-	private static final Path basePath = new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR);
+	private static Path basePath;
 
 	private static FlinkS3FileSystem fileSystem;
 
@@ -89,14 +83,14 @@
 	@BeforeClass
 	public static void checkCredentialsAndSetup() throws IOException {
 		// check whether credentials exist
-		Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null);
-		Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null);
-		Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null);
+		S3TestCredentials.assumeCredentialsAvailable();
+
+		basePath = new Path(S3TestCredentials.getTestBucketUri() + "tests-" + UUID.randomUUID());
 
 		// initialize configuration with valid credentials
 		final Configuration conf = new Configuration();
-		conf.setString("s3.access.key", ACCESS_KEY);
-		conf.setString("s3.secret.key", SECRET_KEY);
+		conf.setString("s3.access.key", S3TestCredentials.getS3AccessKey());
+		conf.setString("s3.secret.key", S3TestCredentials.getS3SecretKey());
 
 		conf.setLong(PART_UPLOAD_MIN_SIZE, PART_UPLOAD_MIN_SIZE_VALUE);
 		conf.setInteger(MAX_CONCURRENT_UPLOADS, MAX_CONCURRENT_UPLOADS_VALUE);
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java
index 4a1368a815e..a9027a0b62c 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java
@@ -27,6 +27,7 @@
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.fs.s3.common.FlinkS3FileSystem;
+import org.apache.flink.testutils.s3.S3TestCredentials;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.TestLogger;
@@ -34,7 +35,6 @@
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -63,10 +63,6 @@
 
 	// ----------------------- S3 general configuration -----------------------
 
-	private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
-	private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY");
-	private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET");
-
 	private static final long PART_UPLOAD_MIN_SIZE_VALUE = 7L << 20;
 	private static final int MAX_CONCURRENT_UPLOADS_VALUE = 2;
 
@@ -74,9 +70,7 @@
 
 	private static final Random RND = new Random();
 
-	private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
-
-	private static final Path basePath = new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR);
+	private static Path basePath;
 
 	private static FlinkS3FileSystem fileSystem;
 
@@ -101,14 +95,14 @@
 	@BeforeClass
 	public static void checkCredentialsAndSetup() throws IOException {
 		// check whether credentials exist
-		Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null);
-		Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null);
-		Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null);
+		S3TestCredentials.assumeCredentialsAvailable();
+
+		basePath = new Path(S3TestCredentials.getTestBucketUri() + "tests-" + UUID.randomUUID());
 
 		// initialize configuration with valid credentials
 		final Configuration conf = new Configuration();
-		conf.setString("s3.access.key", ACCESS_KEY);
-		conf.setString("s3.secret.key", SECRET_KEY);
+		conf.setString("s3.access.key", S3TestCredentials.getS3AccessKey());
+		conf.setString("s3.secret.key", S3TestCredentials.getS3SecretKey());
 
 		conf.setLong(PART_UPLOAD_MIN_SIZE, PART_UPLOAD_MIN_SIZE_VALUE);
 		conf.setInteger(MAX_CONCURRENT_UPLOADS, MAX_CONCURRENT_UPLOADS_VALUE);
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemBehaviorITCase.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemBehaviorITCase.java
index 812404ce639..1603927749e 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemBehaviorITCase.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemBehaviorITCase.java
@@ -23,9 +23,9 @@
 import org.apache.flink.core.fs.FileSystemBehaviorTestSuite;
 import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.testutils.s3.S3TestCredentials;
 
 import org.junit.AfterClass;
-import org.junit.Assume;
 import org.junit.BeforeClass;
 
 import java.io.IOException;
@@ -36,24 +36,17 @@
  */
 public class PrestoS3FileSystemBehaviorITCase extends FileSystemBehaviorTestSuite {
 
-	private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET");
-
 	private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
 
-	private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
-	private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY");
-
 	@BeforeClass
 	public static void checkCredentialsAndSetup() throws IOException {
 		// check whether credentials exist
-		Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null);
-		Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null);
-		Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null);
+		S3TestCredentials.assumeCredentialsAvailable();
 
 		// initialize configuration with valid credentials
 		final Configuration conf = new Configuration();
-		conf.setString("s3.access.key", ACCESS_KEY);
-		conf.setString("s3.secret.key", SECRET_KEY);
+		conf.setString("s3.access.key", S3TestCredentials.getS3AccessKey());
+		conf.setString("s3.secret.key", S3TestCredentials.getS3SecretKey());
 		FileSystem.initialize(conf);
 	}
 
@@ -69,7 +62,7 @@ public FileSystem getFileSystem() throws Exception {
 
 	@Override
 	public Path getBasePath() throws Exception {
-		return new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR);
+		return new Path(S3TestCredentials.getTestBucketUri() + TEST_DATA_DIR);
 	}
 
 	@Override
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java
index cc5c9935202..9a008a176d6 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemITCase.java
@@ -25,9 +25,9 @@
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.testutils.s3.S3TestCredentials;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -68,23 +68,16 @@
 		return Arrays.asList("s3", "s3p");
 	}
 
-	private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET");
-
 	private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
 
-	private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
-	private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY");
-
 	@BeforeClass
 	public static void checkIfCredentialsArePresent() {
-		Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null);
-		Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null);
-		Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null);
+		S3TestCredentials.assumeCredentialsAvailable();
 	}
 
 	@Test
 	public void testConfigKeysForwarding() throws Exception {
-		final Path path = new Path(scheme + "://" + BUCKET + '/' + TEST_DATA_DIR);
+		final Path path = new Path(S3TestCredentials.getTestBucketUri() + TEST_DATA_DIR);
 
 		// access without credentials should fail
 		{
@@ -103,8 +96,8 @@ public void testConfigKeysForwarding() throws Exception {
 		{
 			Configuration conf = new Configuration();
 			conf.setString(S3_USE_INSTANCE_CREDENTIALS, "false");
-			conf.setString("presto.s3.access-key", ACCESS_KEY);
-			conf.setString("presto.s3.secret-key", SECRET_KEY);
+			conf.setString("presto.s3.access-key", S3TestCredentials.getS3AccessKey());
+			conf.setString("presto.s3.secret-key", S3TestCredentials.getS3SecretKey());
 
 			FileSystem.initialize(conf);
 			path.getFileSystem().exists(path);
@@ -114,8 +107,8 @@ public void testConfigKeysForwarding() throws Exception {
 		{
 			Configuration conf = new Configuration();
 			conf.setString(S3_USE_INSTANCE_CREDENTIALS, "false");
-			conf.setString("s3.access-key", ACCESS_KEY);
-			conf.setString("s3.secret-key", SECRET_KEY);
+			conf.setString("s3.access-key", S3TestCredentials.getS3AccessKey());
+			conf.setString("s3.secret-key", S3TestCredentials.getS3SecretKey());
 
 			FileSystem.initialize(conf);
 			path.getFileSystem().exists(path);
@@ -125,8 +118,8 @@ public void testConfigKeysForwarding() throws Exception {
 		{
 			Configuration conf = new Configuration();
 			conf.setString(S3_USE_INSTANCE_CREDENTIALS, "false");
-			conf.setString("s3.access.key", ACCESS_KEY);
-			conf.setString("s3.secret.key", SECRET_KEY);
+			conf.setString("s3.access.key", S3TestCredentials.getS3AccessKey());
+			conf.setString("s3.secret.key", S3TestCredentials.getS3SecretKey());
 
 			FileSystem.initialize(conf);
 			path.getFileSystem().exists(path);
@@ -136,8 +129,8 @@ public void testConfigKeysForwarding() throws Exception {
 		{
 			Configuration conf = new Configuration();
 			conf.setString(S3_USE_INSTANCE_CREDENTIALS, "false");
-			conf.setString("presto.s3.access.key", ACCESS_KEY);
-			conf.setString("presto.s3.secret.key", SECRET_KEY);
+			conf.setString("presto.s3.access.key", S3TestCredentials.getS3AccessKey());
+			conf.setString("presto.s3.secret.key", S3TestCredentials.getS3SecretKey());
 
 			FileSystem.initialize(conf);
 			path.getFileSystem().exists(path);
@@ -151,14 +144,14 @@ public void testConfigKeysForwarding() throws Exception {
 	public void testSimpleFileWriteAndRead() throws Exception {
 		final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
 		final Configuration conf = new Configuration();
-		conf.setString("s3.access-key", ACCESS_KEY);
-		conf.setString("s3.secret-key", SECRET_KEY);
+		conf.setString("s3.access-key", S3TestCredentials.getS3AccessKey());
+		conf.setString("s3.secret-key", S3TestCredentials.getS3SecretKey());
 
 		final String testLine = "Hello Upload!";
 
 		FileSystem.initialize(conf);
 
-		final Path path = new Path(scheme + "://" + BUCKET + '/' + TEST_DATA_DIR + "/test.txt");
+		final Path path = new Path(S3TestCredentials.getTestBucketUri() + TEST_DATA_DIR + "/test.txt");
 		final FileSystem fs = path.getFileSystem();
 
 		try {
@@ -186,12 +179,12 @@ public void testSimpleFileWriteAndRead() throws Exception {
 	public void testDirectoryListing() throws Exception {
 		final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
 		final Configuration conf = new Configuration();
-		conf.setString("s3.access-key", ACCESS_KEY);
-		conf.setString("s3.secret-key", SECRET_KEY);
+		conf.setString("s3.access-key", S3TestCredentials.getS3AccessKey());
+		conf.setString("s3.secret-key", S3TestCredentials.getS3SecretKey());
 
 		FileSystem.initialize(conf);
 
-		final Path directory = new Path(scheme + "://" + BUCKET + '/' + TEST_DATA_DIR + "/testdir/");
+		final Path directory = new Path(S3TestCredentials.getTestBucketUri() + TEST_DATA_DIR + "/testdir/");
 		final FileSystem fs = directory.getFileSystem();
 
 		// directory must not yet exist
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3RecoverableWriterTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3RecoverableWriterTest.java
index 580d957db23..59d2f165e2e 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3RecoverableWriterTest.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3RecoverableWriterTest.java
@@ -21,15 +21,15 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.fs.s3.common.FlinkS3FileSystem;
+import org.apache.flink.testutils.s3.S3TestCredentials;
 
 import org.junit.AfterClass;
-import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.UUID;
 
 import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.MAX_CONCURRENT_UPLOADS;
@@ -42,10 +42,6 @@
 
 	// ----------------------- S3 general configuration -----------------------
 
-	private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
-	private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY");
-	private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET");
-
 	private static final long PART_UPLOAD_MIN_SIZE_VALUE = 7L << 20;
 	private static final int MAX_CONCURRENT_UPLOADS_VALUE = 2;
 
@@ -53,21 +49,17 @@
 
 	private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
 
-	private static final Path basePath = new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR);
-
 	// ----------------------- Test Lifecycle -----------------------
 
 	@BeforeClass
 	public static void checkCredentialsAndSetup() throws IOException {
 		// check whether credentials exist
-		Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null);
-		Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null);
-		Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null);
+		S3TestCredentials.assumeCredentialsAvailable();
 
 		// initialize configuration with valid credentials
 		final Configuration conf = new Configuration();
-		conf.setString("s3.access.key", ACCESS_KEY);
-		conf.setString("s3.secret.key", SECRET_KEY);
+		conf.setString("s3.access.key", S3TestCredentials.getS3AccessKey());
+		conf.setString("s3.secret.key", S3TestCredentials.getS3SecretKey());
 
 		conf.setLong(PART_UPLOAD_MIN_SIZE, PART_UPLOAD_MIN_SIZE_VALUE);
 		conf.setInteger(MAX_CONCURRENT_UPLOADS, MAX_CONCURRENT_UPLOADS_VALUE);
@@ -87,7 +79,8 @@ public static void cleanUp() throws IOException {
 
 	@Test(expected = UnsupportedOperationException.class)
 	public void requestingRecoverableWriterShouldThroughException() throws Exception {
-		FlinkS3FileSystem fileSystem = (FlinkS3FileSystem) FileSystem.get(basePath.toUri());
+		URI s3Uri = URI.create(S3TestCredentials.getTestBucketUri());
+		FlinkS3FileSystem fileSystem = (FlinkS3FileSystem) FileSystem.get(s3Uri);
 		fileSystem.createRecoverableWriter();
 	}
 }
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/s3/S3TestCredentials.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/s3/S3TestCredentials.java
new file mode 100644
index 00000000000..b1a0aecf079
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/s3/S3TestCredentials.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.s3;
+
+import org.junit.Assume;
+import org.junit.AssumptionViolatedException;
+
+import javax.annotation.Nullable;
+
+/**
+ * Access to credentials to access S3 buckets during integration tests.
+ */
+public class S3TestCredentials {
+
+	@Nullable
+	private static final String S3_TEST_BUCKET = System.getenv("IT_CASE_S3_BUCKET");
+
+	@Nullable
+	private static final String S3_TEST_ACCESS_KEY = System.getenv("IT_CASE_S3_ACCESS_KEY");
+
+	@Nullable
+	private static final String S3_TEST_SECRET_KEY = System.getenv("IT_CASE_S3_SECRET_KEY");
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Checks whether S3 test credentials are available in the environment variables
+	 * of this JVM.
+	 */
+	public static boolean credentialsAvailable() {
+		return S3_TEST_BUCKET != null && S3_TEST_ACCESS_KEY != null && S3_TEST_SECRET_KEY != null;
+	}
+
+	/**
+	 * Checks whether credentials are available in the environment variables of this JVM.
+	 * If not, throws an {@link AssumptionViolatedException} which causes JUnit tests to be
+	 * skipped.
+	 */
+	public static void assumeCredentialsAvailable() {
+		Assume.assumeTrue("No S3 credentials available in this test's environment", credentialsAvailable());
+	}
+
+	/**
+	 * Gets the S3 Access Key.
+	 *
+	 * <p>This method throws an exception if the key is not available. Tests should
+	 * use {@link #assumeCredentialsAvailable()} to skip tests when credentials are not
+	 * available.
+	 */
+	public static String getS3AccessKey() {
+		if (S3_TEST_ACCESS_KEY != null) {
+			return S3_TEST_ACCESS_KEY;
+		}
+		else {
+			throw new IllegalStateException("S3 test access key not available");
+		}
+	}
+
+	/**
+	 * Gets the S3 Secret Key.
+	 *
+	 * <p>This method throws an exception if the key is not available. Tests should
+	 * use {@link #assumeCredentialsAvailable()} to skip tests when credentials are not
+	 * available.
+	 */
+	public static String getS3SecretKey() {
+		if (S3_TEST_SECRET_KEY != null) {
+			return S3_TEST_SECRET_KEY;
+		}
+		else {
+			throw new IllegalStateException("S3 test secret key not available");
+		}
+	}
+
+	/**
+	 * Gets the URI for the path under which all tests should put their data.
+	 *
+	 * <p>This method throws an exception if the bucket was not configured. Tests should
+	 * use {@link #assumeCredentialsAvailable()} to skip tests when credentials are not
+	 * available.
+	 */
+	public static String getTestBucketUri() {
+		return getTestBucketUriWithScheme("s3");
+	}
+
+	/**
+	 * Gets the URI for the path under which all tests should put their data.
+	 *
+	 * <p>This method throws an exception if the bucket was not configured. Tests should
+	 * use {@link #assumeCredentialsAvailable()} to skip tests when credentials are not
+	 * available.
+	 */
+	public static String getTestBucketUriWithScheme(String scheme) {
+		if (S3_TEST_BUCKET != null) {
+			return scheme + "://" + S3_TEST_BUCKET + "/temp/";
+		}
+		else {
+			throw new IllegalStateException("S3 test bucket not available");
+		}
+	}
+}
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java
index e1e95b1c379..0fa9893c054 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java
@@ -23,10 +23,10 @@
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.testutils.s3.S3TestCredentials;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
-import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -53,13 +53,8 @@
  */
 public class YarnFileStageTestS3ITCase extends TestLogger {
 
-	private static final String BUCKET = System.getenv("ARTIFACTS_AWS_BUCKET");
-
 	private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
 
-	private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
-	private static final String SECRET_KEY = System.getenv("ARTIFACTS_AWS_SECRET_KEY");
-
 	@ClassRule
 	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
 
@@ -79,9 +74,7 @@
 	@BeforeClass
 	public static void checkCredentialsAndSetup() throws IOException {
 		// check whether credentials exist
-		Assume.assumeTrue("AWS S3 bucket not configured, skipping test...", BUCKET != null);
-		Assume.assumeTrue("AWS S3 access key not configured, skipping test...", ACCESS_KEY != null);
-		Assume.assumeTrue("AWS S3 secret key not configured, skipping test...", SECRET_KEY != null);
+		S3TestCredentials.assumeCredentialsAvailable();
 
 		skipTest = false;
 
@@ -115,14 +108,14 @@ private static void setupCustomHadoopConfig() throws IOException {
 		Map<String /* key */, String /* value */> parameters = new HashMap<>();
 
 		// set all different S3 fs implementation variants' configuration keys
-		parameters.put("fs.s3a.access.key", ACCESS_KEY);
-		parameters.put("fs.s3a.secret.key", SECRET_KEY);
+		parameters.put("fs.s3a.access.key", S3TestCredentials.getS3AccessKey());
+		parameters.put("fs.s3a.secret.key", S3TestCredentials.getS3SecretKey());
 
-		parameters.put("fs.s3.awsAccessKeyId", ACCESS_KEY);
-		parameters.put("fs.s3.awsSecretAccessKey", SECRET_KEY);
+		parameters.put("fs.s3.awsAccessKeyId", S3TestCredentials.getS3AccessKey());
+		parameters.put("fs.s3.awsSecretAccessKey", S3TestCredentials.getS3SecretKey());
 
-		parameters.put("fs.s3n.awsAccessKeyId", ACCESS_KEY);
-		parameters.put("fs.s3n.awsSecretAccessKey", SECRET_KEY);
+		parameters.put("fs.s3n.awsAccessKeyId", S3TestCredentials.getS3AccessKey());
+		parameters.put("fs.s3n.awsSecretAccessKey", S3TestCredentials.getS3SecretKey());
 
 		try (PrintStream out = new PrintStream(new FileOutputStream(hadoopConfig))) {
 			out.println("<?xml version=\"1.0\"?>");
@@ -155,7 +148,7 @@ private static void setupCustomHadoopConfig() throws IOException {
 	private void testRecursiveUploadForYarn(String scheme, String pathSuffix) throws Exception {
 		++numRecursiveUploadTests;
 
-		final Path basePath = new Path(scheme + "://" + BUCKET + '/' + TEST_DATA_DIR);
+		final Path basePath = new Path(S3TestCredentials.getTestBucketUriWithScheme(scheme) + TEST_DATA_DIR);
 		final HadoopFileSystem fs = (HadoopFileSystem) basePath.getFileSystem();
 
 		assumeFalse(fs.exists(basePath));
@@ -171,23 +164,6 @@ private void testRecursiveUploadForYarn(String scheme, String pathSuffix) throws
 		}
 	}
 
-	/**
-	 * Verifies that nested directories are properly copied with a <tt>s3a://</tt> file
-	 * systems during resource uploads for YARN.
-	 */
-	@Test
-	public void testRecursiveUploadForYarnS3() throws Exception {
-		try {
-			Class.forName("org.apache.hadoop.fs.s3.S3FileSystem");
-		} catch (ClassNotFoundException e) {
-			// not in the classpath, cannot run this test
-			String msg = "Skipping test because S3FileSystem is not in the class path";
-			log.info(msg);
-			assumeNoException(msg, e);
-		}
-		testRecursiveUploadForYarn("s3", "testYarn-s3");
-	}
-
 	@Test
 	public void testRecursiveUploadForYarnS3n() throws Exception {
 		try {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services