You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/19 07:30:11 UTC

[flink] 01/03: [FLINK-9061] [core] Introduce WriteOptions to Flink FileSystem abstraction

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2b7ea9efe51f2049d628cf1c7ecd922d11ee1420
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Aug 23 14:32:14 2018 +0200

    [FLINK-9061] [core] Introduce WriteOptions to Flink FileSystem abstraction
---
 .../java/org/apache/flink/core/fs/FileSystem.java  |  44 +++----
 .../org/apache/flink/core/fs/WriteOptions.java     | 144 +++++++++++++++++++++
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java    |  25 ++++
 3 files changed, 190 insertions(+), 23 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index d451109..ba2113a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -576,29 +576,8 @@ public abstract class FileSystem {
 	/**
 	 * Opens an FSDataOutputStream at the indicated Path.
 	 *
-	 * <p>This method is deprecated, because most of its parameters are ignored by most file systems.
-	 * To control for example the replication factor and block size in the Hadoop Distributed File system,
-	 * make sure that the respective Hadoop configuration file is either linked from the Flink configuration,
-	 * or in the classpath of either Flink or the user code.
-	 *
-	 * @param f
-	 *        the file name to open
-	 * @param overwrite
-	 *        if a file with this name already exists, then if true,
-	 *        the file will be overwritten, and if false an error will be thrown.
-	 * @param bufferSize
-	 *        the size of the buffer to be used.
-	 * @param replication
-	 *        required block replication for the file.
-	 * @param blockSize
-	 *        the size of the file blocks
-	 *
-	 * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because
-	 *                     a file already exists at that path and the write mode indicates to not
-	 *                     overwrite the file.
-	 *
-	 * @deprecated Deprecated because not well supported across types of file systems.
-	 *             Control the behavior of specific file systems via configurations instead.
+	 * @deprecated Deprecated in favor of {@link #create(Path, WriteOptions)} which offers better extensibility
+	 *             to options that are supported only by some filesystems implementations.
 	 */
 	@Deprecated
 	public FSDataOutputStream create(
@@ -649,6 +628,25 @@ public abstract class FileSystem {
 	public abstract FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException;
 
 	/**
+	 * Creates a new file at the given path and opens an FSDataOutputStream to that new file.
+	 *
+	 * <p>This method takes various options, some of which are not supported by all file systems
+	 * (such as controlling block size).
+	 *
+	 * <p>Implementation note: This method should be abstract, but is currently not in order to not break
+	 * backwards compatibility of this class with earlier Flink versions.
+	 *
+	 * @param f The path for the new file.
+	 * @param options The options to parametrize the file and stream creation.
+	 * @return The stream to the new file at the target path.
+	 *
+	 * @throws IOException Thrown if an error occurs while creating the file or opening the stream.
+	 */
+	public FSDataOutputStream create(Path f, WriteOptions options) throws IOException {
+		return create(f, options.getOverwrite());
+	}
+
+	/**
 	 * Renames the file/directory src to dst.
 	 *
 	 * @param src
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/WriteOptions.java b/flink-core/src/main/java/org/apache/flink/core/fs/WriteOptions.java
new file mode 100644
index 0000000..70f4973
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/WriteOptions.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Write options that can be passed to the methods that write files.
+ */
+@Public
+public class WriteOptions {
+
+	private WriteMode overwrite = WriteMode.NO_OVERWRITE;
+
+	@Nullable
+	private BlockOptions blockSettings;
+
+	private boolean injectEntropy;
+
+	// ------------------------------------------------------------------------
+	//  getters & setters
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the overwrite option.
+	 */
+	public WriteMode getOverwrite() {
+		return overwrite;
+	}
+
+	/**
+	 * Sets the overwrite option.
+	 *
+	 * <p>Method returns this object for fluent function call chaining.
+	 */
+	public WriteOptions setOverwrite(WriteMode overwrite) {
+		this.overwrite = checkNotNull(overwrite);
+		return this;
+	}
+
+	/**
+	 * Gets the block writing settings, like size and replication factor.
+	 * Returns null if no settings are defined.
+	 */
+	@Nullable
+	public BlockOptions getBlockSettings() {
+		return blockSettings;
+	}
+
+	/**
+	 * Sets the block settings, for file systems working with block replication and
+	 * exposing those settings
+	 *
+	 * <p>Method returns this object for fluent function call chaining.
+	 */
+	public WriteOptions setBlockSettings(@Nullable BlockOptions blockSettings) {
+		this.blockSettings = blockSettings;
+		return this;
+	}
+
+	/**
+	 * Gets whether to inject entropy into the path.
+	 */
+	public boolean isInjectEntropy() {
+		return injectEntropy;
+	}
+
+	/**
+	 * Sets whether to inject entropy into the path.
+	 *
+	 * <p>Entropy injection is only supported select filesystems like S3 to overcome
+	 * scalability issues in the sharding. For this option to have any effect, the
+	 * file system must be configured to replace an entropy key with entropy, and the
+	 * path that is written to must contain the entropy key.
+	 *
+	 * <p>Method returns this object for fluent function call chaining.
+	 */
+	public WriteOptions setInjectEntropy(boolean injectEntropy) {
+		this.injectEntropy = injectEntropy;
+		return this;
+	}
+
+	// ------------------------------------------------------------------------
+	//  nested options classes
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Settings for block replication. Interpreted only by filesystems that are based
+	 * expose block replication settings.
+	 */
+	@Public
+	public static class BlockOptions {
+
+		/** The size of the blocks, in bytes. */
+		private long blockSize;
+
+		/** The number of times the block should be replicated. */
+		private int replicationFactor;
+
+		public BlockOptions(long blockSize, int replicationFactor) {
+			checkArgument(blockSize > 0, "blockSize must be >0");
+			checkArgument(replicationFactor > 0, "replicationFactor must be >=1");
+
+			this.blockSize = blockSize;
+			this.replicationFactor = replicationFactor;
+		}
+
+		/**
+		 * Gets the block size, in bytes.
+		 */
+		public long getBlockSize() {
+			return blockSize;
+		}
+
+		/**
+		 * Gets the number of times the block should be replicated.
+		 */
+		public int getReplicationFactor() {
+			return replicationFactor;
+		}
+	}
+}
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 065ba5a..bceed5e 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -19,16 +19,20 @@
 package org.apache.flink.runtime.fs.hdfs;
 
 import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.fs.WriteOptions;
+import org.apache.flink.core.fs.WriteOptions.BlockOptions;
 
 import java.io.IOException;
 import java.net.URI;
 import java.util.Locale;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -36,6 +40,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class HadoopFileSystem extends FileSystem {
 
+	/** The write buffer size used by default. */
+	public static final int DEFAULT_WRITE_BUFFER_SIZE = 4096;
+
 	/** The wrapped Hadoop File System. */
 	private final org.apache.hadoop.fs.FileSystem fs;
 
@@ -143,6 +150,24 @@ public class HadoopFileSystem extends FileSystem {
 	}
 
 	@Override
+	public FSDataOutputStream create(Path f, WriteOptions options) throws IOException {
+		BlockOptions blockSettings = options.getBlockSettings();
+		if (blockSettings == null) {
+			return create(f, options.getOverwrite());
+		}
+		else {
+			checkArgument(blockSettings.getReplicationFactor() <= Short.MAX_VALUE,
+					"block replication factor out of bounds");
+
+			return create(f,
+					options.getOverwrite() == WriteMode.OVERWRITE,
+					DEFAULT_WRITE_BUFFER_SIZE,
+					(short) blockSettings.getReplicationFactor(),
+					blockSettings.getBlockSize());
+		}
+	}
+
+	@Override
 	public boolean delete(final Path f, final boolean recursive) throws IOException {
 		return this.fs.delete(toHadoopPath(f), recursive);
 	}