You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by br...@apache.org on 2013/07/02 19:31:59 UTC
svn commit: r1499029 [1/3] - in /hadoop/common/trunk/hadoop-hdfs-project: ./
hadoop-hdfs-nfs/ hadoop-hdfs-nfs/dev-support/ hadoop-hdfs-nfs/src/
hadoop-hdfs-nfs/src/main/ hadoop-hdfs-nfs/src/main/java/
hadoop-hdfs-nfs/src/main/java/org/ hadoop-hdfs-nfs/...
Author: brandonli
Date: Tue Jul 2 17:31:58 2013
New Revision: 1499029
URL: http://svn.apache.org/r1499029
Log:
HDFS-4762 Provide HDFS based NFSv3 and Mountd implementation. Contributed by Brandon Li
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/README.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/dev-support/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/dev-support/findbugsExcludeFile.xml
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/Mountd.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/LruCache.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestPortmapRegister.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestUdpServer.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/pom.xml
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/README.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/README.txt?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/README.txt (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/README.txt Tue Jul 2 17:31:58 2013
@@ -0,0 +1,4 @@
+-----------------------------------------------------------------------------
+HDFS-NFS - NFS implementation for Hadoop HDFS
+
+-----------------------------------------------------------------------------
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/dev-support/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/dev-support/findbugsExcludeFile.xml?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/dev-support/findbugsExcludeFile.xml (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/dev-support/findbugsExcludeFile.xml Tue Jul 2 17:31:58 2013
@@ -0,0 +1,18 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<FindBugsFilter>
+</FindBugsFilter>
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml Tue Jul 2 17:31:58 2013
@@ -0,0 +1,268 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-project-dist</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+ <relativePath>../../hadoop-project-dist</relativePath>
+ </parent>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-nfs</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+ <description>Apache Hadoop HDFS-NFS</description>
+ <name>Apache Hadoop HDFS-NFS</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-nfs</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>3.6.2.Final</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.2</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-daemon</groupId>
+ <artifactId>commons-daemon</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>xmlenc</groupId>
+ <artifactId>xmlenc</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <startKdc>${startKdc}</startKdc>
+ <kdc.resource.dir>${kdc.resource.dir}</kdc.resource.dir>
+ </systemPropertyVariables>
+ <properties>
+ <property>
+ <name>listener</name>
+ <value>org.apache.hadoop.test.TimedOutTestsListener</value>
+ </property>
+ </properties>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-jsp-generated-sources-directory</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.build.directory}/generated-sources/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <configuration>
+ <skipTests>false</skipTests>
+ </configuration>
+ <executions>
+ <execution>
+ <id>create-jsp-generated-sources-directory</id>
+ <phase>initialize</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target>
+ <mkdir dir="${project.build.directory}/generated-sources/java" />
+ </target>
+ </configuration>
+ </execution>
+ <execution>
+ <phase>pre-site</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <tasks>
+ <copy file="src/main/resources/hdfs-nfs-default.xml" todir="src/site/resources"/>
+ <copy file="src/main/xsl/configuration.xsl" todir="src/site/resources"/>
+ </tasks>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/Mountd.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/Mountd.java?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/Mountd.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/Mountd.java Tue Jul 2 17:31:58 2013
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.mount;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mount.MountdBase;
+
+/**
+ * Main class for starting mountd daemon. This daemon implements the NFS
+ * mount protocol. When receiving a MOUNT request from an NFS client, it checks
+ * the request against the list of currently exported file systems. If the
+ * client is permitted to mount the file system, rpc.mountd obtains a file
+ * handle for requested directory and returns it to the client.
+ */
+public class Mountd extends MountdBase {
+ /**
+ * Constructor
+ * @param exports
+ * @throws IOException
+ */
+ public Mountd(List<String> exports) throws IOException {
+ super(exports, new RpcProgramMountd(exports));
+ }
+
+ public Mountd(List<String> exports, Configuration config) throws IOException {
+ super(exports, new RpcProgramMountd(exports, config));
+ }
+
+ public static void main(String[] args) throws IOException {
+ List<String> exports = new ArrayList<String>();
+ exports.add("/");
+ Mountd mountd = new Mountd(exports);
+ mountd.start(true);
+ }
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java Tue Jul 2 17:31:58 2013
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.mount;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.mount.MountEntry;
+import org.apache.hadoop.mount.MountInterface;
+import org.apache.hadoop.mount.MountResponse;
+import org.apache.hadoop.nfs.nfs3.FileHandle;
+import org.apache.hadoop.nfs.nfs3.Nfs3Status;
+import org.apache.hadoop.oncrpc.RpcAcceptedReply;
+import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.XDR;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * RPC program corresponding to mountd daemon. See {@link Mountd}.
+ */
+public class RpcProgramMountd extends RpcProgram implements MountInterface {
+ private static final Log LOG = LogFactory.getLog(RpcProgramMountd.class);
+ public static final int PROGRAM = 100005;
+ public static final int VERSION_1 = 1;
+ public static final int VERSION_2 = 2;
+ public static final int VERSION_3 = 3;
+ public static final int PORT = 4242;
+
+ // Need DFSClient for branch-1 to get ExtendedHdfsFileStatus
+ private final DFSClient dfsClient;
+
+ /** Synchronized list */
+ private final List<MountEntry> mounts;
+
+ /** List that is unmodifiable */
+ private final List<String> exports;
+
+ public RpcProgramMountd() throws IOException {
+ this(new ArrayList<String>(0));
+ }
+
+ public RpcProgramMountd(List<String> exports) throws IOException {
+ this(exports, new Configuration());
+ }
+
+ public RpcProgramMountd(List<String> exports, Configuration config)
+ throws IOException {
+ // Note that RPC cache is not enabled
+ super("mountd", "localhost", PORT, PROGRAM, VERSION_1, VERSION_3, 0);
+ this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>());
+ this.exports = Collections.unmodifiableList(exports);
+ this.dfsClient = new DFSClient(NameNode.getAddress(config), config);
+ }
+
+ public XDR nullOp(XDR out, int xid, InetAddress client) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("MOUNT NULLOP : " + " client: " + client);
+ }
+ return RpcAcceptedReply.voidReply(out, xid);
+ }
+
+ public XDR mnt(XDR xdr, XDR out, int xid, InetAddress client) {
+ String path = xdr.readString();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("MOUNT MNT path: " + path + " client: " + client);
+ }
+
+ String host = client.getHostName();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got host: " + host + " path: " + path);
+ }
+ if (!exports.contains(path)) {
+ LOG.info("Path " + path + " is not shared.");
+ MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_NOENT, out, xid, null);
+ return out;
+ }
+
+ FileHandle handle = null;
+ try {
+ HdfsFileStatus exFileStatus = dfsClient.getFileInfo(path);
+
+ handle = new FileHandle(exFileStatus.getFileId());
+ } catch (IOException e) {
+ LOG.error("Can't get handle for export:" + path + ", exception:" + e);
+ MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_NOENT, out, xid, null);
+ return out;
+ }
+
+ assert (handle != null);
+ LOG.info("Giving handle (fileId:" + handle.getFileId()
+ + ") to client for export " + path);
+ mounts.add(new MountEntry(host, path));
+
+ MountResponse.writeMNTResponse(Nfs3Status.NFS3_OK, out, xid,
+ handle.getContent());
+ return out;
+ }
+
+ public XDR dump(XDR out, int xid, InetAddress client) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("MOUNT NULLOP : " + " client: " + client);
+ }
+
+ List<MountEntry> copy = new ArrayList<MountEntry>(mounts);
+ MountResponse.writeMountList(out, xid, copy);
+ return out;
+ }
+
+ public XDR umnt(XDR xdr, XDR out, int xid, InetAddress client) {
+ String path = xdr.readString();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("MOUNT UMNT path: " + path + " client: " + client);
+ }
+
+ String host = client.getHostName();
+ mounts.remove(new MountEntry(host, path));
+ RpcAcceptedReply.voidReply(out, xid);
+ return out;
+ }
+
+ public XDR umntall(XDR out, int xid, InetAddress client) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("MOUNT UMNTALL : " + " client: " + client);
+ }
+ mounts.clear();
+ return RpcAcceptedReply.voidReply(out, xid);
+ }
+
+ @Override
+ public XDR handleInternal(RpcCall rpcCall, XDR xdr, XDR out,
+ InetAddress client, Channel channel) {
+ int procedure = rpcCall.getProcedure();
+ int xid = rpcCall.getXid();
+ if (procedure == MNTPROC_NULL) {
+ out = nullOp(out, xid, client);
+ } else if (procedure == MNTPROC_MNT) {
+ out = mnt(xdr, out, xid, client);
+ } else if (procedure == MNTPROC_DUMP) {
+ out = dump(out, xid, client);
+ } else if (procedure == MNTPROC_UMNT) {
+ out = umnt(xdr, out, xid, client);
+ } else if (procedure == MNTPROC_UMNTALL) {
+ umntall(out, xid, client);
+ } else if (procedure == MNTPROC_EXPORT) {
+ out = MountResponse.writeExportList(out, xid, exports);
+ } else {
+ // Invalid procedure
+ RpcAcceptedReply.voidReply(out, xid,
+ RpcAcceptedReply.AcceptState.PROC_UNAVAIL); }
+ return out;
+ }
+
+ @Override
+ protected boolean isIdempotent(RpcCall call) {
+ // Not required, because cache is turned off
+ return false;
+ }
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java Tue Jul 2 17:31:58 2013
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.nfs3;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This class is a thread pool to easily schedule async data operations.Current
+ * async data operation is write back operation. In the future, we could use it
+ * for readahead operations too.
+ */
+public class AsyncDataService {
+ static final Log LOG = LogFactory.getLog(AsyncDataService.class);
+
+ // ThreadPool core pool size
+ private static final int CORE_THREADS_PER_VOLUME = 1;
+ // ThreadPool maximum pool size
+ private static final int MAXIMUM_THREADS_PER_VOLUME = 4;
+ // ThreadPool keep-alive time for threads over core pool size
+ private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
+ private final ThreadGroup threadGroup = new ThreadGroup("async data service");
+ private ThreadFactory threadFactory = null;
+ private ThreadPoolExecutor executor = null;
+
+ public AsyncDataService() {
+ threadFactory = new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ return new Thread(threadGroup, r);
+ }
+ };
+
+ executor = new ThreadPoolExecutor(CORE_THREADS_PER_VOLUME,
+ MAXIMUM_THREADS_PER_VOLUME, THREADS_KEEP_ALIVE_SECONDS,
+ TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
+
+ // This can reduce the number of running threads
+ executor.allowCoreThreadTimeOut(true);
+ }
+
+ /**
+ * Execute the task sometime in the future.
+ */
+ synchronized void execute(Runnable task) {
+ if (executor == null) {
+ throw new RuntimeException("AsyncDataService is already shutdown");
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Current active thread number: " + executor.getActiveCount()
+ + " queue size:" + executor.getQueue().size()
+ + " scheduled task number:" + executor.getTaskCount());
+ }
+ executor.execute(task);
+ }
+
+ /**
+ * Gracefully shut down the ThreadPool. Will wait for all data tasks to
+ * finish.
+ */
+ synchronized void shutdown() {
+ if (executor == null) {
+ LOG.warn("AsyncDataService has already shut down.");
+ } else {
+ LOG.info("Shutting down all async data service threads...");
+ executor.shutdown();
+
+ // clear the executor so that calling execute again will fail.
+ executor = null;
+ LOG.info("All async data service threads have been shut down");
+ }
+ }
+
+ /**
+ * Write the data to HDFS asynchronously
+ */
+ void writeAsync(OpenFileCtx openFileCtx) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scheduling write back task for fileId: "
+ + openFileCtx.copyLatestAttr().getFileId());
+ }
+ WriteBackTask wbTask = new WriteBackTask(openFileCtx);
+ execute(wbTask);
+ }
+
+ /**
+ * A task for write data back to HDFS for a file. Since only one thread can
+ * write for a file, any time there should be only one task(in queue or
+ * executing) for one file existing, and this should be guaranteed by the
+ * caller.
+ */
+ static class WriteBackTask implements Runnable {
+
+ OpenFileCtx openFileCtx;
+
+ WriteBackTask(OpenFileCtx openFileCtx) {
+ this.openFileCtx = openFileCtx;
+ }
+
+ OpenFileCtx getOpenFileCtx() {
+ return openFileCtx;
+ }
+
+ @Override
+ public String toString() {
+ // Called in AsyncDataService.execute for displaying error messages.
+ return "write back data for fileId"
+ + openFileCtx.copyLatestAttr().getFileId() + " with nextOffset "
+ + openFileCtx.getNextOffset();
+ }
+
+ public void run() {
+ try {
+ openFileCtx.executeWriteBack();
+ } catch (Throwable t) {
+ LOG.error("Asyn data service got error:"
+ + ExceptionUtils.getFullStackTrace(t));
+ }
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java Tue Jul 2 17:31:58 2013
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.nfs3;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * A cache saves DFSClient objects for different users
+ */
+public class DFSClientCache {
+ static final Log LOG = LogFactory.getLog(DFSClientCache.class);
+ private final LruCache<String, DFSClient> lruTable;
+ private final Configuration config;
+
+ public DFSClientCache(Configuration config) {
+ // By default, keep 256 DFSClient instance for 256 active users
+ this(config, 256);
+ }
+
+ public DFSClientCache(Configuration config, int size) {
+ lruTable = new LruCache<String, DFSClient>(size);
+ this.config = config;
+ }
+
+ public void put(String uname, DFSClient client) {
+ lruTable.put(uname, client);
+ }
+
+ synchronized public DFSClient get(String uname) {
+ DFSClient client = lruTable.get(uname);
+ if (client != null) {
+ return client;
+ }
+
+ // Not in table, create one.
+ try {
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(uname);
+ client = ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
+ public DFSClient run() throws IOException {
+ return new DFSClient(NameNode.getAddress(config), config);
+ }
+ });
+ } catch (IOException e) {
+ LOG.error("Create DFSClient failed for user:" + uname);
+ e.printStackTrace();
+
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ // Add new entry
+ lruTable.put(uname, client);
+ return client;
+ }
+
+ public int usedSize() {
+ return lruTable.usedSize();
+ }
+
+ public boolean containsKey(String key) {
+ return lruTable.containsKey(key);
+ }
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/LruCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/LruCache.java?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/LruCache.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/LruCache.java Tue Jul 2 17:31:58 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.nfs3;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A thread-safe LRU table.
+ */
+public class LruCache<K, V> {
+ private final int maxSize;
+ private final LinkedHashMap<K, V> map;
+ private static final float hashTableLoadFactor = 0.75f;
+
+ public LruCache(int maxSize) {
+ this.maxSize = maxSize;
+ int hashTableCapacity = (int) Math.ceil(maxSize / hashTableLoadFactor) + 1;
+ map = new LinkedHashMap<K, V>(hashTableCapacity, hashTableLoadFactor, true) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+ return size() > LruCache.this.maxSize;
+ }
+ };
+ }
+
+ // The found entry becomes the most recently used.
+ public synchronized V get(K key) {
+ return map.get(key);
+ }
+
+ public synchronized void put(K key, V value) {
+ map.put(key, value);
+ }
+
+ public synchronized int usedSize() {
+ return map.size();
+ }
+
+ public synchronized boolean containsKey(K key) {
+ return map.containsKey(key);
+ }
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java Tue Jul 2 17:31:58 2013
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.nfs3;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.nfs.mount.Mountd;
+import org.apache.hadoop.nfs.nfs3.Nfs3Base;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Nfs server. Supports NFS v3 using {@link RpcProgramNfs3}.
+ * Currently Mountd program is also started inside this class.
+ * Only TCP server is supported and UDP is not supported.
+ */
+public class Nfs3 extends Nfs3Base {
+ public Nfs3(List<String> exports) throws IOException {
+ super(new Mountd(exports), new RpcProgramNfs3(exports));
+ }
+
+ public Nfs3(List<String> exports, Configuration config) throws IOException {
+ super(new Mountd(exports, config), new RpcProgramNfs3(exports, config));
+ }
+
+ public static void main(String[] args) throws IOException {
+ StringUtils.startupShutdownMessage(Nfs3.class, args, LOG);
+ List<String> exports = new ArrayList<String>();
+ exports.add("/");
+ final Nfs3 nfsServer = new Nfs3(exports);
+ nfsServer.start(true);
+ }
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java Tue Jul 2 17:31:58 2013
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.nfs3;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.nfs.NfsFileType;
+import org.apache.hadoop.nfs.NfsTime;
+import org.apache.hadoop.nfs.nfs3.FileHandle;
+import org.apache.hadoop.nfs.nfs3.IdUserGroup;
+import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
+import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
+import org.apache.hadoop.nfs.nfs3.response.WccAttr;
+import org.apache.hadoop.nfs.nfs3.response.WccData;
+import org.apache.hadoop.oncrpc.XDR;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * Utility/helper methods related to NFS
+ */
+public class Nfs3Utils {
+ public final static String INODEID_PATH_PREFIX = "/.reserved/.inodes/";
+
+ public static String getFileIdPath(FileHandle handle) {
+ return getFileIdPath(handle.getFileId());
+ }
+
+ public static String getFileIdPath(long fileId) {
+ return INODEID_PATH_PREFIX + fileId;
+ }
+
+ public static HdfsFileStatus getFileStatus(DFSClient client, String fileIdPath)
+ throws IOException {
+ return client.getFileInfo(fileIdPath);
+ }
+
+ public static Nfs3FileAttributes getNfs3FileAttrFromFileStatus(
+ HdfsFileStatus fs, IdUserGroup iug) {
+ /**
+ * Some 32bit Linux client has problem with 64bit fileId: it seems the 32bit
+ * client takes only the lower 32bit of the fileId and treats it as signed
+ * int. When the 32th bit is 1, the client considers it invalid.
+ */
+ return new Nfs3FileAttributes(fs.isDir(), fs.getChildrenNum(), fs
+ .getPermission().toShort(), iug.getUidAllowingUnknown(fs.getOwner()),
+ iug.getGidAllowingUnknown(fs.getGroup()), fs.getLen(), 0 /* fsid */,
+ fs.getFileId(), fs.getModificationTime(), fs.getAccessTime());
+ }
+
+ public static Nfs3FileAttributes getFileAttr(DFSClient client,
+ String fileIdPath, IdUserGroup iug) throws IOException {
+ HdfsFileStatus fs = getFileStatus(client, fileIdPath);
+ return fs == null ? null : getNfs3FileAttrFromFileStatus(fs, iug);
+ }
+
+ public static WccAttr getWccAttr(DFSClient client, String fileIdPath)
+ throws IOException {
+ HdfsFileStatus fstat = getFileStatus(client, fileIdPath);
+ if (fstat == null) {
+ return null;
+ }
+
+ long size = fstat.isDir() ? Nfs3FileAttributes.getDirSize(fstat
+ .getChildrenNum()) : fstat.getLen();
+ return new WccAttr(size, new NfsTime(fstat.getModificationTime()),
+ new NfsTime(fstat.getModificationTime()));
+ }
+
+ public static WccAttr getWccAttr(Nfs3FileAttributes attr) {
+ return new WccAttr(attr.getSize(), attr.getMtime(), attr.getCtime());
+ }
+
+ public static WccData createWccData(final WccAttr preOpAttr,
+ DFSClient dfsClient, final String fileIdPath, final IdUserGroup iug)
+ throws IOException {
+ Nfs3FileAttributes postOpDirAttr = getFileAttr(dfsClient, fileIdPath, iug);
+ return new WccData(preOpAttr, postOpDirAttr);
+ }
+
+ /**
+ * Send a write response to the netty network socket channel
+ */
+ public static void writeChannel(Channel channel, XDR out) {
+ ChannelBuffer outBuf = XDR.writeMessageTcp(out, true);
+ channel.write(outBuf);
+ }
+
+ private static boolean isSet(int access, int bits) {
+ return (access & bits) == bits;
+ }
+
+ public static int getAccessRights(int mode, int type) {
+ int rtn = 0;
+ if (isSet(mode, Nfs3Constant.ACCESS_MODE_READ)) {
+ rtn |= Nfs3Constant.ACCESS3_READ;
+ // LOOKUP is only meaningful for dir
+ if (type == NfsFileType.NFSDIR.toValue()) {
+ rtn |= Nfs3Constant.ACCESS3_LOOKUP;
+ }
+ }
+ if (isSet(mode, Nfs3Constant.ACCESS_MODE_WRITE)) {
+ rtn |= Nfs3Constant.ACCESS3_MODIFY;
+ rtn |= Nfs3Constant.ACCESS3_EXTEND;
+ // Set delete bit, UNIX may ignore it for regular file since it's up to
+ // parent dir op permission
+ rtn |= Nfs3Constant.ACCESS3_DELETE;
+ }
+ if (isSet(mode, Nfs3Constant.ACCESS_MODE_EXECUTE)) {
+ if (type == NfsFileType.NFSREG.toValue()) {
+ rtn |= Nfs3Constant.ACCESS3_EXECUTE;
+ }
+ }
+ return rtn;
+ }
+
+ public static int getAccessRightsForUserGroup(int uid, int gid,
+ Nfs3FileAttributes attr) {
+ int mode = attr.getMode();
+ if (uid == attr.getUid()) {
+ return getAccessRights(mode >> 6, attr.getType());
+ }
+ if (gid == attr.getGid()) {
+ return getAccessRights(mode >> 3, attr.getType());
+ }
+ return getAccessRights(mode, attr.getType());
+ }
+
+ public static long bytesToLong(byte[] data) {
+ long n = 0xffL & data[0];
+ for (int i = 1; i < 8; i++) {
+ n = (n << 8) | (0xffL & data[i]);
+ }
+ return n;
+ }
+
+ public static byte[] longToByte(long v) {
+ byte[] data = new byte[8];
+ data[0] = (byte) (v >>> 56);
+ data[1] = (byte) (v >>> 48);
+ data[2] = (byte) (v >>> 40);
+ data[3] = (byte) (v >>> 32);
+ data[4] = (byte) (v >>> 24);
+ data[5] = (byte) (v >>> 16);
+ data[6] = (byte) (v >>> 8);
+ data[7] = (byte) (v >>> 0);
+ return data;
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java Tue Jul 2 17:31:58 2013
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.nfs3;
+
+/**
+ * OffsetRange is the range of read/write request. A single point (e.g.,[5,5])
+ * is not a valid range.
+ */
+public class OffsetRange implements Comparable<OffsetRange> {
+ private final long min;
+ private final long max;
+
+ OffsetRange(long min, long max) {
+ if ((min >= max) || (min < 0) || (max < 0)) {
+ throw new IllegalArgumentException("Wrong offset range: (" + min + ","
+ + max + ")");
+ }
+ this.min = min;
+ this.max = max;
+ }
+
+ long getMin() {
+ return min;
+ }
+
+ long getMax() {
+ return max;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) (min ^ max);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ assert (o instanceof OffsetRange);
+ OffsetRange range = (OffsetRange) o;
+ return (min == range.getMin()) && (max == range.getMax());
+ }
+
+ private static int compareTo(long left, long right) {
+ if (left < right) {
+ return -1;
+ } else if (left > right) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public int compareTo(OffsetRange other) {
+ final int d = compareTo(min, other.getMin());
+ return d != 0 ? d : compareTo(max, other.getMax());
+ }
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1499029&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Tue Jul 2 17:31:58 2013
@@ -0,0 +1,775 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.nfs3;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.security.InvalidParameterException;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
+import org.apache.hadoop.io.BytesWritable.Comparator;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.nfs.nfs3.FileHandle;
+import org.apache.hadoop.nfs.nfs3.IdUserGroup;
+import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
+import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
+import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
+import org.apache.hadoop.nfs.nfs3.Nfs3Status;
+import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
+import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
+import org.apache.hadoop.nfs.nfs3.response.WccAttr;
+import org.apache.hadoop.nfs.nfs3.response.WccData;
+import org.apache.hadoop.oncrpc.XDR;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * OpenFileCtx saves the context of one HDFS file output stream. Access to it is
+ * synchronized by its member lock.
+ */
+class OpenFileCtx {
+ public static final Log LOG = LogFactory.getLog(OpenFileCtx.class);
+
+ /**
+ * Lock to synchronize OpenFileCtx changes. Thread should get this lock before
+ * any read/write operation to an OpenFileCtx object
+ */
+ private final ReentrantLock ctxLock;
+
+ // The stream status. False means the stream is closed.
+ private boolean activeState;
+ // The stream write-back status. True means one thread is doing write back.
+ private boolean asyncStatus;
+
+ private final FSDataOutputStream fos;
+ private final Nfs3FileAttributes latestAttr;
+ private long nextOffset;
+
+ private final SortedMap<OffsetRange, WriteCtx> pendingWrites;
+
+ // The last write, commit request or write-back event. Updating time to keep
+ // output steam alive.
+ private long lastAccessTime;
+
+ // Pending writes water mark for dump, 1MB
+ private static int DUMP_WRITE_WATER_MARK = 1024 * 1024;
+ private FileOutputStream dumpOut;
+ private long nonSequentialWriteInMemory;
+ private boolean enabledDump;
+ private RandomAccessFile raf;
+ private final String dumpFilePath;
+
+ private void updateLastAccessTime() {
+ lastAccessTime = System.currentTimeMillis();
+ }
+
+ private boolean checkStreamTimeout(long streamTimeout) {
+ return System.currentTimeMillis() - lastAccessTime > streamTimeout;
+ }
+
+ // Increase or decrease the memory occupation of non-sequential writes
+ private long updateNonSequentialWriteInMemory(long count) {
+ nonSequentialWriteInMemory += count;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Update nonSequentialWriteInMemory by " + count + " new value:"
+ + nonSequentialWriteInMemory);
+ }
+
+ if (nonSequentialWriteInMemory < 0) {
+ LOG.error("nonSequentialWriteInMemory is negative after update with count "
+ + count);
+ throw new IllegalArgumentException(
+ "nonSequentialWriteInMemory is negative after update with count "
+ + count);
+ }
+ return nonSequentialWriteInMemory;
+ }
+
+ OpenFileCtx(FSDataOutputStream fos, Nfs3FileAttributes latestAttr,
+ String dumpFilePath) {
+ this.fos = fos;
+ this.latestAttr = latestAttr;
+ pendingWrites = new TreeMap<OffsetRange, WriteCtx>();
+ updateLastAccessTime();
+ activeState = true;
+ asyncStatus = false;
+ dumpOut = null;
+ raf = null;
+ nonSequentialWriteInMemory = 0;
+ this.dumpFilePath = dumpFilePath;
+ enabledDump = dumpFilePath == null ? false: true;
+ ctxLock = new ReentrantLock(true);
+ }
+
+ private void lockCtx() {
+ if (LOG.isTraceEnabled()) {
+ StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
+ StackTraceElement e = stacktrace[2];
+ String methodName = e.getMethodName();
+ LOG.trace("lock ctx, caller:" + methodName);
+ }
+ ctxLock.lock();
+ }
+
+ private void unlockCtx() {
+ ctxLock.unlock();
+ if (LOG.isTraceEnabled()) {
+ StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
+ StackTraceElement e = stacktrace[2];
+ String methodName = e.getMethodName();
+ LOG.info("unlock ctx, caller:" + methodName);
+ }
+ }
+
+ // Make a copy of the latestAttr
+ public Nfs3FileAttributes copyLatestAttr() {
+ Nfs3FileAttributes ret;
+ lockCtx();
+ try {
+ ret = new Nfs3FileAttributes(latestAttr);
+ } finally {
+ unlockCtx();
+ }
+ return ret;
+ }
+
+ private long getNextOffsetUnprotected() {
+ assert(ctxLock.isLocked());
+ return nextOffset;
+ }
+
+ public long getNextOffset() {
+ long ret;
+ lockCtx();
+ try {
+ ret = getNextOffsetUnprotected();
+ } finally {
+ unlockCtx();
+ }
+ return ret;
+ }
+
+ // Get flushed offset. Note that flushed data may not be persisted.
+ private long getFlushedOffset() {
+ return fos.getPos();
+ }
+
+ // Check if need to dump the new writes
+ private void checkDump(long count) {
+ assert (ctxLock.isLocked());
+
+ // Always update the in memory count
+ updateNonSequentialWriteInMemory(count);
+
+ if (!enabledDump) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Do nothing, dump is disabled.");
+ }
+ return;
+ }
+
+ if (nonSequentialWriteInMemory < DUMP_WRITE_WATER_MARK) {
+ return;
+ }
+
+ // Create dump outputstream for the first time
+ if (dumpOut == null) {
+ LOG.info("Create dump file:" + dumpFilePath);
+ File dumpFile = new File(dumpFilePath);
+ try {
+ if (dumpFile.exists()) {
+ LOG.fatal("The dump file should not exist:" + dumpFilePath);
+ throw new RuntimeException("The dump file should not exist:"
+ + dumpFilePath);
+ }
+ dumpOut = new FileOutputStream(dumpFile);
+ } catch (IOException e) {
+ LOG.error("Got failure when creating dump stream " + dumpFilePath
+ + " with error:" + e);
+ enabledDump = false;
+ IOUtils.cleanup(LOG, dumpOut);
+ return;
+ }
+ }
+ // Get raf for the first dump
+ if (raf == null) {
+ try {
+ raf = new RandomAccessFile(dumpFilePath, "r");
+ } catch (FileNotFoundException e) {
+ LOG.error("Can't get random access to file " + dumpFilePath);
+ // Disable dump
+ enabledDump = false;
+ return;
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Start dump, current write number:" + pendingWrites.size());
+ }
+ Iterator<OffsetRange> it = pendingWrites.keySet().iterator();
+ while (it.hasNext()) {
+ OffsetRange key = it.next();
+ WriteCtx writeCtx = pendingWrites.get(key);
+ try {
+ long dumpedDataSize = writeCtx.dumpData(dumpOut, raf);
+ if (dumpedDataSize > 0) {
+ updateNonSequentialWriteInMemory(-dumpedDataSize);
+ }
+ } catch (IOException e) {
+ LOG.error("Dump data failed:" + writeCtx + " with error:" + e);
+ // Disable dump
+ enabledDump = false;
+ return;
+ }
+ }
+ if (nonSequentialWriteInMemory != 0) {
+ LOG.fatal("After dump, nonSequentialWriteInMemory is not zero: "
+ + nonSequentialWriteInMemory);
+ throw new RuntimeException(
+ "After dump, nonSequentialWriteInMemory is not zero: "
+ + nonSequentialWriteInMemory);
+ }
+ }
+
+ private WriteCtx checkRepeatedWriteRequest(WRITE3Request request,
+ Channel channel, int xid) {
+ OffsetRange range = new OffsetRange(request.getOffset(),
+ request.getOffset() + request.getCount());
+ WriteCtx writeCtx = pendingWrites.get(range);
+ if (writeCtx== null) {
+ return null;
+ } else {
+ if (xid != writeCtx.getXid()) {
+ LOG.warn("Got a repeated request, same range, with a different xid:"
+ + xid + " xid in old request:" + writeCtx.getXid());
+ //TODO: better handling.
+ }
+ return writeCtx;
+ }
+ }
+
+ public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request,
+ Channel channel, int xid, AsyncDataService asyncDataService,
+ IdUserGroup iug) {
+
+ lockCtx();
+ try {
+ if (!activeState) {
+ LOG.info("OpenFileCtx is inactive, fileId:"
+ + request.getHandle().getFileId());
+ WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
+ WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
+ fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
+ Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+ } else {
+ // Handle repeated write requests(same xid or not).
+ // If already replied, send reply again. If not replied, drop the
+ // repeated request.
+ WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel,
+ xid);
+ if (existantWriteCtx != null) {
+ if (!existantWriteCtx.getReplied()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Repeated write request which hasn't be served: xid="
+ + xid + ", drop it.");
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Repeated write request which is already served: xid="
+ + xid + ", resend response.");
+ }
+ WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
+ WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
+ fileWcc, request.getCount(), request.getStableHow(),
+ Nfs3Constant.WRITE_COMMIT_VERF);
+ Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+ }
+ updateLastAccessTime();
+
+ } else {
+ receivedNewWriteInternal(dfsClient, request, channel, xid,
+ asyncDataService, iug);
+ }
+ }
+
+ } finally {
+ unlockCtx();
+ }
+ }
+
+ private void receivedNewWriteInternal(DFSClient dfsClient,
+ WRITE3Request request, Channel channel, int xid,
+ AsyncDataService asyncDataService, IdUserGroup iug) {
+ long offset = request.getOffset();
+ int count = request.getCount();
+ WriteStableHow stableHow = request.getStableHow();
+
+ // Get file length, fail non-append call
+ WccAttr preOpAttr = latestAttr.getWccAttr();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("requesed offset=" + offset + " and current filesize="
+ + preOpAttr.getSize());
+ }
+
+ long nextOffset = getNextOffsetUnprotected();
+ if (offset == nextOffset) {
+ LOG.info("Add to the list, update nextOffset and notify the writer,"
+ + " nextOffset:" + nextOffset);
+ WriteCtx writeCtx = new WriteCtx(request.getHandle(),
+ request.getOffset(), request.getCount(), request.getStableHow(),
+ request.getData().array(), channel, xid, false, WriteCtx.NO_DUMP);
+ addWrite(writeCtx);
+
+ // Create an async task and change openFileCtx status to indicate async
+ // task pending
+ if (!asyncStatus) {
+ asyncStatus = true;
+ asyncDataService.execute(new AsyncDataService.WriteBackTask(this));
+ }
+
+ // Update the write time first
+ updateLastAccessTime();
+ Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr);
+
+ // Send response immediately for unstable write
+ if (request.getStableHow() == WriteStableHow.UNSTABLE) {
+ WccData fileWcc = new WccData(preOpAttr, postOpAttr);
+ WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
+ fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+ Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+ writeCtx.setReplied(true);
+ }
+
+ } else if (offset > nextOffset) {
+ LOG.info("Add new write to the list but not update nextOffset:"
+ + nextOffset);
+ WriteCtx writeCtx = new WriteCtx(request.getHandle(),
+ request.getOffset(), request.getCount(), request.getStableHow(),
+ request.getData().array(), channel, xid, false, WriteCtx.ALLOW_DUMP);
+ addWrite(writeCtx);
+
+ // Check if need to dump some pending requests to file
+ checkDump(request.getCount());
+ updateLastAccessTime();
+ Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr);
+
+ // In test, noticed some Linux client sends a batch (e.g., 1MB)
+ // of reordered writes and won't send more writes until it gets
+ // responses of the previous batch. So here send response immediately for
+ // unstable non-sequential write
+ if (request.getStableHow() == WriteStableHow.UNSTABLE) {
+ WccData fileWcc = new WccData(preOpAttr, postOpAttr);
+ WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
+ fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+ Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+ writeCtx.setReplied(true);
+ }
+
+ } else {
+ // offset < nextOffset
+ LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
+ + nextOffset + ")");
+ WccData wccData = new WccData(preOpAttr, null);
+ WRITE3Response response;
+
+ if (offset + count > nextOffset) {
+ LOG.warn("Haven't noticed any partial overwrite out of a sequential file"
+ + "write requests, so treat it as a real random write, no support.");
+ response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
+ WriteStableHow.UNSTABLE, 0);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Process perfectOverWrite");
+ }
+ response = processPerfectOverWrite(dfsClient, offset, count, stableHow,
+ request.getData().array(),
+ Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug);
+ }
+
+ updateLastAccessTime();
+ Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+ }
+ }
+
+ /**
+ * Honor 2 kinds of overwrites: 1). support some application like touch(write
+ * the same content back to change mtime), 2) client somehow sends the same
+ * write again in a different RPC.
+ */
+ private WRITE3Response processPerfectOverWrite(DFSClient dfsClient,
+ long offset, int count, WriteStableHow stableHow, byte[] data,
+ String path, WccData wccData, IdUserGroup iug) {
+ assert (ctxLock.isLocked());
+ WRITE3Response response = null;
+
+ // Read the content back
+ byte[] readbuffer = new byte[count];
+
+ int readCount = 0;
+ FSDataInputStream fis = null;
+ try {
+ // Sync file data and length to avoid partial read failure
+ ((HdfsDataOutputStream) fos).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
+
+ fis = new FSDataInputStream(dfsClient.open(path));
+ readCount = fis.read(offset, readbuffer, 0, count);
+ if (readCount < count) {
+ LOG.error("Can't read back " + count + " bytes, partial read size:"
+ + readCount);
+ return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0,
+ stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+ }
+
+ } catch (IOException e) {
+ LOG.info("Read failed when processing possible perfect overwrite, path="
+ + path + " error:" + e);
+ return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0,
+ stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+ } finally {
+ IOUtils.cleanup(LOG, fis);
+ }
+
+ // Compare with the request
+ Comparator comparator = new Comparator();
+ if (comparator.compare(readbuffer, 0, readCount, data, 0, count) != 0) {
+ LOG.info("Perfect overwrite has different content");
+ response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
+ stableHow, 0);
+ } else {
+ LOG.info("Perfect overwrite has same content,"
+ + " updating the mtime, then return success");
+ Nfs3FileAttributes postOpAttr = null;
+ try {
+ dfsClient.setTimes(path, System.currentTimeMillis(), -1);
+ postOpAttr = Nfs3Utils.getFileAttr(dfsClient, path, iug);
+ } catch (IOException e) {
+ LOG.info("Got error when processing perfect overwrite, path=" + path
+ + " error:" + e);
+ return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
+ 0);
+ }
+
+ wccData.setPostOpAttr(postOpAttr);
+ response = new WRITE3Response(Nfs3Status.NFS3_OK, wccData, count,
+ stableHow, 0);
+ }
+ return response;
+ }
+
+ public final static int COMMIT_FINISHED = 0;
+ public final static int COMMIT_WAIT = 1;
+ public final static int COMMIT_INACTIVE_CTX = 2;
+ public final static int COMMIT_ERROR = 3;
+
+ /**
+ * return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
+ * COMMIT_INACTIVE_CTX, COMMIT_ERROR
+ */
+ public int checkCommit(long commitOffset) {
+ int ret = COMMIT_WAIT;
+
+ lockCtx();
+ try {
+ if (!activeState) {
+ ret = COMMIT_INACTIVE_CTX;
+ } else {
+ ret = checkCommitInternal(commitOffset);
+ }
+ } finally {
+ unlockCtx();
+ }
+ return ret;
+ }
+
+ private int checkCommitInternal(long commitOffset) {
+ if (commitOffset == 0) {
+ // Commit whole file
+ commitOffset = getNextOffsetUnprotected();
+ }
+
+ long flushed = getFlushedOffset();
+ LOG.info("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
+ if (flushed < commitOffset) {
+ // Keep stream active
+ updateLastAccessTime();
+ return COMMIT_WAIT;
+ }
+
+ int ret = COMMIT_WAIT;
+ try {
+ // Sync file data and length
+ ((HdfsDataOutputStream) fos).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
+ // Nothing to do for metadata since attr related change is pass-through
+ ret = COMMIT_FINISHED;
+ } catch (IOException e) {
+ LOG.error("Got stream error during data sync:" + e);
+ // Do nothing. Stream will be closed eventually by StreamMonitor.
+ ret = COMMIT_ERROR;
+ }
+
+ // Keep stream active
+ updateLastAccessTime();
+ return ret;
+ }
+
+ private void addWrite(WriteCtx writeCtx) {
+ assert (ctxLock.isLocked());
+ long offset = writeCtx.getOffset();
+ int count = writeCtx.getCount();
+ pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
+ }
+
+
+ /**
+ * Check stream status to decide if it should be closed
+ * @return true, remove stream; false, keep stream
+ */
+ public boolean streamCleanup(long fileId, long streamTimeout) {
+ if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) {
+ throw new InvalidParameterException("StreamTimeout" + streamTimeout
+ + "ms is less than MINIMIUM_STREAM_TIMEOUT "
+ + WriteManager.MINIMIUM_STREAM_TIMEOUT + "ms");
+ }
+
+ boolean flag = false;
+ if (!ctxLock.tryLock()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Another thread is working on it" + ctxLock.toString());
+ }
+ return flag;
+ }
+
+ try {
+ // Check the stream timeout
+ if (checkStreamTimeout(streamTimeout)) {
+ LOG.info("closing stream for fileId:" + fileId);
+ cleanup();
+ flag = true;
+ }
+ } finally {
+ unlockCtx();
+ }
+ return flag;
+ }
+
+ // Invoked by AsynDataService to do the write back
+ public void executeWriteBack() {
+ long nextOffset;
+ OffsetRange key;
+ WriteCtx writeCtx;
+
+ try {
+ // Don't lock OpenFileCtx for all writes to reduce the timeout of other
+ // client request to the same file
+ while (true) {
+ lockCtx();
+ if (!asyncStatus) {
+ // This should never happen. There should be only one thread working
+ // on one OpenFileCtx anytime.
+ LOG.fatal("The openFileCtx has false async status");
+ throw new RuntimeException("The openFileCtx has false async status");
+ }
+ // Any single write failure can change activeState to false, so do the
+ // check each loop.
+ if (pendingWrites.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The asyn write task has no pendding writes, fileId: "
+ + latestAttr.getFileId());
+ }
+ break;
+ }
+ if (!activeState) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The openFileCtx is not active anymore, fileId: "
+ + latestAttr.getFileId());
+ }
+ break;
+ }
+
+ // Get the next sequential write
+ nextOffset = getNextOffsetUnprotected();
+ key = pendingWrites.firstKey();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("key.getMin()=" + key.getMin() + " nextOffset="
+ + nextOffset);
+ }
+
+ if (key.getMin() > nextOffset) {
+ if (LOG.isDebugEnabled()) {
+ LOG.info("The next sequencial write has not arrived yet");
+ }
+ break;
+
+ } else if (key.getMin() < nextOffset && key.getMax() > nextOffset) {
+ // Can't handle overlapping write. Didn't see it in tests yet.
+ LOG.fatal("Got a overlapping write (" + key.getMin() + ","
+ + key.getMax() + "), nextOffset=" + nextOffset);
+ throw new RuntimeException("Got a overlapping write (" + key.getMin()
+ + "," + key.getMax() + "), nextOffset=" + nextOffset);
+
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Remove write(" + key.getMin() + "-" + key.getMax()
+ + ") from the list");
+ }
+ writeCtx = pendingWrites.remove(key);
+ // Do the write
+ doSingleWrite(writeCtx);
+ updateLastAccessTime();
+ }
+
+ unlockCtx();
+ }
+
+ } finally {
+ // Always reset the async status so another async task can be created
+ // for this file
+ asyncStatus = false;
+ if (ctxLock.isHeldByCurrentThread()) {
+ unlockCtx();
+ }
+ }
+ }
+
+ private void doSingleWrite(final WriteCtx writeCtx) {
+ assert(ctxLock.isLocked());
+ Channel channel = writeCtx.getChannel();
+ int xid = writeCtx.getXid();
+
+ long offset = writeCtx.getOffset();
+ int count = writeCtx.getCount();
+ WriteStableHow stableHow = writeCtx.getStableHow();
+ byte[] data = null;
+ try {
+ data = writeCtx.getData();
+ } catch (IOException e1) {
+ LOG.error("Failed to get request data offset:" + offset + " count:"
+ + count + " error:" + e1);
+ // Cleanup everything
+ cleanup();
+ return;
+ }
+ assert (data.length == count);
+
+ FileHandle handle = writeCtx.getHandle();
+ LOG.info("do write, fileId: " + handle.getFileId() + " offset: " + offset
+ + " length:" + count + " stableHow:" + stableHow.getValue());
+
+ try {
+ fos.write(data, 0, count);
+
+ if (fos.getPos() != (offset + count)) {
+ throw new IOException("output stream is out of sync, pos="
+ + fos.getPos() + " and nextOffset should be" + (offset + count));
+ }
+ nextOffset = fos.getPos();
+
+ // Reduce memory occupation size if request was allowed dumped
+ if (writeCtx.getDataState() == WriteCtx.ALLOW_DUMP) {
+ updateNonSequentialWriteInMemory(-count);
+ }
+
+ if (!writeCtx.getReplied()) {
+ WccAttr preOpAttr = latestAttr.getWccAttr();
+ WccData fileWcc = new WccData(preOpAttr, latestAttr);
+ WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
+ fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+ Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+ }
+
+ } catch (IOException e) {
+ LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
+ + offset + " and length " + data.length, e);
+ if (!writeCtx.getReplied()) {
+ WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
+ Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+ // Keep stream open. Either client retries or SteamMonitor closes it.
+ }
+
+ LOG.info("Clean up open file context for fileId: "
+ + latestAttr.getFileid());
+ cleanup();
+ }
+ }
+
+ private void cleanup() {
+ assert(ctxLock.isLocked());
+ activeState = false;
+
+ // Close stream
+ try {
+ if (fos != null) {
+ fos.close();
+ }
+ } catch (IOException e) {
+ LOG.info("Can't close stream for fileId:" + latestAttr.getFileid()
+ + ", error:" + e);
+ }
+
+ // Reply error for pending writes
+ LOG.info("There are " + pendingWrites.size() + " pending writes.");
+ WccAttr preOpAttr = latestAttr.getWccAttr();
+ while (!pendingWrites.isEmpty()) {
+ OffsetRange key = pendingWrites.firstKey();
+ LOG.info("Fail pending write: (" + key.getMin() + "," + key.getMax()
+ + "), nextOffset=" + getNextOffsetUnprotected());
+
+ WriteCtx writeCtx = pendingWrites.remove(key);
+ if (!writeCtx.getReplied()) {
+ WccData fileWcc = new WccData(preOpAttr, latestAttr);
+ WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
+ fileWcc, 0, writeCtx.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
+ Nfs3Utils.writeChannel(writeCtx.getChannel(),
+ response.send(new XDR(), writeCtx.getXid()));
+ }
+ }
+
+ // Cleanup dump file
+ if (dumpOut!=null){
+ try {
+ dumpOut.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ if (raf!=null) {
+ try {
+ raf.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ File dumpFile = new File(dumpFilePath);
+ if (dumpFile.delete()) {
+ LOG.error("Failed to delete dumpfile: "+ dumpFile);
+ }
+ }
+}
\ No newline at end of file