You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/25 06:51:59 UTC
[flink] 01/06: Revert "[FLINK-9061] Add entropy injection to S3
file system"
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
commit aff9994862331a81ab93dd206073a0503d784c84
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Sep 20 21:57:22 2018 +0200
Revert "[FLINK-9061] Add entropy injection to S3 file system"
This was incorrectly applied to the 1.6 branch
---
.../java/org/apache/flink/core/fs/FileSystem.java | 44 +--
.../org/apache/flink/core/fs/WriteOptions.java | 144 ----------
.../java/org/apache/flink/util/StringUtils.java | 32 ---
.../org/apache/flink/util/StringUtilsTest.java | 14 -
.../flink/runtime/fs/hdfs/HadoopFileSystem.java | 25 --
.../flink/fs/s3presto/S3FileSystemFactory.java | 100 +------
.../flink/fs/s3presto/S3PrestoFileSystem.java | 301 ---------------------
.../fs/s3presto/PrestoS3FileSystemEntropyTest.java | 133 ---------
.../flink/fs/s3presto/PrestoS3FileSystemTest.java | 26 +-
.../state/filesystem/FsCheckpointStorage.java | 17 +-
.../filesystem/FsCheckpointStreamFactory.java | 7 +-
.../FsCheckpointStateOutputStreamTest.java | 12 +-
.../filesystem/FsStateBackendEntropyTest.java | 99 -------
13 files changed, 48 insertions(+), 906 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index ba2113a..d451109 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -576,8 +576,29 @@ public abstract class FileSystem {
/**
* Opens an FSDataOutputStream at the indicated Path.
*
- * @deprecated Deprecated in favor of {@link #create(Path, WriteOptions)} which offers better extensibility
- * to options that are supported only by some filesystems implementations.
+ * <p>This method is deprecated, because most of its parameters are ignored by most file systems.
+ * To control for example the replication factor and block size in the Hadoop Distributed File system,
+ * make sure that the respective Hadoop configuration file is either linked from the Flink configuration,
+ * or in the classpath of either Flink or the user code.
+ *
+ * @param f
+ * the file name to open
+ * @param overwrite
+ * if a file with this name already exists, then if true,
+ * the file will be overwritten, and if false an error will be thrown.
+ * @param bufferSize
+ * the size of the buffer to be used.
+ * @param replication
+ * required block replication for the file.
+ * @param blockSize
+ * the size of the file blocks
+ *
+ * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because
+ * a file already exists at that path and the write mode indicates to not
+ * overwrite the file.
+ *
+ * @deprecated Deprecated because not well supported across types of file systems.
+ * Control the behavior of specific file systems via configurations instead.
*/
@Deprecated
public FSDataOutputStream create(
@@ -628,25 +649,6 @@ public abstract class FileSystem {
public abstract FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException;
/**
- * Creates a new file at the given path and opens an FSDataOutputStream to that new file.
- *
- * <p>This method takes various options, some of which are not supported by all file systems
- * (such as controlling block size).
- *
- * <p>Implementation note: This method should be abstract, but is currently not in order to not break
- * backwards compatibility of this class with earlier Flink versions.
- *
- * @param f The path for the new file.
- * @param options The options to parametrize the file and stream creation.
- * @return The stream to the new file at the target path.
- *
- * @throws IOException Thrown if an error occurs while creating the file or opening the stream.
- */
- public FSDataOutputStream create(Path f, WriteOptions options) throws IOException {
- return create(f, options.getOverwrite());
- }
-
- /**
* Renames the file/directory src to dst.
*
* @param src
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/WriteOptions.java b/flink-core/src/main/java/org/apache/flink/core/fs/WriteOptions.java
deleted file mode 100644
index 70f4973..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/fs/WriteOptions.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.fs;
-
-import org.apache.flink.annotation.Public;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
-
-import javax.annotation.Nullable;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Write options that can be passed to the methods that write files.
- */
-@Public
-public class WriteOptions {
-
- private WriteMode overwrite = WriteMode.NO_OVERWRITE;
-
- @Nullable
- private BlockOptions blockSettings;
-
- private boolean injectEntropy;
-
- // ------------------------------------------------------------------------
- // getters & setters
- // ------------------------------------------------------------------------
-
- /**
- * Gets the overwrite option.
- */
- public WriteMode getOverwrite() {
- return overwrite;
- }
-
- /**
- * Sets the overwrite option.
- *
- * <p>Method returns this object for fluent function call chaining.
- */
- public WriteOptions setOverwrite(WriteMode overwrite) {
- this.overwrite = checkNotNull(overwrite);
- return this;
- }
-
- /**
- * Gets the block writing settings, like size and replication factor.
- * Returns null if no settings are defined.
- */
- @Nullable
- public BlockOptions getBlockSettings() {
- return blockSettings;
- }
-
- /**
- * Sets the block settings, for file systems working with block replication and
- * exposing those settings
- *
- * <p>Method returns this object for fluent function call chaining.
- */
- public WriteOptions setBlockSettings(@Nullable BlockOptions blockSettings) {
- this.blockSettings = blockSettings;
- return this;
- }
-
- /**
- * Gets whether to inject entropy into the path.
- */
- public boolean isInjectEntropy() {
- return injectEntropy;
- }
-
- /**
- * Sets whether to inject entropy into the path.
- *
- * <p>Entropy injection is only supported select filesystems like S3 to overcome
- * scalability issues in the sharding. For this option to have any effect, the
- * file system must be configured to replace an entropy key with entropy, and the
- * path that is written to must contain the entropy key.
- *
- * <p>Method returns this object for fluent function call chaining.
- */
- public WriteOptions setInjectEntropy(boolean injectEntropy) {
- this.injectEntropy = injectEntropy;
- return this;
- }
-
- // ------------------------------------------------------------------------
- // nested options classes
- // ------------------------------------------------------------------------
-
- /**
- * Settings for block replication. Interpreted only by filesystems that are based
- * expose block replication settings.
- */
- @Public
- public static class BlockOptions {
-
- /** The size of the blocks, in bytes. */
- private long blockSize;
-
- /** The number of times the block should be replicated. */
- private int replicationFactor;
-
- public BlockOptions(long blockSize, int replicationFactor) {
- checkArgument(blockSize > 0, "blockSize must be >0");
- checkArgument(replicationFactor > 0, "replicationFactor must be >=1");
-
- this.blockSize = blockSize;
- this.replicationFactor = replicationFactor;
- }
-
- /**
- * Gets the block size, in bytes.
- */
- public long getBlockSize() {
- return blockSize;
- }
-
- /**
- * Gets the number of times the block should be replicated.
- */
- public int getReplicationFactor() {
- return replicationFactor;
- }
- }
-}
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index c3b3808..208a301 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -30,7 +30,6 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
-import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -248,37 +247,6 @@ public final class StringUtils {
}
/**
- * Appends a random alphanumeric string of given length to the given string buffer.
- *
- * @param rnd The random number generator to use.
- * @param buffer The buffer to append to.
- * @param length The number of alphanumeric characters to append.
- */
- public static void appendRandomAlphanumericString(Random rnd, StringBuilder buffer, int length) {
- checkNotNull(rnd);
- checkArgument(length >= 0);
-
- for (int i = 0; i < length; i++) {
- buffer.append(nextAlphanumericChar(rnd));
- }
- }
-
- private static char nextAlphanumericChar(Random rnd) {
- int which = rnd.nextInt(62);
- char c;
- if (which < 10) {
- c = (char) ('0' + which);
- }
- else if (which < 36) {
- c = (char) ('A' - 10 + which);
- }
- else {
- c = (char) ('a' - 36 + which);
- }
- return c;
- }
-
- /**
* Writes a String to the given output.
* The written string can be read with {@link #readString(DataInputView)}.
*
diff --git a/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
index 1c9abf2..5f705b4 100644
--- a/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
@@ -20,11 +20,8 @@ package org.apache.flink.util;
import org.junit.Test;
-import java.util.Random;
-
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
/**
* Tests for the {@link StringUtils}.
@@ -59,15 +56,4 @@ public class StringUtilsTest extends TestLogger {
String hex = StringUtils.byteToHexString(byteArray);
assertEquals("019f314a", hex);
}
-
- @Test
- public void testAppendAlphanumeric() {
- StringBuilder bld = new StringBuilder();
- StringUtils.appendRandomAlphanumericString(new Random(), bld, 256);
- String str = bld.toString();
-
- if (!str.matches("[a-zA-Z0-9]+")) {
- fail("Not alphanumeric: " + str);
- }
- }
}
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index bceed5e..065ba5a 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -19,20 +19,16 @@
package org.apache.flink.runtime.fs.hdfs;
import org.apache.flink.core.fs.BlockLocation;
-import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.core.fs.WriteOptions;
-import org.apache.flink.core.fs.WriteOptions.BlockOptions;
import java.io.IOException;
import java.net.URI;
import java.util.Locale;
-import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -40,9 +36,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class HadoopFileSystem extends FileSystem {
- /** The write buffer size used by default. */
- public static final int DEFAULT_WRITE_BUFFER_SIZE = 4096;
-
/** The wrapped Hadoop File System. */
private final org.apache.hadoop.fs.FileSystem fs;
@@ -150,24 +143,6 @@ public class HadoopFileSystem extends FileSystem {
}
@Override
- public FSDataOutputStream create(Path f, WriteOptions options) throws IOException {
- BlockOptions blockSettings = options.getBlockSettings();
- if (blockSettings == null) {
- return create(f, options.getOverwrite());
- }
- else {
- checkArgument(blockSettings.getReplicationFactor() <= Short.MAX_VALUE,
- "block replication factor out of bounds");
-
- return create(f,
- options.getOverwrite() == WriteMode.OVERWRITE,
- DEFAULT_WRITE_BUFFER_SIZE,
- (short) blockSettings.getReplicationFactor(),
- blockSettings.getBlockSize());
- }
- }
-
- @Override
public boolean delete(final Path f, final boolean recursive) throws IOException {
return this.fs.delete(toHadoopPath(f), recursive);
}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index 230d18b..a04f9c9 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -19,20 +19,12 @@
package org.apache.flink.fs.s3presto;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory;
import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
import org.apache.flink.util.FlinkRuntimeException;
import com.facebook.presto.hive.PrestoS3FileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
@@ -42,31 +34,7 @@ import java.util.Set;
/**
* Simple factory for the S3 file system.
*/
-public class S3FileSystemFactory implements FileSystemFactory {
-
- /**
- * The substring to be replaced by random entropy in checkpoint paths.
- */
- public static final ConfigOption<String> ENTROPY_INJECT_KEY_OPTION = ConfigOptions
- .key("s3.entropy.key")
- .noDefaultValue()
- .withDescription(
- "This option can be used to improve performance due to sharding issues on Amazon S3. " +
- "For file creations with entropy injection, this key will be replaced by random " +
- "alphanumeric characters. For other file creations, the key will be filtered out.");
-
- /**
- * The number of entropy characters, in case entropy injection is configured.
- */
- public static final ConfigOption<Integer> ENTROPY_INJECT_LENGTH_OPTION = ConfigOptions
- .key("s3.entropy.length")
- .defaultValue(4)
- .withDescription(
- "When '" + ENTROPY_INJECT_KEY_OPTION.key() + "' is set, this option defines the number of " +
- "random characters to replace the entropy key with.");
-
- private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemFactory.class);
-
+public class S3FileSystemFactory extends AbstractFileSystemFactory {
private static final Set<String> PACKAGE_PREFIXES_TO_SHADE =
new HashSet<>(Collections.singletonList("com.amazonaws."));
@@ -82,55 +50,8 @@ public class S3FileSystemFactory implements FileSystemFactory {
{ "presto.s3.secret.key", "presto.s3.secret-key" }
};
- private static final String INVALID_ENTROPY_KEY_CHARS = "^.*[~#@*+%{}<>\\[\\]|\"\\\\].*$";
-
- private final HadoopConfigLoader hadoopConfigLoader = createHadoopConfigLoader();
-
- private Configuration flinkConfig;
-
- @Override
- public void configure(Configuration config) {
- flinkConfig = config;
- hadoopConfigLoader.setFlinkConfig(config);
- }
-
- @Override
- public FileSystem create(URI fsUri) throws IOException {
- LOG.debug("Creating S3 FileSystem backed by Presto S3 FileSystem");
- LOG.debug("Loading Hadoop configuration for Presto S3 File System");
-
- try {
- // instantiate the presto file system
- org.apache.hadoop.conf.Configuration hadoopConfig = hadoopConfigLoader.getOrLoadHadoopConfig();
- org.apache.hadoop.fs.FileSystem fs = new PrestoS3FileSystem();
- fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
-
- // load the entropy injection settings
- String entropyInjectionKey = flinkConfig.getString(ENTROPY_INJECT_KEY_OPTION);
- int numEntropyChars = -1;
-
- if (entropyInjectionKey != null) {
- if (entropyInjectionKey.matches(INVALID_ENTROPY_KEY_CHARS)) {
- throw new IllegalConfigurationException("Invalid character in value for " +
- ENTROPY_INJECT_KEY_OPTION.key() + " : " + entropyInjectionKey);
- }
-
- numEntropyChars = flinkConfig.getInteger(ENTROPY_INJECT_LENGTH_OPTION);
-
- if (numEntropyChars <= 0) {
- throw new IllegalConfigurationException(
- ENTROPY_INJECT_LENGTH_OPTION.key() + " must configure a value > 0");
- }
- }
-
- return new S3PrestoFileSystem(fs, entropyInjectionKey, numEntropyChars);
- }
- catch (IOException e) {
- throw e;
- }
- catch (Exception e) {
- throw new IOException(e.getMessage(), e);
- }
+ public S3FileSystemFactory() {
+ super("Presto S3 File System", createHadoopConfigLoader());
}
@Override
@@ -144,7 +65,13 @@ public class S3FileSystemFactory implements FileSystemFactory {
"presto.s3.", PACKAGE_PREFIXES_TO_SHADE, CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX);
}
- static URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) {
+ @Override
+ protected org.apache.hadoop.fs.FileSystem createHadoopFileSystem() {
+ return new PrestoS3FileSystem();
+ }
+
+ @Override
+ protected URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) {
final String scheme = fsUri.getScheme();
final String authority = fsUri.getAuthority();
final URI initUri;
@@ -161,11 +88,10 @@ public class S3FileSystemFactory implements FileSystemFactory {
return initUri;
}
- static URI createURI(String str) {
+ private URI createURI(String str) {
try {
return new URI(str);
- }
- catch (URISyntaxException e) {
+ } catch (URISyntaxException e) {
throw new FlinkRuntimeException("Error in s3 aws URI - " + str, e);
}
}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java
deleted file mode 100644
index e6a6ae4..0000000
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.fs.s3presto;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.core.fs.BlockLocation;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemKind;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.WriteOptions;
-import org.apache.flink.runtime.fs.hdfs.HadoopBlockLocation;
-import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
-import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileStatus;
-import org.apache.flink.util.StringUtils;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A Flink FileSystem against S3, wrapping the Presto Hadoop S3 File System implementation.
- *
- * <p>This class bases heavily on the {@link org.apache.flink.runtime.fs.hdfs.HadoopFileSystem} class.
- * Code is copied here for the sake of minimal changes to the original class within a minor release.
- */
-class S3PrestoFileSystem extends FileSystem {
-
- /** The wrapped Hadoop File System. */
- private final org.apache.hadoop.fs.FileSystem fs;
-
- @Nullable
- private final String entropyInjectionKey;
-
- private final int entropyLength;
-
- /**
- * Wraps the given Hadoop File System object as a Flink File System object.
- * The given Hadoop file system object is expected to be initialized already.
- *
- * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
- */
- public S3PrestoFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem) {
- this(hadoopFileSystem, null, -1);
- }
-
- /**
- * Wraps the given Hadoop File System object as a Flink File System object.
- * The given Hadoop file system object is expected to be initialized already.
- *
- * <p>This constructor additionally configures the entropy injection for the file system.
- *
- * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
- * @param entropyInjectionKey The substring that will be replaced by entropy or removed.
- * @param entropyLength The number of random alphanumeric characters to inject as entropy.
- */
- public S3PrestoFileSystem(
- org.apache.hadoop.fs.FileSystem hadoopFileSystem,
- @Nullable String entropyInjectionKey,
- int entropyLength) {
-
- if (entropyInjectionKey != null && entropyLength <= 0) {
- throw new IllegalArgumentException("Entropy length must be >= 0 when entropy injection key is set");
- }
-
- this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
- this.entropyInjectionKey = entropyInjectionKey;
- this.entropyLength = entropyLength;
- }
-
- // ------------------------------------------------------------------------
- // properties
- // ------------------------------------------------------------------------
-
- public org.apache.hadoop.fs.FileSystem getHadoopFileSystem() {
- return fs;
- }
-
- @Nullable
- public String getEntropyInjectionKey() {
- return entropyInjectionKey;
- }
-
- public int getEntropyLength() {
- return entropyLength;
- }
-
- // ------------------------------------------------------------------------
- // file system methods
- // ------------------------------------------------------------------------
-
- @Override
- public Path getWorkingDirectory() {
- return new Path(fs.getWorkingDirectory().toUri());
- }
-
- public Path getHomeDirectory() {
- return new Path(fs.getHomeDirectory().toUri());
- }
-
- @Override
- public URI getUri() {
- return fs.getUri();
- }
-
- @Override
- public FileStatus getFileStatus(final Path f) throws IOException {
- org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(toHadoopPath(f));
- return new HadoopFileStatus(status);
- }
-
- @Override
- public BlockLocation[] getFileBlockLocations(
- final FileStatus file,
- final long start,
- final long len) throws IOException {
-
- if (!(file instanceof HadoopFileStatus)) {
- throw new IOException("file is not an instance of HadoopFileStatus");
- }
-
- final HadoopFileStatus f = (HadoopFileStatus) file;
-
- final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(),
- start, len);
-
- // Wrap up HDFS specific block location objects
- final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
- for (int i = 0; i < distBlkLocations.length; i++) {
- distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]);
- }
-
- return distBlkLocations;
- }
-
- @Override
- public HadoopDataInputStream open(final Path f, final int bufferSize) throws IOException {
- final org.apache.hadoop.fs.Path path = toHadoopPath(f);
- final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path, bufferSize);
- return new HadoopDataInputStream(fdis);
- }
-
- @Override
- public HadoopDataInputStream open(final Path f) throws IOException {
- final org.apache.hadoop.fs.Path path = toHadoopPath(f);
- final org.apache.hadoop.fs.FSDataInputStream fdis = fs.open(path);
- return new HadoopDataInputStream(fdis);
- }
-
- @Override
- public HadoopDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException {
- final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream =
- fs.create(toHadoopPath(f), overwrite == WriteMode.OVERWRITE);
- return new HadoopDataOutputStream(fsDataOutputStream);
- }
-
- @Override
- public FSDataOutputStream create(final Path f, final WriteOptions options) throws IOException {
- final org.apache.hadoop.fs.Path path = options.isInjectEntropy()
- ? toHadoopPathInjectEntropy(f)
- : toHadoopPath(f);
-
- final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream = fs.create(
- path, options.getOverwrite() == WriteMode.OVERWRITE);
-
- return new HadoopDataOutputStream(fsDataOutputStream);
- }
-
- @Override
- public boolean delete(final Path f, final boolean recursive) throws IOException {
- return fs.delete(toHadoopPath(f), recursive);
- }
-
- @Override
- public boolean exists(Path f) throws IOException {
- return fs.exists(toHadoopPath(f));
- }
-
- @Override
- public FileStatus[] listStatus(final Path f) throws IOException {
- final org.apache.hadoop.fs.FileStatus[] hadoopFiles = fs.listStatus(toHadoopPath(f));
- final FileStatus[] files = new FileStatus[hadoopFiles.length];
-
- // Convert types
- for (int i = 0; i < files.length; i++) {
- files[i] = new HadoopFileStatus(hadoopFiles[i]);
- }
-
- return files;
- }
-
- @Override
- public boolean mkdirs(final Path f) throws IOException {
- return fs.mkdirs(toHadoopPath(f));
- }
-
- @Override
- public boolean rename(final Path src, final Path dst) throws IOException {
- return fs.rename(toHadoopPath(src), toHadoopPath(dst));
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public long getDefaultBlockSize() {
- return fs.getDefaultBlockSize();
- }
-
- @Override
- public boolean isDistributedFS() {
- return true;
- }
-
- @Override
- public FileSystemKind getKind() {
- return FileSystemKind.OBJECT_STORE;
- }
-
- // ------------------------------------------------------------------------
- // entropy utilities
- // ------------------------------------------------------------------------
-
- @VisibleForTesting
- org.apache.hadoop.fs.Path toHadoopPath(Path path) throws IOException {
- return rewritePathForEntropyKey(path, false);
- }
-
- @VisibleForTesting
- org.apache.hadoop.fs.Path toHadoopPathInjectEntropy(Path path) throws IOException {
- return rewritePathForEntropyKey(path, true);
- }
-
- private org.apache.hadoop.fs.Path rewritePathForEntropyKey(Path path, boolean addEntropy) throws IOException {
- if (entropyInjectionKey == null) {
- return convertToHadoopPath(path);
- }
- else {
- final URI originalUri = path.toUri();
- final String checkpointPath = originalUri.getPath();
-
- final int indexOfKey = checkpointPath.indexOf(entropyInjectionKey);
- if (indexOfKey == -1) {
- return convertToHadoopPath(path);
- }
- else {
- final StringBuilder buffer = new StringBuilder(checkpointPath.length());
- buffer.append(checkpointPath, 0, indexOfKey);
-
- if (addEntropy) {
- StringUtils.appendRandomAlphanumericString(ThreadLocalRandom.current(), buffer, entropyLength);
- }
-
- buffer.append(checkpointPath, indexOfKey + entropyInjectionKey.length(), checkpointPath.length());
-
- final String rewrittenPath = buffer.toString();
- try {
- return convertToHadoopPath(new URI(
- originalUri.getScheme(),
- originalUri.getAuthority(),
- rewrittenPath,
- originalUri.getQuery(),
- originalUri.getFragment()));
- }
- catch (URISyntaxException e) {
- // this should actually never happen, because the URI was valid before
- throw new IOException("URI format error while processing path for entropy injection", e);
- }
- }
- }
- }
-
- private static org.apache.hadoop.fs.Path convertToHadoopPath(URI uri) {
- return new org.apache.hadoop.fs.Path(uri);
- }
-
- private static org.apache.hadoop.fs.Path convertToHadoopPath(Path path) {
- return convertToHadoopPath(path.toUri());
- }
-}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemEntropyTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemEntropyTest.java
deleted file mode 100644
index 587b02e..0000000
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemEntropyTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.fs.s3presto;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.WriteOptions;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the entropy injection in the {@link S3PrestoFileSystem}.
- */
-public class PrestoS3FileSystemEntropyTest {
-
- @Test
- public void testEmptyPath() throws Exception {
- Path path = new Path("hdfs://localhost:12345");
- S3PrestoFileSystem fs = createFs("test", 4);
-
- assertEquals(toHadoopPath(path), fs.toHadoopPathInjectEntropy(path));
- assertEquals(toHadoopPath(path), fs.toHadoopPath(path));
- }
-
- @Test
- public void testFullUriNonMatching() throws Exception {
- Path path = new Path("s3://hugo@myawesomehost:55522/path/to/the/file");
- S3PrestoFileSystem fs = createFs("_entropy_key_", 4);
-
- assertEquals(toHadoopPath(path), fs.toHadoopPathInjectEntropy(path));
- assertEquals(toHadoopPath(path), fs.toHadoopPath(path));
- }
-
- @Test
- public void testFullUriMatching() throws Exception {
- Path path = new Path("s3://hugo@myawesomehost:55522/path/s0mek3y/the/file");
- S3PrestoFileSystem fs = createFs("s0mek3y", 8);
-
- org.apache.hadoop.fs.Path withEntropy = fs.toHadoopPathInjectEntropy(path);
- org.apache.hadoop.fs.Path withoutEntropy = fs.toHadoopPath(path);
-
- validateMatches(withEntropy, "s3://hugo@myawesomehost:55522/path/[a-zA-Z0-9]{8}/the/file");
- assertEquals(new org.apache.hadoop.fs.Path("s3://hugo@myawesomehost:55522/path/the/file"), withoutEntropy);
- }
-
- @Test
- public void testPathOnlyNonMatching() throws Exception {
- Path path = new Path("/path/file");
- S3PrestoFileSystem fs = createFs("_entropy_key_", 4);
-
- assertEquals(toHadoopPath(path), fs.toHadoopPathInjectEntropy(path));
- assertEquals(toHadoopPath(path), fs.toHadoopPath(path));
- }
-
- @Test
- public void testPathOnlyMatching() throws Exception {
- Path path = new Path("/path/_entropy_key_/file");
- S3PrestoFileSystem fs = createFs("_entropy_key_", 4);
-
- org.apache.hadoop.fs.Path withEntropy = fs.toHadoopPathInjectEntropy(path);
- org.apache.hadoop.fs.Path withoutEntropy = fs.toHadoopPath(path);
-
- validateMatches(withEntropy, "/path/[a-zA-Z0-9]{4}/file");
- assertEquals(new org.apache.hadoop.fs.Path("/path/file"), withoutEntropy);
- }
-
- @Test
- public void testEntropyNotFullSegment() throws Exception {
- Path path = new Path("s3://myhost:122/entropy-_entropy_key_-suffix/file");
- S3PrestoFileSystem fs = createFs("_entropy_key_", 3);
-
- org.apache.hadoop.fs.Path withEntropy = fs.toHadoopPathInjectEntropy(path);
- org.apache.hadoop.fs.Path withoutEntropy = fs.toHadoopPath(path);
-
- validateMatches(withEntropy, "s3://myhost:122/entropy-[a-zA-Z0-9]{3}-suffix/file");
- assertEquals(new org.apache.hadoop.fs.Path("s3://myhost:122/entropy--suffix/file"), withoutEntropy);
- }
-
- @Test
- public void testWriteOptionWithEntropy() throws Exception {
- FileSystem underlyingFs = mock(FileSystem.class);
- when(underlyingFs.create(any(org.apache.hadoop.fs.Path.class), anyBoolean())).thenReturn(mock(FSDataOutputStream.class));
- ArgumentCaptor<org.apache.hadoop.fs.Path> pathCaptor = ArgumentCaptor.forClass(org.apache.hadoop.fs.Path.class);
-
- Path path = new Path("s3://hugo@myawesomehost:55522/path/s0mek3y/the/file");
- S3PrestoFileSystem fs = new S3PrestoFileSystem(underlyingFs, "s0mek3y", 11);
-
- fs.create(path, new WriteOptions().setInjectEntropy(true));
- verify(underlyingFs).create(pathCaptor.capture(), anyBoolean());
-
- validateMatches(pathCaptor.getValue(), "s3://hugo@myawesomehost:55522/path/[a-zA-Z0-9]{11}/the/file");
- }
-
- private static void validateMatches(org.apache.hadoop.fs.Path path, String pattern) {
- if (!path.toString().matches(pattern)) {
- fail("Path " + path + " does not match " + pattern);
- }
- }
-
- private static S3PrestoFileSystem createFs(String entropyKey, int entropyLen) {
- return new S3PrestoFileSystem(mock(FileSystem.class), entropyKey, entropyLen);
- }
-
- private org.apache.hadoop.fs.Path toHadoopPath(Path path) {
- return new org.apache.hadoop.fs.Path(path.toUri());
- }
-}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
index 9afad57..4eeb2d4 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
@@ -21,12 +21,12 @@ package org.apache.flink.fs.s3presto;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3Client;
import com.facebook.presto.hive.PrestoS3FileSystem;
-import org.junit.After;
import org.junit.Test;
import java.lang.reflect.Field;
@@ -41,11 +41,6 @@ import static org.junit.Assert.assertTrue;
*/
public class PrestoS3FileSystemTest {
- @After
- public void resetFileSystemConfig() throws Exception {
- FileSystem.initialize(new Configuration());
- }
-
@Test
public void testConfigPropagation() throws Exception{
final Configuration conf = new Configuration();
@@ -95,29 +90,14 @@ public class PrestoS3FileSystemTest {
hadoopConfig.get("presto.s3.credentials-provider"));
}
- @Test
- public void testEntropyInjectionConfig() throws Exception {
- final Configuration conf = new Configuration();
- conf.setString("s3.entropy.key", "__entropy__");
- conf.setInteger("s3.entropy.length", 7);
-
- FileSystem.initialize(conf);
-
- FileSystem fs = FileSystem.get(new URI("s3://test"));
- S3PrestoFileSystem s3fs = (S3PrestoFileSystem) fs;
-
- assertEquals("__entropy__", s3fs.getEntropyInjectionKey());
- assertEquals(7, s3fs.getEntropyLength());
- }
-
// ------------------------------------------------------------------------
// utilities
// ------------------------------------------------------------------------
private static void validateBasicCredentials(FileSystem fs) throws Exception {
- assertTrue(fs instanceof S3PrestoFileSystem);
+ assertTrue(fs instanceof HadoopFileSystem);
- org.apache.hadoop.fs.FileSystem hadoopFs = ((S3PrestoFileSystem) fs).getHadoopFileSystem();
+ org.apache.hadoop.fs.FileSystem hadoopFs = ((HadoopFileSystem) fs).getHadoopFileSystem();
assertTrue(hadoopFs instanceof PrestoS3FileSystem);
try (PrestoS3FileSystem prestoFs = (PrestoS3FileSystem) hadoopFs) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
index 1549e01..af80af7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
@@ -32,7 +32,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* An implementation of durable checkpoint storage to file systems.
@@ -55,25 +54,11 @@ public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
JobID jobId,
int fileSizeThreshold) throws IOException {
- this(checkpointBaseDirectory.getFileSystem(),
- checkpointBaseDirectory,
- defaultSavepointDirectory,
- jobId,
- fileSizeThreshold);
- }
-
- public FsCheckpointStorage(
- FileSystem fs,
- Path checkpointBaseDirectory,
- @Nullable Path defaultSavepointDirectory,
- JobID jobId,
- int fileSizeThreshold) throws IOException {
-
super(jobId, defaultSavepointDirectory);
checkArgument(fileSizeThreshold >= 0);
- this.fileSystem = checkNotNull(fs);
+ this.fileSystem = checkpointBaseDirectory.getFileSystem();
this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointBaseDirectory, jobId);
this.sharedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_SHARED_STATE_DIR);
this.taskOwnedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_TASK_OWNED_STATE_DIR);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 228c5b4..609ef69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.state.filesystem;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.WriteOptions;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;
@@ -346,11 +345,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
for (int attempt = 0; attempt < 10; attempt++) {
try {
Path statePath = createStatePath();
- FSDataOutputStream outStream = fs.create(
- statePath,
- new WriteOptions()
- .setOverwrite(FileSystem.WriteMode.NO_OVERWRITE)
- .setInjectEntropy(true));
+ FSDataOutputStream outStream = fs.create(statePath, FileSystem.WriteMode.NO_OVERWRITE);
// success, managed to open the stream
this.statePath = statePath;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
index c962146..8bafdf7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.WriteOptions;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.StreamStateHandle;
@@ -139,13 +138,13 @@ public class FsCheckpointStateOutputStreamTest {
*/
@Test
public void testCleanupWhenClosingStream() throws IOException {
+
final FileSystem fs = mock(FileSystem.class);
final FSDataOutputStream outputStream = mock(FSDataOutputStream.class);
final ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
when(fs.create(pathCaptor.capture(), any(FileSystem.WriteMode.class))).thenReturn(outputStream);
- when(fs.create(pathCaptor.capture(), any(WriteOptions.class))).thenReturn(outputStream);
CheckpointStreamFactory.CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
Path.fromLocalFile(tempDir.newFolder()),
@@ -155,6 +154,9 @@ public class FsCheckpointStateOutputStreamTest {
// this should create the underlying file stream
stream.write(new byte[] {1, 2, 3, 4, 5});
+
+ verify(fs).create(any(Path.class), any(FileSystem.WriteMode.class));
+
stream.close();
verify(fs).delete(eq(pathCaptor.getValue()), anyBoolean());
@@ -168,11 +170,9 @@ public class FsCheckpointStateOutputStreamTest {
final FileSystem fs = mock(FileSystem.class);
final FSDataOutputStream outputStream = mock(FSDataOutputStream.class);
- final ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+ final ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
when(fs.create(pathCaptor.capture(), any(FileSystem.WriteMode.class))).thenReturn(outputStream);
- when(fs.create(pathCaptor.capture(), any(WriteOptions.class))).thenReturn(outputStream);
-
doThrow(new IOException("Test IOException.")).when(outputStream).close();
CheckpointStreamFactory.CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
@@ -184,6 +184,8 @@ public class FsCheckpointStateOutputStreamTest {
// this should create the underlying file stream
stream.write(new byte[] {1, 2, 3, 4, 5});
+ verify(fs).create(any(Path.class), any(FileSystem.WriteMode.class));
+
try {
stream.closeAndGetHandle();
fail("Expected IOException");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
deleted file mode 100644
index d1aa118..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.filesystem;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.WriteOptions;
-import org.apache.flink.core.fs.local.LocalFileSystem;
-import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
-import org.apache.flink.runtime.state.CheckpointStorageLocation;
-import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
-import org.apache.flink.runtime.state.CheckpointedStateScope;
-
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.mockito.ArgumentCaptor;
-
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-/**
- * Tests verifying that the FsStateBackend passes the entropy injection option
- * to the FileSystem for state payload files, but not for metadata files.
- */
-public class FsStateBackendEntropyTest {
-
- @Rule
- public final TemporaryFolder tmp = new TemporaryFolder();
-
- @After
- public void resetFileSystems() throws Exception {
- FileSystem.initialize(new Configuration());
- }
-
- @Test
- public void testInjection() throws Exception {
- FileSystem fs = spy(LocalFileSystem.getSharedInstance());
- ArgumentCaptor<WriteOptions> optionsCaptor = ArgumentCaptor.forClass(WriteOptions.class);
-
- Path checkpointDir = Path.fromLocalFile(tmp.newFolder());
-
- FsCheckpointStorage storage = new FsCheckpointStorage(
- fs, checkpointDir, null, new JobID(), 1024);
-
- CheckpointStorageLocation location = storage.initializeLocationForCheckpoint(96562);
-
- // check entropy in task-owned state
- try (CheckpointStateOutputStream stream = storage.createTaskOwnedStateStream()) {
- stream.flush();
- }
-
- verify(fs, times(1)).create(any(Path.class), optionsCaptor.capture());
- assertTrue(optionsCaptor.getValue().isInjectEntropy());
- reset(fs);
-
- // check entropy in the exclusive/shared state
- try (CheckpointStateOutputStream stream =
- location.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)) {
-
- stream.flush();
- }
-
- verify(fs, times(1)).create(any(Path.class), optionsCaptor.capture());
- assertTrue(optionsCaptor.getValue().isInjectEntropy());
- reset(fs);
-
- // check that there is no entropy in the metadata
- // check entropy in the exclusive/shared state
- try (CheckpointMetadataOutputStream stream = location.createMetadataOutputStream()) {
- stream.flush();
- }
-
- verify(fs, times(0)).create(any(Path.class), any(WriteOptions.class));
- }
-}