You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/01/16 14:10:37 UTC
[flink] branch master updated: [FLINK-11302][fs-connector]
Correctly parse tmp dirs in FlinkS3FileSystem.
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 291373a [FLINK-11302][fs-connector] Correctly parse tmp dirs in FlinkS3FileSystem.
291373a is described below
commit 291373a0b61f4651ddbbaf0dc3f9dd9fd68db611
Author: Artsem Semianenka <ar...@gmail.com>
AuthorDate: Thu Jan 10 17:06:00 2019 +0100
[FLINK-11302][fs-connector] Correctly parse tmp dirs in FlinkS3FileSystem.
This closes #7458.
---
.../fs/s3/common/AbstractS3FileSystemFactory.java | 7 +++++--
.../flink/fs/s3/common/S3EntropyFsFactoryTest.java | 18 ++++++++++++++++++
2 files changed, 23 insertions(+), 2 deletions(-)
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index 6ccdeae..ff575be 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -21,11 +21,12 @@ package org.apache.flink.fs.s3.common;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -138,7 +139,9 @@ public abstract class AbstractS3FileSystemFactory implements FileSystemFactory {
}
}
- final String localTmpDirectory = flinkConfig.getString(CoreOptions.TMP_DIRS);
+ final String[] localTmpDirectories = ConfigurationUtils.parseTempDirectories(flinkConfig);
+ Preconditions.checkArgument(localTmpDirectories.length > 0);
+ final String localTmpDirectory = localTmpDirectories[0];
final long s3minPartSize = flinkConfig.getLong(PART_UPLOAD_MIN_SIZE);
final int maxConcurrentUploads = flinkConfig.getInteger(MAX_CONCURRENT_UPLOADS);
final S3AccessHelper s3AccessHelper = getS3AccessHelper(fs);
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
index 5b15652..943de1d 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
@@ -52,6 +52,24 @@ public class S3EntropyFsFactoryTest extends TestLogger {
assertEquals(7, fs.generateEntropy().length());
}
+ /**
+ * Test validates that the produced by AbstractS3FileSystemFactory object will contains
+ * only first path from multiple paths in config.
+ */
+ @Test
+ public void testMultipleTempDirsConfig() throws Exception {
+ final Configuration conf = new Configuration();
+ String dir1 = "/tmp/dir1";
+ String dir2 = "/tmp/dir2";
+ conf.setString("io.tmp.dirs", dir1 + "," + dir2);
+
+ TestFsFactory factory = new TestFsFactory();
+ factory.configure(conf);
+
+ FlinkS3FileSystem fs = (FlinkS3FileSystem) factory.create(new URI("s3://test"));
+ assertEquals(fs.getLocalTmpDir(), dir1);
+ }
+
// ------------------------------------------------------------------------
private static final class TestFsFactory extends AbstractS3FileSystemFactory {