You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by wa...@apache.org on 2013/10/17 07:33:01 UTC
svn commit: r1532967 [1/2] - in
/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project:
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/
hadoop-hdfs-nfs/
hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/
hadoop-hdfs-nfs/src/m...
Author: wang
Date: Thu Oct 17 05:32:42 2013
New Revision: 1532967
URL: http://svn.apache.org/r1532967
Log:
merge the rest of trunk to branch HDFS-4949
Added:
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestReaddir.java
- copied unchanged from r1532945, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestReaddir.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
- copied unchanged from r1532945, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/resources/
- copied from r1532945, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/resources/
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/resources/core-site.xml
- copied unchanged from r1532945, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/resources/core-site.xml
Removed:
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/LruCache.java
Modified:
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Server.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestPortmapRegister.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestUdpServer.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Server.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Server.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/Server.java Thu Oct 17 05:32:42 2013
@@ -418,7 +418,11 @@ public class Server {
Properties props = new Properties();
try {
InputStream is = getResource(DEFAULT_LOG4J_PROPERTIES);
- props.load(is);
+ try {
+ props.load(is);
+ } finally {
+ is.close();
+ }
} catch (IOException ex) {
throw new ServerException(ServerException.ERROR.S03, DEFAULT_LOG4J_PROPERTIES, ex.getMessage(), ex);
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml Thu Oct 17 05:32:42 2013
@@ -49,7 +49,6 @@ http://maven.apache.org/xsd/maven-4.0.0.
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
- <version>3.6.2.Final</version>
<scope>compile</scope>
</dependency>
<dependency>
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java Thu Oct 17 05:32:42 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.nfs.mount
import java.io.IOException;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -38,9 +39,15 @@ import org.apache.hadoop.nfs.nfs3.FileHa
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcInfo;
import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.RpcResponse;
+import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.oncrpc.XDR;
-import org.jboss.netty.channel.Channel;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
/**
* RPC program corresponding to mountd daemon. See {@link Mountd}.
@@ -75,7 +82,8 @@ public class RpcProgramMountd extends Rp
public RpcProgramMountd(List<String> exports, Configuration config)
throws IOException {
// Note that RPC cache is not enabled
- super("mountd", "localhost", PORT, PROGRAM, VERSION_1, VERSION_3, 0);
+ super("mountd", "localhost", config.getInt("nfs3.mountd.port", PORT),
+ PROGRAM, VERSION_1, VERSION_3);
this.hostsMatcher = NfsExports.getInstance(config);
this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>());
@@ -88,7 +96,8 @@ public class RpcProgramMountd extends Rp
if (LOG.isDebugEnabled()) {
LOG.debug("MOUNT NULLOP : " + " client: " + client);
}
- return RpcAcceptedReply.voidReply(out, xid);
+ return RpcAcceptedReply.getAcceptInstance(xid, new VerifierNone()).write(
+ out);
}
@Override
@@ -155,7 +164,7 @@ public class RpcProgramMountd extends Rp
String host = client.getHostName();
mounts.remove(new MountEntry(host, path));
- RpcAcceptedReply.voidReply(out, xid);
+ RpcAcceptedReply.getAcceptInstance(xid, new VerifierNone()).write(out);
return out;
}
@@ -165,14 +174,21 @@ public class RpcProgramMountd extends Rp
LOG.debug("MOUNT UMNTALL : " + " client: " + client);
}
mounts.clear();
- return RpcAcceptedReply.voidReply(out, xid);
+ return RpcAcceptedReply.getAcceptInstance(xid, new VerifierNone()).write(
+ out);
}
@Override
- public XDR handleInternal(RpcCall rpcCall, XDR xdr, XDR out,
- InetAddress client, Channel channel) {
+ public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+ RpcCall rpcCall = (RpcCall) info.header();
final MNTPROC mntproc = MNTPROC.fromValue(rpcCall.getProcedure());
int xid = rpcCall.getXid();
+ byte[] data = new byte[info.data().readableBytes()];
+ info.data().readBytes(data);
+ XDR xdr = new XDR(data);
+ XDR out = new XDR();
+ InetAddress client = ((InetSocketAddress) info.remoteAddress()).getAddress();
+
if (mntproc == MNTPROC.NULL) {
out = nullOp(out, xid, client);
} else if (mntproc == MNTPROC.MNT) {
@@ -190,10 +206,13 @@ public class RpcProgramMountd extends Rp
out = MountResponse.writeExportList(out, xid, exports, hostsMatchers);
} else {
// Invalid procedure
- RpcAcceptedReply.voidReply(out, xid,
- RpcAcceptedReply.AcceptState.PROC_UNAVAIL);
+ RpcAcceptedReply.getInstance(xid,
+ RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
+ out);
}
- return out;
+ ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+ RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+ RpcUtil.sendRpcResponse(ctx, rsp);
}
@Override
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java Thu Oct 17 05:32:42 2013
@@ -97,7 +97,7 @@ public class AsyncDataService {
void writeAsync(OpenFileCtx openFileCtx) {
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling write back task for fileId: "
- + openFileCtx.copyLatestAttr().getFileId());
+ + openFileCtx.getLatestAttr().getFileId());
}
WriteBackTask wbTask = new WriteBackTask(openFileCtx);
execute(wbTask);
@@ -125,7 +125,7 @@ public class AsyncDataService {
public String toString() {
// Called in AsyncDataService.execute for displaying error messages.
return "write back data for fileId"
- + openFileCtx.copyLatestAttr().getFileId() + " with nextOffset "
+ + openFileCtx.getLatestAttr().getFileId() + " with nextOffset "
+ openFileCtx.getNextOffset();
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java Thu Oct 17 05:32:42 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.ExecutionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -27,59 +28,81 @@ import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.security.UserGroupInformation;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
/**
* A cache saves DFSClient objects for different users
*/
-public class DFSClientCache {
- static final Log LOG = LogFactory.getLog(DFSClientCache.class);
- private final LruCache<String, DFSClient> lruTable;
+class DFSClientCache {
+ private static final Log LOG = LogFactory.getLog(DFSClientCache.class);
+ /**
+ * Cache that maps User id to corresponding DFSClient.
+ */
+ @VisibleForTesting
+ final LoadingCache<String, DFSClient> clientCache;
+
+ final static int DEFAULT_DFS_CLIENT_CACHE_SIZE = 256;
+
private final Configuration config;
- public DFSClientCache(Configuration config) {
- // By default, keep 256 DFSClient instance for 256 active users
- this(config, 256);
+ DFSClientCache(Configuration config) {
+ this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE);
}
- public DFSClientCache(Configuration config, int size) {
- lruTable = new LruCache<String, DFSClient>(size);
+ DFSClientCache(Configuration config, int clientCache) {
this.config = config;
+ this.clientCache = CacheBuilder.newBuilder()
+ .maximumSize(clientCache)
+ .removalListener(clientRemovealListener())
+ .build(clientLoader());
+ }
+
+ private CacheLoader<String, DFSClient> clientLoader() {
+ return new CacheLoader<String, DFSClient>() {
+ @Override
+ public DFSClient load(String userName) throws Exception {
+ UserGroupInformation ugi = UserGroupInformation
+ .createRemoteUser(userName);
+
+ // Guava requires CacheLoader never returns null.
+ return ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
+ public DFSClient run() throws IOException {
+ return new DFSClient(NameNode.getAddress(config), config);
+ }
+ });
+ }
+ };
+ }
+
+ private RemovalListener<String, DFSClient> clientRemovealListener() {
+ return new RemovalListener<String, DFSClient>() {
+ @Override
+ public void onRemoval(RemovalNotification<String, DFSClient> notification) {
+ DFSClient client = notification.getValue();
+ try {
+ client.close();
+ } catch (IOException e) {
+ LOG.warn(String.format(
+ "IOException when closing the DFSClient(%s), cause: %s", client,
+ e));
+ }
+ }
+ };
}
- public void put(String uname, DFSClient client) {
- lruTable.put(uname, client);
- }
-
- synchronized public DFSClient get(String uname) {
- DFSClient client = lruTable.get(uname);
- if (client != null) {
- return client;
- }
-
- // Not in table, create one.
+ DFSClient get(String userName) {
+ DFSClient client = null;
try {
- UserGroupInformation ugi = UserGroupInformation.createRemoteUser(uname);
- client = ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
- public DFSClient run() throws IOException {
- return new DFSClient(NameNode.getAddress(config), config);
- }
- });
- } catch (IOException e) {
- LOG.error("Create DFSClient failed for user:" + uname);
- e.printStackTrace();
-
- } catch (InterruptedException e) {
- e.printStackTrace();
+ client = clientCache.get(userName);
+ } catch (ExecutionException e) {
+ LOG.error("Failed to create DFSClient for user:" + userName + " Cause:"
+ + e);
}
- // Add new entry
- lruTable.put(uname, client);
return client;
}
-
- public int usedSize() {
- return lruTable.usedSize();
- }
-
- public boolean containsKey(String key) {
- return lruTable.containsKey(key);
- }
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java Thu Oct 17 05:32:42 2013
@@ -42,7 +42,7 @@ public class Nfs3 extends Nfs3Base {
}
public Nfs3(List<String> exports, Configuration config) throws IOException {
- super(new Mountd(exports, config), new RpcProgramNfs3(config));
+ super(new Mountd(exports, config), new RpcProgramNfs3(config), config);
}
public static void main(String[] args) throws IOException {
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java Thu Oct 17 05:32:42 2013
@@ -39,6 +39,12 @@ import org.jboss.netty.channel.Channel;
public class Nfs3Utils {
public final static String INODEID_PATH_PREFIX = "/.reserved/.inodes/";
+
+ public final static String READ_RPC_START = "READ_RPC_CALL_START____";
+ public final static String READ_RPC_END = "READ_RPC_CALL_END______";
+ public final static String WRITE_RPC_START = "WRITE_RPC_CALL_START____";
+ public final static String WRITE_RPC_END = "WRITE_RPC_CALL_END______";
+
public static String getFileIdPath(FileHandle handle) {
return getFileIdPath(handle.getFileId());
}
@@ -49,7 +55,7 @@ public class Nfs3Utils {
public static HdfsFileStatus getFileStatus(DFSClient client, String fileIdPath)
throws IOException {
- return client.getFileInfo(fileIdPath);
+ return client.getFileLinkInfo(fileIdPath);
}
public static Nfs3FileAttributes getNfs3FileAttrFromFileStatus(
@@ -59,7 +65,10 @@ public class Nfs3Utils {
* client takes only the lower 32bit of the fileId and treats it as signed
* int. When the 32th bit is 1, the client considers it invalid.
*/
- return new Nfs3FileAttributes(fs.isDir(), fs.getChildrenNum(), fs
+ NfsFileType fileType = fs.isDir() ? NfsFileType.NFSDIR : NfsFileType.NFSREG;
+ fileType = fs.isSymlink() ? NfsFileType.NFSLNK : fileType;
+
+ return new Nfs3FileAttributes(fileType, fs.getChildrenNum(), fs
.getPermission().toShort(), iug.getUidAllowingUnknown(fs.getOwner()),
iug.getGidAllowingUnknown(fs.getGroup()), fs.getLen(), 0 /* fsid */,
fs.getFileId(), fs.getModificationTime(), fs.getAccessTime());
@@ -99,7 +108,18 @@ public class Nfs3Utils {
/**
* Send a write response to the netty network socket channel
*/
- public static void writeChannel(Channel channel, XDR out) {
+ public static void writeChannel(Channel channel, XDR out, int xid) {
+ if (RpcProgramNfs3.LOG.isDebugEnabled()) {
+ RpcProgramNfs3.LOG.debug(WRITE_RPC_END + xid);
+ }
+ ChannelBuffer outBuf = XDR.writeMessageTcp(out, true);
+ channel.write(outBuf);
+ }
+
+ public static void writeChannelCommit(Channel channel, XDR out, int xid) {
+ if (RpcProgramNfs3.LOG.isDebugEnabled()) {
+ RpcProgramNfs3.LOG.debug("Commit done:" + xid);
+ }
ChannelBuffer outBuf = XDR.writeMessageTcp(out, true);
channel.write(outBuf);
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java Thu Oct 17 05:32:42 2013
@@ -17,19 +17,34 @@
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
+import java.util.Comparator;
+
+import com.google.common.base.Preconditions;
+
/**
* OffsetRange is the range of read/write request. A single point (e.g.,[5,5])
* is not a valid range.
*/
-public class OffsetRange implements Comparable<OffsetRange> {
+public class OffsetRange {
+
+ public static final Comparator<OffsetRange> ReverseComparatorOnMin =
+ new Comparator<OffsetRange>() {
+ @Override
+ public int compare(OffsetRange o1, OffsetRange o2) {
+ if (o1.getMin() == o2.getMin()) {
+ return o1.getMax() < o2.getMax() ?
+ 1 : (o1.getMax() > o2.getMax() ? -1 : 0);
+ } else {
+ return o1.getMin() < o2.getMin() ? 1 : -1;
+ }
+ }
+ };
+
private final long min;
private final long max;
OffsetRange(long min, long max) {
- if ((min >= max) || (min < 0) || (max < 0)) {
- throw new IllegalArgumentException("Wrong offset range: (" + min + ","
- + max + ")");
- }
+ Preconditions.checkArgument(min >= 0 && max >= 0 && min < max);
this.min = min;
this.max = max;
}
@@ -49,24 +64,10 @@ public class OffsetRange implements Comp
@Override
public boolean equals(Object o) {
- assert (o instanceof OffsetRange);
- OffsetRange range = (OffsetRange) o;
- return (min == range.getMin()) && (max == range.getMax());
- }
-
- private static int compareTo(long left, long right) {
- if (left < right) {
- return -1;
- } else if (left > right) {
- return 1;
- } else {
- return 0;
+ if (o instanceof OffsetRange) {
+ OffsetRange range = (OffsetRange) o;
+ return (min == range.getMin()) && (max == range.getMax());
}
- }
-
- @Override
- public int compareTo(OffsetRange other) {
- final int d = compareTo(min, other.getMin());
- return d != 0 ? d : compareTo(max, other.getMax());
+ return false;
}
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1532967&r1=1532966&r2=1532967&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Thu Oct 17 05:32:42 2013
@@ -22,12 +22,15 @@ import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
import java.security.InvalidParameterException;
import java.util.EnumSet;
import java.util.Iterator;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,12 +48,18 @@ import org.apache.hadoop.nfs.nfs3.Nfs3Co
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
+import org.apache.hadoop.nfs.nfs3.response.COMMIT3Response;
import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
import org.apache.hadoop.nfs.nfs3.response.WccAttr;
import org.apache.hadoop.nfs.nfs3.response.WccData;
import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
+import org.apache.hadoop.util.Daemon;
import org.jboss.netty.channel.Channel;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
/**
* OpenFileCtx saves the context of one HDFS file output stream. Access to it is
* synchronized by its member lock.
@@ -58,34 +67,95 @@ import org.jboss.netty.channel.Channel;
class OpenFileCtx {
public static final Log LOG = LogFactory.getLog(OpenFileCtx.class);
- /**
- * Lock to synchronize OpenFileCtx changes. Thread should get this lock before
- * any read/write operation to an OpenFileCtx object
- */
- private final ReentrantLock ctxLock;
+ // Pending writes water mark for dump, 1MB
+ private static long DUMP_WRITE_WATER_MARK = 1024 * 1024;
+
+ static enum COMMIT_STATUS {
+ COMMIT_FINISHED,
+ COMMIT_WAIT,
+ COMMIT_INACTIVE_CTX,
+ COMMIT_INACTIVE_WITH_PENDING_WRITE,
+ COMMIT_ERROR,
+ COMMIT_DO_SYNC;
+ }
+ private final DFSClient client;
+ private final IdUserGroup iug;
+
// The stream status. False means the stream is closed.
- private boolean activeState;
+ private volatile boolean activeState;
// The stream write-back status. True means one thread is doing write back.
- private boolean asyncStatus;
+ private volatile boolean asyncStatus;
+ /**
+ * The current offset of the file in HDFS. All the content before this offset
+ * has been written back to HDFS.
+ */
+ private AtomicLong nextOffset;
private final HdfsDataOutputStream fos;
- private final Nfs3FileAttributes latestAttr;
- private long nextOffset;
+
+ // It's updated after each sync to HDFS
+ private Nfs3FileAttributes latestAttr;
- private final SortedMap<OffsetRange, WriteCtx> pendingWrites;
+ private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites;
+
+ private final ConcurrentNavigableMap<Long, CommitCtx> pendingCommits;
+
+ static class CommitCtx {
+ private final long offset;
+ private final Channel channel;
+ private final int xid;
+ private final Nfs3FileAttributes preOpAttr;
+
+ // Remember time for debug purpose
+ private final long startTime;
+
+ long getOffset() {
+ return offset;
+ }
+
+ Channel getChannel() {
+ return channel;
+ }
+
+ int getXid() {
+ return xid;
+ }
+
+ Nfs3FileAttributes getPreOpAttr() {
+ return preOpAttr;
+ }
+
+ long getStartTime() {
+ return startTime;
+ }
+
+ CommitCtx(long offset, Channel channel, int xid,
+ Nfs3FileAttributes preOpAttr) {
+ this.offset = offset;
+ this.channel = channel;
+ this.xid = xid;
+ this.preOpAttr = preOpAttr;
+ this.startTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("offset: %d xid: %d startTime: %d", offset, xid,
+ startTime);
+ }
+ }
// The last write, commit request or write-back event. Updating time to keep
// output steam alive.
private long lastAccessTime;
- // Pending writes water mark for dump, 1MB
- private static int DUMP_WRITE_WATER_MARK = 1024 * 1024;
+ private volatile boolean enabledDump;
private FileOutputStream dumpOut;
- private long nonSequentialWriteInMemory;
- private boolean enabledDump;
+ private AtomicLong nonSequentialWriteInMemory;
private RandomAccessFile raf;
private final String dumpFilePath;
+ private Daemon dumpThread;
private void updateLastAccessTime() {
lastAccessTime = System.currentTimeMillis();
@@ -95,89 +165,55 @@ class OpenFileCtx {
return System.currentTimeMillis() - lastAccessTime > streamTimeout;
}
+ public long getNextOffset() {
+ return nextOffset.get();
+ }
+
// Increase or decrease the memory occupation of non-sequential writes
private long updateNonSequentialWriteInMemory(long count) {
- nonSequentialWriteInMemory += count;
+ long newValue = nonSequentialWriteInMemory.addAndGet(count);
if (LOG.isDebugEnabled()) {
LOG.debug("Update nonSequentialWriteInMemory by " + count + " new value:"
- + nonSequentialWriteInMemory);
+ + newValue);
}
- if (nonSequentialWriteInMemory < 0) {
- LOG.error("nonSequentialWriteInMemory is negative after update with count "
- + count);
- throw new IllegalArgumentException(
- "nonSequentialWriteInMemory is negative after update with count "
- + count);
- }
- return nonSequentialWriteInMemory;
+ Preconditions.checkState(newValue >= 0,
+ "nonSequentialWriteInMemory is negative after update with count "
+ + count);
+ return newValue;
}
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
- String dumpFilePath) {
+ String dumpFilePath, DFSClient client, IdUserGroup iug) {
this.fos = fos;
this.latestAttr = latestAttr;
- pendingWrites = new TreeMap<OffsetRange, WriteCtx>();
+ // We use the ReverseComparatorOnMin as the comparator of the map. In this
+ // way, we first dump the data with larger offset. In the meanwhile, we
+ // retrieve the last element to write back to HDFS.
+ pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>(
+ OffsetRange.ReverseComparatorOnMin);
+
+ pendingCommits = new ConcurrentSkipListMap<Long, CommitCtx>();
+
updateLastAccessTime();
activeState = true;
asyncStatus = false;
dumpOut = null;
raf = null;
- nonSequentialWriteInMemory = 0;
+ nonSequentialWriteInMemory = new AtomicLong(0);
+
this.dumpFilePath = dumpFilePath;
enabledDump = dumpFilePath == null ? false: true;
- nextOffset = latestAttr.getSize();
- assert(nextOffset == this.fos.getPos());
-
- ctxLock = new ReentrantLock(true);
- }
-
- private void lockCtx() {
- if (LOG.isTraceEnabled()) {
- StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
- StackTraceElement e = stacktrace[2];
- String methodName = e.getMethodName();
- LOG.trace("lock ctx, caller:" + methodName);
- }
- ctxLock.lock();
+ nextOffset = new AtomicLong();
+ nextOffset.set(latestAttr.getSize());
+ assert(nextOffset.get() == this.fos.getPos());
+ dumpThread = null;
+ this.client = client;
+ this.iug = iug;
}
- private void unlockCtx() {
- ctxLock.unlock();
- if (LOG.isTraceEnabled()) {
- StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
- StackTraceElement e = stacktrace[2];
- String methodName = e.getMethodName();
- LOG.info("unlock ctx, caller:" + methodName);
- }
- }
-
- // Make a copy of the latestAttr
- public Nfs3FileAttributes copyLatestAttr() {
- Nfs3FileAttributes ret;
- lockCtx();
- try {
- ret = new Nfs3FileAttributes(latestAttr);
- } finally {
- unlockCtx();
- }
- return ret;
- }
-
- private long getNextOffsetUnprotected() {
- assert(ctxLock.isLocked());
- return nextOffset;
- }
-
- public long getNextOffset() {
- long ret;
- lockCtx();
- try {
- ret = getNextOffsetUnprotected();
- } finally {
- unlockCtx();
- }
- return ret;
+ public Nfs3FileAttributes getLatestAttr() {
+ return latestAttr;
}
// Get flushed offset. Note that flushed data may not be persisted.
@@ -186,12 +222,7 @@ class OpenFileCtx {
}
// Check if need to dump the new writes
- private void checkDump(long count) {
- assert (ctxLock.isLocked());
-
- // Always update the in memory count
- updateNonSequentialWriteInMemory(count);
-
+ private void checkDump() {
if (!enabledDump) {
if (LOG.isDebugEnabled()) {
LOG.debug("Do nothing, dump is disabled.");
@@ -199,66 +230,129 @@ class OpenFileCtx {
return;
}
- if (nonSequentialWriteInMemory < DUMP_WRITE_WATER_MARK) {
+ if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
return;
}
- // Create dump outputstream for the first time
- if (dumpOut == null) {
- LOG.info("Create dump file:" + dumpFilePath);
- File dumpFile = new File(dumpFilePath);
- try {
- if (dumpFile.exists()) {
- LOG.fatal("The dump file should not exist:" + dumpFilePath);
- throw new RuntimeException("The dump file should not exist:"
- + dumpFilePath);
+ // wake up the dumper thread to dump the data
+ synchronized (this) {
+ if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Asking dumper to dump...");
+ }
+ if (dumpThread == null) {
+ dumpThread = new Daemon(new Dumper());
+ dumpThread.start();
+ } else {
+ this.notifyAll();
}
- dumpOut = new FileOutputStream(dumpFile);
- } catch (IOException e) {
- LOG.error("Got failure when creating dump stream " + dumpFilePath
- + " with error:" + e);
- enabledDump = false;
- IOUtils.cleanup(LOG, dumpOut);
- return;
}
}
- // Get raf for the first dump
- if (raf == null) {
- try {
- raf = new RandomAccessFile(dumpFilePath, "r");
- } catch (FileNotFoundException e) {
- LOG.error("Can't get random access to file " + dumpFilePath);
- // Disable dump
- enabledDump = false;
- return;
+ }
+
+ class Dumper implements Runnable {
+ /** Dump data into a file */
+ private void dump() {
+ // Create dump outputstream for the first time
+ if (dumpOut == null) {
+ LOG.info("Create dump file:" + dumpFilePath);
+ File dumpFile = new File(dumpFilePath);
+ try {
+ synchronized (this) {
+ // check if alive again
+ Preconditions.checkState(dumpFile.createNewFile(),
+ "The dump file should not exist: %s", dumpFilePath);
+ dumpOut = new FileOutputStream(dumpFile);
+ }
+ } catch (IOException e) {
+ LOG.error("Got failure when creating dump stream " + dumpFilePath, e);
+ enabledDump = false;
+ if (dumpOut != null) {
+ try {
+ dumpOut.close();
+ } catch (IOException e1) {
+ LOG.error("Can't close dump stream " + dumpFilePath, e);
+ }
+ }
+ return;
+ }
}
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Start dump, current write number:" + pendingWrites.size());
- }
- Iterator<OffsetRange> it = pendingWrites.keySet().iterator();
- while (it.hasNext()) {
- OffsetRange key = it.next();
- WriteCtx writeCtx = pendingWrites.get(key);
- try {
- long dumpedDataSize = writeCtx.dumpData(dumpOut, raf);
- if (dumpedDataSize > 0) {
- updateNonSequentialWriteInMemory(-dumpedDataSize);
+
+ // Get raf for the first dump
+ if (raf == null) {
+ try {
+ raf = new RandomAccessFile(dumpFilePath, "r");
+ } catch (FileNotFoundException e) {
+ LOG.error("Can't get random access to file " + dumpFilePath);
+ // Disable dump
+ enabledDump = false;
+ return;
}
- } catch (IOException e) {
- LOG.error("Dump data failed:" + writeCtx + " with error:" + e);
- // Disable dump
- enabledDump = false;
- return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Start dump. Before dump, nonSequentialWriteInMemory == "
+ + nonSequentialWriteInMemory.get());
+ }
+
+ Iterator<OffsetRange> it = pendingWrites.keySet().iterator();
+ while (activeState && it.hasNext()
+ && nonSequentialWriteInMemory.get() > 0) {
+ OffsetRange key = it.next();
+ WriteCtx writeCtx = pendingWrites.get(key);
+ if (writeCtx == null) {
+ // This write was just deleted
+ continue;
+ }
+ try {
+ long dumpedDataSize = writeCtx.dumpData(dumpOut, raf);
+ if (dumpedDataSize > 0) {
+ updateNonSequentialWriteInMemory(-dumpedDataSize);
+ }
+ } catch (IOException e) {
+ LOG.error("Dump data failed:" + writeCtx + " with error:" + e
+ + " OpenFileCtx state:" + activeState);
+ // Disable dump
+ enabledDump = false;
+ return;
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("After dump, nonSequentialWriteInMemory == "
+ + nonSequentialWriteInMemory.get());
}
}
- if (nonSequentialWriteInMemory != 0) {
- LOG.fatal("After dump, nonSequentialWriteInMemory is not zero: "
- + nonSequentialWriteInMemory);
- throw new RuntimeException(
- "After dump, nonSequentialWriteInMemory is not zero: "
- + nonSequentialWriteInMemory);
+
+ @Override
+ public void run() {
+ while (activeState && enabledDump) {
+ try {
+ if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
+ dump();
+ }
+ synchronized (OpenFileCtx.this) {
+ if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
+ try {
+ OpenFileCtx.this.wait();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Dumper woke up");
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Dumper is interrupted, dumpFilePath= "
+ + OpenFileCtx.this.dumpFilePath);
+ }
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Dumper checking OpenFileCtx activeState: " + activeState
+ + " enabledDump: " + enabledDump);
+ }
+ } catch (Throwable t) {
+ LOG.info("Dumper get Throwable: " + t + ". dumpFilePath: "
+ + OpenFileCtx.this.dumpFilePath);
+ }
+ }
}
}
@@ -282,143 +376,252 @@ class OpenFileCtx {
public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request,
Channel channel, int xid, AsyncDataService asyncDataService,
IdUserGroup iug) {
-
- lockCtx();
- try {
- if (!activeState) {
- LOG.info("OpenFileCtx is inactive, fileId:"
- + request.getHandle().getFileId());
- WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
- WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
- fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
- Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
- } else {
- // Handle repeated write requests(same xid or not).
- // If already replied, send reply again. If not replied, drop the
- // repeated request.
- WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel,
- xid);
- if (existantWriteCtx != null) {
- if (!existantWriteCtx.getReplied()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Repeated write request which hasn't be served: xid="
- + xid + ", drop it.");
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Repeated write request which is already served: xid="
- + xid + ", resend response.");
- }
- WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
- WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
- fileWcc, request.getCount(), request.getStableHow(),
- Nfs3Constant.WRITE_COMMIT_VERF);
- Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+
+ if (!activeState) {
+ LOG.info("OpenFileCtx is inactive, fileId:"
+ + request.getHandle().getFileId());
+ WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
+ WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
+ fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
+ Nfs3Utils.writeChannel(channel,
+ response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
+ xid);
+ } else {
+ // Update the write time first
+ updateLastAccessTime();
+
+ // Handle repeated write requests (same xid or not).
+ // If already replied, send reply again. If not replied, drop the
+ // repeated request.
+ WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel,
+ xid);
+ if (existantWriteCtx != null) {
+ if (!existantWriteCtx.getReplied()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Repeated write request which hasn't be served: xid="
+ + xid + ", drop it.");
}
- updateLastAccessTime();
-
} else {
- receivedNewWriteInternal(dfsClient, request, channel, xid,
- asyncDataService, iug);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Repeated write request which is already served: xid="
+ + xid + ", resend response.");
+ }
+ WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
+ WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
+ fileWcc, request.getCount(), request.getStableHow(),
+ Nfs3Constant.WRITE_COMMIT_VERF);
+ Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
+ new XDR(), xid, new VerifierNone()), xid);
}
+ } else {
+ // not a repeated write request
+ receivedNewWriteInternal(dfsClient, request, channel, xid,
+ asyncDataService, iug);
}
-
- } finally {
- unlockCtx();
}
}
- private void receivedNewWriteInternal(DFSClient dfsClient,
- WRITE3Request request, Channel channel, int xid,
- AsyncDataService asyncDataService, IdUserGroup iug) {
+ @VisibleForTesting
+ public static void alterWriteRequest(WRITE3Request request, long cachedOffset) {
long offset = request.getOffset();
int count = request.getCount();
- WriteStableHow stableHow = request.getStableHow();
-
- // Get file length, fail non-append call
- WccAttr preOpAttr = latestAttr.getWccAttr();
+ long smallerCount = offset + count - cachedOffset;
if (LOG.isDebugEnabled()) {
- LOG.debug("requesed offset=" + offset + " and current filesize="
- + preOpAttr.getSize());
+ LOG.debug(String.format("Got overwrite with appended data (%d-%d),"
+ + " current offset %d," + " drop the overlapped section (%d-%d)"
+ + " and append new data (%d-%d).", offset, (offset + count - 1),
+ cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
+ + count - 1)));
+ }
+
+ ByteBuffer data = request.getData();
+ Preconditions.checkState(data.position() == 0,
+ "The write request data has non-zero position");
+ data.position((int) (cachedOffset - offset));
+ Preconditions.checkState(data.limit() - data.position() == smallerCount,
+ "The write request buffer has wrong limit/position regarding count");
+
+ request.setOffset(cachedOffset);
+ request.setCount((int) smallerCount);
+ }
+
+ /**
+ * Creates and adds a WriteCtx into the pendingWrites map. This is a
+ * synchronized method to handle concurrent writes.
+ *
+ * @return A non-null {@link WriteCtx} instance if the incoming write
+ * request's offset >= nextOffset. Otherwise null.
+ */
+ private synchronized WriteCtx addWritesToCache(WRITE3Request request,
+ Channel channel, int xid) {
+ long offset = request.getOffset();
+ int count = request.getCount();
+ long cachedOffset = nextOffset.get();
+ int originalCount = WriteCtx.INVALID_ORIGINAL_COUNT;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("requesed offset=" + offset + " and current offset="
+ + cachedOffset);
}
- long nextOffset = getNextOffsetUnprotected();
- if (offset == nextOffset) {
- LOG.info("Add to the list, update nextOffset and notify the writer,"
- + " nextOffset:" + nextOffset);
- WriteCtx writeCtx = new WriteCtx(request.getHandle(),
- request.getOffset(), request.getCount(), request.getStableHow(),
- request.getData().array(), channel, xid, false, DataState.NO_DUMP);
- addWrite(writeCtx);
+ // Handle a special case first
+ if ((offset < cachedOffset) && (offset + count > cachedOffset)) {
+ // One Linux client behavior: after a file is closed and reopened to
+ // write, the client sometimes combines previous written data(could still
+ // be in kernel buffer) with newly appended data in one write. This is
+ // usually the first write after file reopened. In this
+ // case, we log the event and drop the overlapped section.
+ LOG.warn(String.format("Got overwrite with appended data (%d-%d),"
+ + " current offset %d," + " drop the overlapped section (%d-%d)"
+ + " and append new data (%d-%d).", offset, (offset + count - 1),
+ cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
+ + count - 1)));
+
+ if (!pendingWrites.isEmpty()) {
+ LOG.warn("There are other pending writes, fail this jumbo write");
+ return null;
+ }
- // Create an async task and change openFileCtx status to indicate async
- // task pending
+ LOG.warn("Modify this write to write only the appended data");
+ alterWriteRequest(request, cachedOffset);
+
+ // Update local variable
+ originalCount = count;
+ offset = request.getOffset();
+ count = request.getCount();
+ }
+
+ // Fail non-append call
+ if (offset < cachedOffset) {
+ LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
+ + nextOffset + ")");
+ return null;
+ } else {
+ DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP
+ : WriteCtx.DataState.ALLOW_DUMP;
+ WriteCtx writeCtx = new WriteCtx(request.getHandle(),
+ request.getOffset(), request.getCount(), originalCount,
+ request.getStableHow(), request.getData(), channel, xid, false,
+ dataState);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Add new write to the list with nextOffset " + cachedOffset
+ + " and requesed offset=" + offset);
+ }
+ if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
+ // update the memory size
+ updateNonSequentialWriteInMemory(count);
+ }
+ // check if there is a WriteCtx with the same range in pendingWrites
+ WriteCtx oldWriteCtx = checkRepeatedWriteRequest(request, channel, xid);
+ if (oldWriteCtx == null) {
+ addWrite(writeCtx);
+ } else {
+ LOG.warn("Got a repeated request, same range, with xid:"
+ + writeCtx.getXid());
+ }
+ return writeCtx;
+ }
+ }
+
+ /** Process an overwrite write request */
+ private void processOverWrite(DFSClient dfsClient, WRITE3Request request,
+ Channel channel, int xid, IdUserGroup iug) {
+ WccData wccData = new WccData(latestAttr.getWccAttr(), null);
+ long offset = request.getOffset();
+ int count = request.getCount();
+ WriteStableHow stableHow = request.getStableHow();
+ WRITE3Response response;
+ long cachedOffset = nextOffset.get();
+ if (offset + count > cachedOffset) {
+ LOG.warn("Treat this jumbo write as a real random write, no support.");
+ response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
+ WriteStableHow.UNSTABLE, Nfs3Constant.WRITE_COMMIT_VERF);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Process perfectOverWrite");
+ }
+ // TODO: let executor handle perfect overwrite
+ response = processPerfectOverWrite(dfsClient, offset, count, stableHow,
+ request.getData().array(),
+ Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug);
+ }
+ updateLastAccessTime();
+ Nfs3Utils.writeChannel(channel,
+ response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
+ xid);
+ }
+
+ /**
+ * Check if we can start the write (back to HDFS) now. If there is no hole for
+ * writing, and there is no other threads writing (i.e., asyncStatus is
+ * false), start the writing and set asyncStatus to true.
+ *
+ * @return True if the new write is sequencial and we can start writing
+ * (including the case that there is already a thread writing).
+ */
+ private synchronized boolean checkAndStartWrite(
+ AsyncDataService asyncDataService, WriteCtx writeCtx) {
+
+ if (writeCtx.getOffset() == nextOffset.get()) {
if (!asyncStatus) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trigger the write back task. Current nextOffset: "
+ + nextOffset.get());
+ }
asyncStatus = true;
asyncDataService.execute(new AsyncDataService.WriteBackTask(this));
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The write back thread is working.");
+ }
}
-
- // Update the write time first
- updateLastAccessTime();
- Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr);
-
- // Send response immediately for unstable write
- if (request.getStableHow() == WriteStableHow.UNSTABLE) {
- WccData fileWcc = new WccData(preOpAttr, postOpAttr);
- WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
- fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
- Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
- writeCtx.setReplied(true);
- }
-
- } else if (offset > nextOffset) {
- LOG.info("Add new write to the list but not update nextOffset:"
- + nextOffset);
- WriteCtx writeCtx = new WriteCtx(request.getHandle(),
- request.getOffset(), request.getCount(), request.getStableHow(),
- request.getData().array(), channel, xid, false, DataState.ALLOW_DUMP);
- addWrite(writeCtx);
+ return true;
+ } else {
+ return false;
+ }
+ }
- // Check if need to dump some pending requests to file
- checkDump(request.getCount());
- updateLastAccessTime();
- Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr);
-
- // In test, noticed some Linux client sends a batch (e.g., 1MB)
- // of reordered writes and won't send more writes until it gets
- // responses of the previous batch. So here send response immediately for
- // unstable non-sequential write
- if (request.getStableHow() == WriteStableHow.UNSTABLE) {
- WccData fileWcc = new WccData(preOpAttr, postOpAttr);
- WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
- fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
- Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
- writeCtx.setReplied(true);
- }
+ private void receivedNewWriteInternal(DFSClient dfsClient,
+ WRITE3Request request, Channel channel, int xid,
+ AsyncDataService asyncDataService, IdUserGroup iug) {
+ WriteStableHow stableHow = request.getStableHow();
+ WccAttr preOpAttr = latestAttr.getWccAttr();
+ int count = request.getCount();
- } else {
+ WriteCtx writeCtx = addWritesToCache(request, channel, xid);
+ if (writeCtx == null) {
// offset < nextOffset
- LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
- + nextOffset + ")");
- WccData wccData = new WccData(preOpAttr, null);
- WRITE3Response response;
+ processOverWrite(dfsClient, request, channel, xid, iug);
+ } else {
+ // The writes is added to pendingWrites.
+ // Check and start writing back if necessary
+ boolean startWriting = checkAndStartWrite(asyncDataService, writeCtx);
+ if (!startWriting) {
+ // offset > nextOffset. check if we need to dump data
+ checkDump();
+
+ // In test, noticed some Linux client sends a batch (e.g., 1MB)
+ // of reordered writes and won't send more writes until it gets
+ // responses of the previous batch. So here send response immediately
+ // for unstable non-sequential write
+ if (stableHow != WriteStableHow.UNSTABLE) {
+ LOG.info("Have to change stable write to unstable write:"
+ + request.getStableHow());
+ stableHow = WriteStableHow.UNSTABLE;
+ }
- if (offset + count > nextOffset) {
- LOG.warn("Haven't noticed any partial overwrite out of a sequential file"
- + "write requests, so treat it as a real random write, no support.");
- response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
- WriteStableHow.UNSTABLE, 0);
- } else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Process perfectOverWrite");
+ LOG.debug("UNSTABLE write request, send response for offset: "
+ + writeCtx.getOffset());
}
- response = processPerfectOverWrite(dfsClient, offset, count, stableHow,
- request.getData().array(),
- Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug);
+ WccData fileWcc = new WccData(preOpAttr, latestAttr);
+ WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
+ fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+ Nfs3Utils
+ .writeChannel(channel, response.writeHeaderAndResponse(new XDR(),
+ xid, new VerifierNone()), xid);
+ writeCtx.setReplied(true);
}
-
- updateLastAccessTime();
- Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
}
}
@@ -430,7 +633,6 @@ class OpenFileCtx {
private WRITE3Response processPerfectOverWrite(DFSClient dfsClient,
long offset, int count, WriteStableHow stableHow, byte[] data,
String path, WccData wccData, IdUserGroup iug) {
- assert (ctxLock.isLocked());
WRITE3Response response = null;
// Read the content back
@@ -441,21 +643,30 @@ class OpenFileCtx {
try {
// Sync file data and length to avoid partial read failure
fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
-
+ } catch (ClosedChannelException closedException) {
+ LOG.info("The FSDataOutputStream has been closed. " +
+ "Continue processing the perfect overwrite.");
+ } catch (IOException e) {
+ LOG.info("hsync failed when processing possible perfect overwrite, path="
+ + path + " error:" + e);
+ return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
+ Nfs3Constant.WRITE_COMMIT_VERF);
+ }
+
+ try {
fis = new FSDataInputStream(dfsClient.open(path));
readCount = fis.read(offset, readbuffer, 0, count);
if (readCount < count) {
LOG.error("Can't read back " + count + " bytes, partial read size:"
+ readCount);
- return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0,
- stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+ return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
+ Nfs3Constant.WRITE_COMMIT_VERF);
}
-
} catch (IOException e) {
LOG.info("Read failed when processing possible perfect overwrite, path="
+ path + " error:" + e);
- return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0,
- stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+ return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
+ Nfs3Constant.WRITE_COMMIT_VERF);
} finally {
IOUtils.cleanup(LOG, fis);
}
@@ -465,7 +676,7 @@ class OpenFileCtx {
if (comparator.compare(readbuffer, 0, readCount, data, 0, count) != 0) {
LOG.info("Perfect overwrite has different content");
response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
- stableHow, 0);
+ stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
} else {
LOG.info("Perfect overwrite has same content,"
+ " updating the mtime, then return success");
@@ -477,85 +688,110 @@ class OpenFileCtx {
LOG.info("Got error when processing perfect overwrite, path=" + path
+ " error:" + e);
return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
- 0);
+ Nfs3Constant.WRITE_COMMIT_VERF);
}
wccData.setPostOpAttr(postOpAttr);
response = new WRITE3Response(Nfs3Status.NFS3_OK, wccData, count,
- stableHow, 0);
+ stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
}
return response;
}
-
- public final static int COMMIT_FINISHED = 0;
- public final static int COMMIT_WAIT = 1;
- public final static int COMMIT_INACTIVE_CTX = 2;
- public final static int COMMIT_ERROR = 3;
- /**
- * return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
- * COMMIT_INACTIVE_CTX, COMMIT_ERROR
- */
- public int checkCommit(long commitOffset) {
- int ret = COMMIT_WAIT;
+ public COMMIT_STATUS checkCommit(DFSClient dfsClient, long commitOffset,
+ Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
+ // Keep stream active
+ updateLastAccessTime();
+ Preconditions.checkState(commitOffset >= 0);
- lockCtx();
- try {
- if (!activeState) {
- ret = COMMIT_INACTIVE_CTX;
- } else {
- ret = checkCommitInternal(commitOffset);
+ COMMIT_STATUS ret = checkCommitInternal(commitOffset, channel, xid,
+ preOpAttr);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got commit status: " + ret.name());
+ }
+ // Do the sync outside the lock
+ if (ret == COMMIT_STATUS.COMMIT_DO_SYNC
+ || ret == COMMIT_STATUS.COMMIT_FINISHED) {
+ try {
+ // Sync file data and length
+ fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
+ // Nothing to do for metadata since attr related change is pass-through
+ } catch (ClosedChannelException cce) {
+ if (pendingWrites.isEmpty()) {
+ ret = COMMIT_STATUS.COMMIT_FINISHED;
+ } else {
+ ret = COMMIT_STATUS.COMMIT_ERROR;
+ }
+ } catch (IOException e) {
+ LOG.error("Got stream error during data sync:" + e);
+ // Do nothing. Stream will be closed eventually by StreamMonitor.
+ // status = Nfs3Status.NFS3ERR_IO;
+ ret = COMMIT_STATUS.COMMIT_ERROR;
}
- } finally {
- unlockCtx();
}
return ret;
}
-
- private int checkCommitInternal(long commitOffset) {
- if (commitOffset == 0) {
- // Commit whole file
- commitOffset = getNextOffsetUnprotected();
+
+ /**
+ * return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
+ * COMMIT_INACTIVE_CTX, COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR
+ */
+ private synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
+ Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
+ if (!activeState) {
+ if (pendingWrites.isEmpty()) {
+ return COMMIT_STATUS.COMMIT_INACTIVE_CTX;
+ } else {
+ // TODO: return success if already committed
+ return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE;
+ }
}
long flushed = getFlushedOffset();
- LOG.info("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
- if (flushed < commitOffset) {
- // Keep stream active
- updateLastAccessTime();
- return COMMIT_WAIT;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
}
- int ret = COMMIT_WAIT;
- try {
- // Sync file data and length
- fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
- // Nothing to do for metadata since attr related change is pass-through
- ret = COMMIT_FINISHED;
- } catch (IOException e) {
- LOG.error("Got stream error during data sync:" + e);
- // Do nothing. Stream will be closed eventually by StreamMonitor.
- ret = COMMIT_ERROR;
+ if (commitOffset > 0) {
+ if (commitOffset > flushed) {
+ CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid,
+ preOpAttr);
+ pendingCommits.put(commitOffset, commitCtx);
+ return COMMIT_STATUS.COMMIT_WAIT;
+ } else {
+ return COMMIT_STATUS.COMMIT_DO_SYNC;
+ }
}
- // Keep stream active
- updateLastAccessTime();
- return ret;
+ Entry<OffsetRange, WriteCtx> key = pendingWrites.firstEntry();
+
+ // Commit whole file, commitOffset == 0
+ if (pendingWrites.isEmpty()) {
+ // Note that, there is no guarantee data is synced. TODO: We could still
+ // do a sync here though the output stream might be closed.
+ return COMMIT_STATUS.COMMIT_FINISHED;
+ } else {
+ // Insert commit
+ long maxOffset = key.getKey().getMax() - 1;
+ Preconditions.checkState(maxOffset > 0);
+ CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, preOpAttr);
+ pendingCommits.put(maxOffset, commitCtx);
+ return COMMIT_STATUS.COMMIT_WAIT;
+ }
}
private void addWrite(WriteCtx writeCtx) {
- assert (ctxLock.isLocked());
long offset = writeCtx.getOffset();
int count = writeCtx.getCount();
+ // For the offset range (min, max), min is inclusive, and max is exclusive
pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
}
-
/**
* Check stream status to decide if it should be closed
* @return true, remove stream; false, keep stream
*/
- public boolean streamCleanup(long fileId, long streamTimeout) {
+ public synchronized boolean streamCleanup(long fileId, long streamTimeout) {
if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) {
throw new InvalidParameterException("StreamTimeout" + streamTimeout
+ "ms is less than MINIMIUM_STREAM_TIMEOUT "
@@ -563,131 +799,189 @@ class OpenFileCtx {
}
boolean flag = false;
- if (!ctxLock.tryLock()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Another thread is working on it" + ctxLock.toString());
- }
- return flag;
- }
-
- try {
- // Check the stream timeout
- if (checkStreamTimeout(streamTimeout)) {
- LOG.info("closing stream for fileId:" + fileId);
- cleanup();
- flag = true;
+ // Check the stream timeout
+ if (checkStreamTimeout(streamTimeout)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("closing stream for fileId:" + fileId);
}
- } finally {
- unlockCtx();
+ cleanup();
+ flag = true;
}
return flag;
}
- // Invoked by AsynDataService to do the write back
- public void executeWriteBack() {
- long nextOffset;
- OffsetRange key;
- WriteCtx writeCtx;
-
- try {
- // Don't lock OpenFileCtx for all writes to reduce the timeout of other
- // client request to the same file
- while (true) {
- lockCtx();
- if (!asyncStatus) {
- // This should never happen. There should be only one thread working
- // on one OpenFileCtx anytime.
- LOG.fatal("The openFileCtx has false async status");
- throw new RuntimeException("The openFileCtx has false async status");
- }
- // Any single write failure can change activeState to false, so do the
- // check each loop.
- if (pendingWrites.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("The asyn write task has no pendding writes, fileId: "
- + latestAttr.getFileId());
- }
- break;
+ /**
+ * Get (and remove) the next WriteCtx from {@link #pendingWrites} if possible.
+ *
+ * @return Null if {@link #pendingWrites} is null, or the next WriteCtx's
+ * offset is larger than nextOffSet.
+ */
+ private synchronized WriteCtx offerNextToWrite() {
+ if (pendingWrites.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The asyn write task has no pending writes, fileId: "
+ + latestAttr.getFileId());
+ }
+ // process pending commit again to handle this race: a commit is added
+ // to pendingCommits map just after the last doSingleWrite returns.
+ // There is no pending write and the commit should be handled by the
+ // last doSingleWrite. Due to the race, the commit is left along and
+ // can't be processed until cleanup. Therefore, we should do another
+ // processCommits to fix the race issue.
+ processCommits(nextOffset.get()); // nextOffset has same value as
+ // flushedOffset
+ this.asyncStatus = false;
+ return null;
+ }
+
+ Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry();
+ OffsetRange range = lastEntry.getKey();
+ WriteCtx toWrite = lastEntry.getValue();
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("range.getMin()=" + range.getMin() + " nextOffset="
+ + nextOffset);
+ }
+
+ long offset = nextOffset.get();
+ if (range.getMin() > offset) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The next sequencial write has not arrived yet");
}
- if (!activeState) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("The openFileCtx is not active anymore, fileId: "
- + latestAttr.getFileId());
- }
- break;
+ processCommits(nextOffset.get()); // handle race
+ this.asyncStatus = false;
+ } else if (range.getMin() < offset && range.getMax() > offset) {
+ // shouldn't happen since we do sync for overlapped concurrent writers
+ LOG.warn("Got a overlapping write (" + range.getMin() + ","
+ + range.getMax() + "), nextOffset=" + offset
+ + ". Silently drop it now");
+ pendingWrites.remove(range);
+ processCommits(nextOffset.get()); // handle race
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax()
+ + ") from the list");
}
-
- // Get the next sequential write
- nextOffset = getNextOffsetUnprotected();
- key = pendingWrites.firstKey();
- if (LOG.isTraceEnabled()) {
- LOG.trace("key.getMin()=" + key.getMin() + " nextOffset="
- + nextOffset);
+ // after writing, remove the WriteCtx from cache
+ pendingWrites.remove(range);
+ // update nextOffset
+ nextOffset.addAndGet(toWrite.getCount());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Change nextOffset to " + nextOffset.get());
}
-
- if (key.getMin() > nextOffset) {
- if (LOG.isDebugEnabled()) {
- LOG.info("The next sequencial write has not arrived yet");
- }
- break;
-
- } else if (key.getMin() < nextOffset && key.getMax() > nextOffset) {
- // Can't handle overlapping write. Didn't see it in tests yet.
- LOG.fatal("Got a overlapping write (" + key.getMin() + ","
- + key.getMax() + "), nextOffset=" + nextOffset);
- throw new RuntimeException("Got a overlapping write (" + key.getMin()
- + "," + key.getMax() + "), nextOffset=" + nextOffset);
-
- } else {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Remove write(" + key.getMin() + "-" + key.getMax()
- + ") from the list");
- }
- writeCtx = pendingWrites.remove(key);
+ return toWrite;
+ }
+
+ return null;
+ }
+
+ /** Invoked by AsynDataService to write back to HDFS */
+ void executeWriteBack() {
+ Preconditions.checkState(asyncStatus,
+ "The openFileCtx has false async status");
+ try {
+ while (activeState) {
+ WriteCtx toWrite = offerNextToWrite();
+ if (toWrite != null) {
// Do the write
- doSingleWrite(writeCtx);
+ doSingleWrite(toWrite);
updateLastAccessTime();
+ } else {
+ break;
}
-
- unlockCtx();
}
-
+
+ if (!activeState && LOG.isDebugEnabled()) {
+ LOG.debug("The openFileCtx is not active anymore, fileId: "
+ + latestAttr.getFileId());
+ }
} finally {
- // Always reset the async status so another async task can be created
- // for this file
+ // make sure we reset asyncStatus to false
asyncStatus = false;
- if (ctxLock.isHeldByCurrentThread()) {
- unlockCtx();
- }
}
}
+ private void processCommits(long offset) {
+ Preconditions.checkState(offset > 0);
+ long flushedOffset = getFlushedOffset();
+ Entry<Long, CommitCtx> entry = pendingCommits.firstEntry();
+
+ if (entry == null || entry.getValue().offset > flushedOffset) {
+ return;
+ }
+
+ // Now do sync for the ready commits
+ int status = Nfs3Status.NFS3ERR_IO;
+ try {
+ // Sync file data and length
+ fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
+ status = Nfs3Status.NFS3_OK;
+ } catch (ClosedChannelException cce) {
+ if (!pendingWrites.isEmpty()) {
+ LOG.error("Can't sync for fileId: " + latestAttr.getFileId()
+ + ". Channel closed with writes pending");
+ }
+ status = Nfs3Status.NFS3ERR_IO;
+ } catch (IOException e) {
+ LOG.error("Got stream error during data sync:" + e);
+ // Do nothing. Stream will be closed eventually by StreamMonitor.
+ status = Nfs3Status.NFS3ERR_IO;
+ }
+
+ // Update latestAttr
+ try {
+ latestAttr = Nfs3Utils.getFileAttr(client,
+ Nfs3Utils.getFileIdPath(latestAttr.getFileId()), iug);
+ } catch (IOException e) {
+ LOG.error("Can't get new file attr for fileId: " + latestAttr.getFileId());
+ status = Nfs3Status.NFS3ERR_IO;
+ }
+
+ if (latestAttr.getSize() != offset) {
+ LOG.error("After sync, the expect file size: " + offset
+ + ", however actual file size is: " + latestAttr.getSize());
+ status = Nfs3Status.NFS3ERR_IO;
+ }
+ WccData wccData = new WccData(Nfs3Utils.getWccAttr(latestAttr), latestAttr);
+
+ // Send response for the ready commits
+ while (entry != null && entry.getValue().offset <= flushedOffset) {
+ pendingCommits.remove(entry.getKey());
+ CommitCtx commit = entry.getValue();
+
+ COMMIT3Response response = new COMMIT3Response(status, wccData,
+ Nfs3Constant.WRITE_COMMIT_VERF);
+ Nfs3Utils.writeChannelCommit(commit.getChannel(), response
+ .writeHeaderAndResponse(new XDR(), commit.getXid(),
+ new VerifierNone()), commit.getXid());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("FileId: " + latestAttr.getFileid() + " Service time:"
+ + (System.currentTimeMillis() - commit.getStartTime())
+ + "ms. Sent response for commit:" + commit);
+ }
+ entry = pendingCommits.firstEntry();
+ }
+ }
+
private void doSingleWrite(final WriteCtx writeCtx) {
- assert(ctxLock.isLocked());
Channel channel = writeCtx.getChannel();
int xid = writeCtx.getXid();
long offset = writeCtx.getOffset();
int count = writeCtx.getCount();
WriteStableHow stableHow = writeCtx.getStableHow();
- byte[] data = null;
- try {
- data = writeCtx.getData();
- } catch (IOException e1) {
- LOG.error("Failed to get request data offset:" + offset + " count:"
- + count + " error:" + e1);
- // Cleanup everything
- cleanup();
- return;
- }
- assert (data.length == count);
-
+
FileHandle handle = writeCtx.getHandle();
- LOG.info("do write, fileId: " + handle.getFileId() + " offset: " + offset
- + " length:" + count + " stableHow:" + stableHow.getValue());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("do write, fileId: " + handle.getFileId() + " offset: "
+ + offset + " length:" + count + " stableHow:" + stableHow.getValue());
+ }
try {
- fos.write(data, 0, count);
+ // The write is not protected by lock. asyncState is used to make sure
+ // there is one thread doing write back at any time
+ writeCtx.writeData(fos);
long flushedOffset = getFlushedOffset();
if (flushedOffset != (offset + count)) {
@@ -695,27 +989,47 @@ class OpenFileCtx {
+ flushedOffset + " and nextOffset should be"
+ (offset + count));
}
- nextOffset = flushedOffset;
+
// Reduce memory occupation size if request was allowed dumped
- if (writeCtx.getDataState() == DataState.ALLOW_DUMP) {
- updateNonSequentialWriteInMemory(-count);
+ if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
+ synchronized (writeCtx) {
+ if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
+ writeCtx.setDataState(WriteCtx.DataState.NO_DUMP);
+ updateNonSequentialWriteInMemory(-count);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("After writing " + handle.getFileId() + " at offset "
+ + offset + ", updated the memory count, new value:"
+ + nonSequentialWriteInMemory.get());
+ }
+ }
+ }
}
if (!writeCtx.getReplied()) {
WccAttr preOpAttr = latestAttr.getWccAttr();
WccData fileWcc = new WccData(preOpAttr, latestAttr);
+ if (writeCtx.getOriginalCount() != WriteCtx.INVALID_ORIGINAL_COUNT) {
+ LOG.warn("Return original count:" + writeCtx.getOriginalCount()
+ + " instead of real data count:" + count);
+ count = writeCtx.getOriginalCount();
+ }
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
- Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+ Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
+ new XDR(), xid, new VerifierNone()), xid);
}
-
+
+ // Handle the waiting commits without holding any lock
+ processCommits(writeCtx.getOffset() + writeCtx.getCount());
+
} catch (IOException e) {
LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
- + offset + " and length " + data.length, e);
+ + offset + " and length " + count, e);
if (!writeCtx.getReplied()) {
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
- Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+ Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
+ new XDR(), xid, new VerifierNone()), xid);
// Keep stream open. Either client retries or SteamMonitor closes it.
}
@@ -725,9 +1039,21 @@ class OpenFileCtx {
}
}
- private void cleanup() {
- assert(ctxLock.isLocked());
+ private synchronized void cleanup() {
+ if (!activeState) {
+ LOG.info("Current OpenFileCtx is already inactive, no need to cleanup.");
+ return;
+ }
activeState = false;
+
+ // stop the dump thread
+ if (dumpThread != null) {
+ dumpThread.interrupt();
+ try {
+ dumpThread.join(3000);
+ } catch (InterruptedException e) {
+ }
+ }
// Close stream
try {
@@ -745,36 +1071,62 @@ class OpenFileCtx {
while (!pendingWrites.isEmpty()) {
OffsetRange key = pendingWrites.firstKey();
LOG.info("Fail pending write: (" + key.getMin() + "," + key.getMax()
- + "), nextOffset=" + getNextOffsetUnprotected());
+ + "), nextOffset=" + nextOffset.get());
WriteCtx writeCtx = pendingWrites.remove(key);
if (!writeCtx.getReplied()) {
WccData fileWcc = new WccData(preOpAttr, latestAttr);
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
fileWcc, 0, writeCtx.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
- Nfs3Utils.writeChannel(writeCtx.getChannel(),
- response.send(new XDR(), writeCtx.getXid()));
+ Nfs3Utils.writeChannel(writeCtx.getChannel(), response
+ .writeHeaderAndResponse(new XDR(), writeCtx.getXid(),
+ new VerifierNone()), writeCtx.getXid());
}
}
// Cleanup dump file
- if (dumpOut!=null){
+ if (dumpOut != null) {
try {
dumpOut.close();
} catch (IOException e) {
e.printStackTrace();
}
+ File dumpFile = new File(dumpFilePath);
+ if (dumpFile.exists() && !dumpFile.delete()) {
+ LOG.error("Failed to delete dumpfile: " + dumpFile);
+ }
}
- if (raf!=null) {
+ if (raf != null) {
try {
raf.close();
} catch (IOException e) {
e.printStackTrace();
}
}
- File dumpFile = new File(dumpFilePath);
- if (dumpFile.delete()) {
- LOG.error("Failed to delete dumpfile: "+ dumpFile);
- }
+ }
+
+ @VisibleForTesting
+ ConcurrentNavigableMap<OffsetRange, WriteCtx> getPendingWritesForTest(){
+ return pendingWrites;
+ }
+
+ @VisibleForTesting
+ ConcurrentNavigableMap<Long, CommitCtx> getPendingCommitsForTest(){
+ return pendingCommits;
+ }
+
+ @VisibleForTesting
+ long getNextOffsetForTest() {
+ return nextOffset.get();
+ }
+
+ @VisibleForTesting
+ void setNextOffsetForTest(long newValue) {
+ nextOffset.set(newValue);
+ }
+
+ @VisibleForTesting
+ void setActiveStatusForTest(boolean activeState) {
+ this.activeState = activeState;
}
}
\ No newline at end of file