You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by br...@apache.org on 2013/08/29 22:33:25 UTC
svn commit: r1518813 - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project:
hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/
Author: brandonli
Date: Thu Aug 29 20:33:25 2013
New Revision: 1518813
URL: http://svn.apache.org/r1518813
Log:
HDFS-5078. Merging change r1518292 from trunk
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1518813&r1=1518812&r2=1518813&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Thu Aug 29 20:33:25 2013
@@ -126,6 +126,11 @@ class OpenFileCtx {
nonSequentialWriteInMemory = 0;
this.dumpFilePath = dumpFilePath;
enabledDump = dumpFilePath == null ? false: true;
+ nextOffset = latestAttr.getSize();
+ try {
+ assert(nextOffset == this.fos.getPos());
+ } catch (IOException e) {}
+
ctxLock = new ReentrantLock(true);
}
@@ -691,12 +696,14 @@ class OpenFileCtx {
try {
fos.write(data, 0, count);
-
- if (fos.getPos() != (offset + count)) {
+
+ long flushedOffset = getFlushedOffset();
+ if (flushedOffset != (offset + count)) {
throw new IOException("output stream is out of sync, pos="
- + fos.getPos() + " and nextOffset should be" + (offset + count));
+ + flushedOffset + " and nextOffset should be"
+ + (offset + count));
}
- nextOffset = fos.getPos();
+ nextOffset = flushedOffset;
// Reduce memory occupation size if request was allowed dumped
if (writeCtx.getDataState() == DataState.ALLOW_DUMP) {
@@ -778,4 +785,4 @@ class OpenFileCtx {
LOG.error("Failed to delete dumpfile: "+ dumpFile);
}
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1518813&r1=1518812&r2=1518813&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Thu Aug 29 20:33:25 2013
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Options;
@@ -123,7 +124,7 @@ public class RpcProgramNfs3 extends RpcP
private final Configuration config = new Configuration();
private final WriteManager writeManager;
- private final IdUserGroup iug;// = new IdUserGroup();
+ private final IdUserGroup iug;
private final DFSClientCache clientCache;
private final NfsExports exports;
@@ -161,10 +162,14 @@ public class RpcProgramNfs3 extends RpcP
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
blockSize = config.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
- bufferSize = config.getInt("io.file.buffer.size", 4096);
-
- writeDumpDir = config.get("dfs.nfs3.dump.dir", "/tmp/.hdfs-nfs");
- boolean enableDump = config.getBoolean("dfs.nfs3.enableDump", true);
+ bufferSize = config.getInt(
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+
+ writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY,
+ Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
+ boolean enableDump = config.getBoolean(Nfs3Constant.ENABLE_FILE_DUMP_KEY,
+ Nfs3Constant.ENABLE_FILE_DUMP_DEFAULT);
if (!enableDump) {
writeDumpDir = null;
} else {
@@ -1112,6 +1117,7 @@ public class RpcProgramNfs3 extends RpcP
}
}
+ @Override
public SYMLINK3Response symlink(XDR xdr, RpcAuthSys authSys,
InetAddress client) {
return new SYMLINK3Response(Nfs3Status.NFS3ERR_NOTSUPP);
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1518813&r1=1518812&r2=1518813&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java Thu Aug 29 20:33:25 2013
@@ -25,7 +25,9 @@ import java.util.concurrent.ConcurrentMa
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.nfs.NfsFileType;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
@@ -48,6 +50,7 @@ import com.google.common.collect.Maps;
public class WriteManager {
public static final Log LOG = LogFactory.getLog(WriteManager.class);
+ private final Configuration config;
private final IdUserGroup iug;
private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps
.newConcurrentMap();
@@ -76,6 +79,7 @@ public class WriteManager {
WriteManager(IdUserGroup iug, final Configuration config) {
this.iug = iug;
+ this.config = config;
streamTimeout = config.getLong("dfs.nfs3.stream.timeout",
DEFAULT_STREAM_TIMEOUT);
@@ -129,12 +133,41 @@ public class WriteManager {
OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
if (openFileCtx == null) {
LOG.info("No opened stream for fileId:" + fileHandle.getFileId());
- WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), preOpAttr);
- WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
- fileWcc, count, request.getStableHow(),
- Nfs3Constant.WRITE_COMMIT_VERF);
- Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
- return;
+
+ String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle.getFileId());
+ HdfsDataOutputStream fos = null;
+ Nfs3FileAttributes latestAttr = null;
+ try {
+ int bufferSize = config.getInt(
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+
+ fos = dfsClient.append(fileIdPath, bufferSize, null, null);
+
+ latestAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
+ } catch (IOException e) {
+ LOG.error("Can't apapend to file:" + fileIdPath + ", error:" + e);
+ if (fos != null) {
+ fos.close();
+ }
+ WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr),
+ preOpAttr);
+ WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
+ fileWcc, count, request.getStableHow(),
+ Nfs3Constant.WRITE_COMMIT_VERF);
+ Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+ return;
+ }
+
+ // Add open stream
+ String writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY,
+ Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
+ openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
+ + fileHandle.getFileId());
+ addOpenFileStream(fileHandle, openFileCtx);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("opened stream for file:" + fileHandle.getFileId());
+ }
}
// Add write into the async job queue
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1518813&r1=1518812&r2=1518813&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Aug 29 20:33:25 2013
@@ -81,6 +81,9 @@ Release 2.1.1-beta - UNRELEASED
HDFS-4947 Add NFS server export table to control export by hostname or
IP range (Jing Zhao via brandonli)
+ HDFS-5078 Support file append in NFSv3 gateway to enable data streaming
+ to HDFS (brandonli)
+
IMPROVEMENTS
HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may