You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by br...@apache.org on 2013/08/28 19:23:55 UTC

svn commit: r1518292 - in /hadoop/common/trunk/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/

Author: brandonli
Date: Wed Aug 28 17:23:55 2013
New Revision: 1518292

URL: http://svn.apache.org/r1518292
Log:
HDFS-5078 Support file append in NFSv3 gateway to enable data streaming to HDFS. Contributed by Brandon Li

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1518292&r1=1518291&r2=1518292&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Wed Aug 28 17:23:55 2013
@@ -126,6 +126,9 @@ class OpenFileCtx {
     nonSequentialWriteInMemory = 0;
     this.dumpFilePath = dumpFilePath;  
     enabledDump = dumpFilePath == null ? false: true;
+    nextOffset = latestAttr.getSize();
+    assert(nextOffset == this.fos.getPos());
+
     ctxLock = new ReentrantLock(true);
   }
 
@@ -685,12 +688,14 @@ class OpenFileCtx {
 
     try {
       fos.write(data, 0, count);
-
-      if (fos.getPos() != (offset + count)) {
+      
+      long flushedOffset = getFlushedOffset();
+      if (flushedOffset != (offset + count)) {
         throw new IOException("output stream is out of sync, pos="
-            + fos.getPos() + " and nextOffset should be" + (offset + count));
+            + flushedOffset + " and nextOffset should be"
+            + (offset + count));
       }
-      nextOffset = fos.getPos();
+      nextOffset = flushedOffset;
 
       // Reduce memory occupation size if request was allowed dumped
       if (writeCtx.getDataState() == DataState.ALLOW_DUMP) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1518292&r1=1518291&r2=1518292&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Wed Aug 28 17:23:55 2013
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.Options;
@@ -123,7 +124,7 @@ public class RpcProgramNfs3 extends RpcP
 
   private final Configuration config = new Configuration();
   private final WriteManager writeManager;
-  private final IdUserGroup iug;// = new IdUserGroup();
+  private final IdUserGroup iug;
   private final DFSClientCache clientCache;
 
   private final NfsExports exports;
@@ -161,10 +162,14 @@ public class RpcProgramNfs3 extends RpcP
         DFSConfigKeys.DFS_REPLICATION_DEFAULT);
     blockSize = config.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
         DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
-    bufferSize = config.getInt("io.file.buffer.size", 4096);
-    
-    writeDumpDir = config.get("dfs.nfs3.dump.dir", "/tmp/.hdfs-nfs");    
-    boolean enableDump = config.getBoolean("dfs.nfs3.enableDump", true);
+    bufferSize = config.getInt(
+        CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
+        CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+    
+    writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY,
+        Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
+    boolean enableDump = config.getBoolean(Nfs3Constant.ENABLE_FILE_DUMP_KEY,
+        Nfs3Constant.ENABLE_FILE_DUMP_DEFAULT);
     if (!enableDump) {
       writeDumpDir = null;
     } else {
@@ -1112,6 +1117,7 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
 
+  @Override
   public SYMLINK3Response symlink(XDR xdr, RpcAuthSys authSys,
       InetAddress client) {
     return new SYMLINK3Response(Nfs3Status.NFS3ERR_NOTSUPP);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1518292&r1=1518291&r2=1518292&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java Wed Aug 28 17:23:55 2013
@@ -25,7 +25,9 @@ import java.util.concurrent.ConcurrentMa
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.nfs.NfsFileType;
 import org.apache.hadoop.nfs.nfs3.FileHandle;
 import org.apache.hadoop.nfs.nfs3.IdUserGroup;
@@ -48,6 +50,7 @@ import com.google.common.collect.Maps;
 public class WriteManager {
   public static final Log LOG = LogFactory.getLog(WriteManager.class);
 
+  private final Configuration config;
   private final IdUserGroup iug;
   private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps
       .newConcurrentMap();
@@ -76,6 +79,7 @@ public class WriteManager {
 
   WriteManager(IdUserGroup iug, final Configuration config) {
     this.iug = iug;
+    this.config = config;
     
     streamTimeout = config.getLong("dfs.nfs3.stream.timeout",
         DEFAULT_STREAM_TIMEOUT);
@@ -129,12 +133,41 @@ public class WriteManager {
     OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
     if (openFileCtx == null) {
       LOG.info("No opened stream for fileId:" + fileHandle.getFileId());
-      WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), preOpAttr);
-      WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
-          fileWcc, count, request.getStableHow(),
-          Nfs3Constant.WRITE_COMMIT_VERF);
-      Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
-      return;
+
+      String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle.getFileId());
+      HdfsDataOutputStream fos = null;
+      Nfs3FileAttributes latestAttr = null;
+      try {
+        int bufferSize = config.getInt(
+            CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
+            CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+        
+        fos = dfsClient.append(fileIdPath, bufferSize, null, null);
+
+        latestAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
+      } catch (IOException e) {
+        LOG.error("Can't apapend to file:" + fileIdPath + ", error:" + e);
+        if (fos != null) {
+          fos.close();
+        }
+        WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr),
+            preOpAttr);
+        WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
+            fileWcc, count, request.getStableHow(),
+            Nfs3Constant.WRITE_COMMIT_VERF);
+        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+        return;
+      }
+
+      // Add open stream
+      String writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY,
+          Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
+      openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
+          + fileHandle.getFileId());
+      addOpenFileStream(fileHandle, openFileCtx);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("opened stream for file:" + fileHandle.getFileId());
+      }
     }
 
     // Add write into the async job queue

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1518292&r1=1518291&r2=1518292&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Aug 28 17:23:55 2013
@@ -307,6 +307,9 @@ Release 2.1.1-beta - UNRELEASED
     HDFS-4947 Add NFS server export table to control export by hostname or
     IP range (Jing Zhao via brandonli)
 
+    HDFS-5078 Support file append in NFSv3 gateway to enable data streaming
+    to HDFS (brandonli)
+
   IMPROVEMENTS
 
     HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may