You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/06/21 23:23:54 UTC

[07/50] [abbrv] hadoop git commit: HADOOP-14394. Provide Builder pattern for DistributedFileSystem.create. (lei)

HADOOP-14394. Provide Builder pattern for DistributedFileSystem.create. (lei)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5e7cfdca
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5e7cfdca
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5e7cfdca

Branch: refs/heads/HDFS-10467
Commit: 5e7cfdca7b73a88bf3c3f1e5eb794a24218cce52
Parents: ef8edab
Author: Lei Xu <le...@apache.org>
Authored: Wed Jun 14 23:17:53 2017 -0700
Committer: Lei Xu <le...@apache.org>
Committed: Thu Jun 15 10:59:24 2017 -0700

----------------------------------------------------------------------
 .../hadoop/fs/FSDataOutputStreamBuilder.java    | 171 +++++++++++++-----
 .../java/org/apache/hadoop/fs/FileSystem.java   |  31 +++-
 .../org/apache/hadoop/fs/FilterFileSystem.java  |   4 +-
 .../org/apache/hadoop/fs/HarFileSystem.java     |   4 +-
 .../apache/hadoop/fs/TestLocalFileSystem.java   |  10 +-
 .../hadoop/hdfs/DistributedFileSystem.java      | 173 +++++++++++++++----
 .../hdfs/server/balancer/NameNodeConnector.java |   8 +-
 .../hadoop/hdfs/TestDistributedFileSystem.java  |  84 +++++++--
 .../hadoop/hdfs/TestErasureCodingPolicies.java  |  25 ++-
 .../namenode/TestFavoredNodesEndToEnd.java      |   2 +-
 10 files changed, 390 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
index 55836cc..0527202 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
@@ -18,36 +18,70 @@
 package org.apache.hadoop.fs;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Progressable;
 
+import javax.annotation.Nonnull;
 import java.io.IOException;
 import java.util.EnumSet;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
 
-/** Base of specific file system FSDataOutputStreamBuilder. */
+/**
+ * Builder for {@link FSDataOutputStream} and its subclasses.
+ *
+ * It is used to create {@link FSDataOutputStream} when creating a new file or
+ * appending an existing file on {@link FileSystem}.
+ *
+ * By default, it does not create parent directory that do not exist.
+ * {@link FileSystem#createNonRecursive(Path, boolean, int, short, long,
+ * Progressable)}.
+ *
+ * To create missing parent directory, use {@link #recursive()}.
+ */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class FSDataOutputStreamBuilder {
-  private Path path = null;
+public abstract class FSDataOutputStreamBuilder
+    <S extends FSDataOutputStream, B extends FSDataOutputStreamBuilder<S, B>> {
+  private final FileSystem fs;
+  private final Path path;
   private FsPermission permission = null;
-  private Integer bufferSize;
-  private Short replication;
-  private Long blockSize;
+  private int bufferSize;
+  private short replication;
+  private long blockSize;
+  /** set to true to create missing directory. */
+  private boolean recursive = false;
+  private final EnumSet<CreateFlag> flags = EnumSet.noneOf(CreateFlag.class);
   private Progressable progress = null;
-  private EnumSet<CreateFlag> flags = null;
   private ChecksumOpt checksumOpt = null;
 
-  private final FileSystem fs;
-
-  protected FSDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
+  /**
+   * Return the concrete implementation of the builder instance.
+   */
+  protected abstract B getThisBuilder();
+
+  /**
+   * Constructor.
+   */
+  protected FSDataOutputStreamBuilder(@Nonnull FileSystem fileSystem,
+      @Nonnull Path p) {
+    Preconditions.checkNotNull(fileSystem);
+    Preconditions.checkNotNull(p);
     fs = fileSystem;
     path = p;
+    bufferSize = fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
+        IO_FILE_BUFFER_SIZE_DEFAULT);
+    replication = fs.getDefaultReplication(path);
+    blockSize = fs.getDefaultBlockSize(p);
+  }
+
+  protected FileSystem getFS() {
+    return fs;
   }
 
   protected Path getPath() {
@@ -56,91 +90,136 @@ public class FSDataOutputStreamBuilder {
 
   protected FsPermission getPermission() {
     if (permission == null) {
-      return FsPermission.getFileDefault();
+      permission = FsPermission.getFileDefault();
     }
     return permission;
   }
 
-  public FSDataOutputStreamBuilder setPermission(final FsPermission perm) {
+  /**
+   * Set permission for the file.
+   */
+  public B permission(@Nonnull final FsPermission perm) {
     Preconditions.checkNotNull(perm);
     permission = perm;
-    return this;
+    return getThisBuilder();
   }
 
   protected int getBufferSize() {
-    if (bufferSize == null) {
-      return fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
-          IO_FILE_BUFFER_SIZE_DEFAULT);
-    }
     return bufferSize;
   }
 
-  public FSDataOutputStreamBuilder setBufferSize(int bufSize) {
+  /**
+   * Set the size of the buffer to be used.
+   */
+  public B bufferSize(int bufSize) {
     bufferSize = bufSize;
-    return this;
+    return getThisBuilder();
   }
 
   protected short getReplication() {
-    if (replication == null) {
-      return fs.getDefaultReplication(getPath());
-    }
     return replication;
   }
 
-  public FSDataOutputStreamBuilder setReplication(short replica) {
+  /**
+   * Set replication factor.
+   */
+  public B replication(short replica) {
     replication = replica;
-    return this;
+    return getThisBuilder();
   }
 
   protected long getBlockSize() {
-    if (blockSize == null) {
-      return fs.getDefaultBlockSize(getPath());
-    }
     return blockSize;
   }
 
-  public FSDataOutputStreamBuilder setBlockSize(long blkSize) {
+  /**
+   * Set block size.
+   */
+  public B blockSize(long blkSize) {
     blockSize = blkSize;
-    return this;
+    return getThisBuilder();
+  }
+
+  /**
+   * Return true to create the parent directories if they do not exist.
+   */
+  protected boolean isRecursive() {
+    return recursive;
+  }
+
+  /**
+   * Create the parent directory if they do not exist.
+   */
+  public B recursive() {
+    recursive = true;
+    return getThisBuilder();
   }
 
   protected Progressable getProgress() {
     return progress;
   }
 
-  public FSDataOutputStreamBuilder setProgress(final Progressable prog) {
+  /**
+   * Set the facility of reporting progress.
+   */
+  public B progress(@Nonnull final Progressable prog) {
     Preconditions.checkNotNull(prog);
     progress = prog;
-    return this;
+    return getThisBuilder();
   }
 
   protected EnumSet<CreateFlag> getFlags() {
-    if (flags == null) {
-      return EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
-    }
     return flags;
   }
 
-  public FSDataOutputStreamBuilder setFlags(
-      final EnumSet<CreateFlag> enumFlags) {
-    Preconditions.checkNotNull(enumFlags);
-    flags = enumFlags;
-    return this;
+  /**
+   * Create an FSDataOutputStream at the specified path.
+   */
+  public B create() {
+    flags.add(CreateFlag.CREATE);
+    return getThisBuilder();
+  }
+
+  /**
+   * Set to true to overwrite the existing file.
+   * Set it to false, an exception will be thrown when calling {@link #build()}
+   * if the file exists.
+   */
+  public B overwrite(boolean overwrite) {
+    if (overwrite) {
+      flags.add(CreateFlag.OVERWRITE);
+    } else {
+      flags.remove(CreateFlag.OVERWRITE);
+    }
+    return getThisBuilder();
+  }
+
+  /**
+   * Append to an existing file (optional operation).
+   */
+  public B append() {
+    flags.add(CreateFlag.APPEND);
+    return getThisBuilder();
   }
 
   protected ChecksumOpt getChecksumOpt() {
     return checksumOpt;
   }
 
-  public FSDataOutputStreamBuilder setChecksumOpt(
-      final ChecksumOpt chksumOpt) {
+  /**
+   * Set checksum opt.
+   */
+  public B checksumOpt(@Nonnull final ChecksumOpt chksumOpt) {
     Preconditions.checkNotNull(chksumOpt);
     checksumOpt = chksumOpt;
-    return this;
+    return getThisBuilder();
   }
 
-  public FSDataOutputStream build() throws IOException {
-    return fs.create(getPath(), getPermission(), getFlags(), getBufferSize(),
-        getReplication(), getBlockSize(), getProgress(), getChecksumOpt());
-  }
+  /**
+   * Create the FSDataOutputStream to write on the file system.
+   *
+   * @throws HadoopIllegalArgumentException if the parameters are not valid.
+   * @throws IOException on errors when file system creates or appends the file.
+   */
+  public abstract S build() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 1907475..cc92f31 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -4140,8 +4140,34 @@ public abstract class FileSystem extends Configured implements Closeable {
     return GlobalStorageStatistics.INSTANCE;
   }
 
+  private static final class FileSystemDataOutputStreamBuilder extends
+      FSDataOutputStreamBuilder<FSDataOutputStream,
+        FileSystemDataOutputStreamBuilder> {
+
+    /**
+     * Constructor.
+     */
+    protected FileSystemDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
+      super(fileSystem, p);
+    }
+
+    @Override
+    public FSDataOutputStream build() throws IOException {
+      return getFS().create(getPath(), getPermission(), getFlags(),
+          getBufferSize(), getReplication(), getBlockSize(), getProgress(),
+          getChecksumOpt());
+    }
+
+    @Override
+    protected FileSystemDataOutputStreamBuilder getThisBuilder() {
+      return this;
+    }
+  }
+
   /**
    * Create a new FSDataOutputStreamBuilder for the file with path.
+   * Files are overwritten by default.
+   *
    * @param path file path
    * @return a FSDataOutputStreamBuilder object to build the file
    *
@@ -4149,7 +4175,8 @@ public abstract class FileSystem extends Configured implements Closeable {
    * builder interface becomes stable.
    */
   @InterfaceAudience.Private
-  protected FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
-    return new FSDataOutputStreamBuilder(this, path);
+  protected FSDataOutputStreamBuilder createFile(Path path) {
+    return new FileSystemDataOutputStreamBuilder(this, path)
+        .create().overwrite(true);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
index 3466922..e940065 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
@@ -667,7 +667,7 @@ public class FilterFileSystem extends FileSystem {
   }
 
   @Override
-  protected FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
-    return fs.newFSDataOutputStreamBuilder(path);
+  public FSDataOutputStreamBuilder createFile(Path path) {
+    return fs.createFile(path);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
index 7e50ab1..c410e34 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
@@ -1270,7 +1270,7 @@ public class HarFileSystem extends FileSystem {
   }
 
   @Override
-  public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
-    return fs.newFSDataOutputStreamBuilder(path);
+  public FSDataOutputStreamBuilder createFile(Path path) {
+    return fs.createFile(path);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
index 777e5c0..527b9eb 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
@@ -659,9 +659,9 @@ public class TestLocalFileSystem {
 
     try {
       FSDataOutputStreamBuilder builder =
-          fileSys.newFSDataOutputStreamBuilder(path);
+          fileSys.createFile(path);
       FSDataOutputStream out = builder.build();
-      String content = "Create with a generic type of createBuilder!";
+      String content = "Create with a generic type of createFile!";
       byte[] contentOrigin = content.getBytes("UTF8");
       out.write(contentOrigin);
       out.close();
@@ -680,7 +680,7 @@ public class TestLocalFileSystem {
     // Test value not being set for replication, block size, buffer size
     // and permission
     FSDataOutputStreamBuilder builder =
-        fileSys.newFSDataOutputStreamBuilder(path);
+        fileSys.createFile(path);
     builder.build();
     Assert.assertEquals("Should be default block size",
         builder.getBlockSize(), fileSys.getDefaultBlockSize());
@@ -694,8 +694,8 @@ public class TestLocalFileSystem {
         builder.getPermission(), FsPermission.getFileDefault());
 
     // Test set 0 to replication, block size and buffer size
-    builder = fileSys.newFSDataOutputStreamBuilder(path);
-    builder.setBufferSize(0).setBlockSize(0).setReplication((short) 0);
+    builder = fileSys.createFile(path);
+    builder.bufferSize(0).blockSize(0).replication((short) 0);
     Assert.assertEquals("Block size should be 0",
         builder.getBlockSize(), 0);
     Assert.assertEquals("Replication factor should be 0",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 344f574..2f60e9d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -101,7 +101,6 @@ import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.apache.commons.lang.StringUtils;
 
 import javax.annotation.Nonnull;
 
@@ -526,6 +525,49 @@ public class DistributedFileSystem extends FileSystem {
   }
 
   /**
+   * Similar to {@link #create(Path, FsPermission, EnumSet, int, short, long,
+   * Progressable, ChecksumOpt, InetSocketAddress[], String)}, it provides a
+   * HDFS-specific version of {@link #createNonRecursive(Path, FsPermission,
+   * EnumSet, int, short, long, Progressable)} with a few additions.
+   *
+   * @see #create(Path, FsPermission, EnumSet, int, short, long, Progressable,
+   * ChecksumOpt, InetSocketAddress[], String) for the descriptions of
+   * additional parameters, i.e., favoredNodes and ecPolicyName.
+   */
+  private HdfsDataOutputStream createNonRecursive(final Path f,
+      final FsPermission permission, final EnumSet<CreateFlag> flag,
+      final int bufferSize, final short replication, final long blockSize,
+      final Progressable progress, final ChecksumOpt checksumOpt,
+      final InetSocketAddress[] favoredNodes, final String ecPolicyName)
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    storageStatistics.incrementOpCounter(OpType.CREATE);
+    Path absF = fixRelativePart(f);
+    return new FileSystemLinkResolver<HdfsDataOutputStream>() {
+      @Override
+      public HdfsDataOutputStream doCall(final Path p) throws IOException {
+        final DFSOutputStream out = dfs.create(getPathName(f), permission,
+            flag, false, replication, blockSize, progress, bufferSize,
+            checksumOpt, favoredNodes, ecPolicyName);
+        return dfs.createWrappedOutputStream(out, statistics);
+      }
+      @Override
+      public HdfsDataOutputStream next(final FileSystem fs, final Path p)
+          throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+          return myDfs.createNonRecursive(p, permission, flag, bufferSize,
+              replication, blockSize, progress, checksumOpt, favoredNodes,
+              ecPolicyName);
+        }
+        throw new UnsupportedOperationException("Cannot create with" +
+            " favoredNodes through a symlink to a non-DistributedFileSystem: "
+            + f + " -> " + p);
+      }
+    }.resolve(this, absF);
+  }
+
+  /**
    * Same as create(), except fails if parent directory doesn't already exist.
    */
   @Override
@@ -2686,33 +2728,88 @@ public class DistributedFileSystem extends FileSystem {
   }
 
   /**
-   * Extends FSDataOutputStreamBuilder to support special requirements
-   * of DistributedFileSystem.
+   * HdfsDataOutputStreamBuilder provides the HDFS-specific capabilities to
+   * write file on HDFS.
    */
-  public static class HdfsDataOutputStreamBuilder
-      extends FSDataOutputStreamBuilder {
+  public static final class HdfsDataOutputStreamBuilder
+      extends FSDataOutputStreamBuilder<
+      HdfsDataOutputStream, HdfsDataOutputStreamBuilder> {
     private final DistributedFileSystem dfs;
     private InetSocketAddress[] favoredNodes = null;
     private String ecPolicyName = null;
-    private boolean shouldReplicate  = false;
 
-    public HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) {
+    /**
+     * Construct a HdfsDataOutputStream builder for a file.
+     * @param dfs the {@link DistributedFileSystem} instance.
+     * @param path the path of the file to create / append.
+     */
+    private HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) {
       super(dfs, path);
       this.dfs = dfs;
     }
 
-    protected InetSocketAddress[] getFavoredNodes() {
+    @Override
+    protected HdfsDataOutputStreamBuilder getThisBuilder() {
+      return this;
+    }
+
+    private InetSocketAddress[] getFavoredNodes() {
       return favoredNodes;
     }
 
-    public HdfsDataOutputStreamBuilder setFavoredNodes(
+    /**
+     * Set favored DataNodes.
+     * @param nodes the addresses of the favored DataNodes.
+     */
+    public HdfsDataOutputStreamBuilder favoredNodes(
         @Nonnull final InetSocketAddress[] nodes) {
       Preconditions.checkNotNull(nodes);
       favoredNodes = nodes.clone();
       return this;
     }
 
-    protected String getEcPolicyName() {
+    /**
+     * Force closed blocks to disk.
+     *
+     * @see CreateFlag for the details.
+     */
+    public HdfsDataOutputStreamBuilder syncBlock() {
+      getFlags().add(CreateFlag.SYNC_BLOCK);
+      return this;
+    }
+
+    /**
+     * Create the block on transient storage if possible.
+     *
+     * @see CreateFlag for the details.
+     */
+    public HdfsDataOutputStreamBuilder lazyPersist() {
+      getFlags().add(CreateFlag.LAZY_PERSIST);
+      return this;
+    }
+
+    /**
+     * Append data to a new block instead of the end of the last partial block.
+     *
+     * @see CreateFlag for the details.
+     */
+    public HdfsDataOutputStreamBuilder newBlock() {
+      getFlags().add(CreateFlag.NEW_BLOCK);
+      return this;
+    }
+
+    /**
+     * Advise that a block replica NOT be written to the local DataNode.
+     *
+     * @see CreateFlag for the details.
+     */
+    public HdfsDataOutputStreamBuilder noLocalWrite() {
+      getFlags().add(CreateFlag.NO_LOCAL_WRITE);
+      return this;
+    }
+
+    @VisibleForTesting
+    String getEcPolicyName() {
       return ecPolicyName;
     }
 
@@ -2722,17 +2819,17 @@ public class DistributedFileSystem extends FileSystem {
      * or erasure coding policy is. Don't call this function and
      * enforceReplicate() in the same builder since they have conflict
      * of interest.
-     *
      */
-    public HdfsDataOutputStreamBuilder setEcPolicyName(
+    public HdfsDataOutputStreamBuilder ecPolicyName(
         @Nonnull final String policyName) {
       Preconditions.checkNotNull(policyName);
       ecPolicyName = policyName;
       return this;
     }
 
-    public boolean shouldReplicate() {
-      return shouldReplicate;
+    @VisibleForTesting
+    boolean shouldReplicate() {
+      return getFlags().contains(CreateFlag.SHOULD_REPLICATE);
     }
 
     /**
@@ -2742,30 +2839,46 @@ public class DistributedFileSystem extends FileSystem {
      * conflict of interest.
      */
     public HdfsDataOutputStreamBuilder replicate() {
-      shouldReplicate  = true;
+      getFlags().add(CreateFlag.SHOULD_REPLICATE);
       return this;
     }
 
+    @VisibleForTesting
+    @Override
+    protected EnumSet<CreateFlag> getFlags() {
+      return super.getFlags();
+    }
+
+    /**
+     * Build HdfsDataOutputStream to write.
+     *
+     * @return a fully-initialized OutputStream.
+     * @throws IOException on I/O errors.
+     */
     @Override
     public HdfsDataOutputStream build() throws IOException {
-      Preconditions.checkState(
-          !(shouldReplicate() && (!StringUtils.isEmpty(getEcPolicyName()))),
-          "shouldReplicate and ecPolicyName are " +
-              "exclusive parameters. Set both is not allowed!");
-
-      EnumSet<CreateFlag> createFlags = getFlags();
-      if (shouldReplicate()) {
-        createFlags.add(CreateFlag.SHOULD_REPLICATE);
-      }
-      return dfs.create(getPath(), getPermission(), createFlags,
-          getBufferSize(), getReplication(), getBlockSize(),
-          getProgress(), getChecksumOpt(), getFavoredNodes(),
-          getEcPolicyName());
+      if (isRecursive()) {
+        return dfs.create(getPath(), getPermission(), getFlags(),
+            getBufferSize(), getReplication(), getBlockSize(),
+            getProgress(), getChecksumOpt(), getFavoredNodes(),
+            getEcPolicyName());
+      } else {
+        return dfs.createNonRecursive(getPath(), getPermission(), getFlags(),
+            getBufferSize(), getReplication(), getBlockSize(), getProgress(),
+            getChecksumOpt(), getFavoredNodes(), getEcPolicyName());
+      }
     }
   }
 
+  /**
+   * Create a HdfsDataOutputStreamBuilder to create a file on DFS.
+   * Similar to {@link #create(Path)}, file is overwritten by default.
+   *
+   * @param path the path of the file to create.
+   * @return A HdfsDataOutputStreamBuilder for creating a file.
+   */
   @Override
-  public HdfsDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
-    return new HdfsDataOutputStreamBuilder(this, path);
+  public HdfsDataOutputStreamBuilder createFile(Path path) {
+    return new HdfsDataOutputStreamBuilder(this, path).create().overwrite(true);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index e1b1005..be59cce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -25,7 +25,6 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -36,7 +35,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
@@ -246,10 +244,8 @@ public class NameNodeConnector implements Closeable {
         fs.delete(idPath, true);
       }
 
-      final FSDataOutputStream fsout = fs.newFSDataOutputStreamBuilder(idPath)
-          .replicate()
-          .setFlags(EnumSet.of(CreateFlag.CREATE))
-          .build();
+      final FSDataOutputStream fsout = fs.createFile(idPath)
+          .replicate().recursive().build();
 
       Preconditions.checkState(
           fsout.hasCapability(StreamCapability.HFLUSH.getValue())

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
index e9af594..9857735 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileSystem.Statistics.StatisticsData;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FileChecksum;
@@ -71,6 +72,7 @@ import org.apache.hadoop.fs.StorageStatistics.LongStatistic;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem.HdfsDataOutputStreamBuilder;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
 import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
@@ -1411,36 +1413,88 @@ public class TestDistributedFileSystem {
     }
   }
 
+  private void testBuilderSetters(DistributedFileSystem fs) {
+    Path testFilePath = new Path("/testBuilderSetters");
+    HdfsDataOutputStreamBuilder builder = fs.createFile(testFilePath);
+
+    builder.append().overwrite(false).newBlock().lazyPersist().noLocalWrite()
+        .ecPolicyName("ec-policy");
+    EnumSet<CreateFlag> flags = builder.getFlags();
+    assertTrue(flags.contains(CreateFlag.APPEND));
+    assertTrue(flags.contains(CreateFlag.CREATE));
+    assertTrue(flags.contains(CreateFlag.NEW_BLOCK));
+    assertTrue(flags.contains(CreateFlag.NO_LOCAL_WRITE));
+    assertFalse(flags.contains(CreateFlag.OVERWRITE));
+    assertFalse(flags.contains(CreateFlag.SYNC_BLOCK));
+
+    assertEquals("ec-policy", builder.getEcPolicyName());
+    assertFalse(builder.shouldReplicate());
+  }
+
+  @Test
+  public void testHdfsDataOutputStreamBuilderSetParameters()
+      throws IOException {
+    Configuration conf = getTestConfiguration();
+    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).build()) {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+
+      testBuilderSetters(fs);
+    }
+  }
+
   @Test
   public void testDFSDataOutputStreamBuilder() throws Exception {
     Configuration conf = getTestConfiguration();
-    MiniDFSCluster cluster = null;
     String testFile = "/testDFSDataOutputStreamBuilder";
     Path testFilePath = new Path(testFile);
-    try {
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).build()) {
       DistributedFileSystem fs = cluster.getFileSystem();
 
       // Test create an empty file
-      FSDataOutputStream out =
-          fs.newFSDataOutputStreamBuilder(testFilePath).build();
-      out.close();
+      try (FSDataOutputStream out =
+               fs.createFile(testFilePath).build()) {
+        LOG.info("Test create an empty file");
+      }
 
       // Test create a file with content, and verify the content
       String content = "This is a test!";
-      out = fs.newFSDataOutputStreamBuilder(testFilePath)
-          .setBufferSize(4096).setReplication((short) 1)
-          .setBlockSize(4096).build();
-      byte[] contentOrigin = content.getBytes("UTF8");
-      out.write(contentOrigin);
-      out.close();
+      try (FSDataOutputStream out1 = fs.createFile(testFilePath)
+          .bufferSize(4096)
+          .replication((short) 1)
+          .blockSize(4096)
+          .build()) {
+        byte[] contentOrigin = content.getBytes("UTF8");
+        out1.write(contentOrigin);
+      }
 
       ContractTestUtils.verifyFileContents(fs, testFilePath,
           content.getBytes());
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
+
+      try (FSDataOutputStream out = fs.createFile(testFilePath).overwrite(false)
+        .build()) {
+        fail("it should fail to overwrite an existing file");
+      } catch (FileAlreadyExistsException e) {
+        // As expected, ignore.
+      }
+
+      Path nonParentFile = new Path("/parent/test");
+      try (FSDataOutputStream out = fs.createFile(nonParentFile).build()) {
+        fail("parent directory not exist");
+      } catch (FileNotFoundException e) {
+        // As expected.
+      }
+      assertFalse("parent directory should not be created",
+          fs.exists(new Path("/parent")));
+
+      try (FSDataOutputStream out = fs.createFile(nonParentFile).recursive()
+        .build()) {
+        out.write(1);
       }
+      assertTrue("parent directory has not been created",
+          fs.exists(new Path("/parent")));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java
index 77e6594..4a4bed5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java
@@ -540,15 +540,14 @@ public class TestErasureCodingPolicies {
     fs.setErasureCodingPolicy(dirPath, EC_POLICY.getName());
 
     // null EC policy name value means inheriting parent directory's policy
-    fs.newFSDataOutputStreamBuilder(filePath0).build().close();
+    fs.createFile(filePath0).build().close();
     ErasureCodingPolicy ecPolicyOnFile = fs.getErasureCodingPolicy(filePath0);
     assertEquals(EC_POLICY, ecPolicyOnFile);
 
     // Test illegal EC policy name
     final String illegalPolicyName = "RS-DEFAULT-1-2-64k";
     try {
-      fs.newFSDataOutputStreamBuilder(filePath1)
-          .setEcPolicyName(illegalPolicyName).build().close();
+      fs.createFile(filePath1).ecPolicyName(illegalPolicyName).build().close();
       Assert.fail("illegal erasure coding policy should not be found");
     } catch (Exception e) {
       GenericTestUtils.assertExceptionContains("Policy '" + illegalPolicyName
@@ -563,8 +562,8 @@ public class TestErasureCodingPolicies {
             SystemErasureCodingPolicies.RS_3_2_POLICY_ID);
     ecPolicyOnFile = EC_POLICY;
     fs.setErasureCodingPolicy(dirPath, ecPolicyOnDir.getName());
-    fs.newFSDataOutputStreamBuilder(filePath0)
-        .setEcPolicyName(ecPolicyOnFile.getName()).build().close();
+    fs.createFile(filePath0).ecPolicyName(ecPolicyOnFile.getName())
+        .build().close();
     assertEquals(ecPolicyOnFile, fs.getErasureCodingPolicy(filePath0));
     assertEquals(ecPolicyOnDir, fs.getErasureCodingPolicy(dirPath));
     fs.delete(dirPath, true);
@@ -582,27 +581,27 @@ public class TestErasureCodingPolicies {
     fs.setErasureCodingPolicy(dirPath, EC_POLICY.getName());
 
     final String ecPolicyName = "RS-10-4-64k";
-    fs.newFSDataOutputStreamBuilder(filePath).build().close();
+    fs.createFile(filePath).build().close();
     assertEquals(EC_POLICY, fs.getErasureCodingPolicy(filePath));
     fs.delete(filePath, true);
 
-    fs.newFSDataOutputStreamBuilder(filePath)
-        .setEcPolicyName(ecPolicyName)
+    fs.createFile(filePath)
+        .ecPolicyName(ecPolicyName)
         .build()
         .close();
     assertEquals(ecPolicyName, fs.getErasureCodingPolicy(filePath).getName());
     fs.delete(filePath, true);
 
     try {
-      fs.newFSDataOutputStreamBuilder(filePath)
-          .setEcPolicyName(ecPolicyName)
+      fs.createFile(filePath)
+          .ecPolicyName(ecPolicyName)
           .replicate()
           .build().close();
       Assert.fail("shouldReplicate and ecPolicyName are exclusive " +
           "parameters. Set both is not allowed.");
     }catch (Exception e){
-      GenericTestUtils.assertExceptionContains("shouldReplicate and " +
-          "ecPolicyName are exclusive parameters. Set both is not allowed!", e);
+      GenericTestUtils.assertExceptionContains("SHOULD_REPLICATE flag and " +
+          "ecPolicyName are exclusive parameters.", e);
     }
 
     try {
@@ -618,7 +617,7 @@ public class TestErasureCodingPolicies {
           "ecPolicyName are exclusive parameters. Set both is not allowed!", e);
     }
 
-    fs.newFSDataOutputStreamBuilder(filePath)
+    fs.createFile(filePath)
         .replicate()
         .build()
         .close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
index 50e56cc..3352fd0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
@@ -199,7 +199,7 @@ public class TestFavoredNodesEndToEnd {
       InetSocketAddress[] dns = getDatanodes(rand);
       Path p = new Path("/filename"+i);
       FSDataOutputStream out =
-          dfs.newFSDataOutputStreamBuilder(p).setFavoredNodes(dns).build();
+          dfs.createFile(p).favoredNodes(dns).build();
       out.write(SOME_BYTES);
       out.close();
       BlockLocation[] locations = getBlockLocations(p);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org