You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by br...@apache.org on 2013/10/09 02:10:48 UTC

svn commit: r1530468 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/

Author: brandonli
Date: Wed Oct  9 00:10:48 2013
New Revision: 1530468

URL: http://svn.apache.org/r1530468
Log:
HDFS-5281. Merging change r1530461 from trunk

Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java?rev=1530468&r1=1530467&r2=1530468&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java Wed Oct  9 00:10:48 2013
@@ -115,6 +115,14 @@ public class Nfs3Utils {
     ChannelBuffer outBuf = XDR.writeMessageTcp(out, true);
     channel.write(outBuf);
   }
+  
+  public static void writeChannelCommit(Channel channel, XDR out, int xid) {
+    if (RpcProgramNfs3.LOG.isDebugEnabled()) {
+      RpcProgramNfs3.LOG.debug("Commit done:" + xid);
+    }
+    ChannelBuffer outBuf = XDR.writeMessageTcp(out, true);
+    channel.write(outBuf);
+  }
 
   private static boolean isSet(int access, int bits) {
     return (access & bits) == bits;

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1530468&r1=1530467&r2=1530468&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Wed Oct  9 00:10:48 2013
@@ -48,6 +48,7 @@ import org.apache.hadoop.nfs.nfs3.Nfs3Co
 import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
 import org.apache.hadoop.nfs.nfs3.Nfs3Status;
 import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
+import org.apache.hadoop.nfs.nfs3.response.COMMIT3Response;
 import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
 import org.apache.hadoop.nfs.nfs3.response.WccAttr;
 import org.apache.hadoop.nfs.nfs3.response.WccData;
@@ -69,12 +70,18 @@ class OpenFileCtx {
   // Pending writes water mark for dump, 1MB
   private static long DUMP_WRITE_WATER_MARK = 1024 * 1024;
 
-  public final static int COMMIT_FINISHED = 0;
-  public final static int COMMIT_WAIT = 1;
-  public final static int COMMIT_INACTIVE_CTX = 2;
-  public final static int COMMIT_INACTIVE_WITH_PENDING_WRITE = 3;
-  public final static int COMMIT_ERROR = 4;
+  static enum COMMIT_STATUS {
+    COMMIT_FINISHED,
+    COMMIT_WAIT,
+    COMMIT_INACTIVE_CTX,
+    COMMIT_INACTIVE_WITH_PENDING_WRITE,
+    COMMIT_ERROR,
+    COMMIT_DO_SYNC;
+  }
 
+  private final DFSClient client;
+  private final IdUserGroup iug;
+  
   // The stream status. False means the stream is closed.
   private volatile boolean activeState;
   // The stream write-back status. True means one thread is doing write back.
@@ -87,11 +94,58 @@ class OpenFileCtx {
   private AtomicLong nextOffset;
   private final HdfsDataOutputStream fos;
   
-  // TODO: make it mutable and update it after each writing back to HDFS
-  private final Nfs3FileAttributes latestAttr;
+  // It's updated after each sync to HDFS
+  private Nfs3FileAttributes latestAttr;
 
   private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites;
   
+  private final ConcurrentNavigableMap<Long, CommitCtx> pendingCommits;
+
+  static class CommitCtx {
+    private final long offset;
+    private final Channel channel;
+    private final int xid;
+    private final Nfs3FileAttributes preOpAttr;
+
+    // Remember time for debug purpose
+    private final long startTime;
+
+    long getOffset() {
+      return offset;
+    }
+
+    Channel getChannel() {
+      return channel;
+    }
+
+    int getXid() {
+      return xid;
+    }
+
+    Nfs3FileAttributes getPreOpAttr() {
+      return preOpAttr;
+    }
+
+    long getStartTime() {
+      return startTime;
+    }
+
+    CommitCtx(long offset, Channel channel, int xid,
+        Nfs3FileAttributes preOpAttr) {
+      this.offset = offset;
+      this.channel = channel;
+      this.xid = xid;
+      this.preOpAttr = preOpAttr;
+      this.startTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public String toString() {
+      return String.format("offset: %d xid: %d startTime: %d", offset, xid,
+          startTime);
+    }
+  }
+  
   // The last write, commit request or write-back event. Updating time to keep
   // output steam alive.
   private long lastAccessTime;
@@ -130,7 +184,7 @@ class OpenFileCtx {
   }
   
   OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
-      String dumpFilePath) {
+      String dumpFilePath, DFSClient client, IdUserGroup iug) {
     this.fos = fos;
     this.latestAttr = latestAttr;
     // We use the ReverseComparatorOnMin as the comparator of the map. In this
@@ -138,6 +192,9 @@ class OpenFileCtx {
     // retrieve the last element to write back to HDFS.
     pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>(
         OffsetRange.ReverseComparatorOnMin);
+    
+    pendingCommits = new ConcurrentSkipListMap<Long, CommitCtx>();
+    
     updateLastAccessTime();
     activeState = true;
     asyncStatus = false;
@@ -153,6 +210,8 @@ class OpenFileCtx {
       assert(nextOffset.get() == this.fos.getPos());
     } catch (IOException e) {}
     dumpThread = null;
+    this.client = client;
+    this.iug = iug;
   }
 
   public Nfs3FileAttributes getLatestAttr() {
@@ -547,19 +606,23 @@ class OpenFileCtx {
         // of reordered writes and won't send more writes until it gets
         // responses of the previous batch. So here send response immediately
         // for unstable non-sequential write
-        if (request.getStableHow() == WriteStableHow.UNSTABLE) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("UNSTABLE write request, send response for offset: "
-                + writeCtx.getOffset());
-          }
-          WccData fileWcc = new WccData(preOpAttr, latestAttr);
-          WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
-              fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
-          Nfs3Utils
-              .writeChannel(channel, response.writeHeaderAndResponse(new XDR(),
-                  xid, new VerifierNone()), xid);
-          writeCtx.setReplied(true);
+        if (stableHow != WriteStableHow.UNSTABLE) {
+          LOG.info("Have to change stable write to unstable write:"
+              + request.getStableHow());
+          stableHow = WriteStableHow.UNSTABLE;
         }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("UNSTABLE write request, send response for offset: "
+              + writeCtx.getOffset());
+        }
+        WccData fileWcc = new WccData(preOpAttr, latestAttr);
+        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
+            fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+        Nfs3Utils
+            .writeChannel(channel, response.writeHeaderAndResponse(new XDR(),
+                xid, new VerifierNone()), xid);
+        writeCtx.setReplied(true);
       }
     }
   }
@@ -637,19 +700,52 @@ class OpenFileCtx {
     return response;
   }
 
+  public COMMIT_STATUS checkCommit(DFSClient dfsClient, long commitOffset,
+      Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
+    // Keep stream active
+    updateLastAccessTime();
+    Preconditions.checkState(commitOffset >= 0);
+
+    COMMIT_STATUS ret = checkCommitInternal(commitOffset, channel, xid,
+        preOpAttr);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Got commit status: " + ret.name());
+    }
+    // Do the sync outside the lock
+    if (ret == COMMIT_STATUS.COMMIT_DO_SYNC) {
+      try {
+        // Sync file data and length
+        fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
+        // Nothing to do for metadata since attr related change is pass-through
+      } catch (ClosedChannelException cce) {
+        if (pendingWrites.isEmpty()) {
+          ret = COMMIT_STATUS.COMMIT_FINISHED;
+        } else {
+          ret = COMMIT_STATUS.COMMIT_ERROR;
+        }
+      } catch (IOException e) {
+        LOG.error("Got stream error during data sync:" + e);
+        // Do nothing. Stream will be closed eventually by StreamMonitor.
+        // status = Nfs3Status.NFS3ERR_IO;
+        ret = COMMIT_STATUS.COMMIT_ERROR;
+      }
+    }
+    return ret;
+  }
+
   /**
    * return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
-   * COMMIT_INACTIVE_CTX, COMMIT_ERROR
+   * COMMIT_INACTIVE_CTX, COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR
    */
-  public int checkCommit(long commitOffset) {
-    return activeState ? checkCommitInternal(commitOffset)
-        : COMMIT_INACTIVE_CTX;
-  }
-  
-  private int checkCommitInternal(long commitOffset) {
-    if (commitOffset == 0) {
-      // Commit whole file
-      commitOffset = nextOffset.get();
+  private synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
+      Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
+    if (!activeState) {
+      if (pendingWrites.isEmpty()) {
+        return COMMIT_STATUS.COMMIT_INACTIVE_CTX;
+      } else {
+        // TODO: return success if already committed
+        return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE;
+      }
     }
 
     long flushed = 0;
@@ -657,39 +753,38 @@ class OpenFileCtx {
       flushed = getFlushedOffset();
     } catch (IOException e) {
       LOG.error("Can't get flushed offset, error:" + e);
-      return COMMIT_ERROR;
+      return COMMIT_STATUS.COMMIT_ERROR;
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
     }
-    if (flushed < commitOffset) {
-      // Keep stream active
-      updateLastAccessTime();
-      return COMMIT_WAIT;
-    }
 
-    int ret = COMMIT_WAIT;
-    try {
-      // Sync file data and length
-      fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
-      // Nothing to do for metadata since attr related change is pass-through
-      ret = COMMIT_FINISHED;
-    } catch (ClosedChannelException cce) { 
-      ret = COMMIT_INACTIVE_CTX;
-      if (pendingWrites.isEmpty()) {
-        ret = COMMIT_INACTIVE_CTX;
+    if (commitOffset > 0) {
+      if (commitOffset > flushed) {
+        CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid,
+            preOpAttr);
+        pendingCommits.put(commitOffset, commitCtx);
+        return COMMIT_STATUS.COMMIT_WAIT;
       } else {
-        ret = COMMIT_INACTIVE_WITH_PENDING_WRITE;
+        return COMMIT_STATUS.COMMIT_DO_SYNC;
       }
-    } catch (IOException e) {
-      LOG.error("Got stream error during data sync:" + e);
-      // Do nothing. Stream will be closed eventually by StreamMonitor.
-      ret = COMMIT_ERROR;
     }
 
-    // Keep stream active
-    updateLastAccessTime();
-    return ret;
+    Entry<OffsetRange, WriteCtx> key = pendingWrites.firstEntry();
+
+    // Commit whole file, commitOffset == 0
+    if (pendingWrites.isEmpty()) {
+      // Note that, there is no guarantee data is synced. TODO: We could still
+      // do a sync here though the output stream might be closed.
+      return COMMIT_STATUS.COMMIT_FINISHED;
+    } else {
+      // Insert commit
+      long maxOffset = key.getKey().getMax() - 1;
+      Preconditions.checkState(maxOffset > 0);
+      CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, preOpAttr);
+      pendingCommits.put(maxOffset, commitCtx);
+      return COMMIT_STATUS.COMMIT_WAIT;
+    }
   }
   
   private void addWrite(WriteCtx writeCtx) {
@@ -734,8 +829,18 @@ class OpenFileCtx {
         LOG.debug("The asyn write task has no pending writes, fileId: "
             + latestAttr.getFileId());
       }
+      // process pending commit again to handle this race: a commit is added
+      // to pendingCommits map just after the last doSingleWrite returns.
+      // There is no pending write and the commit should be handled by the
+      // last doSingleWrite. Due to the race, the commit is left along and
+      // can't be processed until cleanup. Therefore, we should do another
+      // processCommits to fix the race issue.
+      processCommits(nextOffset.get()); // nextOffset has same value as
+                                        // flushedOffset
       this.asyncStatus = false;
-    } else {
+      return null;
+    } 
+    
       Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry();
       OffsetRange range = lastEntry.getKey();
       WriteCtx toWrite = lastEntry.getValue();
@@ -750,6 +855,7 @@ class OpenFileCtx {
         if (LOG.isDebugEnabled()) {
           LOG.debug("The next sequencial write has not arrived yet");
         }
+        processCommits(nextOffset.get()); // handle race
         this.asyncStatus = false;
       } else if (range.getMin() < offset && range.getMax() > offset) {
         // shouldn't happen since we do sync for overlapped concurrent writers
@@ -757,6 +863,7 @@ class OpenFileCtx {
             + range.getMax() + "), nextOffset=" + offset
             + ". Silently drop it now");
         pendingWrites.remove(range);
+        processCommits(nextOffset.get()); // handle race
       } else {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax()
@@ -771,7 +878,7 @@ class OpenFileCtx {
         }
         return toWrite;
       }
-    }
+    
     return null;
   }
   
@@ -793,7 +900,7 @@ class OpenFileCtx {
       
       if (!activeState && LOG.isDebugEnabled()) {
         LOG.debug("The openFileCtx is not active anymore, fileId: "
-            + +latestAttr.getFileId());
+            + latestAttr.getFileId());
       }
     } finally {
       // make sure we reset asyncStatus to false
@@ -801,6 +908,71 @@ class OpenFileCtx {
     }
   }
 
+  private void processCommits(long offset) {
+    Preconditions.checkState(offset > 0);
+    long flushedOffset = 0;
+    Entry<Long, CommitCtx> entry = null;
+
+    int status = Nfs3Status.NFS3ERR_IO;
+    try {
+      flushedOffset = getFlushedOffset();
+      entry = pendingCommits.firstEntry();
+      if (entry == null || entry.getValue().offset > flushedOffset) {
+        return;
+      }
+
+      // Now do sync for the ready commits
+      // Sync file data and length
+      fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
+      status = Nfs3Status.NFS3_OK;
+    } catch (ClosedChannelException cce) {
+      if (!pendingWrites.isEmpty()) {
+        LOG.error("Can't sync for fileId: " + latestAttr.getFileId()
+            + ". Channel closed with writes pending");
+      }
+      status = Nfs3Status.NFS3ERR_IO;
+    } catch (IOException e) {
+      LOG.error("Got stream error during data sync:" + e);
+      // Do nothing. Stream will be closed eventually by StreamMonitor.
+      status = Nfs3Status.NFS3ERR_IO;
+    }
+
+    // Update latestAttr
+    try {
+      latestAttr = Nfs3Utils.getFileAttr(client,
+          Nfs3Utils.getFileIdPath(latestAttr.getFileId()), iug);
+    } catch (IOException e) {
+      LOG.error("Can't get new file attr for fileId: " + latestAttr.getFileId());
+      status = Nfs3Status.NFS3ERR_IO;
+    }
+
+    if (latestAttr.getSize() != offset) {
+      LOG.error("After sync, the expect file size: " + offset
+          + ", however actual file size is: " + latestAttr.getSize());
+      status = Nfs3Status.NFS3ERR_IO;
+    }
+    WccData wccData = new WccData(Nfs3Utils.getWccAttr(latestAttr), latestAttr);
+
+    // Send response for the ready commits
+    while (entry != null && entry.getValue().offset <= flushedOffset) {
+      pendingCommits.remove(entry.getKey());
+      CommitCtx commit = entry.getValue();
+
+      COMMIT3Response response = new COMMIT3Response(status, wccData,
+          Nfs3Constant.WRITE_COMMIT_VERF);
+      Nfs3Utils.writeChannelCommit(commit.getChannel(), response
+          .writeHeaderAndResponse(new XDR(), commit.getXid(),
+              new VerifierNone()), commit.getXid());
+      
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("FileId: " + latestAttr.getFileid() + " Service time:"
+            + (System.currentTimeMillis() - commit.getStartTime())
+            + "ms. Sent response for commit:" + commit);
+      }
+      entry = pendingCommits.firstEntry();
+    }
+  }
+  
   private void doSingleWrite(final WriteCtx writeCtx) {
     Channel channel = writeCtx.getChannel();
     int xid = writeCtx.getXid();
@@ -856,6 +1028,10 @@ class OpenFileCtx {
         Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
             new XDR(), xid, new VerifierNone()), xid);
       }
+      
+      // Handle the waiting commits without holding any lock
+      processCommits(writeCtx.getOffset() + writeCtx.getCount());
+     
     } catch (IOException e) {
       LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
           + offset + " and length " + count, e);
@@ -937,4 +1113,29 @@ class OpenFileCtx {
       }
     }
   }
+  
+  @VisibleForTesting
+  ConcurrentNavigableMap<OffsetRange, WriteCtx> getPendingWritesForTest(){
+    return pendingWrites;
+  }
+  
+  @VisibleForTesting
+  ConcurrentNavigableMap<Long, CommitCtx> getPendingCommitsForTest(){
+    return pendingCommits;
+  }
+  
+  @VisibleForTesting
+  long getNextOffsetForTest() {
+    return nextOffset.get();
+  }
+  
+  @VisibleForTesting
+  void setNextOffsetForTest(long newValue) {
+    nextOffset.set(newValue);
+  }
+  
+  @VisibleForTesting
+  void setActiveStatusForTest(boolean activeState) {
+    this.activeState = activeState;
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1530468&r1=1530467&r2=1530468&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Wed Oct  9 00:10:48 2013
@@ -840,7 +840,7 @@ public class RpcProgramNfs3 extends RpcP
     
     // Add open stream
     OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr, writeDumpDir
-        + "/" + postOpObjAttr.getFileId());
+        + "/" + postOpObjAttr.getFileId(), dfsClient, iug);
     fileHandle = new FileHandle(postOpObjAttr.getFileId());
     writeManager.addOpenFileStream(fileHandle, openFileCtx);
     if (LOG.isDebugEnabled()) {
@@ -1706,8 +1706,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public COMMIT3Response commit(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public COMMIT3Response commit(XDR xdr, Channel channel, int xid,
+      SecurityHandler securityHandler, InetAddress client) {
     COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
@@ -1748,18 +1748,10 @@ public class RpcProgramNfs3 extends RpcP
       long commitOffset = (request.getCount() == 0) ? 0
           : (request.getOffset() + request.getCount());
       
-      int status;
-      if (writeManager.handleCommit(handle, commitOffset)) {
-        status = Nfs3Status.NFS3_OK;
-      } else {
-        status = Nfs3Status.NFS3ERR_IO;
-      }
-      Nfs3FileAttributes postOpAttr = writeManager.getFileAttr(dfsClient,
-          handle, iug);
-      WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr);
-      return new COMMIT3Response(status, fileWcc,
-          Nfs3Constant.WRITE_COMMIT_VERF);
-
+      // Insert commit as an async request
+      writeManager.handleCommit(dfsClient, handle, commitOffset, channel, xid,
+          preOpAttr);
+      return null;
     } catch (IOException e) {
       LOG.warn("Exception ", e);
       Nfs3FileAttributes postOpAttr = null;
@@ -1892,7 +1884,7 @@ public class RpcProgramNfs3 extends RpcP
     } else if (nfsproc3 == NFSPROC3.PATHCONF) {
       response = pathconf(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.COMMIT) {
-      response = commit(xdr, securityHandler, client);
+      response = commit(xdr, channel, xid, securityHandler, client);
     } else {
       // Invalid procedure
       RpcAcceptedReply.getInstance(xid,

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1530468&r1=1530467&r2=1530468&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java Wed Oct  9 00:10:48 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS;
 import org.apache.hadoop.nfs.NfsFileType;
 import org.apache.hadoop.nfs.nfs3.FileHandle;
 import org.apache.hadoop.nfs.nfs3.IdUserGroup;
@@ -36,6 +37,7 @@ import org.apache.hadoop.nfs.nfs3.Nfs3Co
 import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
 import org.apache.hadoop.nfs.nfs3.Nfs3Status;
 import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
+import org.apache.hadoop.nfs.nfs3.response.COMMIT3Response;
 import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
 import org.apache.hadoop.nfs.nfs3.response.WccData;
 import org.apache.hadoop.oncrpc.XDR;
@@ -166,7 +168,7 @@ public class WriteManager {
       String writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY,
           Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
       openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
-          + fileHandle.getFileId());
+          + fileHandle.getFileId(), dfsClient, iug);
       addOpenFileStream(fileHandle, openFileCtx);
       if (LOG.isDebugEnabled()) {
         LOG.debug("opened stream for file:" + fileHandle.getFileId());
@@ -176,71 +178,53 @@ public class WriteManager {
     // Add write into the async job queue
     openFileCtx.receivedNewWrite(dfsClient, request, channel, xid,
         asyncDataService, iug);
-    // Block stable write
-    if (request.getStableHow() != WriteStableHow.UNSTABLE) {
-      if (handleCommit(fileHandle, offset + count)) {
-        Nfs3FileAttributes postOpAttr = getFileAttr(dfsClient, handle, iug);
-        WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr),
-            postOpAttr);
-        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
-            fileWcc, count, request.getStableHow(),
-            Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
-            new XDR(), xid, new VerifierNone()), xid);
-      } else {
-        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
-        Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
-            new XDR(), xid, new VerifierNone()), xid);
-      }
-    }
-
     return;
   }
 
-  boolean handleCommit(FileHandle fileHandle, long commitOffset) {
+  void handleCommit(DFSClient dfsClient, FileHandle fileHandle,
+      long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
+    int status;
     OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
+
     if (openFileCtx == null) {
       LOG.info("No opened stream for fileId:" + fileHandle.getFileId()
-          + " commitOffset=" + commitOffset);
-      return true;
-    }
-    long timeout = 30 * 1000; // 30 seconds
-    long startCommit = System.currentTimeMillis();
-    while (true) {
-      int ret = openFileCtx.checkCommit(commitOffset);
-      if (ret == OpenFileCtx.COMMIT_FINISHED) {
-        // Committed
-        return true;
-      } else if (ret == OpenFileCtx.COMMIT_INACTIVE_CTX) {
-        LOG.info("Inactive stream, fileId=" + fileHandle.getFileId()
-            + " commitOffset=" + commitOffset);
-        return true;
-      } else if (ret == OpenFileCtx.COMMIT_INACTIVE_WITH_PENDING_WRITE) {
-        LOG.info("Inactive stream with pending writes, fileId="
-            + fileHandle.getFileId() + " commitOffset=" + commitOffset);
-        return false;
-      }
-      assert (ret == OpenFileCtx.COMMIT_WAIT || ret == OpenFileCtx.COMMIT_ERROR);
-      if (ret == OpenFileCtx.COMMIT_ERROR) {
-        return false;
-      }
+          + " commitOffset=" + commitOffset + ". Return success in this case.");
+      status = Nfs3Status.NFS3_OK;
       
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Not committed yet, wait., fileId=" + fileHandle.getFileId()
-            + " commitOffset=" + commitOffset);
-      }
-      if (System.currentTimeMillis() - startCommit > timeout) {
-        // Commit took too long, return error
-        return false;
-      }
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException e) {
-        LOG.info("Commit is interrupted, fileId=" + fileHandle.getFileId()
-            + " commitOffset=" + commitOffset);
-        return false;
+    } else {
+      COMMIT_STATUS ret = openFileCtx.checkCommit(dfsClient, commitOffset,
+          channel, xid, preOpAttr);
+      switch (ret) {
+      case COMMIT_FINISHED:
+      case COMMIT_INACTIVE_CTX:
+        status = Nfs3Status.NFS3_OK;
+        break;
+      case COMMIT_INACTIVE_WITH_PENDING_WRITE:
+      case COMMIT_ERROR:
+        status = Nfs3Status.NFS3ERR_IO;
+        break;
+      case COMMIT_WAIT:
+        // Do nothing. Commit is async now.
+        return;
+      default:
+        throw new RuntimeException("Wring error code:" + ret.name());
       }
-    }// while
+    }
+    
+    // Send out the response
+    Nfs3FileAttributes postOpAttr = null;
+    try {
+      String fileIdPath = Nfs3Utils.getFileIdPath(preOpAttr.getFileid());
+      postOpAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
+    } catch (IOException e1) {
+      LOG.info("Can't get postOpAttr for fileId: " + preOpAttr.getFileid());
+    }
+    WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr);
+    COMMIT3Response response = new COMMIT3Response(status, fileWcc,
+        Nfs3Constant.WRITE_COMMIT_VERF);
+    Nfs3Utils.writeChannelCommit(channel,
+        response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
+        xid);
   }
 
   /**

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java?rev=1530468&r1=1530467&r2=1530468&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java Wed Oct  9 00:10:48 2013
@@ -19,13 +19,23 @@ package org.apache.hadoop.hdfs.nfs.nfs3;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentNavigableMap;
 
 import junit.framework.Assert;
 
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS;
+import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.CommitCtx;
 import org.apache.hadoop.nfs.nfs3.FileHandle;
+import org.apache.hadoop.nfs.nfs3.IdUserGroup;
 import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
+import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
 import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
+import org.jboss.netty.channel.Channel;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class TestWrites {
   @Test
@@ -97,4 +107,61 @@ public class TestWrites {
     Assert.assertTrue(limit - position == 1);
     Assert.assertTrue(appendedData.get(position) == (byte) 19);
   }
+  
+  @Test
+  // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
+  // includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
+  // COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
+  public void testCheckCommit() throws IOException {
+    DFSClient dfsClient = Mockito.mock(DFSClient.class);
+    Nfs3FileAttributes attr = new Nfs3FileAttributes();
+    HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
+    Mockito.when(fos.getPos()).thenReturn((long) 0);
+
+    OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
+        new IdUserGroup());
+
+    COMMIT_STATUS ret;
+
+    // Test inactive open file context
+    ctx.setActiveStatusForTest(false);
+    ret = ctx.checkCommit(dfsClient, 0, null, 1, attr);
+    Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);
+
+    ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
+        new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
+    ret = ctx.checkCommit(dfsClient, 0, null, 1, attr);
+    Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);
+
+    // Test request with non zero commit offset
+    ctx.setActiveStatusForTest(true);
+    Mockito.when(fos.getPos()).thenReturn((long) 10);
+    ret = ctx.checkCommit(dfsClient, 5, null, 1, attr);
+    Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_DO_SYNC);
+    ret = ctx.checkCommit(dfsClient, 10, null, 1, attr);
+    Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_DO_SYNC);
+
+    ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
+        .getPendingCommitsForTest();
+    Assert.assertTrue(commits.size() == 0);
+    ret = ctx.checkCommit(dfsClient, 11, null, 1, attr);
+    Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
+    Assert.assertTrue(commits.size() == 1);
+    long key = commits.firstKey();
+    Assert.assertTrue(key == 11);
+
+    // Test request with zero commit offset
+    commits.remove(new Long(11));
+    // There is one pending write [5,10]
+    ret = ctx.checkCommit(dfsClient, 0, null, 1, attr);
+    Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
+    Assert.assertTrue(commits.size() == 1);
+    key = commits.firstKey();
+    Assert.assertTrue(key == 9);
+
+    // Empty pending writes
+    ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
+    ret = ctx.checkCommit(dfsClient, 0, null, 1, attr);
+    Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1530468&r1=1530467&r2=1530468&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Oct  9 00:10:48 2013
@@ -110,6 +110,8 @@ Release 2.2.1 - UNRELEASED
     HDFS-5316. Namenode ignores the default https port (Haohui Mai via
     brandonli)
 
+    HDFS-5281. COMMIT request should not block. (brandonli)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES