You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/19 07:30:13 UTC
[flink] 03/03: [FLINK-9061] [s3 presto] Add entropy injection to S3
file system
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a58fa985c4944dd0cf39fa622ec0aa4b35f21f44
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Aug 23 15:43:41 2018 +0200
[FLINK-9061] [s3 presto] Add entropy injection to S3 file system
---
.../java/org/apache/flink/util/StringUtils.java | 32 +++
.../org/apache/flink/util/StringUtilsTest.java | 14 +
.../flink/fs/s3presto/S3FileSystemFactory.java | 100 ++++++-
.../flink/fs/s3presto/S3PrestoFileSystem.java | 301 +++++++++++++++++++++
.../fs/s3presto/PrestoS3FileSystemEntropyTest.java | 133 +++++++++
.../flink/fs/s3presto/PrestoS3FileSystemTest.java | 26 +-
6 files changed, 590 insertions(+), 16 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index 208a301..c3b3808 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -247,6 +248,37 @@ public final class StringUtils {
}
/**
+ * Appends a random alphanumeric string of given length to the given string buffer.
+ *
+ * @param rnd The random number generator to use.
+ * @param buffer The buffer to append to.
+ * @param length The number of alphanumeric characters to append.
+ */
+ public static void appendRandomAlphanumericString(Random rnd, StringBuilder buffer, int length) {
+ checkNotNull(rnd);
+ checkArgument(length >= 0);
+
+ for (int i = 0; i < length; i++) {
+ buffer.append(nextAlphanumericChar(rnd));
+ }
+ }
+
+ private static char nextAlphanumericChar(Random rnd) {
+ int which = rnd.nextInt(62);
+ char c;
+ if (which < 10) {
+ c = (char) ('0' + which);
+ }
+ else if (which < 36) {
+ c = (char) ('A' - 10 + which);
+ }
+ else {
+ c = (char) ('a' - 36 + which);
+ }
+ return c;
+ }
+
+ /**
* Writes a String to the given output.
* The written string can be read with {@link #readString(DataInputView)}.
*
diff --git a/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
index 5f705b4..1c9abf2 100644
--- a/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
@@ -20,8 +20,11 @@ package org.apache.flink.util;
import org.junit.Test;
+import java.util.Random;
+
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
/**
* Tests for the {@link StringUtils}.
@@ -56,4 +59,15 @@ public class StringUtilsTest extends TestLogger {
String hex = StringUtils.byteToHexString(byteArray);
assertEquals("019f314a", hex);
}
+
+ @Test
+ public void testAppendAlphanumeric() {
+ StringBuilder bld = new StringBuilder();
+ StringUtils.appendRandomAlphanumericString(new Random(), bld, 256);
+ String str = bld.toString();
+
+ if (!str.matches("[a-zA-Z0-9]+")) {
+ fail("Not alphanumeric: " + str);
+ }
+ }
}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index a04f9c9..230d18b 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -19,12 +19,20 @@
package org.apache.flink.fs.s3presto;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
import org.apache.flink.util.FlinkRuntimeException;
import com.facebook.presto.hive.PrestoS3FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
@@ -34,7 +42,31 @@ import java.util.Set;
/**
* Simple factory for the S3 file system.
*/
-public class S3FileSystemFactory extends AbstractFileSystemFactory {
+public class S3FileSystemFactory implements FileSystemFactory {
+
+ /**
+ * The substring to be replaced by random entropy in checkpoint paths.
+ */
+ public static final ConfigOption<String> ENTROPY_INJECT_KEY_OPTION = ConfigOptions
+ .key("s3.entropy.key")
+ .noDefaultValue()
+ .withDescription(
+ "This option can be used to improve performance due to sharding issues on Amazon S3. " +
+ "For file creations with entropy injection, this key will be replaced by random " +
+ "alphanumeric characters. For other file creations, the key will be filtered out.");
+
+ /**
+ * The number of entropy characters, in case entropy injection is configured.
+ */
+ public static final ConfigOption<Integer> ENTROPY_INJECT_LENGTH_OPTION = ConfigOptions
+ .key("s3.entropy.length")
+ .defaultValue(4)
+ .withDescription(
+ "When '" + ENTROPY_INJECT_KEY_OPTION.key() + "' is set, this option defines the number of " +
+ "random characters to replace the entropy key with.");
+
+ private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemFactory.class);
+
private static final Set<String> PACKAGE_PREFIXES_TO_SHADE =
new HashSet<>(Collections.singletonList("com.amazonaws."));
@@ -50,8 +82,55 @@ public class S3FileSystemFactory extends AbstractFileSystemFactory {
{ "presto.s3.secret.key", "presto.s3.secret-key" }
};
- public S3FileSystemFactory() {
- super("Presto S3 File System", createHadoopConfigLoader());
+ private static final String INVALID_ENTROPY_KEY_CHARS = "^.*[~#@*+%{}<>\\[\\]|\"\\\\].*$";
+
+ private final HadoopConfigLoader hadoopConfigLoader = createHadoopConfigLoader();
+
+ private Configuration flinkConfig;
+
+ @Override
+ public void configure(Configuration config) {
+ flinkConfig = config;
+ hadoopConfigLoader.setFlinkConfig(config);
+ }
+
+ @Override
+ public FileSystem create(URI fsUri) throws IOException {
+ LOG.debug("Creating S3 FileSystem backed by Presto S3 FileSystem");
+ LOG.debug("Loading Hadoop configuration for Presto S3 File System");
+
+ try {
+ // instantiate the presto file system
+ org.apache.hadoop.conf.Configuration hadoopConfig = hadoopConfigLoader.getOrLoadHadoopConfig();
+ org.apache.hadoop.fs.FileSystem fs = new PrestoS3FileSystem();
+ fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
+
+ // load the entropy injection settings
+ String entropyInjectionKey = flinkConfig.getString(ENTROPY_INJECT_KEY_OPTION);
+ int numEntropyChars = -1;
+
+ if (entropyInjectionKey != null) {
+ if (entropyInjectionKey.matches(INVALID_ENTROPY_KEY_CHARS)) {
+ throw new IllegalConfigurationException("Invalid character in value for " +
+ ENTROPY_INJECT_KEY_OPTION.key() + " : " + entropyInjectionKey);
+ }
+
+ numEntropyChars = flinkConfig.getInteger(ENTROPY_INJECT_LENGTH_OPTION);
+
+ if (numEntropyChars <= 0) {
+ throw new IllegalConfigurationException(
+ ENTROPY_INJECT_LENGTH_OPTION.key() + " must configure a value > 0");
+ }
+ }
+
+ return new S3PrestoFileSystem(fs, entropyInjectionKey, numEntropyChars);
+ }
+ catch (IOException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new IOException(e.getMessage(), e);
+ }
}
@Override
@@ -65,13 +144,7 @@ public class S3FileSystemFactory extends AbstractFileSystemFactory {
"presto.s3.", PACKAGE_PREFIXES_TO_SHADE, CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX);
}
- @Override
- protected org.apache.hadoop.fs.FileSystem createHadoopFileSystem() {
- return new PrestoS3FileSystem();
- }
-
- @Override
- protected URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) {
+ static URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) {
final String scheme = fsUri.getScheme();
final String authority = fsUri.getAuthority();
final URI initUri;
@@ -88,10 +161,11 @@ public class S3FileSystemFactory extends AbstractFileSystemFactory {
return initUri;
}
- private URI createURI(String str) {
+ static URI createURI(String str) {
try {
return new URI(str);
- } catch (URISyntaxException e) {
+ }
+ catch (URISyntaxException e) {
throw new FlinkRuntimeException("Error in s3 aws URI - " + str, e);
}
}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java
new file mode 100644
index 0000000..e6a6ae4
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.WriteOptions;
+import org.apache.flink.runtime.fs.hdfs.HadoopBlockLocation;
+import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
+import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileStatus;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A Flink FileSystem against S3, wrapping the Presto Hadoop S3 File System implementation.
+ *
+ * <p>This class bases heavily on the {@link org.apache.flink.runtime.fs.hdfs.HadoopFileSystem} class.
+ * Code is copied here for the sake of minimal changes to the original class within a minor release.
+ */
+class S3PrestoFileSystem extends FileSystem {
+
+ /** The wrapped Hadoop File System. */
+ private final org.apache.hadoop.fs.FileSystem fs;
+
+ @Nullable
+ private final String entropyInjectionKey;
+
+ private final int entropyLength;
+
+ /**
+ * Wraps the given Hadoop File System object as a Flink File System object.
+ * The given Hadoop file system object is expected to be initialized already.
+ *
+ * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
+ */
+ public S3PrestoFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem) {
+ this(hadoopFileSystem, null, -1);
+ }
+
+ /**
+ * Wraps the given Hadoop File System object as a Flink File System object.
+ * The given Hadoop file system object is expected to be initialized already.
+ *
+ * <p>This constructor additionally configures the entropy injection for the file system.
+ *
+ * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
+ * @param entropyInjectionKey The substring that will be replaced by entropy or removed.
+ * @param entropyLength The number of random alphanumeric characters to inject as entropy.
+ */
+ public S3PrestoFileSystem(
+ org.apache.hadoop.fs.FileSystem hadoopFileSystem,
+ @Nullable String entropyInjectionKey,
+ int entropyLength) {
+
+ if (entropyInjectionKey != null && entropyLength <= 0) {
+ throw new IllegalArgumentException("Entropy length must be >= 0 when entropy injection key is set");
+ }
+
+ this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
+ this.entropyInjectionKey = entropyInjectionKey;
+ this.entropyLength = entropyLength;
+ }
+
+ // ------------------------------------------------------------------------
+ // properties
+ // ------------------------------------------------------------------------
+
+ public org.apache.hadoop.fs.FileSystem getHadoopFileSystem() {
+ return fs;
+ }
+
+ @Nullable
+ public String getEntropyInjectionKey() {
+ return entropyInjectionKey;
+ }
+
+ public int getEntropyLength() {
+ return entropyLength;
+ }
+
+ // ------------------------------------------------------------------------
+ // file system methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public Path getWorkingDirectory() {
+ return new Path(fs.getWorkingDirectory().toUri());
+ }
+
+ public Path getHomeDirectory() {
+ return new Path(fs.getHomeDirectory().toUri());
+ }
+
+ @Override
+ public URI getUri() {
+ return fs.getUri();
+ }
+
+ @Override
+ public FileStatus getFileStatus(final Path f) throws IOException {
+ org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(toHadoopPath(f));
+ return new HadoopFileStatus(status);
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(
+ final FileStatus file,
+ final long start,
+ final long len) throws IOException {
+
+ if (!(file instanceof HadoopFileStatus)) {
+ throw new IOException("file is not an instance of HadoopFileStatus");
+ }
+
+ final HadoopFileStatus f = (HadoopFileStatus) file;
+
+ final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(),
+ start, len);
+
+ // Wrap up HDFS specific block location objects
+ final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
+ for (int i = 0; i < distBlkLocations.length; i++) {
+ distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]);
+ }
+
+ return distBlkLocations;
+ }
+
+ @Override
+ public HadoopDataInputStream open(final Path f, final int bufferSize) throws IOException {
+ final org.apache.hadoop.fs.Path path = toHadoopPath(f);
+ final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path, bufferSize);
+ return new HadoopDataInputStream(fdis);
+ }
+
+ @Override
+ public HadoopDataInputStream open(final Path f) throws IOException {
+ final org.apache.hadoop.fs.Path path = toHadoopPath(f);
+ final org.apache.hadoop.fs.FSDataInputStream fdis = fs.open(path);
+ return new HadoopDataInputStream(fdis);
+ }
+
+ @Override
+ public HadoopDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException {
+ final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream =
+ fs.create(toHadoopPath(f), overwrite == WriteMode.OVERWRITE);
+ return new HadoopDataOutputStream(fsDataOutputStream);
+ }
+
+ @Override
+ public FSDataOutputStream create(final Path f, final WriteOptions options) throws IOException {
+ final org.apache.hadoop.fs.Path path = options.isInjectEntropy()
+ ? toHadoopPathInjectEntropy(f)
+ : toHadoopPath(f);
+
+ final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream = fs.create(
+ path, options.getOverwrite() == WriteMode.OVERWRITE);
+
+ return new HadoopDataOutputStream(fsDataOutputStream);
+ }
+
+ @Override
+ public boolean delete(final Path f, final boolean recursive) throws IOException {
+ return fs.delete(toHadoopPath(f), recursive);
+ }
+
+ @Override
+ public boolean exists(Path f) throws IOException {
+ return fs.exists(toHadoopPath(f));
+ }
+
+ @Override
+ public FileStatus[] listStatus(final Path f) throws IOException {
+ final org.apache.hadoop.fs.FileStatus[] hadoopFiles = fs.listStatus(toHadoopPath(f));
+ final FileStatus[] files = new FileStatus[hadoopFiles.length];
+
+ // Convert types
+ for (int i = 0; i < files.length; i++) {
+ files[i] = new HadoopFileStatus(hadoopFiles[i]);
+ }
+
+ return files;
+ }
+
+ @Override
+ public boolean mkdirs(final Path f) throws IOException {
+ return fs.mkdirs(toHadoopPath(f));
+ }
+
+ @Override
+ public boolean rename(final Path src, final Path dst) throws IOException {
+ return fs.rename(toHadoopPath(src), toHadoopPath(dst));
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public long getDefaultBlockSize() {
+ return fs.getDefaultBlockSize();
+ }
+
+ @Override
+ public boolean isDistributedFS() {
+ return true;
+ }
+
+ @Override
+ public FileSystemKind getKind() {
+ return FileSystemKind.OBJECT_STORE;
+ }
+
+ // ------------------------------------------------------------------------
+ // entropy utilities
+ // ------------------------------------------------------------------------
+
+ @VisibleForTesting
+ org.apache.hadoop.fs.Path toHadoopPath(Path path) throws IOException {
+ return rewritePathForEntropyKey(path, false);
+ }
+
+ @VisibleForTesting
+ org.apache.hadoop.fs.Path toHadoopPathInjectEntropy(Path path) throws IOException {
+ return rewritePathForEntropyKey(path, true);
+ }
+
+ private org.apache.hadoop.fs.Path rewritePathForEntropyKey(Path path, boolean addEntropy) throws IOException {
+ if (entropyInjectionKey == null) {
+ return convertToHadoopPath(path);
+ }
+ else {
+ final URI originalUri = path.toUri();
+ final String checkpointPath = originalUri.getPath();
+
+ final int indexOfKey = checkpointPath.indexOf(entropyInjectionKey);
+ if (indexOfKey == -1) {
+ return convertToHadoopPath(path);
+ }
+ else {
+ final StringBuilder buffer = new StringBuilder(checkpointPath.length());
+ buffer.append(checkpointPath, 0, indexOfKey);
+
+ if (addEntropy) {
+ StringUtils.appendRandomAlphanumericString(ThreadLocalRandom.current(), buffer, entropyLength);
+ }
+
+ buffer.append(checkpointPath, indexOfKey + entropyInjectionKey.length(), checkpointPath.length());
+
+ final String rewrittenPath = buffer.toString();
+ try {
+ return convertToHadoopPath(new URI(
+ originalUri.getScheme(),
+ originalUri.getAuthority(),
+ rewrittenPath,
+ originalUri.getQuery(),
+ originalUri.getFragment()));
+ }
+ catch (URISyntaxException e) {
+ // this should actually never happen, because the URI was valid before
+ throw new IOException("URI format error while processing path for entropy injection", e);
+ }
+ }
+ }
+ }
+
+ private static org.apache.hadoop.fs.Path convertToHadoopPath(URI uri) {
+ return new org.apache.hadoop.fs.Path(uri);
+ }
+
+ private static org.apache.hadoop.fs.Path convertToHadoopPath(Path path) {
+ return convertToHadoopPath(path.toUri());
+ }
+}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemEntropyTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemEntropyTest.java
new file mode 100644
index 0000000..587b02e
--- /dev/null
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemEntropyTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.WriteOptions;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the entropy injection in the {@link S3PrestoFileSystem}.
+ */
+public class PrestoS3FileSystemEntropyTest {
+
+ @Test
+ public void testEmptyPath() throws Exception {
+ Path path = new Path("hdfs://localhost:12345");
+ S3PrestoFileSystem fs = createFs("test", 4);
+
+ assertEquals(toHadoopPath(path), fs.toHadoopPathInjectEntropy(path));
+ assertEquals(toHadoopPath(path), fs.toHadoopPath(path));
+ }
+
+ @Test
+ public void testFullUriNonMatching() throws Exception {
+ Path path = new Path("s3://hugo@myawesomehost:55522/path/to/the/file");
+ S3PrestoFileSystem fs = createFs("_entropy_key_", 4);
+
+ assertEquals(toHadoopPath(path), fs.toHadoopPathInjectEntropy(path));
+ assertEquals(toHadoopPath(path), fs.toHadoopPath(path));
+ }
+
+ @Test
+ public void testFullUriMatching() throws Exception {
+ Path path = new Path("s3://hugo@myawesomehost:55522/path/s0mek3y/the/file");
+ S3PrestoFileSystem fs = createFs("s0mek3y", 8);
+
+ org.apache.hadoop.fs.Path withEntropy = fs.toHadoopPathInjectEntropy(path);
+ org.apache.hadoop.fs.Path withoutEntropy = fs.toHadoopPath(path);
+
+ validateMatches(withEntropy, "s3://hugo@myawesomehost:55522/path/[a-zA-Z0-9]{8}/the/file");
+ assertEquals(new org.apache.hadoop.fs.Path("s3://hugo@myawesomehost:55522/path/the/file"), withoutEntropy);
+ }
+
+ @Test
+ public void testPathOnlyNonMatching() throws Exception {
+ Path path = new Path("/path/file");
+ S3PrestoFileSystem fs = createFs("_entropy_key_", 4);
+
+ assertEquals(toHadoopPath(path), fs.toHadoopPathInjectEntropy(path));
+ assertEquals(toHadoopPath(path), fs.toHadoopPath(path));
+ }
+
+ @Test
+ public void testPathOnlyMatching() throws Exception {
+ Path path = new Path("/path/_entropy_key_/file");
+ S3PrestoFileSystem fs = createFs("_entropy_key_", 4);
+
+ org.apache.hadoop.fs.Path withEntropy = fs.toHadoopPathInjectEntropy(path);
+ org.apache.hadoop.fs.Path withoutEntropy = fs.toHadoopPath(path);
+
+ validateMatches(withEntropy, "/path/[a-zA-Z0-9]{4}/file");
+ assertEquals(new org.apache.hadoop.fs.Path("/path/file"), withoutEntropy);
+ }
+
+ @Test
+ public void testEntropyNotFullSegment() throws Exception {
+ Path path = new Path("s3://myhost:122/entropy-_entropy_key_-suffix/file");
+ S3PrestoFileSystem fs = createFs("_entropy_key_", 3);
+
+ org.apache.hadoop.fs.Path withEntropy = fs.toHadoopPathInjectEntropy(path);
+ org.apache.hadoop.fs.Path withoutEntropy = fs.toHadoopPath(path);
+
+ validateMatches(withEntropy, "s3://myhost:122/entropy-[a-zA-Z0-9]{3}-suffix/file");
+ assertEquals(new org.apache.hadoop.fs.Path("s3://myhost:122/entropy--suffix/file"), withoutEntropy);
+ }
+
+ @Test
+ public void testWriteOptionWithEntropy() throws Exception {
+ FileSystem underlyingFs = mock(FileSystem.class);
+ when(underlyingFs.create(any(org.apache.hadoop.fs.Path.class), anyBoolean())).thenReturn(mock(FSDataOutputStream.class));
+ ArgumentCaptor<org.apache.hadoop.fs.Path> pathCaptor = ArgumentCaptor.forClass(org.apache.hadoop.fs.Path.class);
+
+ Path path = new Path("s3://hugo@myawesomehost:55522/path/s0mek3y/the/file");
+ S3PrestoFileSystem fs = new S3PrestoFileSystem(underlyingFs, "s0mek3y", 11);
+
+ fs.create(path, new WriteOptions().setInjectEntropy(true));
+ verify(underlyingFs).create(pathCaptor.capture(), anyBoolean());
+
+ validateMatches(pathCaptor.getValue(), "s3://hugo@myawesomehost:55522/path/[a-zA-Z0-9]{11}/the/file");
+ }
+
+ private static void validateMatches(org.apache.hadoop.fs.Path path, String pattern) {
+ if (!path.toString().matches(pattern)) {
+ fail("Path " + path + " does not match " + pattern);
+ }
+ }
+
+ private static S3PrestoFileSystem createFs(String entropyKey, int entropyLen) {
+ return new S3PrestoFileSystem(mock(FileSystem.class), entropyKey, entropyLen);
+ }
+
+ private org.apache.hadoop.fs.Path toHadoopPath(Path path) {
+ return new org.apache.hadoop.fs.Path(path.toUri());
+ }
+}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
index 4eeb2d4..9afad57 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
@@ -21,12 +21,12 @@ package org.apache.flink.fs.s3presto;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3Client;
import com.facebook.presto.hive.PrestoS3FileSystem;
+import org.junit.After;
import org.junit.Test;
import java.lang.reflect.Field;
@@ -41,6 +41,11 @@ import static org.junit.Assert.assertTrue;
*/
public class PrestoS3FileSystemTest {
+ @After
+ public void resetFileSystemConfig() throws Exception {
+ FileSystem.initialize(new Configuration());
+ }
+
@Test
public void testConfigPropagation() throws Exception{
final Configuration conf = new Configuration();
@@ -90,14 +95,29 @@ public class PrestoS3FileSystemTest {
hadoopConfig.get("presto.s3.credentials-provider"));
}
+ @Test
+ public void testEntropyInjectionConfig() throws Exception {
+ final Configuration conf = new Configuration();
+ conf.setString("s3.entropy.key", "__entropy__");
+ conf.setInteger("s3.entropy.length", 7);
+
+ FileSystem.initialize(conf);
+
+ FileSystem fs = FileSystem.get(new URI("s3://test"));
+ S3PrestoFileSystem s3fs = (S3PrestoFileSystem) fs;
+
+ assertEquals("__entropy__", s3fs.getEntropyInjectionKey());
+ assertEquals(7, s3fs.getEntropyLength());
+ }
+
// ------------------------------------------------------------------------
// utilities
// ------------------------------------------------------------------------
private static void validateBasicCredentials(FileSystem fs) throws Exception {
- assertTrue(fs instanceof HadoopFileSystem);
+ assertTrue(fs instanceof S3PrestoFileSystem);
- org.apache.hadoop.fs.FileSystem hadoopFs = ((HadoopFileSystem) fs).getHadoopFileSystem();
+ org.apache.hadoop.fs.FileSystem hadoopFs = ((S3PrestoFileSystem) fs).getHadoopFileSystem();
assertTrue(hadoopFs instanceof PrestoS3FileSystem);
try (PrestoS3FileSystem prestoFs = (PrestoS3FileSystem) hadoopFs) {