You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by br...@apache.org on 2013/07/02 19:31:59 UTC

svn commit: r1499029 [1/3] - in /hadoop/common/trunk/hadoop-hdfs-project: ./ hadoop-hdfs-nfs/ hadoop-hdfs-nfs/dev-support/ hadoop-hdfs-nfs/src/ hadoop-hdfs-nfs/src/main/ hadoop-hdfs-nfs/src/main/java/ hadoop-hdfs-nfs/src/main/java/org/ hadoop-hdfs-nfs/...

Author: brandonli
Date: Tue Jul  2 17:31:58 2013
New Revision: 1499029

URL: http://svn.apache.org/r1499029
Log:
HDFS-4762 Provide HDFS based NFSv3 and Mountd implementation. Contributed by Brandon Li

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/README.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/dev-support/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/dev-support/findbugsExcludeFile.xml
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/Mountd.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/LruCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestPortmapRegister.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestUdpServer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/pom.xml

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/README.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/README.txt?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/README.txt (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/README.txt Tue Jul  2 17:31:58 2013
@@ -0,0 +1,4 @@
+-----------------------------------------------------------------------------
+HDFS-NFS - NFS implementation for Hadoop HDFS
+
+-----------------------------------------------------------------------------

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/dev-support/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/dev-support/findbugsExcludeFile.xml?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/dev-support/findbugsExcludeFile.xml (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/dev-support/findbugsExcludeFile.xml Tue Jul  2 17:31:58 2013
@@ -0,0 +1,18 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<FindBugsFilter>
+</FindBugsFilter>

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml Tue Jul  2 17:31:58 2013
@@ -0,0 +1,268 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-project-dist</artifactId>
+    <version>3.0.0-SNAPSHOT</version>
+    <relativePath>../../hadoop-project-dist</relativePath>
+  </parent>
+  <groupId>org.apache.hadoop</groupId>
+  <artifactId>hadoop-hdfs-nfs</artifactId>
+  <version>3.0.0-SNAPSHOT</version>
+  <description>Apache Hadoop HDFS-NFS</description>
+  <name>Apache Hadoop HDFS-NFS</name>
+  <packaging>jar</packaging>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-annotations</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-auth</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-nfs</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty</artifactId>
+      <version>3.6.2.Final</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>3.4.2</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty-util</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-core</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-server</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-daemon</groupId>
+      <artifactId>commons-daemon</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>javax.servlet.jsp</groupId>
+      <artifactId>jsp-api</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>javax.servlet</groupId>
+      <artifactId>servlet-api</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-core-asl</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>tomcat</groupId>
+      <artifactId>jasper-runtime</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>xmlenc</groupId>
+      <artifactId>xmlenc</artifactId>
+      <scope>compile</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemPropertyVariables>
+            <startKdc>${startKdc}</startKdc>
+            <kdc.resource.dir>${kdc.resource.dir}</kdc.resource.dir>
+          </systemPropertyVariables>
+          <properties>
+            <property>
+              <name>listener</name>
+              <value>org.apache.hadoop.test.TimedOutTestsListener</value>
+            </property>
+          </properties>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>add-jsp-generated-sources-directory</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>${project.build.directory}/generated-sources/java</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <configuration>
+          <skipTests>false</skipTests>
+        </configuration>
+        <executions>
+          <execution>
+            <id>create-jsp-generated-sources-directory</id>
+            <phase>initialize</phase>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <configuration>
+              <target>
+                <mkdir dir="${project.build.directory}/generated-sources/java" />
+              </target>
+            </configuration>
+          </execution>
+          <execution>
+            <phase>pre-site</phase>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <configuration>
+              <tasks>
+                <copy file="src/main/resources/hdfs-nfs-default.xml" todir="src/site/resources"/>
+                <copy file="src/main/xsl/configuration.xsl" todir="src/site/resources"/>
+              </tasks>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/Mountd.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/Mountd.java?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/Mountd.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/Mountd.java Tue Jul  2 17:31:58 2013
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.mount;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mount.MountdBase;
+
+/**
+ * Main class for starting mountd daemon. This daemon implements the NFS
+ * mount protocol. When receiving a MOUNT request from an NFS client, it checks
+ * the request against the list of currently exported file systems. If the
+ * client is permitted to mount the file system, rpc.mountd obtains a file
+ * handle for requested directory and returns it to the client.
+ */
+public class Mountd extends MountdBase {
+  /**
+   * Constructor
+   * @param exports
+   * @throws IOException 
+   */
+  public Mountd(List<String> exports) throws IOException {
+    super(exports, new RpcProgramMountd(exports));
+  }
+
+  public Mountd(List<String> exports, Configuration config) throws IOException {
+    super(exports, new RpcProgramMountd(exports, config));
+  }
+  
+  public static void main(String[] args) throws IOException {
+    List<String> exports = new ArrayList<String>();
+    exports.add("/");
+    Mountd mountd = new Mountd(exports);
+    mountd.start(true);
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java Tue Jul  2 17:31:58 2013
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.mount;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.mount.MountEntry;
+import org.apache.hadoop.mount.MountInterface;
+import org.apache.hadoop.mount.MountResponse;
+import org.apache.hadoop.nfs.nfs3.FileHandle;
+import org.apache.hadoop.nfs.nfs3.Nfs3Status;
+import org.apache.hadoop.oncrpc.RpcAcceptedReply;
+import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.XDR;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * RPC program corresponding to mountd daemon. See {@link Mountd}.
+ */
+public class RpcProgramMountd extends RpcProgram implements MountInterface {
+  private static final Log LOG = LogFactory.getLog(RpcProgramMountd.class);
+  public static final int PROGRAM = 100005;
+  public static final int VERSION_1 = 1;
+  public static final int VERSION_2 = 2;
+  public static final int VERSION_3 = 3;
+  public static final int PORT = 4242;
+
+  // Need DFSClient for branch-1 to get ExtendedHdfsFileStatus
+  private final DFSClient dfsClient;
+  
+  /** Synchronized list */
+  private final List<MountEntry> mounts;
+  
+  /** List that is unmodifiable */
+  private final List<String> exports;
+
+  public RpcProgramMountd() throws IOException {
+    this(new ArrayList<String>(0));
+  }
+
+  public RpcProgramMountd(List<String> exports) throws IOException {
+    this(exports, new Configuration());
+  }
+
+  public RpcProgramMountd(List<String> exports, Configuration config)
+      throws IOException {
+    // Note that RPC cache is not enabled
+    super("mountd", "localhost", PORT, PROGRAM, VERSION_1, VERSION_3, 0);
+    this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>());
+    this.exports = Collections.unmodifiableList(exports);
+    this.dfsClient = new DFSClient(NameNode.getAddress(config), config);
+  }
+  
+  public XDR nullOp(XDR out, int xid, InetAddress client) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("MOUNT NULLOP : " + " client: " + client);
+    }
+    return  RpcAcceptedReply.voidReply(out, xid);
+  }
+
+  public XDR mnt(XDR xdr, XDR out, int xid, InetAddress client) {
+    String path = xdr.readString();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("MOUNT MNT path: " + path + " client: " + client);
+    }
+
+    String host = client.getHostName();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Got host: " + host + " path: " + path);
+    }
+    if (!exports.contains(path)) {
+      LOG.info("Path " + path + " is not shared.");
+      MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_NOENT, out, xid, null);
+      return out;
+    }
+
+    FileHandle handle = null;
+    try {
+      HdfsFileStatus exFileStatus = dfsClient.getFileInfo(path);
+      
+      handle = new FileHandle(exFileStatus.getFileId());
+    } catch (IOException e) {
+      LOG.error("Can't get handle for export:" + path + ", exception:" + e);
+      MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_NOENT, out, xid, null);
+      return out;
+    }
+
+    assert (handle != null);
+    LOG.info("Giving handle (fileId:" + handle.getFileId()
+        + ") to client for export " + path);
+    mounts.add(new MountEntry(host, path));
+
+    MountResponse.writeMNTResponse(Nfs3Status.NFS3_OK, out, xid,
+        handle.getContent());
+    return out;
+  }
+
+  public XDR dump(XDR out, int xid, InetAddress client) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("MOUNT NULLOP : " + " client: " + client);
+    }
+
+    List<MountEntry> copy = new ArrayList<MountEntry>(mounts);
+    MountResponse.writeMountList(out, xid, copy);
+    return out;
+  }
+
+  public XDR umnt(XDR xdr, XDR out, int xid, InetAddress client) {
+    String path = xdr.readString();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("MOUNT UMNT path: " + path + " client: " + client);
+    }
+    
+    String host = client.getHostName();
+    mounts.remove(new MountEntry(host, path));
+    RpcAcceptedReply.voidReply(out, xid);
+    return out;
+  }
+
+  public XDR umntall(XDR out, int xid, InetAddress client) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("MOUNT UMNTALL : " + " client: " + client);
+    }
+    mounts.clear();
+    return RpcAcceptedReply.voidReply(out, xid);
+  }
+
+  @Override
+  public XDR handleInternal(RpcCall rpcCall, XDR xdr, XDR out,
+      InetAddress client, Channel channel) {
+    int procedure = rpcCall.getProcedure();
+    int xid = rpcCall.getXid();
+    if (procedure == MNTPROC_NULL) {
+      out = nullOp(out, xid, client);
+    } else if (procedure == MNTPROC_MNT) {
+      out = mnt(xdr, out, xid, client);
+    } else if (procedure == MNTPROC_DUMP) {
+      out = dump(out, xid, client);
+    } else if (procedure == MNTPROC_UMNT) {      
+      out = umnt(xdr, out, xid, client);
+    } else if (procedure == MNTPROC_UMNTALL) {
+      umntall(out, xid, client);
+    } else if (procedure == MNTPROC_EXPORT) {
+      out = MountResponse.writeExportList(out, xid, exports);
+    } else {
+      // Invalid procedure
+      RpcAcceptedReply.voidReply(out, xid,
+          RpcAcceptedReply.AcceptState.PROC_UNAVAIL);    }  
+    return out;
+  }
+  
+  @Override
+  protected boolean isIdempotent(RpcCall call) {
+    // Not required, because cache is turned off
+    return false;
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java Tue Jul  2 17:31:58 2013
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.nfs3;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This class is a thread pool to easily schedule async data operations.Current
+ * async data operation is write back operation. In the future, we could use it
+ * for readahead operations too.
+ */
+public class AsyncDataService {
+  static final Log LOG = LogFactory.getLog(AsyncDataService.class);
+
+  // ThreadPool core pool size
+  private static final int CORE_THREADS_PER_VOLUME = 1;
+  // ThreadPool maximum pool size
+  private static final int MAXIMUM_THREADS_PER_VOLUME = 4;
+  // ThreadPool keep-alive time for threads over core pool size
+  private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
+  private final ThreadGroup threadGroup = new ThreadGroup("async data service");
+  private ThreadFactory threadFactory = null;
+  private ThreadPoolExecutor executor = null;
+
+  public AsyncDataService() {
+    threadFactory = new ThreadFactory() {
+      public Thread newThread(Runnable r) {
+        return new Thread(threadGroup, r);
+      }
+    };
+
+    executor = new ThreadPoolExecutor(CORE_THREADS_PER_VOLUME,
+        MAXIMUM_THREADS_PER_VOLUME, THREADS_KEEP_ALIVE_SECONDS,
+        TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
+
+    // This can reduce the number of running threads
+    executor.allowCoreThreadTimeOut(true);
+  }
+
+  /**
+   * Execute the task sometime in the future.
+   */
+  synchronized void execute(Runnable task) {
+    if (executor == null) {
+      throw new RuntimeException("AsyncDataService is already shutdown");
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Current active thread number: " + executor.getActiveCount()
+          + " queue size:" + executor.getQueue().size()
+          + " scheduled task number:" + executor.getTaskCount());
+    }
+    executor.execute(task);
+  }
+
+  /**
+   * Gracefully shut down the ThreadPool. Will wait for all data tasks to
+   * finish.
+   */
+  synchronized void shutdown() {
+    if (executor == null) {
+      LOG.warn("AsyncDataService has already shut down.");
+    } else {
+      LOG.info("Shutting down all async data service threads...");
+      executor.shutdown();
+
+      // clear the executor so that calling execute again will fail.
+      executor = null;
+      LOG.info("All async data service threads have been shut down");
+    }
+  }
+
+  /**
+   * Write the data to HDFS asynchronously
+   */
+  void writeAsync(OpenFileCtx openFileCtx) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Scheduling write back task for fileId: "
+          + openFileCtx.copyLatestAttr().getFileId());
+    }
+    WriteBackTask wbTask = new WriteBackTask(openFileCtx);
+    execute(wbTask);
+  }
+
+  /**
+   * A task for write data back to HDFS for a file. Since only one thread can
+   * write for a file, any time there should be only one task(in queue or
+   * executing) for one file existing, and this should be guaranteed by the
+   * caller.
+   */
+  static class WriteBackTask implements Runnable {
+
+    OpenFileCtx openFileCtx;
+
+    WriteBackTask(OpenFileCtx openFileCtx) {
+      this.openFileCtx = openFileCtx;
+    }
+
+    OpenFileCtx getOpenFileCtx() {
+      return openFileCtx;
+    }
+
+    @Override
+    public String toString() {
+      // Called in AsyncDataService.execute for displaying error messages.
+      return "write back data for fileId"
+          + openFileCtx.copyLatestAttr().getFileId() + " with nextOffset "
+          + openFileCtx.getNextOffset();
+    }
+
+    public void run() {
+      try {
+        openFileCtx.executeWriteBack();
+      } catch (Throwable t) {
+        LOG.error("Asyn data service got error:"
+            + ExceptionUtils.getFullStackTrace(t));
+      }
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java Tue Jul  2 17:31:58 2013
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.nfs3;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * A cache saves DFSClient objects for different users
+ */
+public class DFSClientCache {
+  static final Log LOG = LogFactory.getLog(DFSClientCache.class);
+  private final LruCache<String, DFSClient> lruTable;
+  private final Configuration config;
+
+  public DFSClientCache(Configuration config) {
+    // By default, keep 256 DFSClient instance for 256 active users
+    this(config, 256);
+  }
+
+  public DFSClientCache(Configuration config, int size) {
+    lruTable = new LruCache<String, DFSClient>(size);
+    this.config = config;
+  }
+
+  public void put(String uname, DFSClient client) {
+    lruTable.put(uname, client);
+  }
+
+  synchronized public DFSClient get(String uname) {
+    DFSClient client = lruTable.get(uname);
+    if (client != null) {
+      return client;
+    }
+
+    // Not in table, create one.
+    try {
+      UserGroupInformation ugi = UserGroupInformation.createRemoteUser(uname);
+      client = ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
+        public DFSClient run() throws IOException {
+          return new DFSClient(NameNode.getAddress(config), config);
+        }
+      });
+    } catch (IOException e) {
+      LOG.error("Create DFSClient failed for user:" + uname);
+      e.printStackTrace();
+
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    // Add new entry
+    lruTable.put(uname, client);
+    return client;
+  }
+
+  public int usedSize() {
+    return lruTable.usedSize();
+  }
+
+  public boolean containsKey(String key) {
+    return lruTable.containsKey(key);
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/LruCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/LruCache.java?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/LruCache.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/LruCache.java Tue Jul  2 17:31:58 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.nfs3;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A thread-safe LRU table.
+ */
+public class LruCache<K, V> {
+  private final int maxSize;
+  private final LinkedHashMap<K, V> map;
+  private static final float hashTableLoadFactor = 0.75f;
+
+  public LruCache(int maxSize) {
+    this.maxSize = maxSize;
+    int hashTableCapacity = (int) Math.ceil(maxSize / hashTableLoadFactor) + 1;
+    map = new LinkedHashMap<K, V>(hashTableCapacity, hashTableLoadFactor, true) {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+        return size() > LruCache.this.maxSize;
+      }
+    };
+  }
+
+  // The found entry becomes the most recently used.
+  public synchronized V get(K key) {
+    return map.get(key);
+  }
+
+  public synchronized void put(K key, V value) {
+    map.put(key, value);
+  }
+
+  public synchronized int usedSize() {
+    return map.size();
+  }
+
+  public synchronized boolean containsKey(K key) {
+    return map.containsKey(key);
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java Tue Jul  2 17:31:58 2013
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.nfs3;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.nfs.mount.Mountd;
+import org.apache.hadoop.nfs.nfs3.Nfs3Base;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Nfs server. Supports NFS v3 using {@link RpcProgramNfs3}.
+ * Currently Mountd program is also started inside this class.
+ * Only TCP server is supported and UDP is not supported.
+ */
+public class Nfs3 extends Nfs3Base {
+  public Nfs3(List<String> exports) throws IOException {
+    super(new Mountd(exports), new RpcProgramNfs3(exports));
+  }
+
+  public Nfs3(List<String> exports, Configuration config) throws IOException {
+    super(new Mountd(exports, config), new RpcProgramNfs3(exports, config));
+  }
+
+  public static void main(String[] args) throws IOException {
+    StringUtils.startupShutdownMessage(Nfs3.class, args, LOG);
+    List<String> exports = new ArrayList<String>();
+    exports.add("/");
+    final Nfs3 nfsServer = new Nfs3(exports);
+    nfsServer.start(true);
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java Tue Jul  2 17:31:58 2013
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.nfs3;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.nfs.NfsFileType;
+import org.apache.hadoop.nfs.NfsTime;
+import org.apache.hadoop.nfs.nfs3.FileHandle;
+import org.apache.hadoop.nfs.nfs3.IdUserGroup;
+import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
+import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
+import org.apache.hadoop.nfs.nfs3.response.WccAttr;
+import org.apache.hadoop.nfs.nfs3.response.WccData;
+import org.apache.hadoop.oncrpc.XDR;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * Utility/helper methods related to NFS
+ */
+public class Nfs3Utils {
+  public final static String INODEID_PATH_PREFIX = "/.reserved/.inodes/";
+
+  public static String getFileIdPath(FileHandle handle) {
+    return getFileIdPath(handle.getFileId());
+  }
+
+  public static String getFileIdPath(long fileId) {
+    return INODEID_PATH_PREFIX + fileId;
+  }
+
+  public static HdfsFileStatus getFileStatus(DFSClient client, String fileIdPath)
+      throws IOException {
+    return client.getFileInfo(fileIdPath);
+  }
+
+  public static Nfs3FileAttributes getNfs3FileAttrFromFileStatus(
+      HdfsFileStatus fs, IdUserGroup iug) {
+    /**
+     * Some 32bit Linux client has problem with 64bit fileId: it seems the 32bit
+     * client takes only the lower 32bit of the fileId and treats it as signed
+     * int. When the 32th bit is 1, the client considers it invalid.
+     */
+    return new Nfs3FileAttributes(fs.isDir(), fs.getChildrenNum(), fs
+        .getPermission().toShort(), iug.getUidAllowingUnknown(fs.getOwner()),
+        iug.getGidAllowingUnknown(fs.getGroup()), fs.getLen(), 0 /* fsid */,
+        fs.getFileId(), fs.getModificationTime(), fs.getAccessTime());
+  }
+
+  public static Nfs3FileAttributes getFileAttr(DFSClient client,
+      String fileIdPath, IdUserGroup iug) throws IOException {
+    HdfsFileStatus fs = getFileStatus(client, fileIdPath);
+    return fs == null ? null : getNfs3FileAttrFromFileStatus(fs, iug);
+  }
+
+  public static WccAttr getWccAttr(DFSClient client, String fileIdPath)
+      throws IOException {
+    HdfsFileStatus fstat = getFileStatus(client, fileIdPath);
+    if (fstat == null) {
+      return null;
+    }
+
+    long size = fstat.isDir() ? Nfs3FileAttributes.getDirSize(fstat
+        .getChildrenNum()) : fstat.getLen();
+    return new WccAttr(size, new NfsTime(fstat.getModificationTime()),
+        new NfsTime(fstat.getModificationTime()));
+  }
+
+  public static WccAttr getWccAttr(Nfs3FileAttributes attr) {
+    return new WccAttr(attr.getSize(), attr.getMtime(), attr.getCtime());
+  }
+
+  public static WccData createWccData(final WccAttr preOpAttr,
+      DFSClient dfsClient, final String fileIdPath, final IdUserGroup iug)
+      throws IOException {
+    Nfs3FileAttributes postOpDirAttr = getFileAttr(dfsClient, fileIdPath, iug);
+    return new WccData(preOpAttr, postOpDirAttr);
+  }
+
+  /**
+   * Send a write response to the netty network socket channel
+   */
+  public static void writeChannel(Channel channel, XDR out) {
+    ChannelBuffer outBuf = XDR.writeMessageTcp(out, true);
+    channel.write(outBuf);
+  }
+
+  private static boolean isSet(int access, int bits) {
+    return (access & bits) == bits;
+  }
+
+  public static int getAccessRights(int mode, int type) {
+    int rtn = 0;
+    if (isSet(mode, Nfs3Constant.ACCESS_MODE_READ)) {
+      rtn |= Nfs3Constant.ACCESS3_READ;
+      // LOOKUP is only meaningful for dir
+      if (type == NfsFileType.NFSDIR.toValue()) {
+        rtn |= Nfs3Constant.ACCESS3_LOOKUP;
+      }
+    }
+    if (isSet(mode, Nfs3Constant.ACCESS_MODE_WRITE)) {
+      rtn |= Nfs3Constant.ACCESS3_MODIFY;
+      rtn |= Nfs3Constant.ACCESS3_EXTEND;
+      // Set delete bit, UNIX may ignore it for regular file since it's up to
+      // parent dir op permission
+      rtn |= Nfs3Constant.ACCESS3_DELETE;
+    }
+    if (isSet(mode, Nfs3Constant.ACCESS_MODE_EXECUTE)) {
+      if (type == NfsFileType.NFSREG.toValue()) {
+        rtn |= Nfs3Constant.ACCESS3_EXECUTE;
+      }
+    }
+    return rtn;
+  }
+
+  public static int getAccessRightsForUserGroup(int uid, int gid,
+      Nfs3FileAttributes attr) {
+    int mode = attr.getMode();
+    if (uid == attr.getUid()) {
+      return getAccessRights(mode >> 6, attr.getType());
+    }
+    if (gid == attr.getGid()) {
+      return getAccessRights(mode >> 3, attr.getType());
+    }
+    return getAccessRights(mode, attr.getType());
+  }
+  
+  public static long bytesToLong(byte[] data) {
+    long n = 0xffL & data[0];
+    for (int i = 1; i < 8; i++) {
+      n = (n << 8) | (0xffL & data[i]);
+    }
+    return n;
+  }
+
+  public static byte[] longToByte(long v) {
+    byte[] data = new byte[8];
+    data[0] = (byte) (v >>> 56);
+    data[1] = (byte) (v >>> 48);
+    data[2] = (byte) (v >>> 40);
+    data[3] = (byte) (v >>> 32);
+    data[4] = (byte) (v >>> 24);
+    data[5] = (byte) (v >>> 16);
+    data[6] = (byte) (v >>> 8);
+    data[7] = (byte) (v >>> 0);
+    return data;
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java Tue Jul  2 17:31:58 2013
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.nfs3;
+
+/**
+ * OffsetRange is the range of read/write request. A single point (e.g.,[5,5])
+ * is not a valid range.
+ */
+public class OffsetRange implements Comparable<OffsetRange> {
+  private final long min;
+  private final long max;
+
+  OffsetRange(long min, long max) {
+    if ((min >= max) || (min < 0) || (max < 0)) {
+      throw new IllegalArgumentException("Wrong offset range: (" + min + ","
+          + max + ")");
+    }
+    this.min = min;
+    this.max = max;
+  }
+
+  long getMin() {
+    return min;
+  }
+
+  long getMax() {
+    return max;
+  }
+
+  @Override
+  public int hashCode() {
+    return (int) (min ^ max);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    assert (o instanceof OffsetRange);
+    OffsetRange range = (OffsetRange) o;
+    return (min == range.getMin()) && (max == range.getMax());
+  }
+
+  private static int compareTo(long left, long right) {
+    if (left < right) {
+      return -1;
+    } else if (left > right) {
+      return 1;
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public int compareTo(OffsetRange other) {
+    final int d = compareTo(min, other.getMin());
+    return d != 0 ? d : compareTo(max, other.getMax());
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Tue Jul  2 17:31:58 2013
@@ -0,0 +1,775 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.nfs3;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.security.InvalidParameterException;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
+import org.apache.hadoop.io.BytesWritable.Comparator;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.nfs.nfs3.FileHandle;
+import org.apache.hadoop.nfs.nfs3.IdUserGroup;
+import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
+import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
+import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
+import org.apache.hadoop.nfs.nfs3.Nfs3Status;
+import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
+import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
+import org.apache.hadoop.nfs.nfs3.response.WccAttr;
+import org.apache.hadoop.nfs.nfs3.response.WccData;
+import org.apache.hadoop.oncrpc.XDR;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * OpenFileCtx saves the context of one HDFS file output stream. Access to it is
+ * synchronized by its member lock.
+ */
+class OpenFileCtx {
+  public static final Log LOG = LogFactory.getLog(OpenFileCtx.class);
+  
+  /**
+   * Lock to synchronize OpenFileCtx changes. Thread should get this lock before
+   * any read/write operation to an OpenFileCtx object
+   */
+  private final ReentrantLock ctxLock;
+
+  // The stream status. False means the stream is closed.
+  private boolean activeState;
+  // The stream write-back status. True means one thread is doing write back.
+  private boolean asyncStatus;
+
+  private final FSDataOutputStream fos;
+  private final Nfs3FileAttributes latestAttr;
+  private long nextOffset;
+
+  private final SortedMap<OffsetRange, WriteCtx> pendingWrites;
+  
+  // The last write, commit request or write-back event. Updating time to keep
+  // output steam alive.
+  private long lastAccessTime;
+  
+  // Pending writes water mark for dump, 1MB
+  private static int DUMP_WRITE_WATER_MARK = 1024 * 1024; 
+  private FileOutputStream dumpOut;
+  private long nonSequentialWriteInMemory;
+  private boolean enabledDump;
+  private RandomAccessFile raf;
+  private final String dumpFilePath;
+  
+  private void updateLastAccessTime() {
+    lastAccessTime = System.currentTimeMillis();
+  }
+
+  private boolean checkStreamTimeout(long streamTimeout) {
+    return System.currentTimeMillis() - lastAccessTime > streamTimeout;
+  }
+  
+  // Increase or decrease the memory occupation of non-sequential writes
+  private long updateNonSequentialWriteInMemory(long count) {
+    nonSequentialWriteInMemory += count;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Update nonSequentialWriteInMemory by " + count + " new value:"
+          + nonSequentialWriteInMemory);
+    }
+
+    if (nonSequentialWriteInMemory < 0) {
+      LOG.error("nonSequentialWriteInMemory is negative after update with count "
+          + count);
+      throw new IllegalArgumentException(
+          "nonSequentialWriteInMemory is negative after update with count "
+              + count);
+    }
+    return nonSequentialWriteInMemory;
+  }
+  
+  OpenFileCtx(FSDataOutputStream fos, Nfs3FileAttributes latestAttr,
+      String dumpFilePath) {
+    this.fos = fos;
+    this.latestAttr = latestAttr;
+    pendingWrites = new TreeMap<OffsetRange, WriteCtx>();
+    updateLastAccessTime();
+    activeState = true;
+    asyncStatus = false;
+    dumpOut = null;
+    raf = null;
+    nonSequentialWriteInMemory = 0;
+    this.dumpFilePath = dumpFilePath;  
+    enabledDump = dumpFilePath == null ? false: true;
+    ctxLock = new ReentrantLock(true);
+  }
+
+  private void lockCtx() {
+    if (LOG.isTraceEnabled()) {
+      StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
+      StackTraceElement e = stacktrace[2];
+      String methodName = e.getMethodName();
+      LOG.trace("lock ctx, caller:" + methodName);
+    }
+    ctxLock.lock();
+  }
+
+  private void unlockCtx() {
+    ctxLock.unlock();
+    if (LOG.isTraceEnabled()) {
+      StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
+      StackTraceElement e = stacktrace[2];
+      String methodName = e.getMethodName();
+      LOG.info("unlock ctx, caller:" + methodName);
+    }
+  }
+  
+  // Make a copy of the latestAttr
+  public Nfs3FileAttributes copyLatestAttr() {
+    Nfs3FileAttributes ret;
+    lockCtx();
+    try {
+      ret = new Nfs3FileAttributes(latestAttr);
+    } finally {
+      unlockCtx();
+    }
+    return ret;
+  }
+  
+  private long getNextOffsetUnprotected() {
+    assert(ctxLock.isLocked());
+    return nextOffset;
+  }
+
+  public long getNextOffset() {
+    long ret;
+    lockCtx();
+    try {
+      ret = getNextOffsetUnprotected();
+    } finally {
+      unlockCtx();
+    }
+    return ret;
+  }
+  
+  // Get flushed offset. Note that flushed data may not be persisted.
+  private long getFlushedOffset() {
+    return fos.getPos();
+  }
+  
+  // Check if need to dump the new writes
+  private void checkDump(long count) {
+    assert (ctxLock.isLocked());
+
+    // Always update the in memory count
+    updateNonSequentialWriteInMemory(count);
+
+    if (!enabledDump) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Do nothing, dump is disabled.");
+      }
+      return;
+    }
+
+    if (nonSequentialWriteInMemory < DUMP_WRITE_WATER_MARK) {
+      return;
+    }
+
+    // Create dump outputstream for the first time
+    if (dumpOut == null) {
+      LOG.info("Create dump file:" + dumpFilePath);
+      File dumpFile = new File(dumpFilePath);
+      try {
+        if (dumpFile.exists()) {
+          LOG.fatal("The dump file should not exist:" + dumpFilePath);
+          throw new RuntimeException("The dump file should not exist:"
+              + dumpFilePath);
+        }
+        dumpOut = new FileOutputStream(dumpFile);
+      } catch (IOException e) {
+        LOG.error("Got failure when creating dump stream " + dumpFilePath
+            + " with error:" + e);
+        enabledDump = false;
+        IOUtils.cleanup(LOG, dumpOut);
+        return;
+      }
+    }
+    // Get raf for the first dump
+    if (raf == null) {
+      try {
+        raf = new RandomAccessFile(dumpFilePath, "r");
+      } catch (FileNotFoundException e) {
+        LOG.error("Can't get random access to file " + dumpFilePath);
+        // Disable dump
+        enabledDump = false;
+        return;
+      }
+    }
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Start dump, current write number:" + pendingWrites.size());
+    }
+    Iterator<OffsetRange> it = pendingWrites.keySet().iterator();
+    while (it.hasNext()) {
+      OffsetRange key = it.next();
+      WriteCtx writeCtx = pendingWrites.get(key);
+      try {
+        long dumpedDataSize = writeCtx.dumpData(dumpOut, raf);
+        if (dumpedDataSize > 0) {
+          updateNonSequentialWriteInMemory(-dumpedDataSize);
+        }
+      } catch (IOException e) {
+        LOG.error("Dump data failed:" + writeCtx + " with error:" + e);
+        // Disable dump
+        enabledDump = false;
+        return;
+      }
+    }
+    if (nonSequentialWriteInMemory != 0) {
+      LOG.fatal("After dump, nonSequentialWriteInMemory is not zero: "
+          + nonSequentialWriteInMemory);
+      throw new RuntimeException(
+          "After dump, nonSequentialWriteInMemory is not zero: "
+              + nonSequentialWriteInMemory);
+    }
+  }
+  
+  private WriteCtx checkRepeatedWriteRequest(WRITE3Request request,
+      Channel channel, int xid) {
+    OffsetRange range = new OffsetRange(request.getOffset(),
+        request.getOffset() + request.getCount());
+    WriteCtx writeCtx = pendingWrites.get(range);
+    if (writeCtx== null) {
+      return null;
+    } else {
+      if (xid != writeCtx.getXid()) {
+        LOG.warn("Got a repeated request, same range, with a different xid:"
+            + xid + " xid in old request:" + writeCtx.getXid());
+        //TODO: better handling.
+      }
+      return writeCtx;  
+    }
+  }
+  
+  public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request,
+      Channel channel, int xid, AsyncDataService asyncDataService,
+      IdUserGroup iug) {
+
+    lockCtx();
+    try {
+      if (!activeState) {
+        LOG.info("OpenFileCtx is inactive, fileId:"
+            + request.getHandle().getFileId());
+        WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
+        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
+            fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
+        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+      } else {
+        // Handle repeated write requests(same xid or not).
+        // If already replied, send reply again. If not replied, drop the
+        // repeated request.
+        WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel,
+            xid);
+        if (existantWriteCtx != null) {
+          if (!existantWriteCtx.getReplied()) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Repeated write request which hasn't be served: xid="
+                  + xid + ", drop it.");
+            }
+          } else {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Repeated write request which is already served: xid="
+                  + xid + ", resend response.");
+            }
+            WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
+            WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
+                fileWcc, request.getCount(), request.getStableHow(),
+                Nfs3Constant.WRITE_COMMIT_VERF);
+            Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+          }
+          updateLastAccessTime();
+          
+        } else {
+          receivedNewWriteInternal(dfsClient, request, channel, xid,
+              asyncDataService, iug);
+        }
+      }
+
+    } finally {
+      unlockCtx();
+    }
+  }
+
+  private void receivedNewWriteInternal(DFSClient dfsClient,
+      WRITE3Request request, Channel channel, int xid,
+      AsyncDataService asyncDataService, IdUserGroup iug) {
+    long offset = request.getOffset();
+    int count = request.getCount();
+    WriteStableHow stableHow = request.getStableHow();
+
+    // Get file length, fail non-append call
+    WccAttr preOpAttr = latestAttr.getWccAttr();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("requesed offset=" + offset + " and current filesize="
+          + preOpAttr.getSize());
+    }
+
+    long nextOffset = getNextOffsetUnprotected();
+    if (offset == nextOffset) {
+      LOG.info("Add to the list, update nextOffset and notify the writer,"
+          + " nextOffset:" + nextOffset);
+      WriteCtx writeCtx = new WriteCtx(request.getHandle(),
+          request.getOffset(), request.getCount(), request.getStableHow(),
+          request.getData().array(), channel, xid, false, WriteCtx.NO_DUMP);
+      addWrite(writeCtx);
+      
+      // Create an async task and change openFileCtx status to indicate async
+      // task pending
+      if (!asyncStatus) {
+        asyncStatus = true;
+        asyncDataService.execute(new AsyncDataService.WriteBackTask(this));
+      }
+      
+      // Update the write time first
+      updateLastAccessTime();
+      Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr);
+
+      // Send response immediately for unstable write
+      if (request.getStableHow() == WriteStableHow.UNSTABLE) {
+        WccData fileWcc = new WccData(preOpAttr, postOpAttr);
+        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
+            fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+        writeCtx.setReplied(true);
+      }
+
+    } else if (offset > nextOffset) {
+      LOG.info("Add new write to the list but not update nextOffset:"
+          + nextOffset);
+      WriteCtx writeCtx = new WriteCtx(request.getHandle(),
+          request.getOffset(), request.getCount(), request.getStableHow(),
+          request.getData().array(), channel, xid, false, WriteCtx.ALLOW_DUMP);
+      addWrite(writeCtx);
+
+      // Check if need to dump some pending requests to file
+      checkDump(request.getCount());
+      updateLastAccessTime();
+      Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr);
+      
+      // In test, noticed some Linux client sends a batch (e.g., 1MB)
+      // of reordered writes and won't send more writes until it gets
+      // responses of the previous batch. So here send response immediately for
+      // unstable non-sequential write
+      if (request.getStableHow() == WriteStableHow.UNSTABLE) {
+        WccData fileWcc = new WccData(preOpAttr, postOpAttr);
+        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
+            fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+        writeCtx.setReplied(true);
+      }
+
+    } else {
+      // offset < nextOffset
+      LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
+          + nextOffset + ")");
+      WccData wccData = new WccData(preOpAttr, null);
+      WRITE3Response response;
+
+      if (offset + count > nextOffset) {
+        LOG.warn("Haven't noticed any partial overwrite out of a sequential file"
+            + "write requests, so treat it as a real random write, no support.");
+        response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
+            WriteStableHow.UNSTABLE, 0);
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Process perfectOverWrite");
+        }
+        response = processPerfectOverWrite(dfsClient, offset, count, stableHow,
+            request.getData().array(),
+            Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug);
+      }
+      
+      updateLastAccessTime();
+      Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+    }
+  }
+  
+  /**
+   * Honor 2 kinds of overwrites: 1). support some application like touch(write
+   * the same content back to change mtime), 2) client somehow sends the same
+   * write again in a different RPC.
+   */
+  private WRITE3Response processPerfectOverWrite(DFSClient dfsClient,
+      long offset, int count, WriteStableHow stableHow, byte[] data,
+      String path, WccData wccData, IdUserGroup iug) {
+    assert (ctxLock.isLocked());
+    WRITE3Response response = null;
+
+    // Read the content back
+    byte[] readbuffer = new byte[count];
+
+    int readCount = 0;
+    FSDataInputStream fis = null;
+    try {
+      // Sync file data and length to avoid partial read failure
+      ((HdfsDataOutputStream) fos).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
+      
+      fis = new FSDataInputStream(dfsClient.open(path));
+      readCount = fis.read(offset, readbuffer, 0, count);
+      if (readCount < count) {
+        LOG.error("Can't read back " + count + " bytes, partial read size:"
+            + readCount);
+        return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0,
+            stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+      }
+
+    } catch (IOException e) {
+      LOG.info("Read failed when processing possible perfect overwrite, path="
+          + path + " error:" + e);
+      return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0,
+          stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+    } finally {
+      IOUtils.cleanup(LOG, fis);
+    }
+
+    // Compare with the request
+    Comparator comparator = new Comparator();
+    if (comparator.compare(readbuffer, 0, readCount, data, 0, count) != 0) {
+      LOG.info("Perfect overwrite has different content");
+      response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
+          stableHow, 0);
+    } else {
+      LOG.info("Perfect overwrite has same content,"
+          + " updating the mtime, then return success");
+      Nfs3FileAttributes postOpAttr = null;
+      try {
+        dfsClient.setTimes(path, System.currentTimeMillis(), -1);
+        postOpAttr = Nfs3Utils.getFileAttr(dfsClient, path, iug);
+      } catch (IOException e) {
+        LOG.info("Got error when processing perfect overwrite, path=" + path
+            + " error:" + e);
+        return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
+            0);
+      }
+
+      wccData.setPostOpAttr(postOpAttr);
+      response = new WRITE3Response(Nfs3Status.NFS3_OK, wccData, count,
+          stableHow, 0);
+    }
+    return response;
+  }
+  
+  public final static int COMMIT_FINISHED = 0;
+  public final static int COMMIT_WAIT = 1;
+  public final static int COMMIT_INACTIVE_CTX = 2;
+  public final static int COMMIT_ERROR = 3;
+
+  /**
+   * return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
+   * COMMIT_INACTIVE_CTX, COMMIT_ERROR
+   */
+  public int checkCommit(long commitOffset) {
+    int ret = COMMIT_WAIT;
+
+    lockCtx();
+    try {
+      if (!activeState) {
+        ret = COMMIT_INACTIVE_CTX;
+      } else {
+        ret = checkCommitInternal(commitOffset);
+      }
+    } finally {
+      unlockCtx();
+    }
+    return ret;
+  }
+  
+  private int checkCommitInternal(long commitOffset) {
+    if (commitOffset == 0) {
+      // Commit whole file
+      commitOffset = getNextOffsetUnprotected();
+    }
+
+    long flushed = getFlushedOffset();
+    LOG.info("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
+    if (flushed < commitOffset) {
+      // Keep stream active
+      updateLastAccessTime();
+      return COMMIT_WAIT;
+    }
+
+    int ret = COMMIT_WAIT;
+    try {
+      // Sync file data and length
+      ((HdfsDataOutputStream) fos).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
+      // Nothing to do for metadata since attr related change is pass-through
+      ret = COMMIT_FINISHED;
+    } catch (IOException e) {
+      LOG.error("Got stream error during data sync:" + e);
+      // Do nothing. Stream will be closed eventually by StreamMonitor.
+      ret = COMMIT_ERROR;
+    }
+
+    // Keep stream active
+    updateLastAccessTime();
+    return ret;
+  }
+  
+  private void addWrite(WriteCtx writeCtx) {
+    assert (ctxLock.isLocked());
+    long offset = writeCtx.getOffset();
+    int count = writeCtx.getCount();
+    pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
+  }
+  
+  
+  /**
+   * Check stream status to decide if it should be closed
+   * @return true, remove stream; false, keep stream
+   */
+  public boolean streamCleanup(long fileId, long streamTimeout) {
+    if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) {
+      throw new InvalidParameterException("StreamTimeout" + streamTimeout
+          + "ms is less than MINIMIUM_STREAM_TIMEOUT "
+          + WriteManager.MINIMIUM_STREAM_TIMEOUT + "ms");
+    }
+    
+    boolean flag = false;
+    if (!ctxLock.tryLock()) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Another thread is working on it" + ctxLock.toString());
+      }
+      return flag;
+    }
+    
+    try {
+      // Check the stream timeout
+      if (checkStreamTimeout(streamTimeout)) {
+        LOG.info("closing stream for fileId:" + fileId);
+        cleanup();
+        flag = true;
+      }
+    } finally {
+      unlockCtx();
+    }
+    return flag;
+  }
+  
+  // Invoked by AsynDataService to do the write back
+  public void executeWriteBack() {
+    long nextOffset;
+    OffsetRange key;
+    WriteCtx writeCtx;
+
+    try {
+      // Don't lock OpenFileCtx for all writes to reduce the timeout of other
+      // client request to the same file
+      while (true) {
+        lockCtx();
+        if (!asyncStatus) {
+          // This should never happen. There should be only one thread working
+          // on one OpenFileCtx anytime.
+          LOG.fatal("The openFileCtx has false async status");
+          throw new RuntimeException("The openFileCtx has false async status");
+        }
+        // Any single write failure can change activeState to false, so do the
+        // check each loop.
+        if (pendingWrites.isEmpty()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("The asyn write task has no pendding writes, fileId: "
+                + latestAttr.getFileId());
+          }
+          break;
+        }
+        if (!activeState) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("The openFileCtx is not active anymore, fileId: "
+                + latestAttr.getFileId());
+          }
+          break;
+        }
+
+        // Get the next sequential write
+        nextOffset = getNextOffsetUnprotected();
+        key = pendingWrites.firstKey();
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("key.getMin()=" + key.getMin() + " nextOffset="
+              + nextOffset);
+        }
+
+        if (key.getMin() > nextOffset) {
+          if (LOG.isDebugEnabled()) {
+            LOG.info("The next sequencial write has not arrived yet");
+          }
+          break;
+
+        } else if (key.getMin() < nextOffset && key.getMax() > nextOffset) {
+          // Can't handle overlapping write. Didn't see it in tests yet.
+          LOG.fatal("Got a overlapping write (" + key.getMin() + ","
+              + key.getMax() + "), nextOffset=" + nextOffset);
+          throw new RuntimeException("Got a overlapping write (" + key.getMin()
+              + "," + key.getMax() + "), nextOffset=" + nextOffset);
+
+        } else {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Remove write(" + key.getMin() + "-" + key.getMax()
+                + ") from the list");
+          }
+          writeCtx = pendingWrites.remove(key);
+          // Do the write
+          doSingleWrite(writeCtx);
+          updateLastAccessTime();
+        }
+        
+        unlockCtx();
+      }
+
+    } finally {
+      // Always reset the async status so another async task can be created
+      // for this file
+      asyncStatus = false;
+      if (ctxLock.isHeldByCurrentThread()) {
+        unlockCtx();
+      }
+    }
+  }
+
+  private void doSingleWrite(final WriteCtx writeCtx) {
+    assert(ctxLock.isLocked());
+    Channel channel = writeCtx.getChannel();
+    int xid = writeCtx.getXid();
+
+    long offset = writeCtx.getOffset();
+    int count = writeCtx.getCount();
+    WriteStableHow stableHow = writeCtx.getStableHow();
+    byte[] data = null;
+    try {
+      data = writeCtx.getData();
+    } catch (IOException e1) {
+      LOG.error("Failed to get request data offset:" + offset + " count:"
+          + count + " error:" + e1);
+      // Cleanup everything
+      cleanup();
+      return;
+    }
+    assert (data.length == count);
+
+    FileHandle handle = writeCtx.getHandle();
+    LOG.info("do write, fileId: " + handle.getFileId() + " offset: " + offset
+        + " length:" + count + " stableHow:" + stableHow.getValue());
+
+    try {
+      fos.write(data, 0, count);
+
+      if (fos.getPos() != (offset + count)) {
+        throw new IOException("output stream is out of sync, pos="
+            + fos.getPos() + " and nextOffset should be" + (offset + count));
+      }
+      nextOffset = fos.getPos();
+
+      // Reduce memory occupation size if request was allowed dumped
+      if (writeCtx.getDataState() == WriteCtx.ALLOW_DUMP) {
+        updateNonSequentialWriteInMemory(-count);
+      }
+      
+      if (!writeCtx.getReplied()) {
+        WccAttr preOpAttr = latestAttr.getWccAttr();
+        WccData fileWcc = new WccData(preOpAttr, latestAttr);
+        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
+            fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+      }
+
+    } catch (IOException e) {
+      LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
+          + offset + " and length " + data.length, e);
+      if (!writeCtx.getReplied()) {
+        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
+        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+        // Keep stream open. Either client retries or SteamMonitor closes it.
+      }
+
+      LOG.info("Clean up open file context for fileId: "
+          + latestAttr.getFileid());
+      cleanup();
+    }
+  }
+
+  private void cleanup() {
+    assert(ctxLock.isLocked());
+    activeState = false;
+    
+    // Close stream
+    try {
+      if (fos != null) {
+        fos.close();
+      }
+    } catch (IOException e) {
+      LOG.info("Can't close stream for fileId:" + latestAttr.getFileid()
+          + ", error:" + e);
+    }
+    
+    // Reply error for pending writes
+    LOG.info("There are " + pendingWrites.size() + " pending writes.");
+    WccAttr preOpAttr = latestAttr.getWccAttr();
+    while (!pendingWrites.isEmpty()) {
+      OffsetRange key = pendingWrites.firstKey();
+      LOG.info("Fail pending write: (" + key.getMin() + "," + key.getMax()
+          + "), nextOffset=" + getNextOffsetUnprotected());
+      
+      WriteCtx writeCtx = pendingWrites.remove(key);
+      if (!writeCtx.getReplied()) {
+        WccData fileWcc = new WccData(preOpAttr, latestAttr);
+        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
+            fileWcc, 0, writeCtx.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
+        Nfs3Utils.writeChannel(writeCtx.getChannel(),
+            response.send(new XDR(), writeCtx.getXid()));
+      }
+    }
+    
+    // Cleanup dump file
+    if (dumpOut!=null){
+      try {
+        dumpOut.close();
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+    if (raf!=null) {
+      try {
+        raf.close();
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+    File dumpFile = new File(dumpFilePath);
+    if (dumpFile.delete()) {
+      LOG.error("Failed to delete dumpfile: "+ dumpFile);
+    }
+  }
+}
\ No newline at end of file