You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ji...@apache.org on 2013/09/23 22:26:21 UTC

svn commit: r1525688 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/

Author: jing9
Date: Mon Sep 23 20:26:20 2013
New Revision: 1525688

URL: http://svn.apache.org/r1525688
Log:
HDFS-4971. Merge change r1525681 from trunk.

Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java?rev=1525688&r1=1525687&r2=1525688&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java Mon Sep 23 20:26:20 2013
@@ -97,7 +97,7 @@ public class AsyncDataService {
   void writeAsync(OpenFileCtx openFileCtx) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Scheduling write back task for fileId: "
-          + openFileCtx.copyLatestAttr().getFileId());
+          + openFileCtx.getLatestAttr().getFileId());
     }
     WriteBackTask wbTask = new WriteBackTask(openFileCtx);
     execute(wbTask);
@@ -125,7 +125,7 @@ public class AsyncDataService {
     public String toString() {
       // Called in AsyncDataService.execute for displaying error messages.
       return "write back data for fileId"
-          + openFileCtx.copyLatestAttr().getFileId() + " with nextOffset "
+          + openFileCtx.getLatestAttr().getFileId() + " with nextOffset "
           + openFileCtx.getNextOffset();
     }
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java?rev=1525688&r1=1525687&r2=1525688&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java Mon Sep 23 20:26:20 2013
@@ -17,19 +17,34 @@
  */
 package org.apache.hadoop.hdfs.nfs.nfs3;
 
+import java.util.Comparator;
+
+import com.google.common.base.Preconditions;
+
 /**
  * OffsetRange is the range of read/write request. A single point (e.g.,[5,5])
  * is not a valid range.
  */
-public class OffsetRange implements Comparable<OffsetRange> {
+public class OffsetRange {
+  
+  public static final Comparator<OffsetRange> ReverseComparatorOnMin = 
+      new Comparator<OffsetRange>() {
+    @Override
+    public int compare(OffsetRange o1, OffsetRange o2) {
+      if (o1.getMin() == o2.getMin()) {
+        return o1.getMax() < o2.getMax() ? 
+            1 : (o1.getMax() > o2.getMax() ? -1 : 0);
+      } else {
+        return o1.getMin() < o2.getMin() ? 1 : -1;
+      }
+    }
+  };
+  
   private final long min;
   private final long max;
 
   OffsetRange(long min, long max) {
-    if ((min >= max) || (min < 0) || (max < 0)) {
-      throw new IllegalArgumentException("Wrong offset range: (" + min + ","
-          + max + ")");
-    }
+    Preconditions.checkArgument(min >= 0 && max >= 0 && min < max);
     this.min = min;
     this.max = max;
   }
@@ -49,24 +64,10 @@ public class OffsetRange implements Comp
 
   @Override
   public boolean equals(Object o) {
-    assert (o instanceof OffsetRange);
-    OffsetRange range = (OffsetRange) o;
-    return (min == range.getMin()) && (max == range.getMax());
-  }
-
-  private static int compareTo(long left, long right) {
-    if (left < right) {
-      return -1;
-    } else if (left > right) {
-      return 1;
-    } else {
-      return 0;
+    if (o instanceof OffsetRange) {
+      OffsetRange range = (OffsetRange) o;
+      return (min == range.getMin()) && (max == range.getMax());
     }
-  }
-
-  @Override
-  public int compareTo(OffsetRange other) {
-    final int d = compareTo(min, other.getMin());
-    return d != 0 ? d : compareTo(max, other.getMax());
+    return false;
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1525688&r1=1525687&r2=1525688&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Mon Sep 23 20:26:20 2013
@@ -22,12 +22,14 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.channels.ClosedChannelException;
 import java.security.InvalidParameterException;
 import java.util.EnumSet;
 import java.util.Iterator;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -50,8 +52,11 @@ import org.apache.hadoop.nfs.nfs3.respon
 import org.apache.hadoop.nfs.nfs3.response.WccData;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
+import org.apache.hadoop.util.Daemon;
 import org.jboss.netty.channel.Channel;
 
+import com.google.common.base.Preconditions;
+
 /**
  * OpenFileCtx saves the context of one HDFS file output stream. Access to it is
  * synchronized by its member lock.
@@ -59,34 +64,42 @@ import org.jboss.netty.channel.Channel;
 class OpenFileCtx {
   public static final Log LOG = LogFactory.getLog(OpenFileCtx.class);
   
-  /**
-   * Lock to synchronize OpenFileCtx changes. Thread should get this lock before
-   * any read/write operation to an OpenFileCtx object
-   */
-  private final ReentrantLock ctxLock;
+  // Pending writes water mark for dump, 1MB
+  private static long DUMP_WRITE_WATER_MARK = 1024 * 1024;
+
+  public final static int COMMIT_FINISHED = 0;
+  public final static int COMMIT_WAIT = 1;
+  public final static int COMMIT_INACTIVE_CTX = 2;
+  public final static int COMMIT_INACTIVE_WITH_PENDING_WRITE = 3;
+  public final static int COMMIT_ERROR = 4;
 
   // The stream status. False means the stream is closed.
-  private boolean activeState;
+  private volatile boolean activeState;
   // The stream write-back status. True means one thread is doing write back.
-  private boolean asyncStatus;
+  private volatile boolean asyncStatus;
 
+  /**
+   * The current offset of the file in HDFS. All the content before this offset
+   * has been written back to HDFS.
+   */
+  private AtomicLong nextOffset;
   private final HdfsDataOutputStream fos;
+  
+  // TODO: make it mutable and update it after each writing back to HDFS
   private final Nfs3FileAttributes latestAttr;
-  private long nextOffset;
 
-  private final SortedMap<OffsetRange, WriteCtx> pendingWrites;
+  private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites;
   
   // The last write, commit request or write-back event. Updating time to keep
   // output steam alive.
   private long lastAccessTime;
   
-  // Pending writes water mark for dump, 1MB
-  private static int DUMP_WRITE_WATER_MARK = 1024 * 1024; 
+  private volatile boolean enabledDump;
   private FileOutputStream dumpOut;
-  private long nonSequentialWriteInMemory;
-  private boolean enabledDump;
+  private AtomicLong nonSequentialWriteInMemory;
   private RandomAccessFile raf;
   private final String dumpFilePath;
+  private Daemon dumpThread;
   
   private void updateLastAccessTime() {
     lastAccessTime = System.currentTimeMillis();
@@ -96,91 +109,52 @@ class OpenFileCtx {
     return System.currentTimeMillis() - lastAccessTime > streamTimeout;
   }
   
+  public long getNextOffset() {
+    return nextOffset.get();
+  }
+  
   // Increase or decrease the memory occupation of non-sequential writes
   private long updateNonSequentialWriteInMemory(long count) {
-    nonSequentialWriteInMemory += count;
+    long newValue = nonSequentialWriteInMemory.addAndGet(count);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Update nonSequentialWriteInMemory by " + count + " new value:"
-          + nonSequentialWriteInMemory);
+          + newValue);
     }
 
-    if (nonSequentialWriteInMemory < 0) {
-      LOG.error("nonSequentialWriteInMemory is negative after update with count "
-          + count);
-      throw new IllegalArgumentException(
-          "nonSequentialWriteInMemory is negative after update with count "
-              + count);
-    }
-    return nonSequentialWriteInMemory;
+    Preconditions.checkState(newValue >= 0,
+        "nonSequentialWriteInMemory is negative after update with count "
+            + count);
+    return newValue;
   }
   
   OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
       String dumpFilePath) {
     this.fos = fos;
     this.latestAttr = latestAttr;
-    pendingWrites = new TreeMap<OffsetRange, WriteCtx>();
+    // We use the ReverseComparatorOnMin as the comparator of the map. In this
+    // way, we first dump the data with larger offset. In the meanwhile, we
+    // retrieve the last element to write back to HDFS.
+    pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>(
+        OffsetRange.ReverseComparatorOnMin);
     updateLastAccessTime();
     activeState = true;
     asyncStatus = false;
     dumpOut = null;
     raf = null;
-    nonSequentialWriteInMemory = 0;
+    nonSequentialWriteInMemory = new AtomicLong(0);
+  
     this.dumpFilePath = dumpFilePath;  
     enabledDump = dumpFilePath == null ? false: true;
-    nextOffset = latestAttr.getSize();
-    try {
-      assert(nextOffset == this.fos.getPos());
+    nextOffset = new AtomicLong();
+    nextOffset.set(latestAttr.getSize());
+    try {	
+      assert(nextOffset.get() == this.fos.getPos());
     } catch (IOException e) {}
-
-    ctxLock = new ReentrantLock(true);
-  }
-
-  private void lockCtx() {
-    if (LOG.isTraceEnabled()) {
-      StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
-      StackTraceElement e = stacktrace[2];
-      String methodName = e.getMethodName();
-      LOG.trace("lock ctx, caller:" + methodName);
-    }
-    ctxLock.lock();
+    dumpThread = null;
   }
 
-  private void unlockCtx() {
-    ctxLock.unlock();
-    if (LOG.isTraceEnabled()) {
-      StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
-      StackTraceElement e = stacktrace[2];
-      String methodName = e.getMethodName();
-      LOG.info("unlock ctx, caller:" + methodName);
-    }
-  }
-  
-  // Make a copy of the latestAttr
-  public Nfs3FileAttributes copyLatestAttr() {
-    Nfs3FileAttributes ret;
-    lockCtx();
-    try {
-      ret = new Nfs3FileAttributes(latestAttr);
-    } finally {
-      unlockCtx();
-    }
-    return ret;
-  }
-  
-  private long getNextOffsetUnprotected() {
-    assert(ctxLock.isLocked());
-    return nextOffset;
-  }
-
-  public long getNextOffset() {
-    long ret;
-    lockCtx();
-    try {
-      ret = getNextOffsetUnprotected();
-    } finally {
-      unlockCtx();
-    }
-    return ret;
+  public Nfs3FileAttributes getLatestAttr() {
+    return latestAttr;
   }
   
   // Get flushed offset. Note that flushed data may not be persisted.
@@ -189,12 +163,7 @@ class OpenFileCtx {
   }
   
   // Check if need to dump the new writes
-  private void checkDump(long count) {
-    assert (ctxLock.isLocked());
-
-    // Always update the in memory count
-    updateNonSequentialWriteInMemory(count);
-
+  private void checkDump() {
     if (!enabledDump) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Do nothing, dump is disabled.");
@@ -202,66 +171,111 @@ class OpenFileCtx {
       return;
     }
 
-    if (nonSequentialWriteInMemory < DUMP_WRITE_WATER_MARK) {
+    if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
       return;
     }
 
-    // Create dump outputstream for the first time
-    if (dumpOut == null) {
-      LOG.info("Create dump file:" + dumpFilePath);
-      File dumpFile = new File(dumpFilePath);
-      try {
-        if (dumpFile.exists()) {
-          LOG.fatal("The dump file should not exist:" + dumpFilePath);
-          throw new RuntimeException("The dump file should not exist:"
-              + dumpFilePath);
+    // wake up the dumper thread to dump the data
+    synchronized (this) {
+      if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Asking dumper to dump...");
+        }
+        if (dumpThread == null) {
+          dumpThread = new Daemon(new Dumper());
+          dumpThread.start();
+        } else {
+          this.notifyAll();          
         }
-        dumpOut = new FileOutputStream(dumpFile);
-      } catch (IOException e) {
-        LOG.error("Got failure when creating dump stream " + dumpFilePath
-            + " with error:" + e);
-        enabledDump = false;
-        IOUtils.cleanup(LOG, dumpOut);
-        return;
       }
     }
-    // Get raf for the first dump
-    if (raf == null) {
-      try {
-        raf = new RandomAccessFile(dumpFilePath, "r");
-      } catch (FileNotFoundException e) {
-        LOG.error("Can't get random access to file " + dumpFilePath);
-        // Disable dump
-        enabledDump = false;
-        return;
+  }
+
+  class Dumper implements Runnable {
+    /** Dump data into a file */
+    private void dump() {
+      // Create dump outputstream for the first time
+      if (dumpOut == null) {
+        LOG.info("Create dump file:" + dumpFilePath);
+        File dumpFile = new File(dumpFilePath);
+        try {
+          synchronized (this) {
+            // check if alive again
+          Preconditions.checkState(dumpFile.createNewFile(),
+              "The dump file should not exist: %s", dumpFilePath);
+          dumpOut = new FileOutputStream(dumpFile);
+          }
+        } catch (IOException e) {
+          LOG.error("Got failure when creating dump stream " + dumpFilePath, e);
+          enabledDump = false;
+          if (dumpOut != null) {
+            try {
+              dumpOut.close();
+            } catch (IOException e1) {
+              LOG.error("Can't close dump stream " + dumpFilePath, e);
+            }
+          }
+          return;
+        }
       }
-    }
-    
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Start dump, current write number:" + pendingWrites.size());
-    }
-    Iterator<OffsetRange> it = pendingWrites.keySet().iterator();
-    while (it.hasNext()) {
-      OffsetRange key = it.next();
-      WriteCtx writeCtx = pendingWrites.get(key);
-      try {
-        long dumpedDataSize = writeCtx.dumpData(dumpOut, raf);
-        if (dumpedDataSize > 0) {
-          updateNonSequentialWriteInMemory(-dumpedDataSize);
+
+      // Get raf for the first dump
+      if (raf == null) {
+        try {
+          raf = new RandomAccessFile(dumpFilePath, "r");
+        } catch (FileNotFoundException e) {
+          LOG.error("Can't get random access to file " + dumpFilePath);
+          // Disable dump
+          enabledDump = false;
+          return;
         }
-      } catch (IOException e) {
-        LOG.error("Dump data failed:" + writeCtx + " with error:" + e);
-        // Disable dump
-        enabledDump = false;
-        return;
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Start dump. Before dump, nonSequentialWriteInMemory == "
+            + nonSequentialWriteInMemory.get());
+      }
+
+      Iterator<OffsetRange> it = pendingWrites.keySet().iterator();
+      while (activeState && it.hasNext()
+          && nonSequentialWriteInMemory.get() > 0) {
+        OffsetRange key = it.next();
+        WriteCtx writeCtx = pendingWrites.get(key);
+        try {
+          long dumpedDataSize = writeCtx.dumpData(dumpOut, raf);
+          if (dumpedDataSize > 0) {
+            updateNonSequentialWriteInMemory(-dumpedDataSize);
+          }
+        } catch (IOException e) {
+          LOG.error("Dump data failed:" + writeCtx + " with error:" + e
+              + " OpenFileCtx state:" + activeState);
+          // Disable dump
+          enabledDump = false;
+          return;
+        }
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("After dump, nonSequentialWriteInMemory == "
+            + nonSequentialWriteInMemory.get());
       }
     }
-    if (nonSequentialWriteInMemory != 0) {
-      LOG.fatal("After dump, nonSequentialWriteInMemory is not zero: "
-          + nonSequentialWriteInMemory);
-      throw new RuntimeException(
-          "After dump, nonSequentialWriteInMemory is not zero: "
-              + nonSequentialWriteInMemory);
+
+    @Override
+    public void run() {
+      while (activeState && enabledDump) {
+        if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
+          dump();
+        }
+        synchronized (OpenFileCtx.this) {
+          if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
+            try {
+              OpenFileCtx.this.wait();
+            } catch (InterruptedException e) {
+            }
+          }
+        }
+      }
     }
   }
   
@@ -285,148 +299,196 @@ class OpenFileCtx {
   public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request,
       Channel channel, int xid, AsyncDataService asyncDataService,
       IdUserGroup iug) {
-
-    lockCtx();
-    try {
-      if (!activeState) {
-        LOG.info("OpenFileCtx is inactive, fileId:"
-            + request.getHandle().getFileId());
-        WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
-        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
-            fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
-            new XDR(), xid, new VerifierNone()), xid);
-      } else {
-        // Handle repeated write requests(same xid or not).
-        // If already replied, send reply again. If not replied, drop the
-        // repeated request.
-        WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel,
-            xid);
-        if (existantWriteCtx != null) {
-          if (!existantWriteCtx.getReplied()) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Repeated write request which hasn't be served: xid="
-                  + xid + ", drop it.");
-            }
-          } else {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Repeated write request which is already served: xid="
-                  + xid + ", resend response.");
-            }
-            WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
-            WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
-                fileWcc, request.getCount(), request.getStableHow(),
-                Nfs3Constant.WRITE_COMMIT_VERF);
-            Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
-                new XDR(), xid, new VerifierNone()), xid);
+    
+    if (!activeState) {
+      LOG.info("OpenFileCtx is inactive, fileId:"
+          + request.getHandle().getFileId());
+      WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
+      WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
+          fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
+      Nfs3Utils.writeChannel(channel,
+          response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
+          xid);
+    } else {
+      // Update the write time first
+      updateLastAccessTime();
+      
+      // Handle repeated write requests (same xid or not).
+      // If already replied, send reply again. If not replied, drop the
+      // repeated request.
+      WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel,
+          xid);
+      if (existantWriteCtx != null) {
+        if (!existantWriteCtx.getReplied()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Repeated write request which hasn't be served: xid="
+                + xid + ", drop it.");
           }
-          updateLastAccessTime();
-          
         } else {
-          receivedNewWriteInternal(dfsClient, request, channel, xid,
-              asyncDataService, iug);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Repeated write request which is already served: xid="
+                + xid + ", resend response.");
+          }
+          WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
+          WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
+              fileWcc, request.getCount(), request.getStableHow(),
+              Nfs3Constant.WRITE_COMMIT_VERF);
+          Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
+              new XDR(), xid, new VerifierNone()), xid);
         }
+      } else {
+        // not a repeated write request
+        receivedNewWriteInternal(dfsClient, request, channel, xid,
+            asyncDataService, iug);
       }
-
-    } finally {
-      unlockCtx();
     }
   }
 
-  private void receivedNewWriteInternal(DFSClient dfsClient,
-      WRITE3Request request, Channel channel, int xid,
-      AsyncDataService asyncDataService, IdUserGroup iug) {
+  /**
+   * Creates and adds a WriteCtx into the pendingWrites map. This is a
+   * synchronized method to handle concurrent writes.
+   * 
+   * @return A non-null {@link WriteCtx} instance if the incoming write
+   *         request's offset >= nextOffset. Otherwise null.
+   */
+  private synchronized WriteCtx addWritesToCache(WRITE3Request request,
+      Channel channel, int xid) {
     long offset = request.getOffset();
     int count = request.getCount();
-    WriteStableHow stableHow = request.getStableHow();
+    long cachedOffset = nextOffset.get();
 
-    // Get file length, fail non-append call
-    WccAttr preOpAttr = latestAttr.getWccAttr();
     if (LOG.isDebugEnabled()) {
-      LOG.debug("requesed offset=" + offset + " and current filesize="
-          + preOpAttr.getSize());
+      LOG.debug("requesed offset=" + offset + " and current offset="
+          + cachedOffset);
     }
 
-    long nextOffset = getNextOffsetUnprotected();
-    if (offset == nextOffset) {
-      LOG.info("Add to the list, update nextOffset and notify the writer,"
-          + " nextOffset:" + nextOffset);
+    // Fail non-append call
+    if (offset < cachedOffset) {
+      LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
+          + nextOffset + ")");
+      return null;
+    } else {
+      DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP
+          : WriteCtx.DataState.ALLOW_DUMP;
       WriteCtx writeCtx = new WriteCtx(request.getHandle(),
           request.getOffset(), request.getCount(), request.getStableHow(),
-          request.getData().array(), channel, xid, false, DataState.NO_DUMP);
-      addWrite(writeCtx);
-      
-      // Create an async task and change openFileCtx status to indicate async
-      // task pending
+          request.getData().array(), channel, xid, false, dataState);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Add new write to the list with nextOffset " + cachedOffset
+            + " and requesed offset=" + offset);
+      }
+      if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
+        // update the memory size
+        updateNonSequentialWriteInMemory(count);
+      }
+      // check if there is a WriteCtx with the same range in pendingWrites
+      WriteCtx oldWriteCtx = checkRepeatedWriteRequest(request, channel, xid);
+      if (oldWriteCtx == null) {
+        addWrite(writeCtx);
+      } else {
+        LOG.warn("Got a repeated request, same range, with xid:"
+            + writeCtx.getXid());
+      }
+      return writeCtx;
+    }
+  }
+  
+  /** Process an overwrite write request */
+  private void processOverWrite(DFSClient dfsClient, WRITE3Request request,
+      Channel channel, int xid, IdUserGroup iug) {
+    WccData wccData = new WccData(latestAttr.getWccAttr(), null);
+    long offset = request.getOffset();
+    int count = request.getCount();
+    WriteStableHow stableHow = request.getStableHow();
+    WRITE3Response response;
+    long cachedOffset = nextOffset.get();
+    if (offset + count > cachedOffset) {
+      LOG.warn("Haven't noticed any partial overwrite for a sequential file"
+          + " write requests. Treat it as a real random write, no support.");
+      response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
+          WriteStableHow.UNSTABLE, 0);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Process perfectOverWrite");
+      }
+      // TODO: let executor handle perfect overwrite
+      response = processPerfectOverWrite(dfsClient, offset, count, stableHow,
+          request.getData().array(),
+          Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug);
+    }
+    updateLastAccessTime();
+    Nfs3Utils.writeChannel(channel,
+        response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
+        xid);
+  }
+  
+  /**
+   * Check if we can start the write (back to HDFS) now. If there is no hole for
+   * writing, and there is no other threads writing (i.e., asyncStatus is
+   * false), start the writing and set asyncStatus to true.
+   * 
+   * @return True if the new write is sequencial and we can start writing
+   *         (including the case that there is already a thread writing).
+   */
+  private synchronized boolean checkAndStartWrite(
+      AsyncDataService asyncDataService, WriteCtx writeCtx) {
+    
+    if (writeCtx.getOffset() == nextOffset.get()) {
       if (!asyncStatus) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Trigger the write back task. Current nextOffset: "
+              + nextOffset.get());
+        }
         asyncStatus = true;
         asyncDataService.execute(new AsyncDataService.WriteBackTask(this));
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("The write back thread is working.");
+        }
       }
-      
-      // Update the write time first
-      updateLastAccessTime();
-      Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr);
-
-      // Send response immediately for unstable write
-      if (request.getStableHow() == WriteStableHow.UNSTABLE) {
-        WccData fileWcc = new WccData(preOpAttr, postOpAttr);
-        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
-            fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
-            new XDR(), xid, new VerifierNone()), xid);
-        writeCtx.setReplied(true);
-      }
-
-    } else if (offset > nextOffset) {
-      LOG.info("Add new write to the list but not update nextOffset:"
-          + nextOffset);
-      WriteCtx writeCtx = new WriteCtx(request.getHandle(),
-          request.getOffset(), request.getCount(), request.getStableHow(),
-          request.getData().array(), channel, xid, false, DataState.ALLOW_DUMP);
-      addWrite(writeCtx);
+      return true;
+    } else {
+      return false;
+    }
+  }
 
-      // Check if need to dump some pending requests to file
-      checkDump(request.getCount());
-      updateLastAccessTime();
-      Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr);
-      
-      // In test, noticed some Linux client sends a batch (e.g., 1MB)
-      // of reordered writes and won't send more writes until it gets
-      // responses of the previous batch. So here send response immediately for
-      // unstable non-sequential write
-      if (request.getStableHow() == WriteStableHow.UNSTABLE) {
-        WccData fileWcc = new WccData(preOpAttr, postOpAttr);
-        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
-            fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
-            new XDR(), xid, new VerifierNone()), xid);
-        writeCtx.setReplied(true);
-      }
+  private void receivedNewWriteInternal(DFSClient dfsClient,
+      WRITE3Request request, Channel channel, int xid,
+      AsyncDataService asyncDataService, IdUserGroup iug) {
+    WriteStableHow stableHow = request.getStableHow();
+    WccAttr preOpAttr = latestAttr.getWccAttr();
+    int count = request.getCount();
 
-    } else {
+    WriteCtx writeCtx = addWritesToCache(request, channel, xid);
+    if (writeCtx == null) {
       // offset < nextOffset
-      LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
-          + nextOffset + ")");
-      WccData wccData = new WccData(preOpAttr, null);
-      WRITE3Response response;
-
-      if (offset + count > nextOffset) {
-        LOG.warn("Haven't noticed any partial overwrite out of a sequential file"
-            + "write requests, so treat it as a real random write, no support.");
-        response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
-            WriteStableHow.UNSTABLE, 0);
-      } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Process perfectOverWrite");
+      processOverWrite(dfsClient, request, channel, xid, iug);
+    } else {
+      // The writes is added to pendingWrites.
+      // Check and start writing back if necessary
+      boolean startWriting = checkAndStartWrite(asyncDataService, writeCtx);
+      if (!startWriting) {
+        // offset > nextOffset. check if we need to dump data
+        checkDump();
+        
+        // In test, noticed some Linux client sends a batch (e.g., 1MB)
+        // of reordered writes and won't send more writes until it gets
+        // responses of the previous batch. So here send response immediately
+        // for unstable non-sequential write
+        if (request.getStableHow() == WriteStableHow.UNSTABLE) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("UNSTABLE write request, send response for offset: "
+                + writeCtx.getOffset());
+          }
+          WccData fileWcc = new WccData(preOpAttr, latestAttr);
+          WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
+              fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+          Nfs3Utils
+              .writeChannel(channel, response.writeHeaderAndResponse(new XDR(),
+                  xid, new VerifierNone()), xid);
+          writeCtx.setReplied(true);
         }
-        response = processPerfectOverWrite(dfsClient, offset, count, stableHow,
-            request.getData().array(),
-            Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug);
       }
-      
-      updateLastAccessTime();
-      Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
-          new XDR(), xid, new VerifierNone()), xid);
     }
   }
   
@@ -438,7 +500,6 @@ class OpenFileCtx {
   private WRITE3Response processPerfectOverWrite(DFSClient dfsClient,
       long offset, int count, WriteStableHow stableHow, byte[] data,
       String path, WccData wccData, IdUserGroup iug) {
-    assert (ctxLock.isLocked());
     WRITE3Response response = null;
 
     // Read the content back
@@ -449,21 +510,30 @@ class OpenFileCtx {
     try {
       // Sync file data and length to avoid partial read failure
       fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
-      
+    } catch (ClosedChannelException closedException) {
+      LOG.info("The FSDataOutputStream has been closed. " +
+      		"Continue processing the perfect overwrite.");
+    } catch (IOException e) {
+      LOG.info("hsync failed when processing possible perfect overwrite, path="
+          + path + " error:" + e);
+      return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
+          Nfs3Constant.WRITE_COMMIT_VERF);
+    }
+    
+    try {
       fis = new FSDataInputStream(dfsClient.open(path));
       readCount = fis.read(offset, readbuffer, 0, count);
       if (readCount < count) {
         LOG.error("Can't read back " + count + " bytes, partial read size:"
             + readCount);
-        return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0,
-            stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+        return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
+            Nfs3Constant.WRITE_COMMIT_VERF);
       }
-
     } catch (IOException e) {
       LOG.info("Read failed when processing possible perfect overwrite, path="
           + path + " error:" + e);
-      return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0,
-          stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+      return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
+          Nfs3Constant.WRITE_COMMIT_VERF);
     } finally {
       IOUtils.cleanup(LOG, fis);
     }
@@ -494,36 +564,20 @@ class OpenFileCtx {
     }
     return response;
   }
-  
-  public final static int COMMIT_FINISHED = 0;
-  public final static int COMMIT_WAIT = 1;
-  public final static int COMMIT_INACTIVE_CTX = 2;
-  public final static int COMMIT_ERROR = 3;
 
   /**
    * return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
    * COMMIT_INACTIVE_CTX, COMMIT_ERROR
    */
   public int checkCommit(long commitOffset) {
-    int ret = COMMIT_WAIT;
-
-    lockCtx();
-    try {
-      if (!activeState) {
-        ret = COMMIT_INACTIVE_CTX;
-      } else {
-        ret = checkCommitInternal(commitOffset);
-      }
-    } finally {
-      unlockCtx();
-    }
-    return ret;
+    return activeState ? checkCommitInternal(commitOffset)
+        : COMMIT_INACTIVE_CTX;
   }
   
   private int checkCommitInternal(long commitOffset) {
     if (commitOffset == 0) {
       // Commit whole file
-      commitOffset = getNextOffsetUnprotected();
+      commitOffset = nextOffset.get();
     }
 
     long flushed = 0;
@@ -533,7 +587,9 @@ class OpenFileCtx {
       LOG.error("Can't get flushed offset, error:" + e);
       return COMMIT_ERROR;
     }
-    LOG.info("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
+    }
     if (flushed < commitOffset) {
       // Keep stream active
       updateLastAccessTime();
@@ -546,6 +602,13 @@ class OpenFileCtx {
       fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
       // Nothing to do for metadata since attr related change is pass-through
       ret = COMMIT_FINISHED;
+    } catch (ClosedChannelException cce) { 
+      ret = COMMIT_INACTIVE_CTX;
+      if (pendingWrites.isEmpty()) {
+        ret = COMMIT_INACTIVE_CTX;
+      } else {
+        ret = COMMIT_INACTIVE_WITH_PENDING_WRITE;
+      }
     } catch (IOException e) {
       LOG.error("Got stream error during data sync:" + e);
       // Do nothing. Stream will be closed eventually by StreamMonitor.
@@ -558,18 +621,16 @@ class OpenFileCtx {
   }
   
   private void addWrite(WriteCtx writeCtx) {
-    assert (ctxLock.isLocked());
     long offset = writeCtx.getOffset();
     int count = writeCtx.getCount();
     pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
   }
   
-  
   /**
    * Check stream status to decide if it should be closed
    * @return true, remove stream; false, keep stream
    */
-  public boolean streamCleanup(long fileId, long streamTimeout) {
+  public synchronized boolean streamCleanup(long fileId, long streamTimeout) {
     if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) {
       throw new InvalidParameterException("StreamTimeout" + streamTimeout
           + "ms is less than MINIMIUM_STREAM_TIMEOUT "
@@ -577,107 +638,97 @@ class OpenFileCtx {
     }
     
     boolean flag = false;
-    if (!ctxLock.tryLock()) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Another thread is working on it" + ctxLock.toString());
-      }
-      return flag;
-    }
-    
-    try {
-      // Check the stream timeout
-      if (checkStreamTimeout(streamTimeout)) {
-        LOG.info("closing stream for fileId:" + fileId);
-        cleanup();
-        flag = true;
+    // Check the stream timeout
+    if (checkStreamTimeout(streamTimeout)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("closing stream for fileId:" + fileId);
       }
-    } finally {
-      unlockCtx();
+      cleanup();
+      flag = true;
     }
     return flag;
   }
   
-  // Invoked by AsynDataService to do the write back
-  public void executeWriteBack() {
-    long nextOffset;
-    OffsetRange key;
-    WriteCtx writeCtx;
-
-    try {
-      // Don't lock OpenFileCtx for all writes to reduce the timeout of other
-      // client request to the same file
-      while (true) {
-        lockCtx();
-        if (!asyncStatus) {
-          // This should never happen. There should be only one thread working
-          // on one OpenFileCtx anytime.
-          LOG.fatal("The openFileCtx has false async status");
-          throw new RuntimeException("The openFileCtx has false async status");
-        }
-        // Any single write failure can change activeState to false, so do the
-        // check each loop.
-        if (pendingWrites.isEmpty()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("The asyn write task has no pendding writes, fileId: "
-                + latestAttr.getFileId());
-          }
-          break;
+  /**
+   * Get (and remove) the next WriteCtx from {@link #pendingWrites} if possible.
+   * 
+   * @return Null if {@link #pendingWrites} is null, or the next WriteCtx's
+   *         offset is larger than nextOffSet.
+   */
+  private synchronized WriteCtx offerNextToWrite() {
+    if (pendingWrites.isEmpty()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("The asyn write task has no pending writes, fileId: "
+            + latestAttr.getFileId());
+      }
+      this.asyncStatus = false;
+    } else {
+      Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry();
+      OffsetRange range = lastEntry.getKey();
+      WriteCtx toWrite = lastEntry.getValue();
+      
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("range.getMin()=" + range.getMin() + " nextOffset="
+            + nextOffset);
+      }
+      
+      long offset = nextOffset.get();
+      if (range.getMin() > offset) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("The next sequencial write has not arrived yet");
         }
-        if (!activeState) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("The openFileCtx is not active anymore, fileId: "
-                + latestAttr.getFileId());
-          }
-          break;
+        this.asyncStatus = false;
+      } else if (range.getMin() < offset && range.getMax() > offset) {
+        // shouldn't happen since we do sync for overlapped concurrent writers
+        LOG.warn("Got a overlapping write (" + range.getMin() + ","
+            + range.getMax() + "), nextOffset=" + offset
+            + ". Silently drop it now");
+        pendingWrites.remove(range);
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax()
+              + ") from the list");
         }
-
-        // Get the next sequential write
-        nextOffset = getNextOffsetUnprotected();
-        key = pendingWrites.firstKey();
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("key.getMin()=" + key.getMin() + " nextOffset="
-              + nextOffset);
+        // after writing, remove the WriteCtx from cache 
+        pendingWrites.remove(range);
+        // update nextOffset
+        nextOffset.addAndGet(toWrite.getCount());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Change nextOffset to " + nextOffset.get());
         }
-
-        if (key.getMin() > nextOffset) {
-          if (LOG.isDebugEnabled()) {
-            LOG.info("The next sequencial write has not arrived yet");
-          }
-          break;
-
-        } else if (key.getMin() < nextOffset && key.getMax() > nextOffset) {
-          // Can't handle overlapping write. Didn't see it in tests yet.
-          LOG.fatal("Got a overlapping write (" + key.getMin() + ","
-              + key.getMax() + "), nextOffset=" + nextOffset);
-          throw new RuntimeException("Got a overlapping write (" + key.getMin()
-              + "," + key.getMax() + "), nextOffset=" + nextOffset);
-
-        } else {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Remove write(" + key.getMin() + "-" + key.getMax()
-                + ") from the list");
-          }
-          writeCtx = pendingWrites.remove(key);
+        return toWrite;
+      }
+    }
+    return null;
+  }
+  
+  /** Invoked by AsynDataService to write back to HDFS */
+  void executeWriteBack() {
+    Preconditions.checkState(asyncStatus,
+        "The openFileCtx has false async status");
+    try {
+      while (activeState) {
+        WriteCtx toWrite = offerNextToWrite();
+        if (toWrite != null) {
           // Do the write
-          doSingleWrite(writeCtx);
+          doSingleWrite(toWrite);
           updateLastAccessTime();
+        } else {
+          break;
         }
-        
-        unlockCtx();
       }
-
+      
+      if (!activeState && LOG.isDebugEnabled()) {
+        LOG.debug("The openFileCtx is not active anymore, fileId: "
+            + +latestAttr.getFileId());
+      }
     } finally {
-      // Always reset the async status so another async task can be created
-      // for this file
+      // make sure we reset asyncStatus to false
       asyncStatus = false;
-      if (ctxLock.isHeldByCurrentThread()) {
-        unlockCtx();
-      }
     }
   }
 
   private void doSingleWrite(final WriteCtx writeCtx) {
-    assert(ctxLock.isLocked());
     Channel channel = writeCtx.getChannel();
     int xid = writeCtx.getXid();
 
@@ -687,20 +738,25 @@ class OpenFileCtx {
     byte[] data = null;
     try {
       data = writeCtx.getData();
-    } catch (IOException e1) {
+    } catch (Exception e1) {
       LOG.error("Failed to get request data offset:" + offset + " count:"
           + count + " error:" + e1);
       // Cleanup everything
       cleanup();
       return;
     }
-    assert (data.length == count);
+    
+    Preconditions.checkState(data.length == count);
 
     FileHandle handle = writeCtx.getHandle();
-    LOG.info("do write, fileId: " + handle.getFileId() + " offset: " + offset
-        + " length:" + count + " stableHow:" + stableHow.getValue());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("do write, fileId: " + handle.getFileId() + " offset: "
+          + offset + " length:" + count + " stableHow:" + stableHow.getValue());
+    }
 
     try {
+      // The write is not protected by lock. asyncState is used to make sure
+      // there is one thread doing write back at any time
       fos.write(data, 0, count);
       
       long flushedOffset = getFlushedOffset();
@@ -709,11 +765,20 @@ class OpenFileCtx {
             + flushedOffset + " and nextOffset should be"
             + (offset + count));
       }
-      nextOffset = flushedOffset;
+      
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("After writing " + handle.getFileId() + " at offset "
+            + offset + ", update the memory count.");
+      }
 
       // Reduce memory occupation size if request was allowed dumped
-      if (writeCtx.getDataState() == DataState.ALLOW_DUMP) {
-        updateNonSequentialWriteInMemory(-count);
+      if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
+        synchronized (writeCtx) {
+          if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
+            writeCtx.setDataState(WriteCtx.DataState.NO_DUMP);
+            updateNonSequentialWriteInMemory(-count);
+          }
+        }
       }
       
       if (!writeCtx.getReplied()) {
@@ -724,7 +789,6 @@ class OpenFileCtx {
         Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
             new XDR(), xid, new VerifierNone()), xid);
       }
-
     } catch (IOException e) {
       LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
           + offset + " and length " + data.length, e);
@@ -741,9 +805,21 @@ class OpenFileCtx {
     }
   }
 
-  private void cleanup() {
-    assert(ctxLock.isLocked());
+  private synchronized void cleanup() {
+    if (!activeState) {
+      LOG.info("Current OpenFileCtx is already inactive, no need to cleanup.");
+      return;
+    }
     activeState = false;
+
+    // stop the dump thread
+    if (dumpThread != null) {
+      dumpThread.interrupt();
+      try {
+        dumpThread.join(3000);
+      } catch (InterruptedException e) {
+      }
+    }
     
     // Close stream
     try {
@@ -761,7 +837,7 @@ class OpenFileCtx {
     while (!pendingWrites.isEmpty()) {
       OffsetRange key = pendingWrites.firstKey();
       LOG.info("Fail pending write: (" + key.getMin() + "," + key.getMax()
-          + "), nextOffset=" + getNextOffsetUnprotected());
+          + "), nextOffset=" + nextOffset.get());
       
       WriteCtx writeCtx = pendingWrites.remove(key);
       if (!writeCtx.getReplied()) {
@@ -775,23 +851,23 @@ class OpenFileCtx {
     }
     
     // Cleanup dump file
-    if (dumpOut!=null){
+    if (dumpOut != null) {
       try {
         dumpOut.close();
       } catch (IOException e) {
         e.printStackTrace();
       }
+      File dumpFile = new File(dumpFilePath);
+      if (dumpFile.exists() && !dumpFile.delete()) {
+        LOG.error("Failed to delete dumpfile: " + dumpFile);
+      }
     }
-    if (raf!=null) {
+    if (raf != null) {
       try {
         raf.close();
       } catch (IOException e) {
         e.printStackTrace();
       }
     }
-    File dumpFile = new File(dumpFilePath);
-    if (dumpFile.delete()) {
-      LOG.error("Failed to delete dumpfile: "+ dumpFile);
-    }
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java?rev=1525688&r1=1525687&r2=1525688&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java Mon Sep 23 20:26:20 2013
@@ -27,6 +27,8 @@ import org.apache.hadoop.nfs.nfs3.FileHa
 import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
 import org.jboss.netty.channel.Channel;
 
+import com.google.common.base.Preconditions;
+
 /**
  * WriteCtx saves the context of one write request, such as request, channel,
  * xid and reply status.
@@ -49,13 +51,21 @@ class WriteCtx {
   private final long offset;
   private final int count;
   private final WriteStableHow stableHow;
-  private byte[] data;
+  private volatile byte[] data;
   
   private final Channel channel;
   private final int xid;
   private boolean replied;
 
-  private DataState dataState;
+  /** 
+   * Data belonging to the same {@link OpenFileCtx} may be dumped to a file. 
+   * After being dumped to the file, the corresponding {@link WriteCtx} records 
+   * the dump file and the offset.  
+   */
+  private RandomAccessFile raf;
+  private long dumpFileOffset;
+  
+  private volatile DataState dataState;
 
   public DataState getDataState() {
     return dataState;
@@ -64,12 +74,13 @@ class WriteCtx {
   public void setDataState(DataState dataState) {
     this.dataState = dataState;
   }
-
-  private RandomAccessFile raf;
-  private long dumpFileOffset;
   
-  // Return the dumped data size
-  public long dumpData(FileOutputStream dumpOut, RandomAccessFile raf)
+  /** 
+   * Writing the data into a local file. After the writing, if 
+   * {@link #dataState} is still ALLOW_DUMP, set {@link #data} to null and set 
+   * {@link #dataState} to DUMPED.
+   */
+  long dumpData(FileOutputStream dumpOut, RandomAccessFile raf)
       throws IOException {
     if (dataState != DataState.ALLOW_DUMP) {
       if (LOG.isTraceEnabled()) {
@@ -84,48 +95,63 @@ class WriteCtx {
     if (LOG.isDebugEnabled()) {
       LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset);
     }
-    data = null;
-    dataState = DataState.DUMPED;
-    return count;
+    // it is possible that while we dump the data, the data is also being
+    // written back to HDFS. After dump, if the writing back has not finished
+    // yet, we change its flag to DUMPED and set the data to null. Otherwise
+    // this WriteCtx instance should have been removed from the buffer.
+    if (dataState == DataState.ALLOW_DUMP) {
+      synchronized (this) {
+        if (dataState == DataState.ALLOW_DUMP) {
+          data = null;
+          dataState = DataState.DUMPED;
+          return count;
+        }
+      }
+    }
+    return 0;
   }
 
-  public FileHandle getHandle() {
+  FileHandle getHandle() {
     return handle;
   }
   
-  public long getOffset() {
+  long getOffset() {
     return offset;
   }
 
-  public int getCount() {
+  int getCount() {
     return count;
   }
 
-  public WriteStableHow getStableHow() {
+  WriteStableHow getStableHow() {
     return stableHow;
   }
 
-  public byte[] getData() throws IOException {
+  byte[] getData() throws IOException {
     if (dataState != DataState.DUMPED) {
-      if (data == null) {
-        throw new IOException("Data is not dumpted but has null:" + this);
-      }
-    } else {
-      // read back
-      if (data != null) {
-        throw new IOException("Data is dumpted but not null");
-      }
-      data = new byte[count];
-      raf.seek(dumpFileOffset);
-      int size = raf.read(data, 0, count);
-      if (size != count) {
-        throw new IOException("Data count is " + count + ", but read back "
-            + size + "bytes");
+      synchronized (this) {
+        if (dataState != DataState.DUMPED) {
+          Preconditions.checkState(data != null);
+          return data;
+        }
       }
     }
+    // read back from dumped file
+    this.loadData();
     return data;
   }
 
+  private void loadData() throws IOException {
+    Preconditions.checkState(data == null);
+    data = new byte[count];
+    raf.seek(dumpFileOffset);
+    int size = raf.read(data, 0, count);
+    if (size != count) {
+      throw new IOException("Data count is " + count + ", but read back "
+          + size + "bytes");
+    }
+  }
+
   Channel getChannel() {
     return channel;
   }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1525688&r1=1525687&r2=1525688&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java Mon Sep 23 20:26:20 2013
@@ -67,8 +67,8 @@ public class WriteManager {
    */
   private long streamTimeout;
   
-  public static final long DEFAULT_STREAM_TIMEOUT = 10 * 1000; // 10 second
-  public static final long MINIMIUM_STREAM_TIMEOUT = 1 * 1000; // 1 second
+  public static final long DEFAULT_STREAM_TIMEOUT = 10 * 60 * 1000; //10 minutes
+  public static final long MINIMIUM_STREAM_TIMEOUT = 10 * 1000; //10 seconds
   
   void addOpenFileStream(FileHandle h, OpenFileCtx ctx) {
     openFileMap.put(h, ctx);
@@ -215,6 +215,10 @@ public class WriteManager {
         LOG.info("Inactive stream, fileId=" + fileHandle.getFileId()
             + " commitOffset=" + commitOffset);
         return true;
+      } else if (ret == OpenFileCtx.COMMIT_INACTIVE_WITH_PENDING_WRITE) {
+        LOG.info("Inactive stream with pending writes, fileId="
+            + fileHandle.getFileId() + " commitOffset=" + commitOffset);
+        return false;
       }
       assert (ret == OpenFileCtx.COMMIT_WAIT || ret == OpenFileCtx.COMMIT_ERROR);
       if (ret == OpenFileCtx.COMMIT_ERROR) {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java?rev=1525688&r1=1525687&r2=1525688&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java Mon Sep 23 20:26:20 2013
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.nfs.nfs3;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 
@@ -51,8 +52,9 @@ public class TestOffsetRange {
     OffsetRange r3 = new OffsetRange(1, 3);
     OffsetRange r4 = new OffsetRange(3, 4);
 
-    assertTrue(r2.compareTo(r3) == 0);
-    assertTrue(r2.compareTo(r1) == 1);
-    assertTrue(r2.compareTo(r4) == -1);
+    assertEquals(0, OffsetRange.ReverseComparatorOnMin.compare(r2, r3));
+    assertEquals(0, OffsetRange.ReverseComparatorOnMin.compare(r2, r2));
+    assertTrue(OffsetRange.ReverseComparatorOnMin.compare(r2, r1) < 0);
+    assertTrue(OffsetRange.ReverseComparatorOnMin.compare(r2, r4) > 0);
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1525688&r1=1525687&r2=1525688&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Sep 23 20:26:20 2013
@@ -176,6 +176,9 @@ Release 2.1.1-beta - 2013-09-23
     HDFS-5212. Refactor RpcMessage and NFS3Response to support different 
     types of authentication information. (jing9)
 
+    HDFS-4971. Move IO operations out of locking in OpenFileCtx. (brandonli and
+    jing9)
+
   OPTIMIZATIONS
 
   BUG FIXES