You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/19 07:30:11 UTC
[flink] 01/03: [FLINK-9061] [core] Introduce WriteOptions to Flink
FileSystem abstraction
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2b7ea9efe51f2049d628cf1c7ecd922d11ee1420
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Aug 23 14:32:14 2018 +0200
[FLINK-9061] [core] Introduce WriteOptions to Flink FileSystem abstraction
---
.../java/org/apache/flink/core/fs/FileSystem.java | 44 +++----
.../org/apache/flink/core/fs/WriteOptions.java | 144 +++++++++++++++++++++
.../flink/runtime/fs/hdfs/HadoopFileSystem.java | 25 ++++
3 files changed, 190 insertions(+), 23 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index d451109..ba2113a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -576,29 +576,8 @@ public abstract class FileSystem {
/**
* Opens an FSDataOutputStream at the indicated Path.
*
- * <p>This method is deprecated, because most of its parameters are ignored by most file systems.
- * To control for example the replication factor and block size in the Hadoop Distributed File system,
- * make sure that the respective Hadoop configuration file is either linked from the Flink configuration,
- * or in the classpath of either Flink or the user code.
- *
- * @param f
- * the file name to open
- * @param overwrite
- * if a file with this name already exists, then if true,
- * the file will be overwritten, and if false an error will be thrown.
- * @param bufferSize
- * the size of the buffer to be used.
- * @param replication
- * required block replication for the file.
- * @param blockSize
- * the size of the file blocks
- *
- * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because
- * a file already exists at that path and the write mode indicates to not
- * overwrite the file.
- *
- * @deprecated Deprecated because not well supported across types of file systems.
- * Control the behavior of specific file systems via configurations instead.
+ * @deprecated Deprecated in favor of {@link #create(Path, WriteOptions)} which offers better extensibility
+ * to options that are supported only by some filesystems implementations.
*/
@Deprecated
public FSDataOutputStream create(
@@ -649,6 +628,25 @@ public abstract class FileSystem {
public abstract FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException;
/**
+ * Creates a new file at the given path and opens an FSDataOutputStream to that new file.
+ *
+ * <p>This method takes various options, some of which are not supported by all file systems
+ * (such as controlling block size).
+ *
+ * <p>Implementation note: This method should be abstract, but is currently not in order to not break
+ * backwards compatibility of this class with earlier Flink versions.
+ *
+ * @param f The path for the new file.
+ * @param options The options to parametrize the file and stream creation.
+ * @return The stream to the new file at the target path.
+ *
+ * @throws IOException Thrown if an error occurs while creating the file or opening the stream.
+ */
+ public FSDataOutputStream create(Path f, WriteOptions options) throws IOException {
+ return create(f, options.getOverwrite());
+ }
+
+ /**
* Renames the file/directory src to dst.
*
* @param src
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/WriteOptions.java b/flink-core/src/main/java/org/apache/flink/core/fs/WriteOptions.java
new file mode 100644
index 0000000..70f4973
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/WriteOptions.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Write options that can be passed to the methods that write files.
+ */
+@Public
+public class WriteOptions {
+
+ private WriteMode overwrite = WriteMode.NO_OVERWRITE;
+
+ @Nullable
+ private BlockOptions blockSettings;
+
+ private boolean injectEntropy;
+
+ // ------------------------------------------------------------------------
+ // getters & setters
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the overwrite option.
+ */
+ public WriteMode getOverwrite() {
+ return overwrite;
+ }
+
+ /**
+ * Sets the overwrite option.
+ *
+ * <p>Method returns this object for fluent function call chaining.
+ */
+ public WriteOptions setOverwrite(WriteMode overwrite) {
+ this.overwrite = checkNotNull(overwrite);
+ return this;
+ }
+
+ /**
+ * Gets the block writing settings, like size and replication factor.
+ * Returns null if no settings are defined.
+ */
+ @Nullable
+ public BlockOptions getBlockSettings() {
+ return blockSettings;
+ }
+
+ /**
+ * Sets the block settings, for file systems working with block replication and
+ * exposing those settings
+ *
+ * <p>Method returns this object for fluent function call chaining.
+ */
+ public WriteOptions setBlockSettings(@Nullable BlockOptions blockSettings) {
+ this.blockSettings = blockSettings;
+ return this;
+ }
+
+ /**
+ * Gets whether to inject entropy into the path.
+ */
+ public boolean isInjectEntropy() {
+ return injectEntropy;
+ }
+
+ /**
+ * Sets whether to inject entropy into the path.
+ *
+ * <p>Entropy injection is only supported select filesystems like S3 to overcome
+ * scalability issues in the sharding. For this option to have any effect, the
+ * file system must be configured to replace an entropy key with entropy, and the
+ * path that is written to must contain the entropy key.
+ *
+ * <p>Method returns this object for fluent function call chaining.
+ */
+ public WriteOptions setInjectEntropy(boolean injectEntropy) {
+ this.injectEntropy = injectEntropy;
+ return this;
+ }
+
+ // ------------------------------------------------------------------------
+ // nested options classes
+ // ------------------------------------------------------------------------
+
+ /**
+ * Settings for block replication. Interpreted only by filesystems that are based
+ * expose block replication settings.
+ */
+ @Public
+ public static class BlockOptions {
+
+ /** The size of the blocks, in bytes. */
+ private long blockSize;
+
+ /** The number of times the block should be replicated. */
+ private int replicationFactor;
+
+ public BlockOptions(long blockSize, int replicationFactor) {
+ checkArgument(blockSize > 0, "blockSize must be >0");
+ checkArgument(replicationFactor > 0, "replicationFactor must be >=1");
+
+ this.blockSize = blockSize;
+ this.replicationFactor = replicationFactor;
+ }
+
+ /**
+ * Gets the block size, in bytes.
+ */
+ public long getBlockSize() {
+ return blockSize;
+ }
+
+ /**
+ * Gets the number of times the block should be replicated.
+ */
+ public int getReplicationFactor() {
+ return replicationFactor;
+ }
+ }
+}
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 065ba5a..bceed5e 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -19,16 +19,20 @@
package org.apache.flink.runtime.fs.hdfs;
import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.fs.WriteOptions;
+import org.apache.flink.core.fs.WriteOptions.BlockOptions;
import java.io.IOException;
import java.net.URI;
import java.util.Locale;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -36,6 +40,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class HadoopFileSystem extends FileSystem {
+ /** The write buffer size used by default. */
+ public static final int DEFAULT_WRITE_BUFFER_SIZE = 4096;
+
/** The wrapped Hadoop File System. */
private final org.apache.hadoop.fs.FileSystem fs;
@@ -143,6 +150,24 @@ public class HadoopFileSystem extends FileSystem {
}
@Override
+ public FSDataOutputStream create(Path f, WriteOptions options) throws IOException {
+ BlockOptions blockSettings = options.getBlockSettings();
+ if (blockSettings == null) {
+ return create(f, options.getOverwrite());
+ }
+ else {
+ checkArgument(blockSettings.getReplicationFactor() <= Short.MAX_VALUE,
+ "block replication factor out of bounds");
+
+ return create(f,
+ options.getOverwrite() == WriteMode.OVERWRITE,
+ DEFAULT_WRITE_BUFFER_SIZE,
+ (short) blockSettings.getReplicationFactor(),
+ blockSettings.getBlockSize());
+ }
+ }
+
+ @Override
public boolean delete(final Path f, final boolean recursive) throws IOException {
return this.fs.delete(toHadoopPath(f), recursive);
}