You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by br...@apache.org on 2013/11/08 02:03:10 UTC
svn commit: r1539891 - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project:
hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/
hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/
hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdf...
Author: brandonli
Date: Fri Nov 8 01:03:10 2013
New Revision: 1539891
URL: http://svn.apache.org/r1539891
Log:
HDFS-5364. Merging change r1539834 from trunk
Added:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java
- copied unchanged from r1539834, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java
- copied unchanged from r1539834, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java?rev=1539891&r1=1539890&r2=1539891&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java Fri Nov 8 01:03:10 2013
@@ -23,33 +23,47 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.nfs.mount.Mountd;
+import org.apache.hadoop.mount.MountdBase;
import org.apache.hadoop.nfs.nfs3.Nfs3Base;
import org.apache.hadoop.util.StringUtils;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Nfs server. Supports NFS v3 using {@link RpcProgramNfs3}.
* Currently Mountd program is also started inside this class.
* Only TCP server is supported and UDP is not supported.
*/
public class Nfs3 extends Nfs3Base {
+ private Mountd mountd;
+
static {
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
}
public Nfs3(List<String> exports) throws IOException {
- super(new Mountd(exports), new RpcProgramNfs3());
+ super(new RpcProgramNfs3());
+ mountd = new Mountd(exports);
}
+ @VisibleForTesting
public Nfs3(List<String> exports, Configuration config) throws IOException {
- super(new Mountd(exports, config), new RpcProgramNfs3(config), config);
+ super(new RpcProgramNfs3(config), config);
+ mountd = new Mountd(exports, config);
}
+ public Mountd getMountd() {
+ return mountd;
+ }
+
public static void main(String[] args) throws IOException {
StringUtils.startupShutdownMessage(Nfs3.class, args, LOG);
List<String> exports = new ArrayList<String>();
exports.add("/");
+
final Nfs3 nfsServer = new Nfs3(exports);
+ nfsServer.mountd.start(true); // Start mountd
nfsServer.start(true);
}
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1539891&r1=1539890&r2=1539891&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Fri Nov 8 01:03:10 2013
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
-import java.security.InvalidParameterException;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Map.Entry;
@@ -96,7 +95,7 @@ class OpenFileCtx {
// It's updated after each sync to HDFS
private Nfs3FileAttributes latestAttr;
-
+
private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites;
private final ConcurrentNavigableMap<Long, CommitCtx> pendingCommits;
@@ -165,10 +164,22 @@ class OpenFileCtx {
return System.currentTimeMillis() - lastAccessTime > streamTimeout;
}
+ long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
public long getNextOffset() {
return nextOffset.get();
}
+ boolean getActiveState() {
+ return this.activeState;
+ }
+
+ boolean hasPendingWork() {
+ return (pendingWrites.size() != 0 || pendingCommits.size() != 0);
+ }
+
// Increase or decrease the memory occupation of non-sequential writes
private long updateNonSequentialWriteInMemory(long count) {
long newValue = nonSequentialWriteInMemory.addAndGet(count);
@@ -800,19 +811,18 @@ class OpenFileCtx {
* @return true, remove stream; false, keep stream
*/
public synchronized boolean streamCleanup(long fileId, long streamTimeout) {
- if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) {
- throw new InvalidParameterException("StreamTimeout" + streamTimeout
- + "ms is less than MINIMIUM_STREAM_TIMEOUT "
- + WriteManager.MINIMIUM_STREAM_TIMEOUT + "ms");
+ Preconditions
+ .checkState(streamTimeout >= Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
+ if (!activeState) {
+ return true;
}
boolean flag = false;
// Check the stream timeout
if (checkStreamTimeout(streamTimeout)) {
if (LOG.isDebugEnabled()) {
- LOG.debug("closing stream for fileId:" + fileId);
+ LOG.debug("stream can be closed for fileId:" + fileId);
}
- cleanup();
flag = true;
}
return flag;
@@ -985,7 +995,7 @@ class OpenFileCtx {
FileHandle handle = writeCtx.getHandle();
if (LOG.isDebugEnabled()) {
LOG.debug("do write, fileId: " + handle.getFileId() + " offset: "
- + offset + " length:" + count + " stableHow:" + stableHow.getValue());
+ + offset + " length:" + count + " stableHow:" + stableHow.name());
}
try {
@@ -1066,7 +1076,7 @@ class OpenFileCtx {
}
}
- private synchronized void cleanup() {
+ synchronized void cleanup() {
if (!activeState) {
LOG.info("Current OpenFileCtx is already inactive, no need to cleanup.");
return;
@@ -1074,7 +1084,7 @@ class OpenFileCtx {
activeState = false;
// stop the dump thread
- if (dumpThread != null) {
+ if (dumpThread != null && dumpThread.isAlive()) {
dumpThread.interrupt();
try {
dumpThread.join(3000);
@@ -1156,4 +1166,10 @@ class OpenFileCtx {
void setActiveStatusForTest(boolean activeState) {
this.activeState = activeState;
}
+
+ @Override
+ public String toString() {
+ return String.format("activeState: %b asyncStatus: %b nextOffset: %d",
+ activeState, asyncStatus, nextOffset.get());
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1539891&r1=1539890&r2=1539891&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Fri Nov 8 01:03:10 2013
@@ -214,6 +214,11 @@ public class RpcProgramNfs3 extends RpcP
}
}
+ @Override
+ public void startDaemons() {
+ writeManager.startAsyncDataSerivce();
+ }
+
/******************************************************
* RPC call handlers
******************************************************/
@@ -778,7 +783,8 @@ public class RpcProgramNfs3 extends RpcP
int createMode = request.getMode();
if ((createMode != Nfs3Constant.CREATE_EXCLUSIVE)
- && request.getObjAttr().getUpdateFields().contains(SetAttrField.SIZE)) {
+ && request.getObjAttr().getUpdateFields().contains(SetAttrField.SIZE)
+ && request.getObjAttr().getSize() != 0) {
LOG.error("Setting file size is not supported when creating file: "
+ fileName + " dir fileId:" + dirHandle.getFileId());
return new CREATE3Response(Nfs3Status.NFS3ERR_INVAL);
@@ -831,6 +837,23 @@ public class RpcProgramNfs3 extends RpcP
postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
dirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(preOpDirAttr),
dfsClient, dirFileIdPath, iug);
+
+ // Add open stream
+ OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr,
+ writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug);
+ fileHandle = new FileHandle(postOpObjAttr.getFileId());
+ if (!writeManager.addOpenFileStream(fileHandle, openFileCtx)) {
+ LOG.warn("Can't add more stream, close it."
+ + " Future write will become append");
+ fos.close();
+ fos = null;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Opened stream for file:" + fileName + ", fileId:"
+ + fileHandle.getFileId());
+ }
+ }
+
} catch (IOException e) {
LOG.error("Exception", e);
if (fos != null) {
@@ -859,16 +882,6 @@ public class RpcProgramNfs3 extends RpcP
}
}
- // Add open stream
- OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr, writeDumpDir
- + "/" + postOpObjAttr.getFileId(), dfsClient, iug);
- fileHandle = new FileHandle(postOpObjAttr.getFileId());
- writeManager.addOpenFileStream(fileHandle, openFileCtx);
- if (LOG.isDebugEnabled()) {
- LOG.debug("open stream for file:" + fileName + ", fileId:"
- + fileHandle.getFileId());
- }
-
return new CREATE3Response(Nfs3Status.NFS3_OK, fileHandle, postOpObjAttr,
dirWcc);
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1539891&r1=1539890&r2=1539891&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java Fri Nov 8 01:03:10 2013
@@ -18,8 +18,6 @@
package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
@@ -29,11 +27,12 @@ import org.apache.hadoop.fs.CommonConfig
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.nfs.NfsFileType;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
-import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
@@ -56,69 +55,70 @@ public class WriteManager {
private final Configuration config;
private final IdUserGroup iug;
- private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps
- .newConcurrentMap();
-
+
private AsyncDataService asyncDataService;
private boolean asyncDataServiceStarted = false;
- private final StreamMonitor streamMonitor;
-
+ private final int maxStreams;
+
/**
* The time limit to wait for accumulate reordered sequential writes to the
* same file before the write is considered done.
*/
private long streamTimeout;
-
- public static final long DEFAULT_STREAM_TIMEOUT = 10 * 60 * 1000; //10 minutes
- public static final long MINIMIUM_STREAM_TIMEOUT = 10 * 1000; //10 seconds
-
- void addOpenFileStream(FileHandle h, OpenFileCtx ctx) {
- openFileMap.put(h, ctx);
- if (LOG.isDebugEnabled()) {
- LOG.debug("After add the new stream " + h.getFileId()
- + ", the stream number:" + openFileMap.size());
+
+ private final OpenFileCtxCache fileContextCache;
+
+ static public class MultipleCachedStreamException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public MultipleCachedStreamException(String msg) {
+ super(msg);
}
}
+ boolean addOpenFileStream(FileHandle h, OpenFileCtx ctx) {
+ return fileContextCache.put(h, ctx);
+ }
+
WriteManager(IdUserGroup iug, final Configuration config) {
this.iug = iug;
this.config = config;
-
- streamTimeout = config.getLong("dfs.nfs3.stream.timeout",
- DEFAULT_STREAM_TIMEOUT);
+ streamTimeout = config.getLong(Nfs3Constant.OUTPUT_STREAM_TIMEOUT,
+ Nfs3Constant.OUTPUT_STREAM_TIMEOUT_DEFAULT);
LOG.info("Stream timeout is " + streamTimeout + "ms.");
- if (streamTimeout < MINIMIUM_STREAM_TIMEOUT) {
+ if (streamTimeout < Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT) {
LOG.info("Reset stream timeout to minimum value "
- + MINIMIUM_STREAM_TIMEOUT + "ms.");
- streamTimeout = MINIMIUM_STREAM_TIMEOUT;
+ + Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT + "ms.");
+ streamTimeout = Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT;
}
-
- this.streamMonitor = new StreamMonitor();
+ maxStreams = config.getInt(Nfs3Constant.MAX_OPEN_FILES,
+ Nfs3Constant.MAX_OPEN_FILES_DEFAULT);
+ LOG.info("Maximum open streams is "+ maxStreams);
+ this.fileContextCache = new OpenFileCtxCache(config, streamTimeout);
}
- private void startAsyncDataSerivce() {
- streamMonitor.start();
+ void startAsyncDataSerivce() {
+ if (asyncDataServiceStarted) {
+ return;
+ }
+ fileContextCache.start();
this.asyncDataService = new AsyncDataService();
asyncDataServiceStarted = true;
}
- private void shutdownAsyncDataService() {
- asyncDataService.shutdown();
+ void shutdownAsyncDataService() {
+ if (!asyncDataServiceStarted) {
+ return;
+ }
asyncDataServiceStarted = false;
- streamMonitor.interrupt();
+ asyncDataService.shutdown();
+ fileContextCache.shutdown();
}
void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel,
int xid, Nfs3FileAttributes preOpAttr) throws IOException {
- // First write request starts the async data service
- if (!asyncDataServiceStarted) {
- startAsyncDataSerivce();
- }
-
- long offset = request.getOffset();
int count = request.getCount();
- WriteStableHow stableHow = request.getStableHow();
byte[] data = request.getData().array();
if (data.length < count) {
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL);
@@ -129,13 +129,12 @@ public class WriteManager {
FileHandle handle = request.getHandle();
if (LOG.isDebugEnabled()) {
- LOG.debug("handleWrite fileId: " + handle.getFileId() + " offset: "
- + offset + " length:" + count + " stableHow:" + stableHow.getValue());
+ LOG.debug("handleWrite " + request);
}
// Check if there is a stream to write
FileHandle fileHandle = request.getHandle();
- OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
+ OpenFileCtx openFileCtx = fileContextCache.get(fileHandle);
if (openFileCtx == null) {
LOG.info("No opened stream for fileId:" + fileHandle.getFileId());
@@ -150,6 +149,15 @@ public class WriteManager {
fos = dfsClient.append(fileIdPath, bufferSize, null, null);
latestAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
+ } catch (RemoteException e) {
+ IOException io = e.unwrapRemoteException();
+ if (io instanceof AlreadyBeingCreatedException) {
+ LOG.warn("Can't append file:" + fileIdPath
+ + ". Possibly the file is being closed. Drop the request:"
+ + request + ", wait for the client to retry...");
+ return;
+ }
+ throw e;
} catch (IOException e) {
LOG.error("Can't apapend to file:" + fileIdPath + ", error:" + e);
if (fos != null) {
@@ -170,9 +178,26 @@ public class WriteManager {
Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
+ fileHandle.getFileId(), dfsClient, iug);
- addOpenFileStream(fileHandle, openFileCtx);
+
+ if (!addOpenFileStream(fileHandle, openFileCtx)) {
+ LOG.info("Can't add new stream. Close it. Tell client to retry.");
+ try {
+ fos.close();
+ } catch (IOException e) {
+ LOG.error("Can't close stream for fileId:" + handle.getFileId());
+ }
+ // Notify client to retry
+ WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
+ WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_JUKEBOX,
+ fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
+ Nfs3Utils.writeChannel(channel,
+ response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
+ xid);
+ return;
+ }
+
if (LOG.isDebugEnabled()) {
- LOG.debug("opened stream for file:" + fileHandle.getFileId());
+ LOG.debug("Opened stream for appending file:" + fileHandle.getFileId());
}
}
@@ -185,7 +210,7 @@ public class WriteManager {
void handleCommit(DFSClient dfsClient, FileHandle fileHandle,
long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
int status;
- OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
+ OpenFileCtx openFileCtx = fileContextCache.get(fileHandle);
if (openFileCtx == null) {
LOG.info("No opened stream for fileId:" + fileHandle.getFileId()
@@ -238,7 +263,7 @@ public class WriteManager {
String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle);
Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug);
if (attr != null) {
- OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
+ OpenFileCtx openFileCtx = fileContextCache.get(fileHandle);
if (openFileCtx != null) {
attr.setSize(openFileCtx.getNextOffset());
attr.setUsed(openFileCtx.getNextOffset());
@@ -253,8 +278,8 @@ public class WriteManager {
Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug);
if ((attr != null) && (attr.getType() == NfsFileType.NFSREG.toValue())) {
- OpenFileCtx openFileCtx = openFileMap
- .get(new FileHandle(attr.getFileId()));
+ OpenFileCtx openFileCtx = fileContextCache.get(new FileHandle(attr
+ .getFileId()));
if (openFileCtx != null) {
attr.setSize(openFileCtx.getNextOffset());
@@ -263,56 +288,9 @@ public class WriteManager {
}
return attr;
}
-
- @VisibleForTesting
- ConcurrentMap<FileHandle, OpenFileCtx> getOpenFileMap() {
- return this.openFileMap;
- }
-
- /**
- * StreamMonitor wakes up periodically to find and closes idle streams.
- */
- class StreamMonitor extends Daemon {
- private int rotation = 5 * 1000; // 5 seconds
- private long lastWakeupTime = 0;
-
- @Override
- public void run() {
- while (true) {
- Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet()
- .iterator();
- if (LOG.isTraceEnabled()) {
- LOG.trace("openFileMap size:" + openFileMap.size());
- }
- while (it.hasNext()) {
- Entry<FileHandle, OpenFileCtx> pairs = it.next();
- OpenFileCtx ctx = pairs.getValue();
- if (ctx.streamCleanup((pairs.getKey()).getFileId(), streamTimeout)) {
- it.remove();
- if (LOG.isDebugEnabled()) {
- LOG.debug("After remove stream " + pairs.getKey().getFileId()
- + ", the stream number:" + openFileMap.size());
- }
- }
- }
-
- // Check if it can sleep
- try {
- long workedTime = System.currentTimeMillis() - lastWakeupTime;
- if (workedTime < rotation) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("StreamMonitor can still have a sleep:"
- + ((rotation - workedTime) / 1000));
- }
- Thread.sleep(rotation - workedTime);
- }
- lastWakeupTime = System.currentTimeMillis();
- } catch (InterruptedException e) {
- LOG.info("StreamMonitor got interrupted");
- return;
- }
- }
- }
+ @VisibleForTesting
+ OpenFileCtxCache getOpenFileCtxCache() {
+ return this.fileContextCache;
}
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java?rev=1539891&r1=1539890&r2=1539891&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java Fri Nov 8 01:03:10 2013
@@ -51,7 +51,7 @@ public class TestMountd {
Nfs3 nfs3 = new Nfs3(exports, config);
nfs3.start(false);
- RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountBase()
+ RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountd()
.getRpcProgram();
mountd.nullOp(new XDR(), 1234, InetAddress.getByName("localhost"));
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java?rev=1539891&r1=1539890&r2=1539891&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java Fri Nov 8 01:03:10 2013
@@ -135,6 +135,7 @@ public class TestOutOfOrderWrite {
@Override
protected ChannelPipelineFactory setPipelineFactory() {
this.pipelineFactory = new ChannelPipelineFactory() {
+ @Override
public ChannelPipeline getPipeline() {
return Channels.pipeline(
RpcUtil.constructRpcFrameDecoder(),
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java?rev=1539891&r1=1539890&r2=1539891&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java Fri Nov 8 01:03:10 2013
@@ -186,9 +186,8 @@ public class TestWrites {
private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime)
throws InterruptedException {
int waitedTime = 0;
- ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = nfsd.getWriteManager()
- .getOpenFileMap();
- OpenFileCtx ctx = openFileMap.get(handle);
+ OpenFileCtx ctx = nfsd.getWriteManager()
+ .getOpenFileCtxCache().get(handle);
assertTrue(ctx != null);
do {
Thread.sleep(3000);
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1539891&r1=1539890&r2=1539891&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Nov 8 01:03:10 2013
@@ -235,6 +235,8 @@ Release 2.2.1 - UNRELEASED
HDFS-5252. Stable write is not handled correctly in someplace. (brandonli)
+ HDFS-5364. Add OpenFileCtx cache. (brandonli)
+
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES