You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by wa...@apache.org on 2013/10/17 07:33:01 UTC

svn commit: r1532967 [1/2] - in /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project: hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/ hadoop-hdfs-nfs/ hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/ hadoop-hdfs-nfs/src/m...

Author: wang
Date: Thu Oct 17 05:32:42 2013
New Revision: 1532967

URL: http://svn.apache.org/r1532967
Log:
merge the rest of trunk to branch HDFS-4949

Added:
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestReaddir.java
      - copied unchanged from r1532945, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestReaddir.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
      - copied unchanged from r1532945, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/resources/
      - copied from r1532945, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/resources/
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/resources/core-site.xml
      - copied unchanged from r1532945, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/resources/core-site.xml
Removed:
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/LruCache.java
Modified:
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Server.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestPortmapRegister.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestUdpServer.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Server.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Server.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Server.java Thu Oct 17 05:32:42 2013
@@ -418,7 +418,11 @@ public class Server {
       Properties props = new Properties();
       try {
         InputStream is = getResource(DEFAULT_LOG4J_PROPERTIES);
-        props.load(is);
+        try {
+          props.load(is);
+        } finally {
+          is.close();
+        }
       } catch (IOException ex) {
         throw new ServerException(ServerException.ERROR.S03, DEFAULT_LOG4J_PROPERTIES, ex.getMessage(), ex);
       }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml Thu Oct 17 05:32:42 2013
@@ -49,7 +49,6 @@ http://maven.apache.org/xsd/maven-4.0.0.
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty</artifactId>
-      <version>3.6.2.Final</version>
       <scope>compile</scope>
     </dependency>
     <dependency>

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java Thu Oct 17 05:32:42 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.nfs.mount
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -38,9 +39,15 @@ import org.apache.hadoop.nfs.nfs3.FileHa
 import org.apache.hadoop.nfs.nfs3.Nfs3Status;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply;
 import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcInfo;
 import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.RpcResponse;
+import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.XDR;
-import org.jboss.netty.channel.Channel;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
 
 /**
  * RPC program corresponding to mountd daemon. See {@link Mountd}.
@@ -75,7 +82,8 @@ public class RpcProgramMountd extends Rp
   public RpcProgramMountd(List<String> exports, Configuration config)
       throws IOException {
     // Note that RPC cache is not enabled
-    super("mountd", "localhost", PORT, PROGRAM, VERSION_1, VERSION_3, 0);
+    super("mountd", "localhost", config.getInt("nfs3.mountd.port", PORT),
+        PROGRAM, VERSION_1, VERSION_3);
     
     this.hostsMatcher = NfsExports.getInstance(config);
     this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>());
@@ -88,7 +96,8 @@ public class RpcProgramMountd extends Rp
     if (LOG.isDebugEnabled()) {
       LOG.debug("MOUNT NULLOP : " + " client: " + client);
     }
-    return RpcAcceptedReply.voidReply(out, xid);
+    return RpcAcceptedReply.getAcceptInstance(xid, new VerifierNone()).write(
+        out);
   }
 
   @Override
@@ -155,7 +164,7 @@ public class RpcProgramMountd extends Rp
     
     String host = client.getHostName();
     mounts.remove(new MountEntry(host, path));
-    RpcAcceptedReply.voidReply(out, xid);
+    RpcAcceptedReply.getAcceptInstance(xid, new VerifierNone()).write(out);
     return out;
   }
 
@@ -165,14 +174,21 @@ public class RpcProgramMountd extends Rp
       LOG.debug("MOUNT UMNTALL : " + " client: " + client);
     }
     mounts.clear();
-    return RpcAcceptedReply.voidReply(out, xid);
+    return RpcAcceptedReply.getAcceptInstance(xid, new VerifierNone()).write(
+        out);
   }
 
   @Override
-  public XDR handleInternal(RpcCall rpcCall, XDR xdr, XDR out,
-      InetAddress client, Channel channel) {
+  public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+    RpcCall rpcCall = (RpcCall) info.header();
     final MNTPROC mntproc = MNTPROC.fromValue(rpcCall.getProcedure());
     int xid = rpcCall.getXid();
+    byte[] data = new byte[info.data().readableBytes()];
+    info.data().readBytes(data);
+    XDR xdr = new XDR(data);
+    XDR out = new XDR();
+    InetAddress client = ((InetSocketAddress) info.remoteAddress()).getAddress();
+
     if (mntproc == MNTPROC.NULL) {
       out = nullOp(out, xid, client);
     } else if (mntproc == MNTPROC.MNT) {
@@ -190,10 +206,13 @@ public class RpcProgramMountd extends Rp
       out = MountResponse.writeExportList(out, xid, exports, hostsMatchers);
     } else {
       // Invalid procedure
-      RpcAcceptedReply.voidReply(out, xid,
-          RpcAcceptedReply.AcceptState.PROC_UNAVAIL);
+      RpcAcceptedReply.getInstance(xid,
+          RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
+          out);
     }  
-    return out;
+    ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+    RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+    RpcUtil.sendRpcResponse(ctx, rsp);
   }
   
   @Override

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java Thu Oct 17 05:32:42 2013
@@ -97,7 +97,7 @@ public class AsyncDataService {
   void writeAsync(OpenFileCtx openFileCtx) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Scheduling write back task for fileId: "
-          + openFileCtx.copyLatestAttr().getFileId());
+          + openFileCtx.getLatestAttr().getFileId());
     }
     WriteBackTask wbTask = new WriteBackTask(openFileCtx);
     execute(wbTask);
@@ -125,7 +125,7 @@ public class AsyncDataService {
     public String toString() {
       // Called in AsyncDataService.execute for displaying error messages.
       return "write back data for fileId"
-          + openFileCtx.copyLatestAttr().getFileId() + " with nextOffset "
+          + openFileCtx.getLatestAttr().getFileId() + " with nextOffset "
           + openFileCtx.getNextOffset();
     }
 

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java Thu Oct 17 05:32:42 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.nfs.nfs3;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -27,59 +28,81 @@ import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.security.UserGroupInformation;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
 /**
  * A cache saves DFSClient objects for different users
  */
-public class DFSClientCache {
-  static final Log LOG = LogFactory.getLog(DFSClientCache.class);
-  private final LruCache<String, DFSClient> lruTable;
+class DFSClientCache {
+  private static final Log LOG = LogFactory.getLog(DFSClientCache.class);
+  /**
+   * Cache that maps User id to corresponding DFSClient.
+   */
+  @VisibleForTesting
+  final LoadingCache<String, DFSClient> clientCache;
+
+  final static int DEFAULT_DFS_CLIENT_CACHE_SIZE = 256;
+
   private final Configuration config;
 
-  public DFSClientCache(Configuration config) {
-    // By default, keep 256 DFSClient instance for 256 active users
-    this(config, 256);
+  DFSClientCache(Configuration config) {
+    this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE);
   }
 
-  public DFSClientCache(Configuration config, int size) {
-    lruTable = new LruCache<String, DFSClient>(size);
+  DFSClientCache(Configuration config, int clientCache) {
     this.config = config;
+    this.clientCache = CacheBuilder.newBuilder()
+        .maximumSize(clientCache)
+        .removalListener(clientRemovealListener())
+        .build(clientLoader());
+  }
+
+  private CacheLoader<String, DFSClient> clientLoader() {
+    return new CacheLoader<String, DFSClient>() {
+      @Override
+      public DFSClient load(String userName) throws Exception {
+        UserGroupInformation ugi = UserGroupInformation
+            .createRemoteUser(userName);
+
+        // Guava requires CacheLoader never returns null.
+        return ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
+          public DFSClient run() throws IOException {
+            return new DFSClient(NameNode.getAddress(config), config);
+          }
+        });
+      }
+    };
+  }
+
+  private RemovalListener<String, DFSClient> clientRemovealListener() {
+    return new RemovalListener<String, DFSClient>() {
+      @Override
+      public void onRemoval(RemovalNotification<String, DFSClient> notification) {
+        DFSClient client = notification.getValue();
+        try {
+          client.close();
+        } catch (IOException e) {
+          LOG.warn(String.format(
+              "IOException when closing the DFSClient(%s), cause: %s", client,
+              e));
+        }
+      }
+    };
   }
 
-  public void put(String uname, DFSClient client) {
-    lruTable.put(uname, client);
-  }
-
-  synchronized public DFSClient get(String uname) {
-    DFSClient client = lruTable.get(uname);
-    if (client != null) {
-      return client;
-    }
-
-    // Not in table, create one.
+  DFSClient get(String userName) {
+    DFSClient client = null;
     try {
-      UserGroupInformation ugi = UserGroupInformation.createRemoteUser(uname);
-      client = ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
-        public DFSClient run() throws IOException {
-          return new DFSClient(NameNode.getAddress(config), config);
-        }
-      });
-    } catch (IOException e) {
-      LOG.error("Create DFSClient failed for user:" + uname);
-      e.printStackTrace();
-
-    } catch (InterruptedException e) {
-      e.printStackTrace();
+      client = clientCache.get(userName);
+    } catch (ExecutionException e) {
+      LOG.error("Failed to create DFSClient for user:" + userName + " Cause:"
+          + e);
     }
-    // Add new entry
-    lruTable.put(uname, client);
     return client;
   }
-
-  public int usedSize() {
-    return lruTable.usedSize();
-  }
-
-  public boolean containsKey(String key) {
-    return lruTable.containsKey(key);
-  }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java Thu Oct 17 05:32:42 2013
@@ -42,7 +42,7 @@ public class Nfs3 extends Nfs3Base {
   }
 
   public Nfs3(List<String> exports, Configuration config) throws IOException {
-    super(new Mountd(exports, config), new RpcProgramNfs3(config));
+    super(new Mountd(exports, config), new RpcProgramNfs3(config), config);
   }
 
   public static void main(String[] args) throws IOException {

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java Thu Oct 17 05:32:42 2013
@@ -39,6 +39,12 @@ import org.jboss.netty.channel.Channel;
 public class Nfs3Utils {
   public final static String INODEID_PATH_PREFIX = "/.reserved/.inodes/";
 
+  
+  public final static String READ_RPC_START =  "READ_RPC_CALL_START____";
+  public final static String READ_RPC_END =    "READ_RPC_CALL_END______";
+  public final static String WRITE_RPC_START = "WRITE_RPC_CALL_START____";
+  public final static String WRITE_RPC_END =   "WRITE_RPC_CALL_END______";
+  
   public static String getFileIdPath(FileHandle handle) {
     return getFileIdPath(handle.getFileId());
   }
@@ -49,7 +55,7 @@ public class Nfs3Utils {
 
   public static HdfsFileStatus getFileStatus(DFSClient client, String fileIdPath)
       throws IOException {
-    return client.getFileInfo(fileIdPath);
+    return client.getFileLinkInfo(fileIdPath);
   }
 
   public static Nfs3FileAttributes getNfs3FileAttrFromFileStatus(
@@ -59,7 +65,10 @@ public class Nfs3Utils {
      * client takes only the lower 32bit of the fileId and treats it as signed
      * int. When the 32th bit is 1, the client considers it invalid.
      */
-    return new Nfs3FileAttributes(fs.isDir(), fs.getChildrenNum(), fs
+    NfsFileType fileType = fs.isDir() ? NfsFileType.NFSDIR : NfsFileType.NFSREG;
+    fileType = fs.isSymlink() ? NfsFileType.NFSLNK : fileType;
+    
+    return new Nfs3FileAttributes(fileType, fs.getChildrenNum(), fs
         .getPermission().toShort(), iug.getUidAllowingUnknown(fs.getOwner()),
         iug.getGidAllowingUnknown(fs.getGroup()), fs.getLen(), 0 /* fsid */,
         fs.getFileId(), fs.getModificationTime(), fs.getAccessTime());
@@ -99,7 +108,18 @@ public class Nfs3Utils {
   /**
    * Send a write response to the netty network socket channel
    */
-  public static void writeChannel(Channel channel, XDR out) {
+  public static void writeChannel(Channel channel, XDR out, int xid) {
+    if (RpcProgramNfs3.LOG.isDebugEnabled()) {
+      RpcProgramNfs3.LOG.debug(WRITE_RPC_END + xid);
+    }
+    ChannelBuffer outBuf = XDR.writeMessageTcp(out, true);
+    channel.write(outBuf);
+  }
+  
+  public static void writeChannelCommit(Channel channel, XDR out, int xid) {
+    if (RpcProgramNfs3.LOG.isDebugEnabled()) {
+      RpcProgramNfs3.LOG.debug("Commit done:" + xid);
+    }
     ChannelBuffer outBuf = XDR.writeMessageTcp(out, true);
     channel.write(outBuf);
   }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java Thu Oct 17 05:32:42 2013
@@ -17,19 +17,34 @@
  */
 package org.apache.hadoop.hdfs.nfs.nfs3;
 
+import java.util.Comparator;
+
+import com.google.common.base.Preconditions;
+
 /**
  * OffsetRange is the range of read/write request. A single point (e.g.,[5,5])
  * is not a valid range.
  */
-public class OffsetRange implements Comparable<OffsetRange> {
+public class OffsetRange {
+  
+  public static final Comparator<OffsetRange> ReverseComparatorOnMin = 
+      new Comparator<OffsetRange>() {
+    @Override
+    public int compare(OffsetRange o1, OffsetRange o2) {
+      if (o1.getMin() == o2.getMin()) {
+        return o1.getMax() < o2.getMax() ? 
+            1 : (o1.getMax() > o2.getMax() ? -1 : 0);
+      } else {
+        return o1.getMin() < o2.getMin() ? 1 : -1;
+      }
+    }
+  };
+  
   private final long min;
   private final long max;
 
   OffsetRange(long min, long max) {
-    if ((min >= max) || (min < 0) || (max < 0)) {
-      throw new IllegalArgumentException("Wrong offset range: (" + min + ","
-          + max + ")");
-    }
+    Preconditions.checkArgument(min >= 0 && max >= 0 && min < max);
     this.min = min;
     this.max = max;
   }
@@ -49,24 +64,10 @@ public class OffsetRange implements Comp
 
   @Override
   public boolean equals(Object o) {
-    assert (o instanceof OffsetRange);
-    OffsetRange range = (OffsetRange) o;
-    return (min == range.getMin()) && (max == range.getMax());
-  }
-
-  private static int compareTo(long left, long right) {
-    if (left < right) {
-      return -1;
-    } else if (left > right) {
-      return 1;
-    } else {
-      return 0;
+    if (o instanceof OffsetRange) {
+      OffsetRange range = (OffsetRange) o;
+      return (min == range.getMin()) && (max == range.getMax());
     }
-  }
-
-  @Override
-  public int compareTo(OffsetRange other) {
-    final int d = compareTo(min, other.getMin());
-    return d != 0 ? d : compareTo(max, other.getMax());
+    return false;
   }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Thu Oct 17 05:32:42 2013
@@ -22,12 +22,15 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
 import java.security.InvalidParameterException;
 import java.util.EnumSet;
 import java.util.Iterator;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,12 +48,18 @@ import org.apache.hadoop.nfs.nfs3.Nfs3Co
 import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
 import org.apache.hadoop.nfs.nfs3.Nfs3Status;
 import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
+import org.apache.hadoop.nfs.nfs3.response.COMMIT3Response;
 import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
 import org.apache.hadoop.nfs.nfs3.response.WccAttr;
 import org.apache.hadoop.nfs.nfs3.response.WccData;
 import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
+import org.apache.hadoop.util.Daemon;
 import org.jboss.netty.channel.Channel;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
 /**
  * OpenFileCtx saves the context of one HDFS file output stream. Access to it is
  * synchronized by its member lock.
@@ -58,34 +67,95 @@ import org.jboss.netty.channel.Channel;
 class OpenFileCtx {
   public static final Log LOG = LogFactory.getLog(OpenFileCtx.class);
   
-  /**
-   * Lock to synchronize OpenFileCtx changes. Thread should get this lock before
-   * any read/write operation to an OpenFileCtx object
-   */
-  private final ReentrantLock ctxLock;
+  // Pending writes water mark for dump, 1MB
+  private static long DUMP_WRITE_WATER_MARK = 1024 * 1024;
+
+  static enum COMMIT_STATUS {
+    COMMIT_FINISHED,
+    COMMIT_WAIT,
+    COMMIT_INACTIVE_CTX,
+    COMMIT_INACTIVE_WITH_PENDING_WRITE,
+    COMMIT_ERROR,
+    COMMIT_DO_SYNC;
+  }
 
+  private final DFSClient client;
+  private final IdUserGroup iug;
+  
   // The stream status. False means the stream is closed.
-  private boolean activeState;
+  private volatile boolean activeState;
   // The stream write-back status. True means one thread is doing write back.
-  private boolean asyncStatus;
+  private volatile boolean asyncStatus;
 
+  /**
+   * The current offset of the file in HDFS. All the content before this offset
+   * has been written back to HDFS.
+   */
+  private AtomicLong nextOffset;
   private final HdfsDataOutputStream fos;
-  private final Nfs3FileAttributes latestAttr;
-  private long nextOffset;
+  
+  // It's updated after each sync to HDFS
+  private Nfs3FileAttributes latestAttr;
 
-  private final SortedMap<OffsetRange, WriteCtx> pendingWrites;
+  private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites;
+  
+  private final ConcurrentNavigableMap<Long, CommitCtx> pendingCommits;
+
+  static class CommitCtx {
+    private final long offset;
+    private final Channel channel;
+    private final int xid;
+    private final Nfs3FileAttributes preOpAttr;
+
+    // Remember time for debug purpose
+    private final long startTime;
+
+    long getOffset() {
+      return offset;
+    }
+
+    Channel getChannel() {
+      return channel;
+    }
+
+    int getXid() {
+      return xid;
+    }
+
+    Nfs3FileAttributes getPreOpAttr() {
+      return preOpAttr;
+    }
+
+    long getStartTime() {
+      return startTime;
+    }
+
+    CommitCtx(long offset, Channel channel, int xid,
+        Nfs3FileAttributes preOpAttr) {
+      this.offset = offset;
+      this.channel = channel;
+      this.xid = xid;
+      this.preOpAttr = preOpAttr;
+      this.startTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public String toString() {
+      return String.format("offset: %d xid: %d startTime: %d", offset, xid,
+          startTime);
+    }
+  }
   
   // The last write, commit request or write-back event. Updating time to keep
   // output steam alive.
   private long lastAccessTime;
   
-  // Pending writes water mark for dump, 1MB
-  private static int DUMP_WRITE_WATER_MARK = 1024 * 1024; 
+  private volatile boolean enabledDump;
   private FileOutputStream dumpOut;
-  private long nonSequentialWriteInMemory;
-  private boolean enabledDump;
+  private AtomicLong nonSequentialWriteInMemory;
   private RandomAccessFile raf;
   private final String dumpFilePath;
+  private Daemon dumpThread;
   
   private void updateLastAccessTime() {
     lastAccessTime = System.currentTimeMillis();
@@ -95,89 +165,55 @@ class OpenFileCtx {
     return System.currentTimeMillis() - lastAccessTime > streamTimeout;
   }
   
+  public long getNextOffset() {
+    return nextOffset.get();
+  }
+  
   // Increase or decrease the memory occupation of non-sequential writes
   private long updateNonSequentialWriteInMemory(long count) {
-    nonSequentialWriteInMemory += count;
+    long newValue = nonSequentialWriteInMemory.addAndGet(count);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Update nonSequentialWriteInMemory by " + count + " new value:"
-          + nonSequentialWriteInMemory);
+          + newValue);
     }
 
-    if (nonSequentialWriteInMemory < 0) {
-      LOG.error("nonSequentialWriteInMemory is negative after update with count "
-          + count);
-      throw new IllegalArgumentException(
-          "nonSequentialWriteInMemory is negative after update with count "
-              + count);
-    }
-    return nonSequentialWriteInMemory;
+    Preconditions.checkState(newValue >= 0,
+        "nonSequentialWriteInMemory is negative after update with count "
+            + count);
+    return newValue;
   }
   
   OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
-      String dumpFilePath) {
+      String dumpFilePath, DFSClient client, IdUserGroup iug) {
     this.fos = fos;
     this.latestAttr = latestAttr;
-    pendingWrites = new TreeMap<OffsetRange, WriteCtx>();
+    // We use the ReverseComparatorOnMin as the comparator of the map. In this
+    // way, we first dump the data with larger offset. In the meanwhile, we
+    // retrieve the last element to write back to HDFS.
+    pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>(
+        OffsetRange.ReverseComparatorOnMin);
+    
+    pendingCommits = new ConcurrentSkipListMap<Long, CommitCtx>();
+    
     updateLastAccessTime();
     activeState = true;
     asyncStatus = false;
     dumpOut = null;
     raf = null;
-    nonSequentialWriteInMemory = 0;
+    nonSequentialWriteInMemory = new AtomicLong(0);
+  
     this.dumpFilePath = dumpFilePath;  
     enabledDump = dumpFilePath == null ? false: true;
-    nextOffset = latestAttr.getSize();
-    assert(nextOffset == this.fos.getPos());
-
-    ctxLock = new ReentrantLock(true);
-  }
-
-  private void lockCtx() {
-    if (LOG.isTraceEnabled()) {
-      StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
-      StackTraceElement e = stacktrace[2];
-      String methodName = e.getMethodName();
-      LOG.trace("lock ctx, caller:" + methodName);
-    }
-    ctxLock.lock();
+    nextOffset = new AtomicLong();
+    nextOffset.set(latestAttr.getSize());
+    assert(nextOffset.get() == this.fos.getPos());
+    dumpThread = null;
+    this.client = client;
+    this.iug = iug;
   }
 
-  private void unlockCtx() {
-    ctxLock.unlock();
-    if (LOG.isTraceEnabled()) {
-      StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
-      StackTraceElement e = stacktrace[2];
-      String methodName = e.getMethodName();
-      LOG.info("unlock ctx, caller:" + methodName);
-    }
-  }
-  
-  // Make a copy of the latestAttr
-  public Nfs3FileAttributes copyLatestAttr() {
-    Nfs3FileAttributes ret;
-    lockCtx();
-    try {
-      ret = new Nfs3FileAttributes(latestAttr);
-    } finally {
-      unlockCtx();
-    }
-    return ret;
-  }
-  
-  private long getNextOffsetUnprotected() {
-    assert(ctxLock.isLocked());
-    return nextOffset;
-  }
-
-  public long getNextOffset() {
-    long ret;
-    lockCtx();
-    try {
-      ret = getNextOffsetUnprotected();
-    } finally {
-      unlockCtx();
-    }
-    return ret;
+  public Nfs3FileAttributes getLatestAttr() {
+    return latestAttr;
   }
   
   // Get flushed offset. Note that flushed data may not be persisted.
@@ -186,12 +222,7 @@ class OpenFileCtx {
   }
   
   // Check if need to dump the new writes
-  private void checkDump(long count) {
-    assert (ctxLock.isLocked());
-
-    // Always update the in memory count
-    updateNonSequentialWriteInMemory(count);
-
+  private void checkDump() {
     if (!enabledDump) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Do nothing, dump is disabled.");
@@ -199,66 +230,129 @@ class OpenFileCtx {
       return;
     }
 
-    if (nonSequentialWriteInMemory < DUMP_WRITE_WATER_MARK) {
+    if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
       return;
     }
 
-    // Create dump outputstream for the first time
-    if (dumpOut == null) {
-      LOG.info("Create dump file:" + dumpFilePath);
-      File dumpFile = new File(dumpFilePath);
-      try {
-        if (dumpFile.exists()) {
-          LOG.fatal("The dump file should not exist:" + dumpFilePath);
-          throw new RuntimeException("The dump file should not exist:"
-              + dumpFilePath);
+    // wake up the dumper thread to dump the data
+    synchronized (this) {
+      if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Asking dumper to dump...");
+        }
+        if (dumpThread == null) {
+          dumpThread = new Daemon(new Dumper());
+          dumpThread.start();
+        } else {
+          this.notifyAll();          
         }
-        dumpOut = new FileOutputStream(dumpFile);
-      } catch (IOException e) {
-        LOG.error("Got failure when creating dump stream " + dumpFilePath
-            + " with error:" + e);
-        enabledDump = false;
-        IOUtils.cleanup(LOG, dumpOut);
-        return;
       }
     }
-    // Get raf for the first dump
-    if (raf == null) {
-      try {
-        raf = new RandomAccessFile(dumpFilePath, "r");
-      } catch (FileNotFoundException e) {
-        LOG.error("Can't get random access to file " + dumpFilePath);
-        // Disable dump
-        enabledDump = false;
-        return;
+  }
+
+  class Dumper implements Runnable {
+    /** Dump data into a file */
+    private void dump() {
+      // Create dump outputstream for the first time
+      if (dumpOut == null) {
+        LOG.info("Create dump file:" + dumpFilePath);
+        File dumpFile = new File(dumpFilePath);
+        try {
+          synchronized (this) {
+            // check if alive again
+            Preconditions.checkState(dumpFile.createNewFile(),
+                "The dump file should not exist: %s", dumpFilePath);
+            dumpOut = new FileOutputStream(dumpFile);
+          }
+        } catch (IOException e) {
+          LOG.error("Got failure when creating dump stream " + dumpFilePath, e);
+          enabledDump = false;
+          if (dumpOut != null) {
+            try {
+              dumpOut.close();
+            } catch (IOException e1) {
+              LOG.error("Can't close dump stream " + dumpFilePath, e);
+            }
+          }
+          return;
+        }
       }
-    }
-    
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Start dump, current write number:" + pendingWrites.size());
-    }
-    Iterator<OffsetRange> it = pendingWrites.keySet().iterator();
-    while (it.hasNext()) {
-      OffsetRange key = it.next();
-      WriteCtx writeCtx = pendingWrites.get(key);
-      try {
-        long dumpedDataSize = writeCtx.dumpData(dumpOut, raf);
-        if (dumpedDataSize > 0) {
-          updateNonSequentialWriteInMemory(-dumpedDataSize);
+
+      // Get raf for the first dump
+      if (raf == null) {
+        try {
+          raf = new RandomAccessFile(dumpFilePath, "r");
+        } catch (FileNotFoundException e) {
+          LOG.error("Can't get random access to file " + dumpFilePath);
+          // Disable dump
+          enabledDump = false;
+          return;
         }
-      } catch (IOException e) {
-        LOG.error("Dump data failed:" + writeCtx + " with error:" + e);
-        // Disable dump
-        enabledDump = false;
-        return;
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Start dump. Before dump, nonSequentialWriteInMemory == "
+            + nonSequentialWriteInMemory.get());
+      }
+
+      Iterator<OffsetRange> it = pendingWrites.keySet().iterator();
+      while (activeState && it.hasNext()
+          && nonSequentialWriteInMemory.get() > 0) {
+        OffsetRange key = it.next();
+        WriteCtx writeCtx = pendingWrites.get(key);
+        if (writeCtx == null) {
+          // This write was just deleted
+          continue;
+        }
+        try {
+          long dumpedDataSize = writeCtx.dumpData(dumpOut, raf);
+          if (dumpedDataSize > 0) {
+            updateNonSequentialWriteInMemory(-dumpedDataSize);
+          }
+        } catch (IOException e) {
+          LOG.error("Dump data failed:" + writeCtx + " with error:" + e
+              + " OpenFileCtx state:" + activeState);
+          // Disable dump
+          enabledDump = false;
+          return;
+        }
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("After dump, nonSequentialWriteInMemory == "
+            + nonSequentialWriteInMemory.get());
       }
     }
-    if (nonSequentialWriteInMemory != 0) {
-      LOG.fatal("After dump, nonSequentialWriteInMemory is not zero: "
-          + nonSequentialWriteInMemory);
-      throw new RuntimeException(
-          "After dump, nonSequentialWriteInMemory is not zero: "
-              + nonSequentialWriteInMemory);
+
+    @Override
+    public void run() {
+      while (activeState && enabledDump) {
+        try {
+          if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
+            dump();
+          }
+          synchronized (OpenFileCtx.this) {
+            if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
+              try {
+                OpenFileCtx.this.wait();
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Dumper woke up");
+                }
+              } catch (InterruptedException e) {
+                LOG.info("Dumper is interrupted, dumpFilePath= "
+                    + OpenFileCtx.this.dumpFilePath);
+              }
+            }
+          }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Dumper checking OpenFileCtx activeState: " + activeState
+                + " enabledDump: " + enabledDump);
+          }
+        } catch (Throwable t) {
+          LOG.info("Dumper get Throwable: " + t + ". dumpFilePath: "
+              + OpenFileCtx.this.dumpFilePath);
+        }
+      }
     }
   }
   
@@ -282,143 +376,252 @@ class OpenFileCtx {
   public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request,
       Channel channel, int xid, AsyncDataService asyncDataService,
       IdUserGroup iug) {
-
-    lockCtx();
-    try {
-      if (!activeState) {
-        LOG.info("OpenFileCtx is inactive, fileId:"
-            + request.getHandle().getFileId());
-        WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
-        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
-            fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
-      } else {
-        // Handle repeated write requests(same xid or not).
-        // If already replied, send reply again. If not replied, drop the
-        // repeated request.
-        WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel,
-            xid);
-        if (existantWriteCtx != null) {
-          if (!existantWriteCtx.getReplied()) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Repeated write request which hasn't be served: xid="
-                  + xid + ", drop it.");
-            }
-          } else {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Repeated write request which is already served: xid="
-                  + xid + ", resend response.");
-            }
-            WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
-            WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
-                fileWcc, request.getCount(), request.getStableHow(),
-                Nfs3Constant.WRITE_COMMIT_VERF);
-            Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+    
+    if (!activeState) {
+      LOG.info("OpenFileCtx is inactive, fileId:"
+          + request.getHandle().getFileId());
+      WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
+      WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
+          fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
+      Nfs3Utils.writeChannel(channel,
+          response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
+          xid);
+    } else {
+      // Update the write time first
+      updateLastAccessTime();
+      
+      // Handle repeated write requests (same xid or not).
+      // If already replied, send reply again. If not replied, drop the
+      // repeated request.
+      WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel,
+          xid);
+      if (existantWriteCtx != null) {
+        if (!existantWriteCtx.getReplied()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Repeated write request which hasn't be served: xid="
+                + xid + ", drop it.");
           }
-          updateLastAccessTime();
-          
         } else {
-          receivedNewWriteInternal(dfsClient, request, channel, xid,
-              asyncDataService, iug);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Repeated write request which is already served: xid="
+                + xid + ", resend response.");
+          }
+          WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
+          WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
+              fileWcc, request.getCount(), request.getStableHow(),
+              Nfs3Constant.WRITE_COMMIT_VERF);
+          Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
+              new XDR(), xid, new VerifierNone()), xid);
         }
+      } else {
+        // not a repeated write request
+        receivedNewWriteInternal(dfsClient, request, channel, xid,
+            asyncDataService, iug);
       }
-
-    } finally {
-      unlockCtx();
     }
   }
 
-  private void receivedNewWriteInternal(DFSClient dfsClient,
-      WRITE3Request request, Channel channel, int xid,
-      AsyncDataService asyncDataService, IdUserGroup iug) {
+  @VisibleForTesting
+  public static void alterWriteRequest(WRITE3Request request, long cachedOffset) {
     long offset = request.getOffset();
     int count = request.getCount();
-    WriteStableHow stableHow = request.getStableHow();
-
-    // Get file length, fail non-append call
-    WccAttr preOpAttr = latestAttr.getWccAttr();
+    long smallerCount = offset + count - cachedOffset;
     if (LOG.isDebugEnabled()) {
-      LOG.debug("requesed offset=" + offset + " and current filesize="
-          + preOpAttr.getSize());
+      LOG.debug(String.format("Got overwrite with appended data (%d-%d),"
+          + " current offset %d," + " drop the overlapped section (%d-%d)"
+          + " and append new data (%d-%d).", offset, (offset + count - 1),
+          cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
+              + count - 1)));
+    }
+    
+    ByteBuffer data = request.getData();
+    Preconditions.checkState(data.position() == 0,
+        "The write request data has non-zero position");
+    data.position((int) (cachedOffset - offset));
+    Preconditions.checkState(data.limit() - data.position() == smallerCount,
+        "The write request buffer has wrong limit/position regarding count");
+    
+    request.setOffset(cachedOffset);
+    request.setCount((int) smallerCount);
+  }
+  
+  /**
+   * Creates and adds a WriteCtx into the pendingWrites map. This is a
+   * synchronized method to handle concurrent writes.
+   * 
+   * @return A non-null {@link WriteCtx} instance if the incoming write
+   *         request's offset >= nextOffset. Otherwise null.
+   */
+  private synchronized WriteCtx addWritesToCache(WRITE3Request request,
+      Channel channel, int xid) {
+    long offset = request.getOffset();
+    int count = request.getCount();
+    long cachedOffset = nextOffset.get();
+    int originalCount = WriteCtx.INVALID_ORIGINAL_COUNT;
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("requesed offset=" + offset + " and current offset="
+          + cachedOffset);
     }
 
-    long nextOffset = getNextOffsetUnprotected();
-    if (offset == nextOffset) {
-      LOG.info("Add to the list, update nextOffset and notify the writer,"
-          + " nextOffset:" + nextOffset);
-      WriteCtx writeCtx = new WriteCtx(request.getHandle(),
-          request.getOffset(), request.getCount(), request.getStableHow(),
-          request.getData().array(), channel, xid, false, DataState.NO_DUMP);
-      addWrite(writeCtx);
+    // Handle a special case first
+    if ((offset < cachedOffset) && (offset + count > cachedOffset)) {
+      // One Linux client behavior: after a file is closed and reopened to
+      // write, the client sometimes combines previous written data(could still
+      // be in kernel buffer) with newly appended data in one write. This is
+      // usually the first write after file reopened. In this
+      // case, we log the event and drop the overlapped section.
+      LOG.warn(String.format("Got overwrite with appended data (%d-%d),"
+          + " current offset %d," + " drop the overlapped section (%d-%d)"
+          + " and append new data (%d-%d).", offset, (offset + count - 1),
+          cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
+              + count - 1)));
+
+      if (!pendingWrites.isEmpty()) {
+        LOG.warn("There are other pending writes, fail this jumbo write");
+        return null;
+      }
       
-      // Create an async task and change openFileCtx status to indicate async
-      // task pending
+      LOG.warn("Modify this write to write only the appended data");
+      alterWriteRequest(request, cachedOffset);
+
+      // Update local variable
+      originalCount = count;
+      offset = request.getOffset();
+      count = request.getCount();
+    }
+    
+    // Fail non-append call
+    if (offset < cachedOffset) {
+      LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
+          + nextOffset + ")");
+      return null;
+    } else {
+      DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP
+          : WriteCtx.DataState.ALLOW_DUMP;
+      WriteCtx writeCtx = new WriteCtx(request.getHandle(),
+          request.getOffset(), request.getCount(), originalCount,
+          request.getStableHow(), request.getData(), channel, xid, false,
+          dataState);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Add new write to the list with nextOffset " + cachedOffset
+            + " and requesed offset=" + offset);
+      }
+      if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
+        // update the memory size
+        updateNonSequentialWriteInMemory(count);
+      }
+      // check if there is a WriteCtx with the same range in pendingWrites
+      WriteCtx oldWriteCtx = checkRepeatedWriteRequest(request, channel, xid);
+      if (oldWriteCtx == null) {
+        addWrite(writeCtx);
+      } else {
+        LOG.warn("Got a repeated request, same range, with xid:"
+            + writeCtx.getXid());
+      }
+      return writeCtx;
+    }
+  }
+  
+  /** Process an overwrite write request */
+  private void processOverWrite(DFSClient dfsClient, WRITE3Request request,
+      Channel channel, int xid, IdUserGroup iug) {
+    WccData wccData = new WccData(latestAttr.getWccAttr(), null);
+    long offset = request.getOffset();
+    int count = request.getCount();
+    WriteStableHow stableHow = request.getStableHow();
+    WRITE3Response response;
+    long cachedOffset = nextOffset.get();
+    if (offset + count > cachedOffset) {
+      LOG.warn("Treat this jumbo write as a real random write, no support.");
+      response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
+          WriteStableHow.UNSTABLE, Nfs3Constant.WRITE_COMMIT_VERF);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Process perfectOverWrite");
+      }
+      // TODO: let executor handle perfect overwrite
+      response = processPerfectOverWrite(dfsClient, offset, count, stableHow,
+          request.getData().array(),
+          Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug);
+    }
+    updateLastAccessTime();
+    Nfs3Utils.writeChannel(channel,
+        response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
+        xid);
+  }
+  
+  /**
+   * Check if we can start the write (back to HDFS) now. If there is no hole for
+   * writing, and there is no other threads writing (i.e., asyncStatus is
+   * false), start the writing and set asyncStatus to true.
+   * 
+   * @return True if the new write is sequencial and we can start writing
+   *         (including the case that there is already a thread writing).
+   */
+  private synchronized boolean checkAndStartWrite(
+      AsyncDataService asyncDataService, WriteCtx writeCtx) {
+    
+    if (writeCtx.getOffset() == nextOffset.get()) {
       if (!asyncStatus) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Trigger the write back task. Current nextOffset: "
+              + nextOffset.get());
+        }
         asyncStatus = true;
         asyncDataService.execute(new AsyncDataService.WriteBackTask(this));
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("The write back thread is working.");
+        }
       }
-      
-      // Update the write time first
-      updateLastAccessTime();
-      Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr);
-
-      // Send response immediately for unstable write
-      if (request.getStableHow() == WriteStableHow.UNSTABLE) {
-        WccData fileWcc = new WccData(preOpAttr, postOpAttr);
-        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
-            fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
-        writeCtx.setReplied(true);
-      }
-
-    } else if (offset > nextOffset) {
-      LOG.info("Add new write to the list but not update nextOffset:"
-          + nextOffset);
-      WriteCtx writeCtx = new WriteCtx(request.getHandle(),
-          request.getOffset(), request.getCount(), request.getStableHow(),
-          request.getData().array(), channel, xid, false, DataState.ALLOW_DUMP);
-      addWrite(writeCtx);
+      return true;
+    } else {
+      return false;
+    }
+  }
 
-      // Check if need to dump some pending requests to file
-      checkDump(request.getCount());
-      updateLastAccessTime();
-      Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr);
-      
-      // In test, noticed some Linux client sends a batch (e.g., 1MB)
-      // of reordered writes and won't send more writes until it gets
-      // responses of the previous batch. So here send response immediately for
-      // unstable non-sequential write
-      if (request.getStableHow() == WriteStableHow.UNSTABLE) {
-        WccData fileWcc = new WccData(preOpAttr, postOpAttr);
-        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
-            fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
-        writeCtx.setReplied(true);
-      }
+  private void receivedNewWriteInternal(DFSClient dfsClient,
+      WRITE3Request request, Channel channel, int xid,
+      AsyncDataService asyncDataService, IdUserGroup iug) {
+    WriteStableHow stableHow = request.getStableHow();
+    WccAttr preOpAttr = latestAttr.getWccAttr();
+    int count = request.getCount();
 
-    } else {
+    WriteCtx writeCtx = addWritesToCache(request, channel, xid);
+    if (writeCtx == null) {
       // offset < nextOffset
-      LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
-          + nextOffset + ")");
-      WccData wccData = new WccData(preOpAttr, null);
-      WRITE3Response response;
+      processOverWrite(dfsClient, request, channel, xid, iug);
+    } else {
+      // The writes is added to pendingWrites.
+      // Check and start writing back if necessary
+      boolean startWriting = checkAndStartWrite(asyncDataService, writeCtx);
+      if (!startWriting) {
+        // offset > nextOffset. check if we need to dump data
+        checkDump();
+        
+        // In test, noticed some Linux client sends a batch (e.g., 1MB)
+        // of reordered writes and won't send more writes until it gets
+        // responses of the previous batch. So here send response immediately
+        // for unstable non-sequential write
+        if (stableHow != WriteStableHow.UNSTABLE) {
+          LOG.info("Have to change stable write to unstable write:"
+              + request.getStableHow());
+          stableHow = WriteStableHow.UNSTABLE;
+        }
 
-      if (offset + count > nextOffset) {
-        LOG.warn("Haven't noticed any partial overwrite out of a sequential file"
-            + "write requests, so treat it as a real random write, no support.");
-        response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
-            WriteStableHow.UNSTABLE, 0);
-      } else {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Process perfectOverWrite");
+          LOG.debug("UNSTABLE write request, send response for offset: "
+              + writeCtx.getOffset());
         }
-        response = processPerfectOverWrite(dfsClient, offset, count, stableHow,
-            request.getData().array(),
-            Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug);
+        WccData fileWcc = new WccData(preOpAttr, latestAttr);
+        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
+            fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+        Nfs3Utils
+            .writeChannel(channel, response.writeHeaderAndResponse(new XDR(),
+                xid, new VerifierNone()), xid);
+        writeCtx.setReplied(true);
       }
-      
-      updateLastAccessTime();
-      Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
     }
   }
   
@@ -430,7 +633,6 @@ class OpenFileCtx {
   private WRITE3Response processPerfectOverWrite(DFSClient dfsClient,
       long offset, int count, WriteStableHow stableHow, byte[] data,
       String path, WccData wccData, IdUserGroup iug) {
-    assert (ctxLock.isLocked());
     WRITE3Response response = null;
 
     // Read the content back
@@ -441,21 +643,30 @@ class OpenFileCtx {
     try {
       // Sync file data and length to avoid partial read failure
       fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
-      
+    } catch (ClosedChannelException closedException) {
+      LOG.info("The FSDataOutputStream has been closed. " +
+      		"Continue processing the perfect overwrite.");
+    } catch (IOException e) {
+      LOG.info("hsync failed when processing possible perfect overwrite, path="
+          + path + " error:" + e);
+      return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
+          Nfs3Constant.WRITE_COMMIT_VERF);
+    }
+    
+    try {
       fis = new FSDataInputStream(dfsClient.open(path));
       readCount = fis.read(offset, readbuffer, 0, count);
       if (readCount < count) {
         LOG.error("Can't read back " + count + " bytes, partial read size:"
             + readCount);
-        return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0,
-            stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+        return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
+            Nfs3Constant.WRITE_COMMIT_VERF);
       }
-
     } catch (IOException e) {
       LOG.info("Read failed when processing possible perfect overwrite, path="
           + path + " error:" + e);
-      return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0,
-          stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+      return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
+          Nfs3Constant.WRITE_COMMIT_VERF);
     } finally {
       IOUtils.cleanup(LOG, fis);
     }
@@ -465,7 +676,7 @@ class OpenFileCtx {
     if (comparator.compare(readbuffer, 0, readCount, data, 0, count) != 0) {
       LOG.info("Perfect overwrite has different content");
       response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
-          stableHow, 0);
+          stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
     } else {
       LOG.info("Perfect overwrite has same content,"
           + " updating the mtime, then return success");
@@ -477,85 +688,110 @@ class OpenFileCtx {
         LOG.info("Got error when processing perfect overwrite, path=" + path
             + " error:" + e);
         return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
-            0);
+            Nfs3Constant.WRITE_COMMIT_VERF);
       }
 
       wccData.setPostOpAttr(postOpAttr);
       response = new WRITE3Response(Nfs3Status.NFS3_OK, wccData, count,
-          stableHow, 0);
+          stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
     }
     return response;
   }
-  
-  public final static int COMMIT_FINISHED = 0;
-  public final static int COMMIT_WAIT = 1;
-  public final static int COMMIT_INACTIVE_CTX = 2;
-  public final static int COMMIT_ERROR = 3;
 
-  /**
-   * return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
-   * COMMIT_INACTIVE_CTX, COMMIT_ERROR
-   */
-  public int checkCommit(long commitOffset) {
-    int ret = COMMIT_WAIT;
+  public COMMIT_STATUS checkCommit(DFSClient dfsClient, long commitOffset,
+      Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
+    // Keep stream active
+    updateLastAccessTime();
+    Preconditions.checkState(commitOffset >= 0);
 
-    lockCtx();
-    try {
-      if (!activeState) {
-        ret = COMMIT_INACTIVE_CTX;
-      } else {
-        ret = checkCommitInternal(commitOffset);
+    COMMIT_STATUS ret = checkCommitInternal(commitOffset, channel, xid,
+        preOpAttr);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Got commit status: " + ret.name());
+    }
+    // Do the sync outside the lock
+    if (ret == COMMIT_STATUS.COMMIT_DO_SYNC
+        || ret == COMMIT_STATUS.COMMIT_FINISHED) {
+      try {
+        // Sync file data and length
+        fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
+        // Nothing to do for metadata since attr related change is pass-through
+      } catch (ClosedChannelException cce) {
+        if (pendingWrites.isEmpty()) {
+          ret = COMMIT_STATUS.COMMIT_FINISHED;
+        } else {
+          ret = COMMIT_STATUS.COMMIT_ERROR;
+        }
+      } catch (IOException e) {
+        LOG.error("Got stream error during data sync:" + e);
+        // Do nothing. Stream will be closed eventually by StreamMonitor.
+        // status = Nfs3Status.NFS3ERR_IO;
+        ret = COMMIT_STATUS.COMMIT_ERROR;
       }
-    } finally {
-      unlockCtx();
     }
     return ret;
   }
-  
-  private int checkCommitInternal(long commitOffset) {
-    if (commitOffset == 0) {
-      // Commit whole file
-      commitOffset = getNextOffsetUnprotected();
+
+  /**
+   * return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
+   * COMMIT_INACTIVE_CTX, COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR
+   */
+  private synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
+      Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
+    if (!activeState) {
+      if (pendingWrites.isEmpty()) {
+        return COMMIT_STATUS.COMMIT_INACTIVE_CTX;
+      } else {
+        // TODO: return success if already committed
+        return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE;
+      }
     }
 
     long flushed = getFlushedOffset();
-    LOG.info("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
-    if (flushed < commitOffset) {
-      // Keep stream active
-      updateLastAccessTime();
-      return COMMIT_WAIT;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
     }
 
-    int ret = COMMIT_WAIT;
-    try {
-      // Sync file data and length
-      fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
-      // Nothing to do for metadata since attr related change is pass-through
-      ret = COMMIT_FINISHED;
-    } catch (IOException e) {
-      LOG.error("Got stream error during data sync:" + e);
-      // Do nothing. Stream will be closed eventually by StreamMonitor.
-      ret = COMMIT_ERROR;
+    if (commitOffset > 0) {
+      if (commitOffset > flushed) {
+        CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid,
+            preOpAttr);
+        pendingCommits.put(commitOffset, commitCtx);
+        return COMMIT_STATUS.COMMIT_WAIT;
+      } else {
+        return COMMIT_STATUS.COMMIT_DO_SYNC;
+      }
     }
 
-    // Keep stream active
-    updateLastAccessTime();
-    return ret;
+    Entry<OffsetRange, WriteCtx> key = pendingWrites.firstEntry();
+
+    // Commit whole file, commitOffset == 0
+    if (pendingWrites.isEmpty()) {
+      // Note that, there is no guarantee data is synced. TODO: We could still
+      // do a sync here though the output stream might be closed.
+      return COMMIT_STATUS.COMMIT_FINISHED;
+    } else {
+      // Insert commit
+      long maxOffset = key.getKey().getMax() - 1;
+      Preconditions.checkState(maxOffset > 0);
+      CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, preOpAttr);
+      pendingCommits.put(maxOffset, commitCtx);
+      return COMMIT_STATUS.COMMIT_WAIT;
+    }
   }
   
   private void addWrite(WriteCtx writeCtx) {
-    assert (ctxLock.isLocked());
     long offset = writeCtx.getOffset();
     int count = writeCtx.getCount();
+    // For the offset range (min, max), min is inclusive, and max is exclusive
     pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
   }
   
-  
   /**
    * Check stream status to decide if it should be closed
    * @return true, remove stream; false, keep stream
    */
-  public boolean streamCleanup(long fileId, long streamTimeout) {
+  public synchronized boolean streamCleanup(long fileId, long streamTimeout) {
     if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) {
       throw new InvalidParameterException("StreamTimeout" + streamTimeout
           + "ms is less than MINIMIUM_STREAM_TIMEOUT "
@@ -563,131 +799,189 @@ class OpenFileCtx {
     }
     
     boolean flag = false;
-    if (!ctxLock.tryLock()) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Another thread is working on it" + ctxLock.toString());
-      }
-      return flag;
-    }
-    
-    try {
-      // Check the stream timeout
-      if (checkStreamTimeout(streamTimeout)) {
-        LOG.info("closing stream for fileId:" + fileId);
-        cleanup();
-        flag = true;
+    // Check the stream timeout
+    if (checkStreamTimeout(streamTimeout)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("closing stream for fileId:" + fileId);
       }
-    } finally {
-      unlockCtx();
+      cleanup();
+      flag = true;
     }
     return flag;
   }
   
-  // Invoked by AsynDataService to do the write back
-  public void executeWriteBack() {
-    long nextOffset;
-    OffsetRange key;
-    WriteCtx writeCtx;
-
-    try {
-      // Don't lock OpenFileCtx for all writes to reduce the timeout of other
-      // client request to the same file
-      while (true) {
-        lockCtx();
-        if (!asyncStatus) {
-          // This should never happen. There should be only one thread working
-          // on one OpenFileCtx anytime.
-          LOG.fatal("The openFileCtx has false async status");
-          throw new RuntimeException("The openFileCtx has false async status");
-        }
-        // Any single write failure can change activeState to false, so do the
-        // check each loop.
-        if (pendingWrites.isEmpty()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("The asyn write task has no pendding writes, fileId: "
-                + latestAttr.getFileId());
-          }
-          break;
+  /**
+   * Get (and remove) the next WriteCtx from {@link #pendingWrites} if possible.
+   * 
+   * @return Null if {@link #pendingWrites} is null, or the next WriteCtx's
+   *         offset is larger than nextOffSet.
+   */
+  private synchronized WriteCtx offerNextToWrite() {
+    if (pendingWrites.isEmpty()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("The asyn write task has no pending writes, fileId: "
+            + latestAttr.getFileId());
+      }
+      // process pending commit again to handle this race: a commit is added
+      // to pendingCommits map just after the last doSingleWrite returns.
+      // There is no pending write and the commit should be handled by the
+      // last doSingleWrite. Due to the race, the commit is left along and
+      // can't be processed until cleanup. Therefore, we should do another
+      // processCommits to fix the race issue.
+      processCommits(nextOffset.get()); // nextOffset has same value as
+                                        // flushedOffset
+      this.asyncStatus = false;
+      return null;
+    } 
+    
+      Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry();
+      OffsetRange range = lastEntry.getKey();
+      WriteCtx toWrite = lastEntry.getValue();
+      
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("range.getMin()=" + range.getMin() + " nextOffset="
+            + nextOffset);
+      }
+      
+      long offset = nextOffset.get();
+      if (range.getMin() > offset) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("The next sequencial write has not arrived yet");
         }
-        if (!activeState) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("The openFileCtx is not active anymore, fileId: "
-                + latestAttr.getFileId());
-          }
-          break;
+        processCommits(nextOffset.get()); // handle race
+        this.asyncStatus = false;
+      } else if (range.getMin() < offset && range.getMax() > offset) {
+        // shouldn't happen since we do sync for overlapped concurrent writers
+        LOG.warn("Got a overlapping write (" + range.getMin() + ","
+            + range.getMax() + "), nextOffset=" + offset
+            + ". Silently drop it now");
+        pendingWrites.remove(range);
+        processCommits(nextOffset.get()); // handle race
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax()
+              + ") from the list");
         }
-
-        // Get the next sequential write
-        nextOffset = getNextOffsetUnprotected();
-        key = pendingWrites.firstKey();
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("key.getMin()=" + key.getMin() + " nextOffset="
-              + nextOffset);
+        // after writing, remove the WriteCtx from cache 
+        pendingWrites.remove(range);
+        // update nextOffset
+        nextOffset.addAndGet(toWrite.getCount());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Change nextOffset to " + nextOffset.get());
         }
-
-        if (key.getMin() > nextOffset) {
-          if (LOG.isDebugEnabled()) {
-            LOG.info("The next sequencial write has not arrived yet");
-          }
-          break;
-
-        } else if (key.getMin() < nextOffset && key.getMax() > nextOffset) {
-          // Can't handle overlapping write. Didn't see it in tests yet.
-          LOG.fatal("Got a overlapping write (" + key.getMin() + ","
-              + key.getMax() + "), nextOffset=" + nextOffset);
-          throw new RuntimeException("Got a overlapping write (" + key.getMin()
-              + "," + key.getMax() + "), nextOffset=" + nextOffset);
-
-        } else {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Remove write(" + key.getMin() + "-" + key.getMax()
-                + ") from the list");
-          }
-          writeCtx = pendingWrites.remove(key);
+        return toWrite;
+      }
+    
+    return null;
+  }
+  
+  /** Invoked by AsynDataService to write back to HDFS */
+  void executeWriteBack() {
+    Preconditions.checkState(asyncStatus,
+        "The openFileCtx has false async status");
+    try {
+      while (activeState) {
+        WriteCtx toWrite = offerNextToWrite();
+        if (toWrite != null) {
           // Do the write
-          doSingleWrite(writeCtx);
+          doSingleWrite(toWrite);
           updateLastAccessTime();
+        } else {
+          break;
         }
-        
-        unlockCtx();
       }
-
+      
+      if (!activeState && LOG.isDebugEnabled()) {
+        LOG.debug("The openFileCtx is not active anymore, fileId: "
+            + latestAttr.getFileId());
+      }
     } finally {
-      // Always reset the async status so another async task can be created
-      // for this file
+      // make sure we reset asyncStatus to false
       asyncStatus = false;
-      if (ctxLock.isHeldByCurrentThread()) {
-        unlockCtx();
-      }
     }
   }
 
+  private void processCommits(long offset) {
+    Preconditions.checkState(offset > 0);
+    long flushedOffset = getFlushedOffset();
+    Entry<Long, CommitCtx> entry = pendingCommits.firstEntry();
+
+    if (entry == null || entry.getValue().offset > flushedOffset) {
+      return;
+    }
+
+    // Now do sync for the ready commits
+    int status = Nfs3Status.NFS3ERR_IO;
+    try {
+      // Sync file data and length
+      fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
+      status = Nfs3Status.NFS3_OK;
+    } catch (ClosedChannelException cce) {
+      if (!pendingWrites.isEmpty()) {
+        LOG.error("Can't sync for fileId: " + latestAttr.getFileId()
+            + ". Channel closed with writes pending");
+      }
+      status = Nfs3Status.NFS3ERR_IO;
+    } catch (IOException e) {
+      LOG.error("Got stream error during data sync:" + e);
+      // Do nothing. Stream will be closed eventually by StreamMonitor.
+      status = Nfs3Status.NFS3ERR_IO;
+    }
+
+    // Update latestAttr
+    try {
+      latestAttr = Nfs3Utils.getFileAttr(client,
+          Nfs3Utils.getFileIdPath(latestAttr.getFileId()), iug);
+    } catch (IOException e) {
+      LOG.error("Can't get new file attr for fileId: " + latestAttr.getFileId());
+      status = Nfs3Status.NFS3ERR_IO;
+    }
+
+    if (latestAttr.getSize() != offset) {
+      LOG.error("After sync, the expect file size: " + offset
+          + ", however actual file size is: " + latestAttr.getSize());
+      status = Nfs3Status.NFS3ERR_IO;
+    }
+    WccData wccData = new WccData(Nfs3Utils.getWccAttr(latestAttr), latestAttr);
+
+    // Send response for the ready commits
+    while (entry != null && entry.getValue().offset <= flushedOffset) {
+      pendingCommits.remove(entry.getKey());
+      CommitCtx commit = entry.getValue();
+
+      COMMIT3Response response = new COMMIT3Response(status, wccData,
+          Nfs3Constant.WRITE_COMMIT_VERF);
+      Nfs3Utils.writeChannelCommit(commit.getChannel(), response
+          .writeHeaderAndResponse(new XDR(), commit.getXid(),
+              new VerifierNone()), commit.getXid());
+      
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("FileId: " + latestAttr.getFileid() + " Service time:"
+            + (System.currentTimeMillis() - commit.getStartTime())
+            + "ms. Sent response for commit:" + commit);
+      }
+      entry = pendingCommits.firstEntry();
+    }
+  }
+  
   private void doSingleWrite(final WriteCtx writeCtx) {
-    assert(ctxLock.isLocked());
     Channel channel = writeCtx.getChannel();
     int xid = writeCtx.getXid();
 
     long offset = writeCtx.getOffset();
     int count = writeCtx.getCount();
     WriteStableHow stableHow = writeCtx.getStableHow();
-    byte[] data = null;
-    try {
-      data = writeCtx.getData();
-    } catch (IOException e1) {
-      LOG.error("Failed to get request data offset:" + offset + " count:"
-          + count + " error:" + e1);
-      // Cleanup everything
-      cleanup();
-      return;
-    }
-    assert (data.length == count);
-
+    
     FileHandle handle = writeCtx.getHandle();
-    LOG.info("do write, fileId: " + handle.getFileId() + " offset: " + offset
-        + " length:" + count + " stableHow:" + stableHow.getValue());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("do write, fileId: " + handle.getFileId() + " offset: "
+          + offset + " length:" + count + " stableHow:" + stableHow.getValue());
+    }
 
     try {
-      fos.write(data, 0, count);
+      // The write is not protected by lock. asyncState is used to make sure
+      // there is one thread doing write back at any time    
+      writeCtx.writeData(fos);
       
       long flushedOffset = getFlushedOffset();
       if (flushedOffset != (offset + count)) {
@@ -695,27 +989,47 @@ class OpenFileCtx {
             + flushedOffset + " and nextOffset should be"
             + (offset + count));
       }
-      nextOffset = flushedOffset;
+      
 
       // Reduce memory occupation size if request was allowed dumped
-      if (writeCtx.getDataState() == DataState.ALLOW_DUMP) {
-        updateNonSequentialWriteInMemory(-count);
+      if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
+        synchronized (writeCtx) {
+          if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
+            writeCtx.setDataState(WriteCtx.DataState.NO_DUMP);
+            updateNonSequentialWriteInMemory(-count);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("After writing " + handle.getFileId() + " at offset "
+                  + offset + ", updated the memory count, new value:"
+                  + nonSequentialWriteInMemory.get());
+            }
+          }
+        }
       }
       
       if (!writeCtx.getReplied()) {
         WccAttr preOpAttr = latestAttr.getWccAttr();
         WccData fileWcc = new WccData(preOpAttr, latestAttr);
+        if (writeCtx.getOriginalCount() != WriteCtx.INVALID_ORIGINAL_COUNT) {
+          LOG.warn("Return original count:" + writeCtx.getOriginalCount()
+              + " instead of real data count:" + count);
+          count = writeCtx.getOriginalCount();
+        }
         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
             fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+        Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
+            new XDR(), xid, new VerifierNone()), xid);
       }
-
+      
+      // Handle the waiting commits without holding any lock
+      processCommits(writeCtx.getOffset() + writeCtx.getCount());
+     
     } catch (IOException e) {
       LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
-          + offset + " and length " + data.length, e);
+          + offset + " and length " + count, e);
       if (!writeCtx.getReplied()) {
         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+        Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
+            new XDR(), xid, new VerifierNone()), xid);
         // Keep stream open. Either client retries or SteamMonitor closes it.
       }
 
@@ -725,9 +1039,21 @@ class OpenFileCtx {
     }
   }
 
-  private void cleanup() {
-    assert(ctxLock.isLocked());
+  private synchronized void cleanup() {
+    if (!activeState) {
+      LOG.info("Current OpenFileCtx is already inactive, no need to cleanup.");
+      return;
+    }
     activeState = false;
+
+    // stop the dump thread
+    if (dumpThread != null) {
+      dumpThread.interrupt();
+      try {
+        dumpThread.join(3000);
+      } catch (InterruptedException e) {
+      }
+    }
     
     // Close stream
     try {
@@ -745,36 +1071,62 @@ class OpenFileCtx {
     while (!pendingWrites.isEmpty()) {
       OffsetRange key = pendingWrites.firstKey();
       LOG.info("Fail pending write: (" + key.getMin() + "," + key.getMax()
-          + "), nextOffset=" + getNextOffsetUnprotected());
+          + "), nextOffset=" + nextOffset.get());
       
       WriteCtx writeCtx = pendingWrites.remove(key);
       if (!writeCtx.getReplied()) {
         WccData fileWcc = new WccData(preOpAttr, latestAttr);
         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
             fileWcc, 0, writeCtx.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(writeCtx.getChannel(),
-            response.send(new XDR(), writeCtx.getXid()));
+        Nfs3Utils.writeChannel(writeCtx.getChannel(), response
+            .writeHeaderAndResponse(new XDR(), writeCtx.getXid(),
+                new VerifierNone()), writeCtx.getXid());
       }
     }
     
     // Cleanup dump file
-    if (dumpOut!=null){
+    if (dumpOut != null) {
       try {
         dumpOut.close();
       } catch (IOException e) {
         e.printStackTrace();
       }
+      File dumpFile = new File(dumpFilePath);
+      if (dumpFile.exists() && !dumpFile.delete()) {
+        LOG.error("Failed to delete dumpfile: " + dumpFile);
+      }
     }
-    if (raf!=null) {
+    if (raf != null) {
       try {
         raf.close();
       } catch (IOException e) {
         e.printStackTrace();
       }
     }
-    File dumpFile = new File(dumpFilePath);
-    if (dumpFile.delete()) {
-      LOG.error("Failed to delete dumpfile: "+ dumpFile);
-    }
+  }
+  
+  @VisibleForTesting
+  ConcurrentNavigableMap<OffsetRange, WriteCtx> getPendingWritesForTest(){
+    return pendingWrites;
+  }
+  
+  @VisibleForTesting
+  ConcurrentNavigableMap<Long, CommitCtx> getPendingCommitsForTest(){
+    return pendingCommits;
+  }
+  
+  @VisibleForTesting
+  long getNextOffsetForTest() {
+    return nextOffset.get();
+  }
+  
+  @VisibleForTesting
+  void setNextOffsetForTest(long newValue) {
+    nextOffset.set(newValue);
+  }
+  
+  @VisibleForTesting
+  void setActiveStatusForTest(boolean activeState) {
+    this.activeState = activeState;
   }
 }
\ No newline at end of file