You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/06/21 23:23:54 UTC
[07/50] [abbrv] hadoop git commit: HADOOP-14394. Provide Builder
pattern for DistributedFileSystem.create. (lei)
HADOOP-14394. Provide Builder pattern for DistributedFileSystem.create. (lei)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5e7cfdca
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5e7cfdca
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5e7cfdca
Branch: refs/heads/HDFS-10467
Commit: 5e7cfdca7b73a88bf3c3f1e5eb794a24218cce52
Parents: ef8edab
Author: Lei Xu <le...@apache.org>
Authored: Wed Jun 14 23:17:53 2017 -0700
Committer: Lei Xu <le...@apache.org>
Committed: Thu Jun 15 10:59:24 2017 -0700
----------------------------------------------------------------------
.../hadoop/fs/FSDataOutputStreamBuilder.java | 171 +++++++++++++-----
.../java/org/apache/hadoop/fs/FileSystem.java | 31 +++-
.../org/apache/hadoop/fs/FilterFileSystem.java | 4 +-
.../org/apache/hadoop/fs/HarFileSystem.java | 4 +-
.../apache/hadoop/fs/TestLocalFileSystem.java | 10 +-
.../hadoop/hdfs/DistributedFileSystem.java | 173 +++++++++++++++----
.../hdfs/server/balancer/NameNodeConnector.java | 8 +-
.../hadoop/hdfs/TestDistributedFileSystem.java | 84 +++++++--
.../hadoop/hdfs/TestErasureCodingPolicies.java | 25 ++-
.../namenode/TestFavoredNodesEndToEnd.java | 2 +-
10 files changed, 390 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
index 55836cc..0527202 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
@@ -18,36 +18,70 @@
package org.apache.hadoop.fs;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
+import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.EnumSet;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
-/** Base of specific file system FSDataOutputStreamBuilder. */
+/**
+ * Builder for {@link FSDataOutputStream} and its subclasses.
+ *
+ * It is used to create {@link FSDataOutputStream} when creating a new file or
+ * appending an existing file on {@link FileSystem}.
+ *
+ * By default, it does not create parent directory that do not exist.
+ * {@link FileSystem#createNonRecursive(Path, boolean, int, short, long,
+ * Progressable)}.
+ *
+ * To create missing parent directory, use {@link #recursive()}.
+ */
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class FSDataOutputStreamBuilder {
- private Path path = null;
+public abstract class FSDataOutputStreamBuilder
+ <S extends FSDataOutputStream, B extends FSDataOutputStreamBuilder<S, B>> {
+ private final FileSystem fs;
+ private final Path path;
private FsPermission permission = null;
- private Integer bufferSize;
- private Short replication;
- private Long blockSize;
+ private int bufferSize;
+ private short replication;
+ private long blockSize;
+ /** set to true to create missing directory. */
+ private boolean recursive = false;
+ private final EnumSet<CreateFlag> flags = EnumSet.noneOf(CreateFlag.class);
private Progressable progress = null;
- private EnumSet<CreateFlag> flags = null;
private ChecksumOpt checksumOpt = null;
- private final FileSystem fs;
-
- protected FSDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
+ /**
+ * Return the concrete implementation of the builder instance.
+ */
+ protected abstract B getThisBuilder();
+
+ /**
+ * Constructor.
+ */
+ protected FSDataOutputStreamBuilder(@Nonnull FileSystem fileSystem,
+ @Nonnull Path p) {
+ Preconditions.checkNotNull(fileSystem);
+ Preconditions.checkNotNull(p);
fs = fileSystem;
path = p;
+ bufferSize = fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
+ IO_FILE_BUFFER_SIZE_DEFAULT);
+ replication = fs.getDefaultReplication(path);
+ blockSize = fs.getDefaultBlockSize(p);
+ }
+
+ protected FileSystem getFS() {
+ return fs;
}
protected Path getPath() {
@@ -56,91 +90,136 @@ public class FSDataOutputStreamBuilder {
protected FsPermission getPermission() {
if (permission == null) {
- return FsPermission.getFileDefault();
+ permission = FsPermission.getFileDefault();
}
return permission;
}
- public FSDataOutputStreamBuilder setPermission(final FsPermission perm) {
+ /**
+ * Set permission for the file.
+ */
+ public B permission(@Nonnull final FsPermission perm) {
Preconditions.checkNotNull(perm);
permission = perm;
- return this;
+ return getThisBuilder();
}
protected int getBufferSize() {
- if (bufferSize == null) {
- return fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
- IO_FILE_BUFFER_SIZE_DEFAULT);
- }
return bufferSize;
}
- public FSDataOutputStreamBuilder setBufferSize(int bufSize) {
+ /**
+ * Set the size of the buffer to be used.
+ */
+ public B bufferSize(int bufSize) {
bufferSize = bufSize;
- return this;
+ return getThisBuilder();
}
protected short getReplication() {
- if (replication == null) {
- return fs.getDefaultReplication(getPath());
- }
return replication;
}
- public FSDataOutputStreamBuilder setReplication(short replica) {
+ /**
+ * Set replication factor.
+ */
+ public B replication(short replica) {
replication = replica;
- return this;
+ return getThisBuilder();
}
protected long getBlockSize() {
- if (blockSize == null) {
- return fs.getDefaultBlockSize(getPath());
- }
return blockSize;
}
- public FSDataOutputStreamBuilder setBlockSize(long blkSize) {
+ /**
+ * Set block size.
+ */
+ public B blockSize(long blkSize) {
blockSize = blkSize;
- return this;
+ return getThisBuilder();
+ }
+
+ /**
+ * Return true to create the parent directories if they do not exist.
+ */
+ protected boolean isRecursive() {
+ return recursive;
+ }
+
+ /**
+ * Create the parent directory if they do not exist.
+ */
+ public B recursive() {
+ recursive = true;
+ return getThisBuilder();
}
protected Progressable getProgress() {
return progress;
}
- public FSDataOutputStreamBuilder setProgress(final Progressable prog) {
+ /**
+ * Set the facility of reporting progress.
+ */
+ public B progress(@Nonnull final Progressable prog) {
Preconditions.checkNotNull(prog);
progress = prog;
- return this;
+ return getThisBuilder();
}
protected EnumSet<CreateFlag> getFlags() {
- if (flags == null) {
- return EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
- }
return flags;
}
- public FSDataOutputStreamBuilder setFlags(
- final EnumSet<CreateFlag> enumFlags) {
- Preconditions.checkNotNull(enumFlags);
- flags = enumFlags;
- return this;
+ /**
+ * Create an FSDataOutputStream at the specified path.
+ */
+ public B create() {
+ flags.add(CreateFlag.CREATE);
+ return getThisBuilder();
+ }
+
+ /**
+ * Set to true to overwrite the existing file.
+ * Set it to false, an exception will be thrown when calling {@link #build()}
+ * if the file exists.
+ */
+ public B overwrite(boolean overwrite) {
+ if (overwrite) {
+ flags.add(CreateFlag.OVERWRITE);
+ } else {
+ flags.remove(CreateFlag.OVERWRITE);
+ }
+ return getThisBuilder();
+ }
+
+ /**
+ * Append to an existing file (optional operation).
+ */
+ public B append() {
+ flags.add(CreateFlag.APPEND);
+ return getThisBuilder();
}
protected ChecksumOpt getChecksumOpt() {
return checksumOpt;
}
- public FSDataOutputStreamBuilder setChecksumOpt(
- final ChecksumOpt chksumOpt) {
+ /**
+ * Set checksum opt.
+ */
+ public B checksumOpt(@Nonnull final ChecksumOpt chksumOpt) {
Preconditions.checkNotNull(chksumOpt);
checksumOpt = chksumOpt;
- return this;
+ return getThisBuilder();
}
- public FSDataOutputStream build() throws IOException {
- return fs.create(getPath(), getPermission(), getFlags(), getBufferSize(),
- getReplication(), getBlockSize(), getProgress(), getChecksumOpt());
- }
+ /**
+ * Create the FSDataOutputStream to write on the file system.
+ *
+ * @throws HadoopIllegalArgumentException if the parameters are not valid.
+ * @throws IOException on errors when file system creates or appends the file.
+ */
+ public abstract S build() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 1907475..cc92f31 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -4140,8 +4140,34 @@ public abstract class FileSystem extends Configured implements Closeable {
return GlobalStorageStatistics.INSTANCE;
}
+ private static final class FileSystemDataOutputStreamBuilder extends
+ FSDataOutputStreamBuilder<FSDataOutputStream,
+ FileSystemDataOutputStreamBuilder> {
+
+ /**
+ * Constructor.
+ */
+ protected FileSystemDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
+ super(fileSystem, p);
+ }
+
+ @Override
+ public FSDataOutputStream build() throws IOException {
+ return getFS().create(getPath(), getPermission(), getFlags(),
+ getBufferSize(), getReplication(), getBlockSize(), getProgress(),
+ getChecksumOpt());
+ }
+
+ @Override
+ protected FileSystemDataOutputStreamBuilder getThisBuilder() {
+ return this;
+ }
+ }
+
/**
* Create a new FSDataOutputStreamBuilder for the file with path.
+ * Files are overwritten by default.
+ *
* @param path file path
* @return a FSDataOutputStreamBuilder object to build the file
*
@@ -4149,7 +4175,8 @@ public abstract class FileSystem extends Configured implements Closeable {
* builder interface becomes stable.
*/
@InterfaceAudience.Private
- protected FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
- return new FSDataOutputStreamBuilder(this, path);
+ protected FSDataOutputStreamBuilder createFile(Path path) {
+ return new FileSystemDataOutputStreamBuilder(this, path)
+ .create().overwrite(true);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
index 3466922..e940065 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
@@ -667,7 +667,7 @@ public class FilterFileSystem extends FileSystem {
}
@Override
- protected FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
- return fs.newFSDataOutputStreamBuilder(path);
+ public FSDataOutputStreamBuilder createFile(Path path) {
+ return fs.createFile(path);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
index 7e50ab1..c410e34 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
@@ -1270,7 +1270,7 @@ public class HarFileSystem extends FileSystem {
}
@Override
- public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
- return fs.newFSDataOutputStreamBuilder(path);
+ public FSDataOutputStreamBuilder createFile(Path path) {
+ return fs.createFile(path);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
index 777e5c0..527b9eb 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
@@ -659,9 +659,9 @@ public class TestLocalFileSystem {
try {
FSDataOutputStreamBuilder builder =
- fileSys.newFSDataOutputStreamBuilder(path);
+ fileSys.createFile(path);
FSDataOutputStream out = builder.build();
- String content = "Create with a generic type of createBuilder!";
+ String content = "Create with a generic type of createFile!";
byte[] contentOrigin = content.getBytes("UTF8");
out.write(contentOrigin);
out.close();
@@ -680,7 +680,7 @@ public class TestLocalFileSystem {
// Test value not being set for replication, block size, buffer size
// and permission
FSDataOutputStreamBuilder builder =
- fileSys.newFSDataOutputStreamBuilder(path);
+ fileSys.createFile(path);
builder.build();
Assert.assertEquals("Should be default block size",
builder.getBlockSize(), fileSys.getDefaultBlockSize());
@@ -694,8 +694,8 @@ public class TestLocalFileSystem {
builder.getPermission(), FsPermission.getFileDefault());
// Test set 0 to replication, block size and buffer size
- builder = fileSys.newFSDataOutputStreamBuilder(path);
- builder.setBufferSize(0).setBlockSize(0).setReplication((short) 0);
+ builder = fileSys.createFile(path);
+ builder.bufferSize(0).blockSize(0).replication((short) 0);
Assert.assertEquals("Block size should be 0",
builder.getBlockSize(), 0);
Assert.assertEquals("Replication factor should be 0",
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 344f574..2f60e9d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -101,7 +101,6 @@ import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.apache.commons.lang.StringUtils;
import javax.annotation.Nonnull;
@@ -526,6 +525,49 @@ public class DistributedFileSystem extends FileSystem {
}
/**
+ * Similar to {@link #create(Path, FsPermission, EnumSet, int, short, long,
+ * Progressable, ChecksumOpt, InetSocketAddress[], String)}, it provides a
+ * HDFS-specific version of {@link #createNonRecursive(Path, FsPermission,
+ * EnumSet, int, short, long, Progressable)} with a few additions.
+ *
+ * @see #create(Path, FsPermission, EnumSet, int, short, long, Progressable,
+ * ChecksumOpt, InetSocketAddress[], String) for the descriptions of
+ * additional parameters, i.e., favoredNodes and ecPolicyName.
+ */
+ private HdfsDataOutputStream createNonRecursive(final Path f,
+ final FsPermission permission, final EnumSet<CreateFlag> flag,
+ final int bufferSize, final short replication, final long blockSize,
+ final Progressable progress, final ChecksumOpt checksumOpt,
+ final InetSocketAddress[] favoredNodes, final String ecPolicyName)
+ throws IOException {
+ statistics.incrementWriteOps(1);
+ storageStatistics.incrementOpCounter(OpType.CREATE);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<HdfsDataOutputStream>() {
+ @Override
+ public HdfsDataOutputStream doCall(final Path p) throws IOException {
+ final DFSOutputStream out = dfs.create(getPathName(f), permission,
+ flag, false, replication, blockSize, progress, bufferSize,
+ checksumOpt, favoredNodes, ecPolicyName);
+ return dfs.createWrappedOutputStream(out, statistics);
+ }
+ @Override
+ public HdfsDataOutputStream next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+ return myDfs.createNonRecursive(p, permission, flag, bufferSize,
+ replication, blockSize, progress, checksumOpt, favoredNodes,
+ ecPolicyName);
+ }
+ throw new UnsupportedOperationException("Cannot create with" +
+ " favoredNodes through a symlink to a non-DistributedFileSystem: "
+ + f + " -> " + p);
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
* Same as create(), except fails if parent directory doesn't already exist.
*/
@Override
@@ -2686,33 +2728,88 @@ public class DistributedFileSystem extends FileSystem {
}
/**
- * Extends FSDataOutputStreamBuilder to support special requirements
- * of DistributedFileSystem.
+ * HdfsDataOutputStreamBuilder provides the HDFS-specific capabilities to
+ * write file on HDFS.
*/
- public static class HdfsDataOutputStreamBuilder
- extends FSDataOutputStreamBuilder {
+ public static final class HdfsDataOutputStreamBuilder
+ extends FSDataOutputStreamBuilder<
+ HdfsDataOutputStream, HdfsDataOutputStreamBuilder> {
private final DistributedFileSystem dfs;
private InetSocketAddress[] favoredNodes = null;
private String ecPolicyName = null;
- private boolean shouldReplicate = false;
- public HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) {
+ /**
+ * Construct a HdfsDataOutputStream builder for a file.
+ * @param dfs the {@link DistributedFileSystem} instance.
+ * @param path the path of the file to create / append.
+ */
+ private HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) {
super(dfs, path);
this.dfs = dfs;
}
- protected InetSocketAddress[] getFavoredNodes() {
+ @Override
+ protected HdfsDataOutputStreamBuilder getThisBuilder() {
+ return this;
+ }
+
+ private InetSocketAddress[] getFavoredNodes() {
return favoredNodes;
}
- public HdfsDataOutputStreamBuilder setFavoredNodes(
+ /**
+ * Set favored DataNodes.
+ * @param nodes the addresses of the favored DataNodes.
+ */
+ public HdfsDataOutputStreamBuilder favoredNodes(
@Nonnull final InetSocketAddress[] nodes) {
Preconditions.checkNotNull(nodes);
favoredNodes = nodes.clone();
return this;
}
- protected String getEcPolicyName() {
+ /**
+ * Force closed blocks to disk.
+ *
+ * @see CreateFlag for the details.
+ */
+ public HdfsDataOutputStreamBuilder syncBlock() {
+ getFlags().add(CreateFlag.SYNC_BLOCK);
+ return this;
+ }
+
+ /**
+ * Create the block on transient storage if possible.
+ *
+ * @see CreateFlag for the details.
+ */
+ public HdfsDataOutputStreamBuilder lazyPersist() {
+ getFlags().add(CreateFlag.LAZY_PERSIST);
+ return this;
+ }
+
+ /**
+ * Append data to a new block instead of the end of the last partial block.
+ *
+ * @see CreateFlag for the details.
+ */
+ public HdfsDataOutputStreamBuilder newBlock() {
+ getFlags().add(CreateFlag.NEW_BLOCK);
+ return this;
+ }
+
+ /**
+ * Advise that a block replica NOT be written to the local DataNode.
+ *
+ * @see CreateFlag for the details.
+ */
+ public HdfsDataOutputStreamBuilder noLocalWrite() {
+ getFlags().add(CreateFlag.NO_LOCAL_WRITE);
+ return this;
+ }
+
+ @VisibleForTesting
+ String getEcPolicyName() {
return ecPolicyName;
}
@@ -2722,17 +2819,17 @@ public class DistributedFileSystem extends FileSystem {
* or erasure coding policy is. Don't call this function and
* enforceReplicate() in the same builder since they have conflict
* of interest.
- *
*/
- public HdfsDataOutputStreamBuilder setEcPolicyName(
+ public HdfsDataOutputStreamBuilder ecPolicyName(
@Nonnull final String policyName) {
Preconditions.checkNotNull(policyName);
ecPolicyName = policyName;
return this;
}
- public boolean shouldReplicate() {
- return shouldReplicate;
+ @VisibleForTesting
+ boolean shouldReplicate() {
+ return getFlags().contains(CreateFlag.SHOULD_REPLICATE);
}
/**
@@ -2742,30 +2839,46 @@ public class DistributedFileSystem extends FileSystem {
* conflict of interest.
*/
public HdfsDataOutputStreamBuilder replicate() {
- shouldReplicate = true;
+ getFlags().add(CreateFlag.SHOULD_REPLICATE);
return this;
}
+ @VisibleForTesting
+ @Override
+ protected EnumSet<CreateFlag> getFlags() {
+ return super.getFlags();
+ }
+
+ /**
+ * Build HdfsDataOutputStream to write.
+ *
+ * @return a fully-initialized OutputStream.
+ * @throws IOException on I/O errors.
+ */
@Override
public HdfsDataOutputStream build() throws IOException {
- Preconditions.checkState(
- !(shouldReplicate() && (!StringUtils.isEmpty(getEcPolicyName()))),
- "shouldReplicate and ecPolicyName are " +
- "exclusive parameters. Set both is not allowed!");
-
- EnumSet<CreateFlag> createFlags = getFlags();
- if (shouldReplicate()) {
- createFlags.add(CreateFlag.SHOULD_REPLICATE);
- }
- return dfs.create(getPath(), getPermission(), createFlags,
- getBufferSize(), getReplication(), getBlockSize(),
- getProgress(), getChecksumOpt(), getFavoredNodes(),
- getEcPolicyName());
+ if (isRecursive()) {
+ return dfs.create(getPath(), getPermission(), getFlags(),
+ getBufferSize(), getReplication(), getBlockSize(),
+ getProgress(), getChecksumOpt(), getFavoredNodes(),
+ getEcPolicyName());
+ } else {
+ return dfs.createNonRecursive(getPath(), getPermission(), getFlags(),
+ getBufferSize(), getReplication(), getBlockSize(), getProgress(),
+ getChecksumOpt(), getFavoredNodes(), getEcPolicyName());
+ }
}
}
+ /**
+ * Create a HdfsDataOutputStreamBuilder to create a file on DFS.
+ * Similar to {@link #create(Path)}, file is overwritten by default.
+ *
+ * @param path the path of the file to create.
+ * @return A HdfsDataOutputStreamBuilder for creating a file.
+ */
@Override
- public HdfsDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
- return new HdfsDataOutputStreamBuilder(this, path);
+ public HdfsDataOutputStreamBuilder createFile(Path path) {
+ return new HdfsDataOutputStreamBuilder(this, path).create().overwrite(true);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index e1b1005..be59cce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -25,7 +25,6 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -36,7 +35,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
@@ -246,10 +244,8 @@ public class NameNodeConnector implements Closeable {
fs.delete(idPath, true);
}
- final FSDataOutputStream fsout = fs.newFSDataOutputStreamBuilder(idPath)
- .replicate()
- .setFlags(EnumSet.of(CreateFlag.CREATE))
- .build();
+ final FSDataOutputStream fsout = fs.createFile(idPath)
+ .replicate().recursive().build();
Preconditions.checkState(
fsout.hasCapability(StreamCapability.HFLUSH.getValue())
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
index e9af594..9857735 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem.Statistics.StatisticsData;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FileChecksum;
@@ -71,6 +72,7 @@ import org.apache.hadoop.fs.StorageStatistics.LongStatistic;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem.HdfsDataOutputStreamBuilder;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
@@ -1411,36 +1413,88 @@ public class TestDistributedFileSystem {
}
}
+ private void testBuilderSetters(DistributedFileSystem fs) {
+ Path testFilePath = new Path("/testBuilderSetters");
+ HdfsDataOutputStreamBuilder builder = fs.createFile(testFilePath);
+
+ builder.append().overwrite(false).newBlock().lazyPersist().noLocalWrite()
+ .ecPolicyName("ec-policy");
+ EnumSet<CreateFlag> flags = builder.getFlags();
+ assertTrue(flags.contains(CreateFlag.APPEND));
+ assertTrue(flags.contains(CreateFlag.CREATE));
+ assertTrue(flags.contains(CreateFlag.NEW_BLOCK));
+ assertTrue(flags.contains(CreateFlag.NO_LOCAL_WRITE));
+ assertFalse(flags.contains(CreateFlag.OVERWRITE));
+ assertFalse(flags.contains(CreateFlag.SYNC_BLOCK));
+
+ assertEquals("ec-policy", builder.getEcPolicyName());
+ assertFalse(builder.shouldReplicate());
+ }
+
+ @Test
+ public void testHdfsDataOutputStreamBuilderSetParameters()
+ throws IOException {
+ Configuration conf = getTestConfiguration();
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1).build()) {
+ cluster.waitActive();
+ DistributedFileSystem fs = cluster.getFileSystem();
+
+ testBuilderSetters(fs);
+ }
+ }
+
@Test
public void testDFSDataOutputStreamBuilder() throws Exception {
Configuration conf = getTestConfiguration();
- MiniDFSCluster cluster = null;
String testFile = "/testDFSDataOutputStreamBuilder";
Path testFilePath = new Path(testFile);
- try {
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1).build()) {
DistributedFileSystem fs = cluster.getFileSystem();
// Test create an empty file
- FSDataOutputStream out =
- fs.newFSDataOutputStreamBuilder(testFilePath).build();
- out.close();
+ try (FSDataOutputStream out =
+ fs.createFile(testFilePath).build()) {
+ LOG.info("Test create an empty file");
+ }
// Test create a file with content, and verify the content
String content = "This is a test!";
- out = fs.newFSDataOutputStreamBuilder(testFilePath)
- .setBufferSize(4096).setReplication((short) 1)
- .setBlockSize(4096).build();
- byte[] contentOrigin = content.getBytes("UTF8");
- out.write(contentOrigin);
- out.close();
+ try (FSDataOutputStream out1 = fs.createFile(testFilePath)
+ .bufferSize(4096)
+ .replication((short) 1)
+ .blockSize(4096)
+ .build()) {
+ byte[] contentOrigin = content.getBytes("UTF8");
+ out1.write(contentOrigin);
+ }
ContractTestUtils.verifyFileContents(fs, testFilePath,
content.getBytes());
- } finally {
- if (cluster != null) {
- cluster.shutdown();
+
+ try (FSDataOutputStream out = fs.createFile(testFilePath).overwrite(false)
+ .build()) {
+ fail("it should fail to overwrite an existing file");
+ } catch (FileAlreadyExistsException e) {
+ // As expected, ignore.
+ }
+
+ Path nonParentFile = new Path("/parent/test");
+ try (FSDataOutputStream out = fs.createFile(nonParentFile).build()) {
+ fail("parent directory not exist");
+ } catch (FileNotFoundException e) {
+ // As expected.
+ }
+ assertFalse("parent directory should not be created",
+ fs.exists(new Path("/parent")));
+
+ try (FSDataOutputStream out = fs.createFile(nonParentFile).recursive()
+ .build()) {
+ out.write(1);
}
+ assertTrue("parent directory has not been created",
+ fs.exists(new Path("/parent")));
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java
index 77e6594..4a4bed5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java
@@ -540,15 +540,14 @@ public class TestErasureCodingPolicies {
fs.setErasureCodingPolicy(dirPath, EC_POLICY.getName());
// null EC policy name value means inheriting parent directory's policy
- fs.newFSDataOutputStreamBuilder(filePath0).build().close();
+ fs.createFile(filePath0).build().close();
ErasureCodingPolicy ecPolicyOnFile = fs.getErasureCodingPolicy(filePath0);
assertEquals(EC_POLICY, ecPolicyOnFile);
// Test illegal EC policy name
final String illegalPolicyName = "RS-DEFAULT-1-2-64k";
try {
- fs.newFSDataOutputStreamBuilder(filePath1)
- .setEcPolicyName(illegalPolicyName).build().close();
+ fs.createFile(filePath1).ecPolicyName(illegalPolicyName).build().close();
Assert.fail("illegal erasure coding policy should not be found");
} catch (Exception e) {
GenericTestUtils.assertExceptionContains("Policy '" + illegalPolicyName
@@ -563,8 +562,8 @@ public class TestErasureCodingPolicies {
SystemErasureCodingPolicies.RS_3_2_POLICY_ID);
ecPolicyOnFile = EC_POLICY;
fs.setErasureCodingPolicy(dirPath, ecPolicyOnDir.getName());
- fs.newFSDataOutputStreamBuilder(filePath0)
- .setEcPolicyName(ecPolicyOnFile.getName()).build().close();
+ fs.createFile(filePath0).ecPolicyName(ecPolicyOnFile.getName())
+ .build().close();
assertEquals(ecPolicyOnFile, fs.getErasureCodingPolicy(filePath0));
assertEquals(ecPolicyOnDir, fs.getErasureCodingPolicy(dirPath));
fs.delete(dirPath, true);
@@ -582,27 +581,27 @@ public class TestErasureCodingPolicies {
fs.setErasureCodingPolicy(dirPath, EC_POLICY.getName());
final String ecPolicyName = "RS-10-4-64k";
- fs.newFSDataOutputStreamBuilder(filePath).build().close();
+ fs.createFile(filePath).build().close();
assertEquals(EC_POLICY, fs.getErasureCodingPolicy(filePath));
fs.delete(filePath, true);
- fs.newFSDataOutputStreamBuilder(filePath)
- .setEcPolicyName(ecPolicyName)
+ fs.createFile(filePath)
+ .ecPolicyName(ecPolicyName)
.build()
.close();
assertEquals(ecPolicyName, fs.getErasureCodingPolicy(filePath).getName());
fs.delete(filePath, true);
try {
- fs.newFSDataOutputStreamBuilder(filePath)
- .setEcPolicyName(ecPolicyName)
+ fs.createFile(filePath)
+ .ecPolicyName(ecPolicyName)
.replicate()
.build().close();
Assert.fail("shouldReplicate and ecPolicyName are exclusive " +
"parameters. Set both is not allowed.");
}catch (Exception e){
- GenericTestUtils.assertExceptionContains("shouldReplicate and " +
- "ecPolicyName are exclusive parameters. Set both is not allowed!", e);
+ GenericTestUtils.assertExceptionContains("SHOULD_REPLICATE flag and " +
+ "ecPolicyName are exclusive parameters.", e);
}
try {
@@ -618,7 +617,7 @@ public class TestErasureCodingPolicies {
"ecPolicyName are exclusive parameters. Set both is not allowed!", e);
}
- fs.newFSDataOutputStreamBuilder(filePath)
+ fs.createFile(filePath)
.replicate()
.build()
.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
index 50e56cc..3352fd0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
@@ -199,7 +199,7 @@ public class TestFavoredNodesEndToEnd {
InetSocketAddress[] dns = getDatanodes(rand);
Path p = new Path("/filename"+i);
FSDataOutputStream out =
- dfs.newFSDataOutputStreamBuilder(p).setFavoredNodes(dns).build();
+ dfs.createFile(p).favoredNodes(dns).build();
out.write(SOME_BYTES);
out.close();
BlockLocation[] locations = getBlockLocations(p);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org