You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2013/10/16 23:07:38 UTC

svn commit: r1532910 [1/4] - in /hadoop/common/branches/HDFS-2832/hadoop-hdfs-project: hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/ hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/ hadoop-hdfs-nfs/src/main/java/org/apac...

Author: arp
Date: Wed Oct 16 21:07:28 2013
New Revision: 1532910

URL: http://svn.apache.org/r1532910
Log:
Merging r1527684 through r1532876 from trunk to branch HDFS-2832

Added:
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestReaddir.java
      - copied unchanged from r1532876, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestReaddir.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
      - copied unchanged from r1532876, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/shell/TestHdfsTextCommand.java
      - copied unchanged from r1532876, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/shell/TestHdfsTextCommand.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
      - copied unchanged from r1532876, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSnapshotCommands.java
      - copied unchanged from r1532876, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSnapshotCommands.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterJspHelper.java
      - copied unchanged from r1532876, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterJspHelper.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java
      - copied unchanged from r1532876, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java
Removed:
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/shell/TestTextCommand.java
Modified:
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Server.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs-config.cmd   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.cmd   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.cmd   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeModeException.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StartupProgressServlet.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DisallowedDatanodeException.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/corrupt_files.jsp
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfsnodelist.jsp
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpDelegationToken.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpFileSystem.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestNameNodeHttpServer.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeJsp.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeJspHelper.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStartupProgressServlet.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1527684-1532876

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Server.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Server.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Server.java Wed Oct 16 21:07:28 2013
@@ -418,7 +418,11 @@ public class Server {
       Properties props = new Properties();
       try {
         InputStream is = getResource(DEFAULT_LOG4J_PROPERTIES);
-        props.load(is);
+        try {
+          props.load(is);
+        } finally {
+          is.close();
+        }
       } catch (IOException ex) {
         throw new ServerException(ServerException.ERROR.S03, DEFAULT_LOG4J_PROPERTIES, ex.getMessage(), ex);
       }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java Wed Oct 16 21:07:28 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.nfs.mount
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -38,10 +39,15 @@ import org.apache.hadoop.nfs.nfs3.FileHa
 import org.apache.hadoop.nfs.nfs3.Nfs3Status;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply;
 import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcInfo;
 import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.RpcResponse;
+import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
-import org.jboss.netty.channel.Channel;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
 
 /**
  * RPC program corresponding to mountd daemon. See {@link Mountd}.
@@ -77,7 +83,7 @@ public class RpcProgramMountd extends Rp
       throws IOException {
     // Note that RPC cache is not enabled
     super("mountd", "localhost", config.getInt("nfs3.mountd.port", PORT),
-        PROGRAM, VERSION_1, VERSION_3, 0);
+        PROGRAM, VERSION_1, VERSION_3);
     
     this.hostsMatcher = NfsExports.getInstance(config);
     this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>());
@@ -173,10 +179,16 @@ public class RpcProgramMountd extends Rp
   }
 
   @Override
-  public XDR handleInternal(RpcCall rpcCall, XDR xdr, XDR out,
-      InetAddress client, Channel channel) {
+  public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+    RpcCall rpcCall = (RpcCall) info.header();
     final MNTPROC mntproc = MNTPROC.fromValue(rpcCall.getProcedure());
     int xid = rpcCall.getXid();
+    byte[] data = new byte[info.data().readableBytes()];
+    info.data().readBytes(data);
+    XDR xdr = new XDR(data);
+    XDR out = new XDR();
+    InetAddress client = ((InetSocketAddress) info.remoteAddress()).getAddress();
+
     if (mntproc == MNTPROC.NULL) {
       out = nullOp(out, xid, client);
     } else if (mntproc == MNTPROC.MNT) {
@@ -198,7 +210,9 @@ public class RpcProgramMountd extends Rp
           RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
           out);
     }  
-    return out;
+    ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+    RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+    RpcUtil.sendRpcResponse(ctx, rsp);
   }
   
   @Override

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java Wed Oct 16 21:07:28 2013
@@ -115,6 +115,14 @@ public class Nfs3Utils {
     ChannelBuffer outBuf = XDR.writeMessageTcp(out, true);
     channel.write(outBuf);
   }
+  
+  public static void writeChannelCommit(Channel channel, XDR out, int xid) {
+    if (RpcProgramNfs3.LOG.isDebugEnabled()) {
+      RpcProgramNfs3.LOG.debug("Commit done:" + xid);
+    }
+    ChannelBuffer outBuf = XDR.writeMessageTcp(out, true);
+    channel.write(outBuf);
+  }
 
   private static boolean isSet(int access, int bits) {
     return (access & bits) == bits;

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Wed Oct 16 21:07:28 2013
@@ -22,6 +22,7 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.security.InvalidParameterException;
 import java.util.EnumSet;
@@ -47,6 +48,7 @@ import org.apache.hadoop.nfs.nfs3.Nfs3Co
 import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
 import org.apache.hadoop.nfs.nfs3.Nfs3Status;
 import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
+import org.apache.hadoop.nfs.nfs3.response.COMMIT3Response;
 import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
 import org.apache.hadoop.nfs.nfs3.response.WccAttr;
 import org.apache.hadoop.nfs.nfs3.response.WccData;
@@ -55,6 +57,7 @@ import org.apache.hadoop.oncrpc.security
 import org.apache.hadoop.util.Daemon;
 import org.jboss.netty.channel.Channel;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -67,12 +70,18 @@ class OpenFileCtx {
   // Pending writes water mark for dump, 1MB
   private static long DUMP_WRITE_WATER_MARK = 1024 * 1024;
 
-  public final static int COMMIT_FINISHED = 0;
-  public final static int COMMIT_WAIT = 1;
-  public final static int COMMIT_INACTIVE_CTX = 2;
-  public final static int COMMIT_INACTIVE_WITH_PENDING_WRITE = 3;
-  public final static int COMMIT_ERROR = 4;
+  static enum COMMIT_STATUS {
+    COMMIT_FINISHED,
+    COMMIT_WAIT,
+    COMMIT_INACTIVE_CTX,
+    COMMIT_INACTIVE_WITH_PENDING_WRITE,
+    COMMIT_ERROR,
+    COMMIT_DO_SYNC;
+  }
 
+  private final DFSClient client;
+  private final IdUserGroup iug;
+  
   // The stream status. False means the stream is closed.
   private volatile boolean activeState;
   // The stream write-back status. True means one thread is doing write back.
@@ -85,11 +94,58 @@ class OpenFileCtx {
   private AtomicLong nextOffset;
   private final HdfsDataOutputStream fos;
   
-  // TODO: make it mutable and update it after each writing back to HDFS
-  private final Nfs3FileAttributes latestAttr;
+  // It's updated after each sync to HDFS
+  private Nfs3FileAttributes latestAttr;
 
   private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites;
   
+  private final ConcurrentNavigableMap<Long, CommitCtx> pendingCommits;
+
+  static class CommitCtx {
+    private final long offset;
+    private final Channel channel;
+    private final int xid;
+    private final Nfs3FileAttributes preOpAttr;
+
+    // Remember time for debug purpose
+    private final long startTime;
+
+    long getOffset() {
+      return offset;
+    }
+
+    Channel getChannel() {
+      return channel;
+    }
+
+    int getXid() {
+      return xid;
+    }
+
+    Nfs3FileAttributes getPreOpAttr() {
+      return preOpAttr;
+    }
+
+    long getStartTime() {
+      return startTime;
+    }
+
+    CommitCtx(long offset, Channel channel, int xid,
+        Nfs3FileAttributes preOpAttr) {
+      this.offset = offset;
+      this.channel = channel;
+      this.xid = xid;
+      this.preOpAttr = preOpAttr;
+      this.startTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public String toString() {
+      return String.format("offset: %d xid: %d startTime: %d", offset, xid,
+          startTime);
+    }
+  }
+  
   // The last write, commit request or write-back event. Updating time to keep
   // output steam alive.
   private long lastAccessTime;
@@ -128,7 +184,7 @@ class OpenFileCtx {
   }
   
   OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
-      String dumpFilePath) {
+      String dumpFilePath, DFSClient client, IdUserGroup iug) {
     this.fos = fos;
     this.latestAttr = latestAttr;
     // We use the ReverseComparatorOnMin as the comparator of the map. In this
@@ -136,6 +192,9 @@ class OpenFileCtx {
     // retrieve the last element to write back to HDFS.
     pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>(
         OffsetRange.ReverseComparatorOnMin);
+    
+    pendingCommits = new ConcurrentSkipListMap<Long, CommitCtx>();
+    
     updateLastAccessTime();
     activeState = true;
     asyncStatus = false;
@@ -149,6 +208,8 @@ class OpenFileCtx {
     nextOffset.set(latestAttr.getSize());
     assert(nextOffset.get() == this.fos.getPos());
     dumpThread = null;
+    this.client = client;
+    this.iug = iug;
   }
 
   public Nfs3FileAttributes getLatestAttr() {
@@ -360,6 +421,30 @@ class OpenFileCtx {
     }
   }
 
+  @VisibleForTesting
+  public static void alterWriteRequest(WRITE3Request request, long cachedOffset) {
+    long offset = request.getOffset();
+    int count = request.getCount();
+    long smallerCount = offset + count - cachedOffset;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(String.format("Got overwrite with appended data (%d-%d),"
+          + " current offset %d," + " drop the overlapped section (%d-%d)"
+          + " and append new data (%d-%d).", offset, (offset + count - 1),
+          cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
+              + count - 1)));
+    }
+    
+    ByteBuffer data = request.getData();
+    Preconditions.checkState(data.position() == 0,
+        "The write request data has non-zero position");
+    data.position((int) (cachedOffset - offset));
+    Preconditions.checkState(data.limit() - data.position() == smallerCount,
+        "The write request buffer has wrong limit/position regarding count");
+    
+    request.setOffset(cachedOffset);
+    request.setCount((int) smallerCount);
+  }
+  
   /**
    * Creates and adds a WriteCtx into the pendingWrites map. This is a
    * synchronized method to handle concurrent writes.
@@ -372,12 +457,40 @@ class OpenFileCtx {
     long offset = request.getOffset();
     int count = request.getCount();
     long cachedOffset = nextOffset.get();
-
+    int originalCount = WriteCtx.INVALID_ORIGINAL_COUNT;
+    
     if (LOG.isDebugEnabled()) {
       LOG.debug("requesed offset=" + offset + " and current offset="
           + cachedOffset);
     }
 
+    // Handle a special case first
+    if ((offset < cachedOffset) && (offset + count > cachedOffset)) {
+      // One Linux client behavior: after a file is closed and reopened to
+      // write, the client sometimes combines previous written data(could still
+      // be in kernel buffer) with newly appended data in one write. This is
+      // usually the first write after file reopened. In this
+      // case, we log the event and drop the overlapped section.
+      LOG.warn(String.format("Got overwrite with appended data (%d-%d),"
+          + " current offset %d," + " drop the overlapped section (%d-%d)"
+          + " and append new data (%d-%d).", offset, (offset + count - 1),
+          cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
+              + count - 1)));
+
+      if (!pendingWrites.isEmpty()) {
+        LOG.warn("There are other pending writes, fail this jumbo write");
+        return null;
+      }
+      
+      LOG.warn("Modify this write to write only the appended data");
+      alterWriteRequest(request, cachedOffset);
+
+      // Update local variable
+      originalCount = count;
+      offset = request.getOffset();
+      count = request.getCount();
+    }
+    
     // Fail non-append call
     if (offset < cachedOffset) {
       LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
@@ -387,8 +500,9 @@ class OpenFileCtx {
       DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP
           : WriteCtx.DataState.ALLOW_DUMP;
       WriteCtx writeCtx = new WriteCtx(request.getHandle(),
-          request.getOffset(), request.getCount(), request.getStableHow(),
-          request.getData().array(), channel, xid, false, dataState);
+          request.getOffset(), request.getCount(), originalCount,
+          request.getStableHow(), request.getData(), channel, xid, false,
+          dataState);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Add new write to the list with nextOffset " + cachedOffset
             + " and requesed offset=" + offset);
@@ -419,8 +533,7 @@ class OpenFileCtx {
     WRITE3Response response;
     long cachedOffset = nextOffset.get();
     if (offset + count > cachedOffset) {
-      LOG.warn("Haven't noticed any partial overwrite for a sequential file"
-          + " write requests. Treat it as a real random write, no support.");
+      LOG.warn("Treat this jumbo write as a real random write, no support.");
       response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
           WriteStableHow.UNSTABLE, Nfs3Constant.WRITE_COMMIT_VERF);
     } else {
@@ -491,19 +604,23 @@ class OpenFileCtx {
         // of reordered writes and won't send more writes until it gets
         // responses of the previous batch. So here send response immediately
         // for unstable non-sequential write
-        if (request.getStableHow() == WriteStableHow.UNSTABLE) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("UNSTABLE write request, send response for offset: "
-                + writeCtx.getOffset());
-          }
-          WccData fileWcc = new WccData(preOpAttr, latestAttr);
-          WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
-              fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
-          Nfs3Utils
-              .writeChannel(channel, response.writeHeaderAndResponse(new XDR(),
-                  xid, new VerifierNone()), xid);
-          writeCtx.setReplied(true);
+        if (stableHow != WriteStableHow.UNSTABLE) {
+          LOG.info("Have to change stable write to unstable write:"
+              + request.getStableHow());
+          stableHow = WriteStableHow.UNSTABLE;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("UNSTABLE write request, send response for offset: "
+              + writeCtx.getOffset());
         }
+        WccData fileWcc = new WccData(preOpAttr, latestAttr);
+        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
+            fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+        Nfs3Utils
+            .writeChannel(channel, response.writeHeaderAndResponse(new XDR(),
+                xid, new VerifierNone()), xid);
+        writeCtx.setReplied(true);
       }
     }
   }
@@ -581,58 +698,92 @@ class OpenFileCtx {
     return response;
   }
 
+  public COMMIT_STATUS checkCommit(DFSClient dfsClient, long commitOffset,
+      Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
+    // Keep stream active
+    updateLastAccessTime();
+    Preconditions.checkState(commitOffset >= 0);
+
+    COMMIT_STATUS ret = checkCommitInternal(commitOffset, channel, xid,
+        preOpAttr);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Got commit status: " + ret.name());
+    }
+    // Do the sync outside the lock
+    if (ret == COMMIT_STATUS.COMMIT_DO_SYNC
+        || ret == COMMIT_STATUS.COMMIT_FINISHED) {
+      try {
+        // Sync file data and length
+        fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
+        // Nothing to do for metadata since attr related change is pass-through
+      } catch (ClosedChannelException cce) {
+        if (pendingWrites.isEmpty()) {
+          ret = COMMIT_STATUS.COMMIT_FINISHED;
+        } else {
+          ret = COMMIT_STATUS.COMMIT_ERROR;
+        }
+      } catch (IOException e) {
+        LOG.error("Got stream error during data sync:" + e);
+        // Do nothing. Stream will be closed eventually by StreamMonitor.
+        // status = Nfs3Status.NFS3ERR_IO;
+        ret = COMMIT_STATUS.COMMIT_ERROR;
+      }
+    }
+    return ret;
+  }
+
   /**
    * return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
-   * COMMIT_INACTIVE_CTX, COMMIT_ERROR
+   * COMMIT_INACTIVE_CTX, COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR
    */
-  public int checkCommit(long commitOffset) {
-    return activeState ? checkCommitInternal(commitOffset)
-        : COMMIT_INACTIVE_CTX;
-  }
-  
-  private int checkCommitInternal(long commitOffset) {
-    if (commitOffset == 0) {
-      // Commit whole file
-      commitOffset = nextOffset.get();
+  private synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
+      Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
+    if (!activeState) {
+      if (pendingWrites.isEmpty()) {
+        return COMMIT_STATUS.COMMIT_INACTIVE_CTX;
+      } else {
+        // TODO: return success if already committed
+        return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE;
+      }
     }
 
     long flushed = getFlushedOffset();
     if (LOG.isDebugEnabled()) {
       LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
     }
-    if (flushed < commitOffset) {
-      // Keep stream active
-      updateLastAccessTime();
-      return COMMIT_WAIT;
-    }
 
-    int ret = COMMIT_WAIT;
-    try {
-      // Sync file data and length
-      fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
-      // Nothing to do for metadata since attr related change is pass-through
-      ret = COMMIT_FINISHED;
-    } catch (ClosedChannelException cce) { 
-      ret = COMMIT_INACTIVE_CTX;
-      if (pendingWrites.isEmpty()) {
-        ret = COMMIT_INACTIVE_CTX;
+    if (commitOffset > 0) {
+      if (commitOffset > flushed) {
+        CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid,
+            preOpAttr);
+        pendingCommits.put(commitOffset, commitCtx);
+        return COMMIT_STATUS.COMMIT_WAIT;
       } else {
-        ret = COMMIT_INACTIVE_WITH_PENDING_WRITE;
+        return COMMIT_STATUS.COMMIT_DO_SYNC;
       }
-    } catch (IOException e) {
-      LOG.error("Got stream error during data sync:" + e);
-      // Do nothing. Stream will be closed eventually by StreamMonitor.
-      ret = COMMIT_ERROR;
     }
 
-    // Keep stream active
-    updateLastAccessTime();
-    return ret;
+    Entry<OffsetRange, WriteCtx> key = pendingWrites.firstEntry();
+
+    // Commit whole file, commitOffset == 0
+    if (pendingWrites.isEmpty()) {
+      // Note that, there is no guarantee data is synced. TODO: We could still
+      // do a sync here though the output stream might be closed.
+      return COMMIT_STATUS.COMMIT_FINISHED;
+    } else {
+      // Insert commit
+      long maxOffset = key.getKey().getMax() - 1;
+      Preconditions.checkState(maxOffset > 0);
+      CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, preOpAttr);
+      pendingCommits.put(maxOffset, commitCtx);
+      return COMMIT_STATUS.COMMIT_WAIT;
+    }
   }
   
   private void addWrite(WriteCtx writeCtx) {
     long offset = writeCtx.getOffset();
     int count = writeCtx.getCount();
+    // For the offset range (min, max), min is inclusive, and max is exclusive
     pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
   }
   
@@ -671,8 +822,18 @@ class OpenFileCtx {
         LOG.debug("The asyn write task has no pending writes, fileId: "
             + latestAttr.getFileId());
       }
+      // process pending commit again to handle this race: a commit is added
+      // to pendingCommits map just after the last doSingleWrite returns.
+      // There is no pending write and the commit should be handled by the
+      // last doSingleWrite. Due to the race, the commit is left along and
+      // can't be processed until cleanup. Therefore, we should do another
+      // processCommits to fix the race issue.
+      processCommits(nextOffset.get()); // nextOffset has same value as
+                                        // flushedOffset
       this.asyncStatus = false;
-    } else {
+      return null;
+    } 
+    
       Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry();
       OffsetRange range = lastEntry.getKey();
       WriteCtx toWrite = lastEntry.getValue();
@@ -687,6 +848,7 @@ class OpenFileCtx {
         if (LOG.isDebugEnabled()) {
           LOG.debug("The next sequencial write has not arrived yet");
         }
+        processCommits(nextOffset.get()); // handle race
         this.asyncStatus = false;
       } else if (range.getMin() < offset && range.getMax() > offset) {
         // shouldn't happen since we do sync for overlapped concurrent writers
@@ -694,6 +856,7 @@ class OpenFileCtx {
             + range.getMax() + "), nextOffset=" + offset
             + ". Silently drop it now");
         pendingWrites.remove(range);
+        processCommits(nextOffset.get()); // handle race
       } else {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax()
@@ -708,7 +871,7 @@ class OpenFileCtx {
         }
         return toWrite;
       }
-    }
+    
     return null;
   }
   
@@ -730,7 +893,7 @@ class OpenFileCtx {
       
       if (!activeState && LOG.isDebugEnabled()) {
         LOG.debug("The openFileCtx is not active anymore, fileId: "
-            + +latestAttr.getFileId());
+            + latestAttr.getFileId());
       }
     } finally {
       // make sure we reset asyncStatus to false
@@ -738,6 +901,69 @@ class OpenFileCtx {
     }
   }
 
+  private void processCommits(long offset) {
+    Preconditions.checkState(offset > 0);
+    long flushedOffset = getFlushedOffset();
+    Entry<Long, CommitCtx> entry = pendingCommits.firstEntry();
+
+    if (entry == null || entry.getValue().offset > flushedOffset) {
+      return;
+    }
+
+    // Now do sync for the ready commits
+    int status = Nfs3Status.NFS3ERR_IO;
+    try {
+      // Sync file data and length
+      fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
+      status = Nfs3Status.NFS3_OK;
+    } catch (ClosedChannelException cce) {
+      if (!pendingWrites.isEmpty()) {
+        LOG.error("Can't sync for fileId: " + latestAttr.getFileId()
+            + ". Channel closed with writes pending");
+      }
+      status = Nfs3Status.NFS3ERR_IO;
+    } catch (IOException e) {
+      LOG.error("Got stream error during data sync:" + e);
+      // Do nothing. Stream will be closed eventually by StreamMonitor.
+      status = Nfs3Status.NFS3ERR_IO;
+    }
+
+    // Update latestAttr
+    try {
+      latestAttr = Nfs3Utils.getFileAttr(client,
+          Nfs3Utils.getFileIdPath(latestAttr.getFileId()), iug);
+    } catch (IOException e) {
+      LOG.error("Can't get new file attr for fileId: " + latestAttr.getFileId());
+      status = Nfs3Status.NFS3ERR_IO;
+    }
+
+    if (latestAttr.getSize() != offset) {
+      LOG.error("After sync, the expect file size: " + offset
+          + ", however actual file size is: " + latestAttr.getSize());
+      status = Nfs3Status.NFS3ERR_IO;
+    }
+    WccData wccData = new WccData(Nfs3Utils.getWccAttr(latestAttr), latestAttr);
+
+    // Send response for the ready commits
+    while (entry != null && entry.getValue().offset <= flushedOffset) {
+      pendingCommits.remove(entry.getKey());
+      CommitCtx commit = entry.getValue();
+
+      COMMIT3Response response = new COMMIT3Response(status, wccData,
+          Nfs3Constant.WRITE_COMMIT_VERF);
+      Nfs3Utils.writeChannelCommit(commit.getChannel(), response
+          .writeHeaderAndResponse(new XDR(), commit.getXid(),
+              new VerifierNone()), commit.getXid());
+      
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("FileId: " + latestAttr.getFileid() + " Service time:"
+            + (System.currentTimeMillis() - commit.getStartTime())
+            + "ms. Sent response for commit:" + commit);
+      }
+      entry = pendingCommits.firstEntry();
+    }
+  }
+  
   private void doSingleWrite(final WriteCtx writeCtx) {
     Channel channel = writeCtx.getChannel();
     int xid = writeCtx.getXid();
@@ -745,19 +971,7 @@ class OpenFileCtx {
     long offset = writeCtx.getOffset();
     int count = writeCtx.getCount();
     WriteStableHow stableHow = writeCtx.getStableHow();
-    byte[] data = null;
-    try {
-      data = writeCtx.getData();
-    } catch (Exception e1) {
-      LOG.error("Failed to get request data offset:" + offset + " count:"
-          + count + " error:" + e1);
-      // Cleanup everything
-      cleanup();
-      return;
-    }
     
-    Preconditions.checkState(data.length == count);
-
     FileHandle handle = writeCtx.getHandle();
     if (LOG.isDebugEnabled()) {
       LOG.debug("do write, fileId: " + handle.getFileId() + " offset: "
@@ -766,8 +980,8 @@ class OpenFileCtx {
 
     try {
       // The write is not protected by lock. asyncState is used to make sure
-      // there is one thread doing write back at any time
-      fos.write(data, 0, count);
+      // there is one thread doing write back at any time    
+      writeCtx.writeData(fos);
       
       long flushedOffset = getFlushedOffset();
       if (flushedOffset != (offset + count)) {
@@ -776,10 +990,6 @@ class OpenFileCtx {
             + (offset + count));
       }
       
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("After writing " + handle.getFileId() + " at offset "
-            + offset + ", update the memory count.");
-      }
 
       // Reduce memory occupation size if request was allowed dumped
       if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
@@ -787,6 +997,11 @@ class OpenFileCtx {
           if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
             writeCtx.setDataState(WriteCtx.DataState.NO_DUMP);
             updateNonSequentialWriteInMemory(-count);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("After writing " + handle.getFileId() + " at offset "
+                  + offset + ", updated the memory count, new value:"
+                  + nonSequentialWriteInMemory.get());
+            }
           }
         }
       }
@@ -794,14 +1009,23 @@ class OpenFileCtx {
       if (!writeCtx.getReplied()) {
         WccAttr preOpAttr = latestAttr.getWccAttr();
         WccData fileWcc = new WccData(preOpAttr, latestAttr);
+        if (writeCtx.getOriginalCount() != WriteCtx.INVALID_ORIGINAL_COUNT) {
+          LOG.warn("Return original count:" + writeCtx.getOriginalCount()
+              + " instead of real data count:" + count);
+          count = writeCtx.getOriginalCount();
+        }
         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
             fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
         Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
             new XDR(), xid, new VerifierNone()), xid);
       }
+      
+      // Handle the waiting commits without holding any lock
+      processCommits(writeCtx.getOffset() + writeCtx.getCount());
+     
     } catch (IOException e) {
       LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
-          + offset + " and length " + data.length, e);
+          + offset + " and length " + count, e);
       if (!writeCtx.getReplied()) {
         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
         Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
@@ -880,4 +1104,29 @@ class OpenFileCtx {
       }
     }
   }
+  
+  @VisibleForTesting
+  ConcurrentNavigableMap<OffsetRange, WriteCtx> getPendingWritesForTest(){
+    return pendingWrites;
+  }
+  
+  @VisibleForTesting
+  ConcurrentNavigableMap<Long, CommitCtx> getPendingCommitsForTest(){
+    return pendingCommits;
+  }
+  
+  @VisibleForTesting
+  long getNextOffsetForTest() {
+    return nextOffset.get();
+  }
+  
+  @VisibleForTesting
+  void setNextOffsetForTest(long newValue) {
+    nextOffset.set(newValue);
+  }
+  
+  @VisibleForTesting
+  void setActiveStatusForTest(boolean activeState) {
+    this.activeState = activeState;
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Wed Oct 16 21:07:28 2013
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
 
@@ -29,6 +30,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.FileUtil;
@@ -43,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.nfs.AccessPrivilege;
 import org.apache.hadoop.nfs.NfsExports;
 import org.apache.hadoop.nfs.NfsFileType;
@@ -103,9 +106,13 @@ import org.apache.hadoop.nfs.nfs3.respon
 import org.apache.hadoop.nfs.nfs3.response.WccData;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply;
 import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcCallCache;
 import org.apache.hadoop.oncrpc.RpcDeniedReply;
+import org.apache.hadoop.oncrpc.RpcInfo;
 import org.apache.hadoop.oncrpc.RpcProgram;
 import org.apache.hadoop.oncrpc.RpcReply;
+import org.apache.hadoop.oncrpc.RpcResponse;
+import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.Credentials;
 import org.apache.hadoop.oncrpc.security.CredentialsSys;
@@ -115,7 +122,10 @@ import org.apache.hadoop.oncrpc.security
 import org.apache.hadoop.oncrpc.security.Verifier;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.apache.hadoop.security.AccessControlException;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
 
 /**
  * RPC program corresponding to nfs daemon. See {@link Nfs3}.
@@ -150,14 +160,15 @@ public class RpcProgramNfs3 extends RpcP
   private Statistics statistics;
   private String writeDumpDir; // The dir save dump files
   
+  private final RpcCallCache rpcCallCache;
+
   public RpcProgramNfs3() throws IOException {
     this(new Configuration());
   }
 
-  public RpcProgramNfs3(Configuration config)
-      throws IOException {
+  public RpcProgramNfs3(Configuration config) throws IOException {
     super("NFS3", "localhost", Nfs3Constant.PORT, Nfs3Constant.PROGRAM,
-        Nfs3Constant.VERSION, Nfs3Constant.VERSION, 100);
+        Nfs3Constant.VERSION, Nfs3Constant.VERSION);
    
     config.set(FsPermission.UMASK_LABEL, "000");
     iug = new IdUserGroup();
@@ -183,6 +194,8 @@ public class RpcProgramNfs3 extends RpcP
     } else {
       clearDirectory(writeDumpDir);
     }
+
+    rpcCallCache = new RpcCallCache("NFS3", 256);
   }
 
   private void clearDirectory(String writeDumpDir) throws IOException {
@@ -213,8 +226,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public GETATTR3Response getattr(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -294,8 +307,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public SETATTR3Response setattr(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
@@ -370,8 +383,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public LOOKUP3Response lookup(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -432,8 +445,8 @@ public class RpcProgramNfs3 extends RpcP
   }
   
   @Override
-  public ACCESS3Response access(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public ACCESS3Response access(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -574,7 +587,6 @@ public class RpcProgramNfs3 extends RpcP
     long offset = request.getOffset();
     int count = request.getCount();
 
-    
     FileHandle handle = request.getHandle();
     if (LOG.isDebugEnabled()) {
       LOG.debug("NFS READ fileId: " + handle.getFileId() + " offset: " + offset
@@ -720,8 +732,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public CREATE3Response create(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
@@ -830,7 +842,7 @@ public class RpcProgramNfs3 extends RpcP
     
     // Add open stream
     OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr, writeDumpDir
-        + "/" + postOpObjAttr.getFileId());
+        + "/" + postOpObjAttr.getFileId(), dfsClient, iug);
     fileHandle = new FileHandle(postOpObjAttr.getFileId());
     writeManager.addOpenFileStream(fileHandle, openFileCtx);
     if (LOG.isDebugEnabled()) {
@@ -973,8 +985,7 @@ public class RpcProgramNfs3 extends RpcP
       }
 
       String fileIdPath = dirFileIdPath + "/" + fileName;
-      HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient,
-          fileIdPath);
+      HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath);
       if (fstat == null) {
         WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
             preOpDirAttr);
@@ -1056,8 +1067,7 @@ public class RpcProgramNfs3 extends RpcP
       }
 
       String fileIdPath = dirFileIdPath + "/" + fileName;
-      HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient,
-          fileIdPath);
+      HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath);
       if (fstat == null) {
         return new RMDIR3Response(Nfs3Status.NFS3ERR_NOENT, errWcc);
       }
@@ -1098,8 +1108,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public RENAME3Response rename(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.get(securityHandler.getUser());
     if (dfsClient == null) {
@@ -1245,13 +1255,37 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
-  public READDIR3Response link(XDR xdr, SecurityHandler securityHandler, InetAddress client) {
+  public READDIR3Response link(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
 
+  /**
+   * Used by readdir and readdirplus to get dirents. It retries the listing if
+   * the startAfter can't be found anymore.
+   */
+  private DirectoryListing listPaths(DFSClient dfsClient, String dirFileIdPath,
+      byte[] startAfter) throws IOException {
+    DirectoryListing dlisting = null;
+    try {
+      dlisting = dfsClient.listPaths(dirFileIdPath, startAfter);
+    } catch (RemoteException e) {
+      IOException io = e.unwrapRemoteException();
+      if (!(io instanceof DirectoryListingStartAfterNotFoundException)) {
+        throw io;
+      }
+      // This happens when startAfter was just deleted
+      LOG.info("Cookie cound't be found: " + new String(startAfter)
+          + ", do listing from beginning");
+      dlisting = dfsClient
+          .listPaths(dirFileIdPath, HdfsFileStatus.EMPTY_NAME);
+    }
+    return dlisting;
+  }
+  
   @Override
-  public READDIR3Response readdir(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1289,7 +1323,7 @@ public class RpcProgramNfs3 extends RpcP
           + cookie + " count: " + count);
     }
 
-    HdfsFileStatus dirStatus;
+    HdfsFileStatus dirStatus = null;
     DirectoryListing dlisting = null;
     Nfs3FileAttributes postOpAttr = null;
     long dotdotFileId = 0;
@@ -1333,8 +1367,8 @@ public class RpcProgramNfs3 extends RpcP
         String inodeIdPath = Nfs3Utils.getFileIdPath(cookie);
         startAfter = inodeIdPath.getBytes();
       }
-      dlisting = dfsClient.listPaths(dirFileIdPath, startAfter);
-
+      
+      dlisting = listPaths(dfsClient, dirFileIdPath, startAfter);
       postOpAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
       if (postOpAttr == null) {
         LOG.error("Can't get path for fileId:" + handle.getFileId());
@@ -1417,11 +1451,15 @@ public class RpcProgramNfs3 extends RpcP
     }
     long dirCount = request.getDirCount();
     if (dirCount <= 0) {
-      LOG.info("Nonpositive count in invalid READDIRPLUS request:" + dirCount);
-      return new READDIRPLUS3Response(Nfs3Status.NFS3_OK);
+      LOG.info("Nonpositive dircount in invalid READDIRPLUS request:" + dirCount);
+      return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_INVAL);
     }
     int maxCount = request.getMaxCount();
-
+    if (maxCount <= 0) {
+      LOG.info("Nonpositive maxcount in invalid READDIRPLUS request:" + maxCount);
+      return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_INVAL);
+    }
+    
     if (LOG.isDebugEnabled()) {
       LOG.debug("NFS READDIRPLUS fileId: " + handle.getFileId() + " cookie: "
           + cookie + " dirCount: " + dirCount + " maxCount: " + maxCount);
@@ -1471,8 +1509,8 @@ public class RpcProgramNfs3 extends RpcP
         String inodeIdPath = Nfs3Utils.getFileIdPath(cookie);
         startAfter = inodeIdPath.getBytes();
       }
-      dlisting = dfsClient.listPaths(dirFileIdPath, startAfter);
-
+      
+      dlisting = listPaths(dfsClient, dirFileIdPath, startAfter);
       postOpDirAttr = Nfs3Utils.getFileAttr(dfsClient, dirFileIdPath, iug);
       if (postOpDirAttr == null) {
         LOG.info("Can't get path for fileId:" + handle.getFileId());
@@ -1540,8 +1578,8 @@ public class RpcProgramNfs3 extends RpcP
   }
   
   @Override
-  public FSSTAT3Response fsstat(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1598,8 +1636,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public FSINFO3Response fsinfo(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1650,8 +1688,8 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public PATHCONF3Response pathconf(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler,
+      InetAddress client) {
     PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK);
     
     if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
@@ -1697,7 +1735,7 @@ public class RpcProgramNfs3 extends RpcP
   }
 
   @Override
-  public COMMIT3Response commit(XDR xdr,
+  public COMMIT3Response commit(XDR xdr, Channel channel, int xid,
       SecurityHandler securityHandler, InetAddress client) {
     COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.get(securityHandler.getUser());
@@ -1739,18 +1777,10 @@ public class RpcProgramNfs3 extends RpcP
       long commitOffset = (request.getCount() == 0) ? 0
           : (request.getOffset() + request.getCount());
       
-      int status;
-      if (writeManager.handleCommit(handle, commitOffset)) {
-        status = Nfs3Status.NFS3_OK;
-      } else {
-        status = Nfs3Status.NFS3ERR_IO;
-      }
-      Nfs3FileAttributes postOpAttr = writeManager.getFileAttr(dfsClient,
-          handle, iug);
-      WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr);
-      return new COMMIT3Response(status, fileWcc,
-          Nfs3Constant.WRITE_COMMIT_VERF);
-
+      // Insert commit as an async request
+      writeManager.handleCommit(dfsClient, handle, commitOffset, channel, xid,
+          preOpAttr);
+      return null;
     } catch (IOException e) {
       LOG.warn("Exception ", e);
       Nfs3FileAttributes postOpAttr = null;
@@ -1776,25 +1806,53 @@ public class RpcProgramNfs3 extends RpcP
   }
   
   @Override
-  public XDR handleInternal(RpcCall rpcCall, final XDR xdr, XDR out,
-      InetAddress client, Channel channel) {
+  public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+    RpcCall rpcCall = (RpcCall) info.header();
     final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure());
     int xid = rpcCall.getXid();
+    byte[] data = new byte[info.data().readableBytes()];
+    info.data().readBytes(data);
+    XDR xdr = new XDR(data);
+    XDR out = new XDR();
+    InetAddress client = ((InetSocketAddress) info.remoteAddress())
+        .getAddress();
+    Channel channel = info.channel();
 
     Credentials credentials = rpcCall.getCredential();
     // Ignore auth only for NFSPROC3_NULL, especially for Linux clients.
     if (nfsproc3 != NFSPROC3.NULL) {
-      if (rpcCall.getCredential().getFlavor() != AuthFlavor.AUTH_SYS
-          && rpcCall.getCredential().getFlavor() != AuthFlavor.RPCSEC_GSS) {
-        LOG.info("Wrong RPC AUTH flavor, "
-            + rpcCall.getCredential().getFlavor()
+      if (credentials.getFlavor() != AuthFlavor.AUTH_SYS
+          && credentials.getFlavor() != AuthFlavor.RPCSEC_GSS) {
+        LOG.info("Wrong RPC AUTH flavor, " + credentials.getFlavor()
             + " is not AUTH_SYS or RPCSEC_GSS.");
         XDR reply = new XDR();
         RpcDeniedReply rdr = new RpcDeniedReply(xid,
             RpcReply.ReplyState.MSG_ACCEPTED,
             RpcDeniedReply.RejectState.AUTH_ERROR, new VerifierNone());
         rdr.write(reply);
-        return reply;
+
+        ChannelBuffer buf = ChannelBuffers.wrappedBuffer(reply.asReadOnlyWrap()
+            .buffer());
+        RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+        RpcUtil.sendRpcResponse(ctx, rsp);
+        return;
+      }
+    }
+
+    if (!isIdempotent(rpcCall)) {
+      RpcCallCache.CacheEntry entry = rpcCallCache.checkOrAddToCache(client,
+          xid);
+      if (entry != null) { // in cache
+        if (entry.isCompleted()) {
+          LOG.info("Sending the cached reply to retransmitted request " + xid);
+          RpcUtil.sendRpcResponse(ctx, entry.getResponse());
+          return;
+        } else { // else request is in progress
+          LOG.info("Retransmitted request, transaction still in progress "
+              + xid);
+          // Ignore the request and do nothing
+          return;
+        }
       }
     }
     
@@ -1855,19 +1913,31 @@ public class RpcProgramNfs3 extends RpcP
     } else if (nfsproc3 == NFSPROC3.PATHCONF) {
       response = pathconf(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.COMMIT) {
-      response = commit(xdr, securityHandler, client);
+      response = commit(xdr, channel, xid, securityHandler, client);
     } else {
       // Invalid procedure
       RpcAcceptedReply.getInstance(xid,
           RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
           out);
     }
-    if (response != null) {
-      // TODO: currently we just return VerifierNone
-      out = response.writeHeaderAndResponse(out, xid, new VerifierNone());
+    if (response == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No sync response, expect an async response for request XID="
+            + rpcCall.getXid());
+      }
+      return;
+    }
+    // TODO: currently we just return VerifierNone
+    out = response.writeHeaderAndResponse(out, xid, new VerifierNone());
+    ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+        .buffer());
+    RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+
+    if (!isIdempotent(rpcCall)) {
+      rpcCallCache.callCompleted(client, xid, rsp);
     }
 
-    return out;
+    RpcUtil.sendRpcResponse(ctx, rsp);
   }
   
   @Override

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java Wed Oct 16 21:07:28 2013
@@ -20,13 +20,16 @@ package org.apache.hadoop.hdfs.nfs.nfs3;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.nfs.nfs3.FileHandle;
 import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
 import org.jboss.netty.channel.Channel;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -50,8 +53,17 @@ class WriteCtx {
   private final FileHandle handle;
   private final long offset;
   private final int count;
+  
+  //Only needed for overlapped write, referring OpenFileCtx.addWritesToCache()  
+  private final int originalCount; 
+  public static final int INVALID_ORIGINAL_COUNT = -1;
+  
+  public int getOriginalCount() {
+    return originalCount;
+  }
+
   private final WriteStableHow stableHow;
-  private volatile byte[] data;
+  private volatile ByteBuffer data;
   
   private final Channel channel;
   private final int xid;
@@ -89,9 +101,13 @@ class WriteCtx {
       }
       return 0;
     }
+
+    // Resized write should not allow dump
+    Preconditions.checkState(originalCount == INVALID_ORIGINAL_COUNT);
+
     this.raf = raf;
     dumpFileOffset = dumpOut.getChannel().position();
-    dumpOut.write(data, 0, count);
+    dumpOut.write(data.array(), 0, count);
     if (LOG.isDebugEnabled()) {
       LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset);
     }
@@ -127,7 +143,8 @@ class WriteCtx {
     return stableHow;
   }
 
-  byte[] getData() throws IOException {
+  @VisibleForTesting
+  ByteBuffer getData() throws IOException {
     if (dataState != DataState.DUMPED) {
       synchronized (this) {
         if (dataState != DataState.DUMPED) {
@@ -143,15 +160,45 @@ class WriteCtx {
 
   private void loadData() throws IOException {
     Preconditions.checkState(data == null);
-    data = new byte[count];
+    byte[] rawData = new byte[count];
     raf.seek(dumpFileOffset);
-    int size = raf.read(data, 0, count);
+    int size = raf.read(rawData, 0, count);
     if (size != count) {
       throw new IOException("Data count is " + count + ", but read back "
           + size + "bytes");
     }
+    data = ByteBuffer.wrap(rawData);
   }
 
+  public void writeData(HdfsDataOutputStream fos) throws IOException {
+    Preconditions.checkState(fos != null);
+
+    ByteBuffer dataBuffer = null;
+    try {
+      dataBuffer = getData();
+    } catch (Exception e1) {
+      LOG.error("Failed to get request data offset:" + offset + " count:"
+          + count + " error:" + e1);
+      throw new IOException("Can't get WriteCtx.data");
+    }
+
+    byte[] data = dataBuffer.array();
+    int position = dataBuffer.position();
+    int limit = dataBuffer.limit();
+    Preconditions.checkState(limit - position == count);
+    // Modified write has a valid original count
+    if (position != 0) {
+      if (limit != getOriginalCount()) {
+        throw new IOException("Modified write has differnt original size."
+            + "buff position:" + position + " buff limit:" + limit + ". "
+            + toString());
+      }
+    }
+    
+    // Now write data
+    fos.write(data, position, count);
+  }
+  
   Channel getChannel() {
     return channel;
   }
@@ -168,11 +215,13 @@ class WriteCtx {
     this.replied = replied;
   }
   
-  WriteCtx(FileHandle handle, long offset, int count, WriteStableHow stableHow,
-      byte[] data, Channel channel, int xid, boolean replied, DataState dataState) {
+  WriteCtx(FileHandle handle, long offset, int count, int originalCount,
+      WriteStableHow stableHow, ByteBuffer data, Channel channel, int xid,
+      boolean replied, DataState dataState) {
     this.handle = handle;
     this.offset = offset;
     this.count = count;
+    this.originalCount = originalCount;
     this.stableHow = stableHow;
     this.data = data;
     this.channel = channel;
@@ -185,7 +234,7 @@ class WriteCtx {
   @Override
   public String toString() {
     return "Id:" + handle.getFileId() + " offset:" + offset + " count:" + count
-        + " stableHow:" + stableHow + " replied:" + replied + " dataState:"
-        + dataState + " xid:" + xid;
+        + " originalCount:" + originalCount + " stableHow:" + stableHow
+        + " replied:" + replied + " dataState:" + dataState + " xid:" + xid;
   }
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java Wed Oct 16 21:07:28 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS;
 import org.apache.hadoop.nfs.NfsFileType;
 import org.apache.hadoop.nfs.nfs3.FileHandle;
 import org.apache.hadoop.nfs.nfs3.IdUserGroup;
@@ -36,6 +37,7 @@ import org.apache.hadoop.nfs.nfs3.Nfs3Co
 import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
 import org.apache.hadoop.nfs.nfs3.Nfs3Status;
 import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
+import org.apache.hadoop.nfs.nfs3.response.COMMIT3Response;
 import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
 import org.apache.hadoop.nfs.nfs3.response.WccData;
 import org.apache.hadoop.oncrpc.XDR;
@@ -166,7 +168,7 @@ public class WriteManager {
       String writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY,
           Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
       openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
-          + fileHandle.getFileId());
+          + fileHandle.getFileId(), dfsClient, iug);
       addOpenFileStream(fileHandle, openFileCtx);
       if (LOG.isDebugEnabled()) {
         LOG.debug("opened stream for file:" + fileHandle.getFileId());
@@ -176,71 +178,55 @@ public class WriteManager {
     // Add write into the async job queue
     openFileCtx.receivedNewWrite(dfsClient, request, channel, xid,
         asyncDataService, iug);
-    // Block stable write
-    if (request.getStableHow() != WriteStableHow.UNSTABLE) {
-      if (handleCommit(fileHandle, offset + count)) {
-        Nfs3FileAttributes postOpAttr = getFileAttr(dfsClient, handle, iug);
-        WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr),
-            postOpAttr);
-        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
-            fileWcc, count, request.getStableHow(),
-            Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
-            new XDR(), xid, new VerifierNone()), xid);
-      } else {
-        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
-        Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
-            new XDR(), xid, new VerifierNone()), xid);
-      }
-    }
-
     return;
   }
 
-  boolean handleCommit(FileHandle fileHandle, long commitOffset) {
+  void handleCommit(DFSClient dfsClient, FileHandle fileHandle,
+      long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
+    int status;
     OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
+
     if (openFileCtx == null) {
       LOG.info("No opened stream for fileId:" + fileHandle.getFileId()
-          + " commitOffset=" + commitOffset);
-      return true;
-    }
-    long timeout = 30 * 1000; // 30 seconds
-    long startCommit = System.currentTimeMillis();
-    while (true) {
-      int ret = openFileCtx.checkCommit(commitOffset);
-      if (ret == OpenFileCtx.COMMIT_FINISHED) {
-        // Committed
-        return true;
-      } else if (ret == OpenFileCtx.COMMIT_INACTIVE_CTX) {
-        LOG.info("Inactive stream, fileId=" + fileHandle.getFileId()
-            + " commitOffset=" + commitOffset);
-        return true;
-      } else if (ret == OpenFileCtx.COMMIT_INACTIVE_WITH_PENDING_WRITE) {
-        LOG.info("Inactive stream with pending writes, fileId="
-            + fileHandle.getFileId() + " commitOffset=" + commitOffset);
-        return false;
-      }
-      assert (ret == OpenFileCtx.COMMIT_WAIT || ret == OpenFileCtx.COMMIT_ERROR);
-      if (ret == OpenFileCtx.COMMIT_ERROR) {
-        return false;
-      }
+          + " commitOffset=" + commitOffset + ". Return success in this case.");
+      status = Nfs3Status.NFS3_OK;
       
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Not committed yet, wait., fileId=" + fileHandle.getFileId()
-            + " commitOffset=" + commitOffset);
-      }
-      if (System.currentTimeMillis() - startCommit > timeout) {
-        // Commit took too long, return error
-        return false;
-      }
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException e) {
-        LOG.info("Commit is interrupted, fileId=" + fileHandle.getFileId()
-            + " commitOffset=" + commitOffset);
-        return false;
+    } else {
+      COMMIT_STATUS ret = openFileCtx.checkCommit(dfsClient, commitOffset,
+          channel, xid, preOpAttr);
+      switch (ret) {
+      case COMMIT_DO_SYNC:
+      case COMMIT_FINISHED:
+      case COMMIT_INACTIVE_CTX:
+        status = Nfs3Status.NFS3_OK;
+        break;
+      case COMMIT_INACTIVE_WITH_PENDING_WRITE:
+      case COMMIT_ERROR:
+        status = Nfs3Status.NFS3ERR_IO;
+        break;
+      case COMMIT_WAIT:
+        // Do nothing. Commit is async now.
+        return;
+      default:
+        throw new RuntimeException("Should not get commit return code:"
+            + ret.name());
       }
-    }// while
+    }
+    
+    // Send out the response
+    Nfs3FileAttributes postOpAttr = null;
+    try {
+      String fileIdPath = Nfs3Utils.getFileIdPath(preOpAttr.getFileid());
+      postOpAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
+    } catch (IOException e1) {
+      LOG.info("Can't get postOpAttr for fileId: " + preOpAttr.getFileid());
+    }
+    WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr);
+    COMMIT3Response response = new COMMIT3Response(status, fileWcc,
+        Nfs3Constant.WRITE_COMMIT_VERF);
+    Nfs3Utils.writeChannelCommit(channel,
+        response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
+        xid);
   }
 
   /**

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Oct 16 21:07:28 2013
@@ -253,8 +253,13 @@ Release 2.3.0 - UNRELEASED
     HDFS-4953. Enable HDFS local reads via mmap.
     (Colin Patrick McCabe via wang).
 
+    HDFS-5342. Provide more information in the FSNamesystem JMX interfaces.
+    (Haohui Mai via jing9)
+ 
   IMPROVEMENTS
 
+    HDFS-5267. Remove volatile from LightWeightHashSet. (Junping Du via llu)
+
     HDFS-4657.  Limit the number of blocks logged by the NN after a block
     report to a configurable value.  (Aaron T. Myers via Colin Patrick
     McCabe)
@@ -262,9 +267,6 @@ Release 2.3.0 - UNRELEASED
     HDFS-4278. Log an ERROR when DFS_BLOCK_ACCESS_TOKEN_ENABLE config is
     disabled but security is turned on. (Kousuke Saruta via harsh)
 
-    HDFS-4817.  Make HDFS advisory caching configurable on a per-file basis.
-    (Colin Patrick McCabe)
-
     HDFS-5004. Add additional JMX bean for NameNode status data
     (Trevor Lorimer via cos)
 
@@ -300,6 +302,23 @@ Release 2.3.0 - UNRELEASED
     HDFS-5260. Merge zero-copy memory-mapped HDFS client reads to trunk and
     branch-2. (cnauroth)
 
+    HDFS-4517. Cover class RemoteBlockReader with unit tests. (Vadim Bondarev
+    and Dennis Y via kihwal)
+
+    HDFS-4512. Cover package org.apache.hadoop.hdfs.server.common with tests.
+    (Vadim Bondarev via kihwal)
+
+    HDFS-4510. Cover classes ClusterJspHelper/NamenodeJspHelper with unit
+    tests. (Andrey Klochkov via kihwal)
+
+    HDFS-5323.  Remove some deadcode in BlockManager (Colin Patrick McCabe)
+
+    HDFS-5338. Add a conf to disable hostname check in datanode registration.
+    (szetszwo)
+
+    HDFS-5130. Add test for snapshot related FsShell and DFSAdmin commands.
+    (Binglin Chang via jing9)
+
   OPTIMIZATIONS
 
     HDFS-5239.  Allow FSNamesystem lock fairness to be configurable (daryn)
@@ -327,7 +346,13 @@ Release 2.3.0 - UNRELEASED
 
     HDFS-5266. ElasticByteBufferPool#Key does not implement equals. (cnauroth)
 
-Release 2.2.0 - UNRELEASED
+    HDFS-5352. Server#initLog() doesn't close InputStream in httpfs. (Ted Yu via
+    jing9)
+
+    HDFS-5283. Under construction blocks only inside snapshots should not be
+    counted in safemode threshhold.  (Vinay via szetszwo)
+
+Release 2.2.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES
 
@@ -339,12 +364,46 @@ Release 2.2.0 - UNRELEASED
 
   BUG FIXES
 
-Release 2.1.2 - UNRELEASED
+    HDFS-5307. Support both HTTP and HTTPS in jsp pages (Haohui Mai via
+    brandonli)
+
+    HDFS-5291. Standby namenode after transition to active goes into safemode.
+    (jing9)
+
+    HDFS-5317. Go back to DFS Home link does not work on datanode webUI
+    (Haohui Mai via brandonli)
+
+    HDFS-5316. Namenode ignores the default https port (Haohui Mai via
+    brandonli)
+
+    HDFS-5281. COMMIT request should not block. (brandonli)
+
+    HDFS-5337. should do hsync for a commit request even there is no pending
+    writes (brandonli)
+
+    HDFS-5335. Hive query failed with possible race in dfs output stream.
+    (Haohui Mai via suresh)
+
+    HDFS-5322. HDFS delegation token not found in cache errors seen on secure HA 
+    clusters. (jing9)
+
+    HDFS-5329. Update FSNamesystem#getListing() to handle inode path in startAfter
+    token. (brandonli)
+
+    HDFS-5330. fix readdir and readdirplus for large directories (brandonli)
+
+Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
 
+    HDFS-4817.  Make HDFS advisory caching configurable on a per-file basis.
+    (Colin Patrick McCabe)
+
+    HDFS-5230. Introduce RpcInfo to decouple XDR classes from the RPC API.
+    (Haohui Mai via brandonli)
+
   IMPROVEMENTS
 
     HDFS-5246. Make Hadoop nfs server port and mount daemon port
@@ -353,6 +412,9 @@ Release 2.1.2 - UNRELEASED
     HDFS-5256. Use guava LoadingCache to implement DFSClientCache. (Haohui Mai
     via brandonli)
 
+    HDFS-5308. Replace HttpConfig#getSchemePrefix with implicit schemes in HDFS 
+    JSP. (Haohui Mai via jing9)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -373,6 +435,27 @@ Release 2.1.2 - UNRELEASED
     HDFS-5265. Namenode fails to start when dfs.https.port is unspecified.
     (Haohui Mai via jing9)
 
+    HDFS-5255. Distcp job fails with hsftp when https is enabled in insecure
+    cluster. (Arpit Agarwal)
+
+    HDFS-5279. Guard against NullPointerException in NameNode JSP pages before
+    initialization of FSNamesystem. (cnauroth)
+
+    HDFS-5289. Race condition in TestRetryCacheWithHA#testCreateSymlink causes
+    spurious test failure. (atm)
+
+    HDFS-5300. FSNameSystem#deleteSnapshot() should not check owner in case of 
+    permissions disabled. (Vinay via jing9)
+
+    HDFS-5306. Datanode https port is not available at the namenode. (Suresh
+    Srinivas via brandonli)
+
+    HDFS-5299. DFS client hangs in updatePipeline RPC when failover happened.
+    (Vinay via jing9)
+
+    HDFS-5259. Support client which combines appended data with old data
+    before sends it to NFS server. (brandonli)
+
 Release 2.1.1-beta - 2013-09-23
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs-config.cmd
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.cmd
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.cmd
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1527684-1532876

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Wed Oct 16 21:07:28 2013
@@ -900,10 +900,15 @@ public class DFSClient implements java.i
     assert dtService != null;
     Token<DelegationTokenIdentifier> token =
       namenode.getDelegationToken(renewer);
-    token.setService(this.dtService);
 
-    LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token));
+    if (token != null) {
+      token.setService(this.dtService);
+      LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token));
+    } else {
+      LOG.info("Cannot get delegation token from " + renewer);
+    }
     return token;
+
   }
 
   /**

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Oct 16 21:07:28 2013
@@ -193,7 +193,10 @@ public class DFSConfigKeys extends Commo
   public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
   public static final String  DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";
   public static final int     DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 1000;
-  
+
+  public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
+  public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;
+
   // Whether to enable datanode's stale state detection and usage for reads
   public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode";
   public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false;

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Wed Oct 16 21:07:28 2013
@@ -38,6 +38,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CanSetDropBehind;
@@ -140,7 +141,7 @@ public class DFSOutputStream extends FSO
   private long bytesCurBlock = 0; // bytes writen in current block
   private int packetSize = 0; // write packet size, not including the header.
   private int chunksPerPacket = 0;
-  private volatile IOException lastException = null;
+  private final AtomicReference<IOException> lastException = new AtomicReference<IOException>();
   private long artificialSlowdown = 0;
   private long lastFlushOffset = 0; // offset when flush was invoked
   //persist blocks on namenode
@@ -810,8 +811,8 @@ public class DFSOutputStream extends FSO
         if (++pipelineRecoveryCount > 5) {
           DFSClient.LOG.warn("Error recovering pipeline for writing " +
               block + ". Already retried 5 times for the same packet.");
-          lastException = new IOException("Failing write. Tried pipeline " +
-              "recovery 5 times without success.");
+          lastException.set(new IOException("Failing write. Tried pipeline " +
+              "recovery 5 times without success."));
           streamerClosed = true;
           return false;
         }
@@ -1002,8 +1003,8 @@ public class DFSOutputStream extends FSO
             }
           }
           if (nodes.length <= 1) {
-            lastException = new IOException("All datanodes " + pipelineMsg
-                + " are bad. Aborting...");
+            lastException.set(new IOException("All datanodes " + pipelineMsg
+                + " are bad. Aborting..."));
             streamerClosed = true;
             return false;
           }
@@ -1018,7 +1019,7 @@ public class DFSOutputStream extends FSO
               newnodes.length-errorIndex);
           nodes = newnodes;
           hasError = false;
-          lastException = null;
+          lastException.set(null);
           errorIndex = -1;
         }
 
@@ -1063,7 +1064,7 @@ public class DFSOutputStream extends FSO
       ExtendedBlock oldBlock = block;
       do {
         hasError = false;
-        lastException = null;
+        lastException.set(null);
         errorIndex = -1;
         success = false;
 
@@ -1278,9 +1279,7 @@ public class DFSOutputStream extends FSO
     }
 
     private void setLastException(IOException e) {
-      if (lastException == null) {
-        lastException = e;
-      }
+      lastException.compareAndSet(null, e);
     }
   }
 
@@ -1312,7 +1311,7 @@ public class DFSOutputStream extends FSO
 
   protected void checkClosed() throws IOException {
     if (closed) {
-      IOException e = lastException;
+      IOException e = lastException.get();
       throw e != null ? e : new ClosedChannelException();
     }
   }
@@ -1468,6 +1467,7 @@ public class DFSOutputStream extends FSO
 
   private void waitAndQueueCurrentPacket() throws IOException {
     synchronized (dataQueue) {
+      try {
       // If queue is full, then wait till we have enough space
       while (!closed && dataQueue.size() + ackQueue.size()  > MAX_PACKETS) {
         try {
@@ -1486,6 +1486,8 @@ public class DFSOutputStream extends FSO
       }
       checkClosed();
       queueCurrentPacket();
+      } catch (ClosedChannelException e) {
+      }
     }
   }
 
@@ -1729,7 +1731,7 @@ public class DFSOutputStream extends FSO
       DFSClient.LOG.warn("Error while syncing", e);
       synchronized (this) {
         if (!closed) {
-          lastException = new IOException("IOException flush:" + e);
+          lastException.set(new IOException("IOException flush:" + e));
           closeThreads(true);
         }
       }
@@ -1787,21 +1789,25 @@ public class DFSOutputStream extends FSO
     if (DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug("Waiting for ack for: " + seqno);
     }
-    synchronized (dataQueue) {
-      while (!closed) {
-        checkClosed();
-        if (lastAckedSeqno >= seqno) {
-          break;
-        }
-        try {
-          dataQueue.wait(1000); // when we receive an ack, we notify on dataQueue
-        } catch (InterruptedException ie) {
-          throw new InterruptedIOException(
-            "Interrupted while waiting for data to be acknowledged by pipeline");
+    try {
+      synchronized (dataQueue) {
+        while (!closed) {
+          checkClosed();
+          if (lastAckedSeqno >= seqno) {
+            break;
+          }
+          try {
+            dataQueue.wait(1000); // when we receive an ack, we notify on
+                                  // dataQueue
+          } catch (InterruptedException ie) {
+            throw new InterruptedIOException(
+                "Interrupted while waiting for data to be acknowledged by pipeline");
+          }
         }
       }
+      checkClosed();
+    } catch (ClosedChannelException e) {
     }
-    checkClosed();
   }
 
   private synchronized void start() {
@@ -1847,7 +1853,7 @@ public class DFSOutputStream extends FSO
   @Override
   public synchronized void close() throws IOException {
     if (closed) {
-      IOException e = lastException;
+      IOException e = lastException.getAndSet(null);
       if (e == null)
         return;
       else
@@ -1875,6 +1881,7 @@ public class DFSOutputStream extends FSO
       closeThreads(false);
       completeFile(lastBlock);
       dfsClient.endFileLease(src);
+    } catch (ClosedChannelException e) {
     } finally {
       closed = true;
     }