You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/01/16 14:10:37 UTC

[flink] branch master updated: [FLINK-11302][fs-connector] Correctly parse tmp dirs in FlinkS3FileSystem.

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 291373a  [FLINK-11302][fs-connector] Correctly parse tmp dirs in FlinkS3FileSystem.
291373a is described below

commit 291373a0b61f4651ddbbaf0dc3f9dd9fd68db611
Author: Artsem Semianenka <ar...@gmail.com>
AuthorDate: Thu Jan 10 17:06:00 2019 +0100

    [FLINK-11302][fs-connector] Correctly parse tmp dirs in FlinkS3FileSystem.
    
    This closes #7458.
---
 .../fs/s3/common/AbstractS3FileSystemFactory.java      |  7 +++++--
 .../flink/fs/s3/common/S3EntropyFsFactoryTest.java     | 18 ++++++++++++++++++
 2 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index 6ccdeae..ff575be 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -21,11 +21,12 @@ package org.apache.flink.fs.s3.common;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
 import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -138,7 +139,9 @@ public abstract class AbstractS3FileSystemFactory implements FileSystemFactory {
 				}
 			}
 
-			final String localTmpDirectory = flinkConfig.getString(CoreOptions.TMP_DIRS);
+			final String[] localTmpDirectories = ConfigurationUtils.parseTempDirectories(flinkConfig);
+			Preconditions.checkArgument(localTmpDirectories.length > 0);
+			final String localTmpDirectory = localTmpDirectories[0];
 			final long s3minPartSize = flinkConfig.getLong(PART_UPLOAD_MIN_SIZE);
 			final int maxConcurrentUploads = flinkConfig.getInteger(MAX_CONCURRENT_UPLOADS);
 			final S3AccessHelper s3AccessHelper = getS3AccessHelper(fs);
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
index 5b15652..943de1d 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
@@ -52,6 +52,24 @@ public class S3EntropyFsFactoryTest extends TestLogger {
 		assertEquals(7, fs.generateEntropy().length());
 	}
 
+	/**
+	 * Test validates that the produced by AbstractS3FileSystemFactory object will contains
+	 * only first path from multiple paths in config.
+	 */
+	@Test
+	public void testMultipleTempDirsConfig() throws Exception {
+		final Configuration conf = new Configuration();
+		String dir1 =  "/tmp/dir1";
+		String dir2 =  "/tmp/dir2";
+		conf.setString("io.tmp.dirs", dir1 + "," + dir2);
+
+		TestFsFactory factory = new TestFsFactory();
+		factory.configure(conf);
+
+		FlinkS3FileSystem fs = (FlinkS3FileSystem) factory.create(new URI("s3://test"));
+		assertEquals(fs.getLocalTmpDir(), dir1);
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static final class TestFsFactory extends AbstractS3FileSystemFactory {