You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2009/05/25 07:23:46 UTC

svn commit: r778283 - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/fs/ src/core/org/apache/hadoop/fs/ftp/ src/core/org/apache/hadoop/fs/kfs/ src/core/org/apache/hadoop/fs/s3/ src/core/org/apache/hadoop/fs/s3native/ src/hdfs/org/apache/hadoop/hd...

Author: dhruba
Date: Mon May 25 05:23:45 2009
New Revision: 778283

URL: http://svn.apache.org/viewvc?rev=778283&view=rev
Log:
HADOOP-5438. Provide a single FileSystem method to create or open-for-append 
to a file.  (He Yongqiang via dhruba)


Added:
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/CreateFlag.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/FilterFileSystem.java
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/ftp/FTPFileSystem.java
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/s3/S3FileSystem.java
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
    hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=778283&r1=778282&r2=778283&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon May 25 05:23:45 2009
@@ -386,6 +386,9 @@
     TestNameEditsConfig, TestStartup and TestStorageRestore.
     (Jakob Homan via shv)
 
+    HADOOP-5438. Provide a single FileSystem method to create or open-for-append 
+    to a file.  (He Yongqiang via dhruba)
+
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

Added: hadoop/core/trunk/src/core/org/apache/hadoop/fs/CreateFlag.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/CreateFlag.java?rev=778283&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/CreateFlag.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/CreateFlag.java Mon May 25 05:23:45 2009
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+/****************************************************************
+ *CreateFlag specifies the file create semantic. Users can combine flags like:<br>
+ *<code>
+ * EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND)
+ * <code>
+ * and pass it to {@link org.apache.hadoop.fs.FileSystem #create(Path f, FsPermission permission,
+ * EnumSet<CreateFlag> flag, int bufferSize, short replication, long blockSize,
+ * Progressable progress)}.
+ * 
+ * <p>
+ * Combine {@link #OVERWRITE} with either {@link #CREATE} 
+ * or {@link #APPEND} does the same as only use 
+ * {@link #OVERWRITE}. <br>
+ * Combine {@link #CREATE} with {@link #APPEND} has the semantic:
+ * <ol>
+ * <li> create the file if it does not exist;
+ * <li> append the file if it already exists.
+ * </ol>
+ *****************************************************************/
+public enum CreateFlag {
+
+  /**
+   * create the file if it does not exist, and throw an IOException if it
+   * already exists
+   */
+  CREATE((short) 0x01),
+
+  /**
+   * create the file if it does not exist, if it exists, overwrite it.
+   */
+  OVERWRITE((short) 0x02),
+
+  /**
+   * append to a file, and throw an IOException if it does not exist
+   */
+  APPEND((short) 0x04);
+
+  private short mode;
+
+  private CreateFlag(short mode) {
+    this.mode = mode;
+  }
+
+  short getMode() {
+    return mode;
+  }
+}
\ No newline at end of file

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java?rev=778283&r1=778282&r2=778283&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java Mon May 25 05:23:45 2009
@@ -24,6 +24,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
@@ -506,6 +507,7 @@
   /**
    * Opens an FSDataOutputStream at the indicated Path with write-progress
    * reporting.
+   * @deprecated Consider using {@link #create(Path, FsPermission, EnumSet, int, short, long, Progressable)} instead.
    * @param f the file name to open
    * @param permission
    * @param overwrite if a file with this name already exists, then if true,
@@ -517,13 +519,36 @@
    * @throws IOException
    * @see #setPermission(Path, FsPermission)
    */
-  public abstract FSDataOutputStream create(Path f,
+  public FSDataOutputStream create(Path f,
       FsPermission permission,
       boolean overwrite,
       int bufferSize,
       short replication,
       long blockSize,
-      Progressable progress) throws IOException;
+      Progressable progress) throws IOException{
+    return create(f, permission, overwrite ? EnumSet.of(CreateFlag.OVERWRITE)
+        : EnumSet.of(CreateFlag.CREATE), bufferSize, replication, blockSize,
+        progress);
+  }
+  
+  /**
+   * Opens an FSDataOutputStream at the indicated Path with write-progress
+   * reporting.
+   * @param f the file name to open.
+   * @param permission
+   * @param flag determines the semantic of this create.
+   * @param bufferSize the size of the buffer to be used.
+   * @param replication required block replication for the file.
+   * @param blockSize
+   * @param progress
+   * @throws IOException
+   * @see #setPermission(Path, FsPermission)
+   * @see CreateFlag
+   */
+  public abstract FSDataOutputStream create(Path f, FsPermission permission,
+      EnumSet<CreateFlag> flag, int bufferSize, short replication, long blockSize,
+      Progressable progress) throws IOException ;
+  
 
   /**
    * Creates the given Path as a brand-new zero-length file.  If

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/FilterFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/FilterFileSystem.java?rev=778283&r1=778282&r2=778283&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/FilterFileSystem.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/FilterFileSystem.java Mon May 25 05:23:45 2009
@@ -20,6 +20,7 @@
 
 import java.io.*;
 import java.net.URI;
+import java.util.EnumSet;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -106,10 +107,10 @@
   /** {@inheritDoc} */
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission,
-      boolean overwrite, int bufferSize, short replication, long blockSize,
+      EnumSet<CreateFlag> flag, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException {
     return fs.create(f, permission,
-        overwrite, bufferSize, replication, blockSize, progress);
+        flag, bufferSize, replication, blockSize, progress);
   }
 
   /**

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java?rev=778283&r1=778282&r2=778283&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java Mon May 25 05:23:45 2009
@@ -28,6 +28,7 @@
 import java.io.OutputStream;
 import java.net.URI;
 import java.nio.ByteBuffer;
+import java.util.EnumSet;
 import java.util.StringTokenizer;
 
 import org.apache.hadoop.conf.Configuration;
@@ -244,10 +245,19 @@
   /** {@inheritDoc} */
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission,
-      boolean overwrite, int bufferSize, short replication, long blockSize,
+      EnumSet<CreateFlag> flag, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException {
+    
+      if(flag.contains(CreateFlag.APPEND)){
+        if (!exists(f)){
+          if(flag.contains(CreateFlag.CREATE))
+            return create(f, false, bufferSize, replication, blockSize, progress);
+        }
+        return append(f, bufferSize, progress);
+    }
+   
     FSDataOutputStream out = create(f,
-        overwrite, bufferSize, replication, blockSize, progress);
+        flag.contains(CreateFlag.OVERWRITE), bufferSize, replication, blockSize, progress);
     setPermission(f, permission);
     return out;
   }

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/ftp/FTPFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/ftp/FTPFileSystem.java?rev=778283&r1=778282&r2=778283&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/ftp/FTPFileSystem.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/ftp/FTPFileSystem.java Mon May 25 05:23:45 2009
@@ -21,6 +21,8 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
+import java.util.EnumSet;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.net.ftp.FTP;
@@ -28,6 +30,7 @@
 import org.apache.commons.net.ftp.FTPFile;
 import org.apache.commons.net.ftp.FTPReply;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -193,19 +196,30 @@
    */
   @Override
   public FSDataOutputStream create(Path file, FsPermission permission,
-      boolean overwrite, int bufferSize, short replication, long blockSize,
+      EnumSet<CreateFlag> flag, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException {
     final FTPClient client = connect();
     Path workDir = new Path(client.printWorkingDirectory());
     Path absolute = makeAbsolute(workDir, file);
+    
+    boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
+    boolean create = flag.contains(CreateFlag.CREATE);
+    boolean append= flag.contains(CreateFlag.APPEND);
+    
     if (exists(client, file)) {
       if (overwrite) {
         delete(client, file);
+      } else if(append){
+        return append(file, bufferSize, progress);
       } else {
         disconnect(client);
         throw new IOException("File already exists: " + file);
       }
+    } else {
+      if(append && !create)
+        throw new FileNotFoundException("File does not exist: "+ file);
     }
+    
     Path parent = absolute.getParent();
     if (parent == null || !mkdirs(client, parent, FsPermission.getDefault())) {
       parent = (parent == null) ? new Path("/") : parent;

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java?rev=778283&r1=778282&r2=778283&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java Mon May 25 05:23:45 2009
@@ -23,9 +23,11 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
+import java.util.EnumSet;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -186,16 +188,25 @@
 
     @Override
     public FSDataOutputStream create(Path file, FsPermission permission,
-                                     boolean overwrite, int bufferSize,
+                                     EnumSet<CreateFlag> flag, int bufferSize,
 				     short replication, long blockSize, Progressable progress)
 	throws IOException {
 
+      boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
+      boolean create = flag.contains(CreateFlag.CREATE);
+      boolean append= flag.contains(CreateFlag.APPEND);
+      
         if (exists(file)) {
             if (overwrite) {
                 delete(file, true);
+            } else if (append){
+             return append(file, bufferSize, progress);
             } else {
                 throw new IOException("File already exists: " + file);
             }
+        } else {
+          if(append && !create)
+            throw new FileNotFoundException("File does not exist: "+ file);
         }
 
 	Path parent = file.getParent();

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/s3/S3FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/s3/S3FileSystem.java?rev=778283&r1=778282&r2=778283&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/s3/S3FileSystem.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/s3/S3FileSystem.java Mon May 25 05:23:45 2009
@@ -22,12 +22,14 @@
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -201,18 +203,24 @@
    */
   @Override
   public FSDataOutputStream create(Path file, FsPermission permission,
-      boolean overwrite, int bufferSize,
+      EnumSet<CreateFlag> flag, int bufferSize,
       short replication, long blockSize, Progressable progress)
     throws IOException {
 
     INode inode = store.retrieveINode(makeAbsolute(file));
     if (inode != null) {
-      if (overwrite) {
+      if (flag.contains(CreateFlag.OVERWRITE)) {
         delete(file, true);
+      } else if (flag.contains(CreateFlag.APPEND)){
+        return append(file, bufferSize, progress);
       } else {
         throw new IOException("File already exists: " + file);
       }
     } else {
+      
+      if(flag.contains(CreateFlag.APPEND) && !flag.contains(CreateFlag.CREATE))
+        throw new FileNotFoundException("File does not exist: "+ file);
+      
       Path parent = file.getParent();
       if (parent != null) {
         if (!mkdirs(parent)) {

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java?rev=778283&r1=778282&r2=778283&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java Mon May 25 05:23:45 2009
@@ -30,6 +30,7 @@
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -41,6 +42,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BufferedFSInputStream;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSInputStream;
@@ -274,16 +276,25 @@
   
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission,
-      boolean overwrite, int bufferSize, short replication, long blockSize,
+      EnumSet<CreateFlag> flag, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException {
 
-    if (exists(f) && !overwrite) {
-      throw new IOException("File already exists:"+f);
+    if(exists(f)) {
+      if(flag.contains(CreateFlag.APPEND)){
+        return append(f, bufferSize, progress);
+      } else if(!flag.contains(CreateFlag.OVERWRITE)) {
+        throw new IOException("File already exists: "+f);
+      }
+    } else {
+       if (flag.contains(CreateFlag.APPEND) && !flag.contains(CreateFlag.CREATE))
+         throw new IOException("File already exists: " + f.toString());
     }
+    
     Path absolutePath = makeAbsolute(f);
     String key = pathToKey(absolutePath);
     return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store,
         key, progress, bufferSize), statistics);
+    
   }
   
   @Override

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=778283&r1=778282&r2=778283&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Mon May 25 05:23:45 2009
@@ -400,8 +400,10 @@
       int buffersize
       ) throws IOException {
     return create(src, FsPermission.getDefault(),
-        overwrite, replication, blockSize, progress, buffersize);
+        overwrite ? EnumSet.of(CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE), 
+        replication, blockSize, progress, buffersize);
   }
+
   /**
    * Create a new dfs file with the specified block replication 
    * with write-progress reporting and return an output stream for writing
@@ -410,7 +412,7 @@
    * @param src stream name
    * @param permission The permission of the directory being created.
    * If permission == null, use {@link FsPermission#getDefault()}.
-   * @param overwrite do not check for file existence if true
+   * @param flag do not check for file existence if true
    * @param replication block replication
    * @return output stream
    * @throws IOException
@@ -418,7 +420,7 @@
    */
   public OutputStream create(String src, 
                              FsPermission permission,
-                             boolean overwrite, 
+                             EnumSet<CreateFlag> flag, 
                              short replication,
                              long blockSize,
                              Progressable progress,
@@ -431,7 +433,7 @@
     FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
     LOG.debug(src + ": masked=" + masked);
     OutputStream result = new DFSOutputStream(src, masked,
-        overwrite, replication, blockSize, progress, buffersize,
+        flag, replication, blockSize, progress, buffersize,
         conf.getInt("io.bytes.per.checksum", 512));
     leasechecker.put(src, result);
     return result;
@@ -2679,7 +2681,7 @@
      * Create a new output stream to the given DataNode.
      * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
      */
-    DFSOutputStream(String src, FsPermission masked, boolean overwrite,
+    DFSOutputStream(String src, FsPermission masked, EnumSet<CreateFlag> flag,
         short replication, long blockSize, Progressable progress,
         int buffersize, int bytesPerChecksum) throws IOException {
       this(src, blockSize, progress, bytesPerChecksum);
@@ -2688,7 +2690,7 @@
 
       try {
         namenode.create(
-            src, masked, clientName, overwrite, replication, blockSize);
+            src, masked, clientName, new EnumSetWritable<CreateFlag>(flag), replication, blockSize);
       } catch(RemoteException re) {
         throw re.unwrapRemoteException(AccessControlException.class,
                                        QuotaExceededException.class);

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=778283&r1=778282&r2=778283&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java Mon May 25 05:23:45 2009
@@ -20,6 +20,7 @@
 
 import java.io.*;
 import java.net.*;
+import java.util.EnumSet;
 
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.*;
@@ -199,13 +200,12 @@
 
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission,
-    boolean overwrite,
-    int bufferSize, short replication, long blockSize,
+    EnumSet<CreateFlag> flag, int bufferSize, short replication, long blockSize,
     Progressable progress) throws IOException {
 
     return new FSDataOutputStream
        (dfs.create(getPathName(f), permission,
-                   overwrite, replication, blockSize, progress, bufferSize),
+                   flag, replication, blockSize, progress, bufferSize),
         statistics);
   }
 

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=778283&r1=778282&r2=778283&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/HftpFileSystem.java Mon May 25 05:23:45 2009
@@ -29,11 +29,13 @@
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.Random;
 
 import javax.security.auth.login.LoginException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSInputStream;
@@ -298,9 +300,8 @@
 
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission,
-                                   boolean overwrite, int bufferSize,
-                                   short replication, long blockSize,
-                                   Progressable progress) throws IOException {
+      EnumSet<CreateFlag> flag, int bufferSize, short replication,
+      long blockSize, Progressable progress) throws IOException {
     throw new IOException("Not supported");
   }
 

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=778283&r1=778282&r2=778283&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Mon May 25 05:23:45 2009
@@ -22,9 +22,11 @@
 
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
+import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.AccessControlException;
 
@@ -41,9 +43,9 @@
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 44: All LocatedBlock objects contain access tokens
+   * 45: add create flag for create command, see Hadoop-5438
    */
-  public static final long versionID = 44L;
+  public static final long versionID = 45L;
   
   ///////////////////////////////////////
   // File contents
@@ -89,8 +91,8 @@
    * @param src path of the file being created.
    * @param masked masked permission.
    * @param clientName name of the current client.
-   * @param overwrite indicates whether the file should be 
-   * overwritten if it already exists.
+   * @param flag indicates whether the file should be 
+   * overwritten if it already exists or create if it does not exist or append.
    * @param replication block replication factor.
    * @param blockSize maximum block size.
    * 
@@ -104,7 +106,7 @@
   public void create(String src, 
                      FsPermission masked,
                              String clientName, 
-                             boolean overwrite, 
+                             EnumSetWritable<CreateFlag> flag, 
                              short replication,
                              long blockSize
                              ) throws IOException;

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=778283&r1=778282&r2=778283&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Mon May 25 05:23:45 2009
@@ -54,6 +54,7 @@
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.*;
@@ -815,10 +816,10 @@
    */
   void startFile(String src, PermissionStatus permissions,
                  String holder, String clientMachine,
-                 boolean overwrite, short replication, long blockSize
+                 EnumSet<CreateFlag> flag, short replication, long blockSize
                 ) throws IOException {
-    startFileInternal(src, permissions, holder, clientMachine, overwrite, false,
-                      replication, blockSize);
+    startFileInternal(src, permissions, holder, clientMachine, flag,
+        replication, blockSize);
     getEditLog().logSync();
     if (auditLog.isInfoEnabled()) {
       final FileStatus stat = dir.getFileInfo(src);
@@ -832,11 +833,14 @@
                                               PermissionStatus permissions,
                                               String holder, 
                                               String clientMachine, 
-                                              boolean overwrite,
-                                              boolean append,
+                                              EnumSet<CreateFlag> flag,
                                               short replication,
                                               long blockSize
                                               ) throws IOException {
+    boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
+    boolean append = flag.contains(CreateFlag.APPEND);
+    boolean create = flag.contains(CreateFlag.CREATE);
+
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
           + ", holder=" + holder
@@ -918,8 +922,15 @@
       }
       if (append) {
         if (myFile == null) {
-          throw new FileNotFoundException("failed to append to non-existent file "
+          if(!create)
+            throw new FileNotFoundException("failed to append to non-existent file "
               + src + " on client " + clientMachine);
+          else {
+            //append & create a nonexist file equals to overwrite
+            this.startFileInternal(src, permissions, holder, clientMachine,
+                EnumSet.of(CreateFlag.OVERWRITE), replication, blockSize);
+            return;
+          }
         } else if (myFile.isDirectory()) {
           throw new IOException("failed to append to directory " + src 
                                 +" on client " + clientMachine);
@@ -992,7 +1003,7 @@
       throw new IOException("Append to hdfs not supported." +
                             " Please refer to dfs.support.append configuration parameter.");
     }
-    startFileInternal(src, null, holder, clientMachine, false, true, 
+    startFileInternal(src, null, holder, clientMachine, EnumSet.of(CreateFlag.APPEND), 
                       (short)blockManager.maxReplication, (long)0);
     getEditLog().logSync();
 

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=778283&r1=778282&r2=778283&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Mon May 25 05:23:45 2009
@@ -29,6 +29,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -62,6 +63,7 @@
 import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
@@ -547,7 +549,7 @@
   public void create(String src, 
                      FsPermission masked,
                              String clientName, 
-                             boolean overwrite,
+                             EnumSetWritable<CreateFlag> flag,
                              short replication,
                              long blockSize
                              ) throws IOException {
@@ -563,7 +565,7 @@
     namesystem.startFile(src,
         new PermissionStatus(UserGroupInformation.getCurrentUGI().getUserName(),
             null, masked),
-        clientName, clientMachine, overwrite, replication, blockSize);
+        clientName, clientMachine, flag.get(), replication, blockSize);
     myMetrics.numFilesCreated.inc();
     myMetrics.numCreateFileOps.inc();
   }

Modified: hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java?rev=778283&r1=778282&r2=778283&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java (original)
+++ hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java Mon May 25 05:23:45 2009
@@ -22,15 +22,19 @@
 import java.io.FileReader;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.EnumSet;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
@@ -617,6 +621,100 @@
       cluster.shutdown();
     }
   }
+  
+  /**
+   * Test file creation with all supported flags.
+   */
+  public void testFileCreationWithFlags() throws IOException {
+    Configuration conf = new Configuration();
+    if (simulatedStorage) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fs = cluster.getFileSystem();
+    Path path = new Path("/" + System.currentTimeMillis()
+        + "-testFileCreationWithFlags");
+    FSDataOutputStream out = null;
+
+    // append to a non-exist file, it should throw an IOException
+    try {
+      IOException expectedException = null;
+      EnumSet<CreateFlag> appendNoFile = EnumSet.of(CreateFlag.APPEND);
+      // this should throw a IOException, because the file does not exist
+      try {
+        out = createFileWithFlag(fs, path, 1, appendNoFile);
+      } catch (IOException e) {
+        expectedException = e;
+      } finally {
+        if (out != null)
+          out.close();
+      }
+      assertTrue(
+          "Append a non-exists file with no create flag should throw an IOException ",
+          expectedException != null);
+
+      // the file already exists, and recreate it with CreateFlag.APPEND,
+      // CreateFlag.CREATE. It will not throw any exception.
+      EnumSet<CreateFlag> appendAndCreate = EnumSet.of(CreateFlag.APPEND,
+          CreateFlag.CREATE);
+      out = createFileWithFlag(fs, path, 1, appendAndCreate);
+      out.close();
+
+      // the file already exists, and recreate it only with CreateFlag.CREATE
+      // flag. it should throw an IOException
+      expectedException = null;
+      EnumSet<CreateFlag> createExistsFile = EnumSet.of(CreateFlag.CREATE);
+      // this should throw a IOException, because the file already exists
+      try {
+        createFileWithFlag(fs, path, 1, createExistsFile);
+      } catch (IOException e) {
+        expectedException = e;
+      }
+      assertTrue(
+          "create a file which already exists should throw an IOException ",
+          expectedException != null);
+
+      // the file exists, recreate it with the flag of CreateFlag.OVERWRITE.
+      EnumSet<CreateFlag> overwriteFile = EnumSet.of(CreateFlag.OVERWRITE);
+      out = createFileWithFlag(fs, path, 1, overwriteFile);
+      out.close();
+
+      // the file exists, recreate it with the flag of CreateFlag.OVERWRITE
+      // together with CreateFlag.CREATE. It has the same effect as only specify
+      // CreateFlag.OVERWRITE.
+      EnumSet<CreateFlag> overwriteWithCreateFile = EnumSet.of(
+          CreateFlag.OVERWRITE, CreateFlag.CREATE);
+      out = createFileWithFlag(fs, path, 1, overwriteWithCreateFile);
+      out.close();
+
+      // the file exists, recreate it with the flag of CreateFlag.OVERWRITE
+      // together with CreateFlag.APPEND. It has the same effect as only specify
+      // CreateFlag.OVERWRITE.
+      EnumSet<CreateFlag> overwriteWithAppendFile = EnumSet.of(
+          CreateFlag.OVERWRITE, CreateFlag.APPEND);
+      out = createFileWithFlag(fs, path, 1, overwriteWithAppendFile);
+      out.close();
+
+      fs.delete(path, true);
+
+      EnumSet<CreateFlag> createNonExistsFile = EnumSet.of(CreateFlag.CREATE,
+          CreateFlag.OVERWRITE);
+      out = createFileWithFlag(fs, path, 1, createNonExistsFile);
+      out.close();
+      fs.delete(path, true);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  // creates a file with the flag api
+  static FSDataOutputStream createFileWithFlag(FileSystem fileSys, Path name, int repl, EnumSet<CreateFlag> flag)
+    throws IOException {
+    System.out.println("createFile: Created " + name + " with " + repl + " replica.");
+    FSDataOutputStream stm = fileSys.create(name, FsPermission.getDefault(), flag, 
+                                            fileSys.getConf().getInt("io.file.buffer.size", 4096),(short)repl, (long)blockSize, null);
+    return stm;
+  }
 
 /**
  * Test that file data becomes available before file is closed.

Modified: hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=778283&r1=778282&r2=778283&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java (original)
+++ hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java Mon May 25 05:23:45 2009
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.List;
 
 import javax.security.auth.login.LoginException;
@@ -30,6 +31,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -43,6 +45,7 @@
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.UnixUserGroupInformation;
@@ -512,7 +515,8 @@
       long start = System.currentTimeMillis();
       // dummyActionNoSynch(fileIdx);
       nameNode.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
-                      clientName, true, replication, BLOCK_SIZE);
+                      clientName, new EnumSetWritable<CreateFlag>(EnumSet
+              .of(CreateFlag.OVERWRITE)), replication, BLOCK_SIZE);
       long end = System.currentTimeMillis();
       for(boolean written = !closeUponCreate; !written; 
         written = nameNode.complete(fileNames[daemonId][inputIdx], clientName));
@@ -882,8 +886,9 @@
       nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
       for(int idx=0; idx < nrFiles; idx++) {
         String fileName = nameGenerator.getNextFileName("ThroughputBench");
-        nameNode.create(fileName, FsPermission.getDefault(),
-                        clientName, true, replication, BLOCK_SIZE);
+        nameNode.create(fileName, FsPermission.getDefault(), clientName,
+            new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.OVERWRITE)), replication,
+            BLOCK_SIZE);
         addBlocks(fileName, clientName);
         nameNode.complete(fileName, clientName);
       }