You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2013/09/24 02:57:45 UTC
svn commit: r1525759 - in
/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project:
hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/
hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/
hadoop-hdfs/src/main/java/ hadoo...
Author: arp
Date: Tue Sep 24 00:57:43 2013
New Revision: 1525759
URL: http://svn.apache.org/r1525759
Log:
Merging r1525409 through r1525758 from trunk to branch HDFS-2832
Modified:
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/ (props changed)
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ (props changed)
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/ (props changed)
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/ (props changed)
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/ (props changed)
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/ (props changed)
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/ (props changed)
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml
Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1525409-1525758
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java?rev=1525759&r1=1525758&r2=1525759&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java Tue Sep 24 00:57:43 2013
@@ -97,7 +97,7 @@ public class AsyncDataService {
void writeAsync(OpenFileCtx openFileCtx) {
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling write back task for fileId: "
- + openFileCtx.copyLatestAttr().getFileId());
+ + openFileCtx.getLatestAttr().getFileId());
}
WriteBackTask wbTask = new WriteBackTask(openFileCtx);
execute(wbTask);
@@ -125,7 +125,7 @@ public class AsyncDataService {
public String toString() {
// Called in AsyncDataService.execute for displaying error messages.
return "write back data for fileId"
- + openFileCtx.copyLatestAttr().getFileId() + " with nextOffset "
+ + openFileCtx.getLatestAttr().getFileId() + " with nextOffset "
+ openFileCtx.getNextOffset();
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java?rev=1525759&r1=1525758&r2=1525759&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java Tue Sep 24 00:57:43 2013
@@ -17,19 +17,34 @@
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
+import java.util.Comparator;
+
+import com.google.common.base.Preconditions;
+
/**
* OffsetRange is the range of read/write request. A single point (e.g.,[5,5])
* is not a valid range.
*/
-public class OffsetRange implements Comparable<OffsetRange> {
+public class OffsetRange {
+
+ public static final Comparator<OffsetRange> ReverseComparatorOnMin =
+ new Comparator<OffsetRange>() {
+ @Override
+ public int compare(OffsetRange o1, OffsetRange o2) {
+ if (o1.getMin() == o2.getMin()) {
+ return o1.getMax() < o2.getMax() ?
+ 1 : (o1.getMax() > o2.getMax() ? -1 : 0);
+ } else {
+ return o1.getMin() < o2.getMin() ? 1 : -1;
+ }
+ }
+ };
+
private final long min;
private final long max;
OffsetRange(long min, long max) {
- if ((min >= max) || (min < 0) || (max < 0)) {
- throw new IllegalArgumentException("Wrong offset range: (" + min + ","
- + max + ")");
- }
+ Preconditions.checkArgument(min >= 0 && max >= 0 && min < max);
this.min = min;
this.max = max;
}
@@ -49,24 +64,10 @@ public class OffsetRange implements Comp
@Override
public boolean equals(Object o) {
- assert (o instanceof OffsetRange);
- OffsetRange range = (OffsetRange) o;
- return (min == range.getMin()) && (max == range.getMax());
- }
-
- private static int compareTo(long left, long right) {
- if (left < right) {
- return -1;
- } else if (left > right) {
- return 1;
- } else {
- return 0;
+ if (o instanceof OffsetRange) {
+ OffsetRange range = (OffsetRange) o;
+ return (min == range.getMin()) && (max == range.getMax());
}
- }
-
- @Override
- public int compareTo(OffsetRange other) {
- final int d = compareTo(min, other.getMin());
- return d != 0 ? d : compareTo(max, other.getMax());
+ return false;
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1525759&r1=1525758&r2=1525759&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Tue Sep 24 00:57:43 2013
@@ -22,12 +22,14 @@ import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.channels.ClosedChannelException;
import java.security.InvalidParameterException;
import java.util.EnumSet;
import java.util.Iterator;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -50,8 +52,11 @@ import org.apache.hadoop.nfs.nfs3.respon
import org.apache.hadoop.nfs.nfs3.response.WccData;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.VerifierNone;
+import org.apache.hadoop.util.Daemon;
import org.jboss.netty.channel.Channel;
+import com.google.common.base.Preconditions;
+
/**
* OpenFileCtx saves the context of one HDFS file output stream. Access to it is
* synchronized by its member lock.
@@ -59,34 +64,42 @@ import org.jboss.netty.channel.Channel;
class OpenFileCtx {
public static final Log LOG = LogFactory.getLog(OpenFileCtx.class);
- /**
- * Lock to synchronize OpenFileCtx changes. Thread should get this lock before
- * any read/write operation to an OpenFileCtx object
- */
- private final ReentrantLock ctxLock;
+ // Pending writes water mark for dump, 1MB
+ private static long DUMP_WRITE_WATER_MARK = 1024 * 1024;
+
+ public final static int COMMIT_FINISHED = 0;
+ public final static int COMMIT_WAIT = 1;
+ public final static int COMMIT_INACTIVE_CTX = 2;
+ public final static int COMMIT_INACTIVE_WITH_PENDING_WRITE = 3;
+ public final static int COMMIT_ERROR = 4;
// The stream status. False means the stream is closed.
- private boolean activeState;
+ private volatile boolean activeState;
// The stream write-back status. True means one thread is doing write back.
- private boolean asyncStatus;
+ private volatile boolean asyncStatus;
+ /**
+ * The current offset of the file in HDFS. All the content before this offset
+ * has been written back to HDFS.
+ */
+ private AtomicLong nextOffset;
private final HdfsDataOutputStream fos;
+
+ // TODO: make it mutable and update it after each writing back to HDFS
private final Nfs3FileAttributes latestAttr;
- private long nextOffset;
- private final SortedMap<OffsetRange, WriteCtx> pendingWrites;
+ private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites;
// The last write, commit request or write-back event. Updating time to keep
// output steam alive.
private long lastAccessTime;
- // Pending writes water mark for dump, 1MB
- private static int DUMP_WRITE_WATER_MARK = 1024 * 1024;
+ private volatile boolean enabledDump;
private FileOutputStream dumpOut;
- private long nonSequentialWriteInMemory;
- private boolean enabledDump;
+ private AtomicLong nonSequentialWriteInMemory;
private RandomAccessFile raf;
private final String dumpFilePath;
+ private Daemon dumpThread;
private void updateLastAccessTime() {
lastAccessTime = System.currentTimeMillis();
@@ -96,89 +109,50 @@ class OpenFileCtx {
return System.currentTimeMillis() - lastAccessTime > streamTimeout;
}
+ public long getNextOffset() {
+ return nextOffset.get();
+ }
+
// Increase or decrease the memory occupation of non-sequential writes
private long updateNonSequentialWriteInMemory(long count) {
- nonSequentialWriteInMemory += count;
+ long newValue = nonSequentialWriteInMemory.addAndGet(count);
if (LOG.isDebugEnabled()) {
LOG.debug("Update nonSequentialWriteInMemory by " + count + " new value:"
- + nonSequentialWriteInMemory);
+ + newValue);
}
- if (nonSequentialWriteInMemory < 0) {
- LOG.error("nonSequentialWriteInMemory is negative after update with count "
- + count);
- throw new IllegalArgumentException(
- "nonSequentialWriteInMemory is negative after update with count "
- + count);
- }
- return nonSequentialWriteInMemory;
+ Preconditions.checkState(newValue >= 0,
+ "nonSequentialWriteInMemory is negative after update with count "
+ + count);
+ return newValue;
}
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
String dumpFilePath) {
this.fos = fos;
this.latestAttr = latestAttr;
- pendingWrites = new TreeMap<OffsetRange, WriteCtx>();
+ // We use the ReverseComparatorOnMin as the comparator of the map. In this
+ // way, we first dump the data with larger offset. In the meanwhile, we
+ // retrieve the last element to write back to HDFS.
+ pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>(
+ OffsetRange.ReverseComparatorOnMin);
updateLastAccessTime();
activeState = true;
asyncStatus = false;
dumpOut = null;
raf = null;
- nonSequentialWriteInMemory = 0;
+ nonSequentialWriteInMemory = new AtomicLong(0);
+
this.dumpFilePath = dumpFilePath;
enabledDump = dumpFilePath == null ? false: true;
- nextOffset = latestAttr.getSize();
- assert(nextOffset == this.fos.getPos());
-
- ctxLock = new ReentrantLock(true);
+ nextOffset = new AtomicLong();
+ nextOffset.set(latestAttr.getSize());
+ assert(nextOffset.get() == this.fos.getPos());
+ dumpThread = null;
}
- private void lockCtx() {
- if (LOG.isTraceEnabled()) {
- StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
- StackTraceElement e = stacktrace[2];
- String methodName = e.getMethodName();
- LOG.trace("lock ctx, caller:" + methodName);
- }
- ctxLock.lock();
- }
-
- private void unlockCtx() {
- ctxLock.unlock();
- if (LOG.isTraceEnabled()) {
- StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
- StackTraceElement e = stacktrace[2];
- String methodName = e.getMethodName();
- LOG.info("unlock ctx, caller:" + methodName);
- }
- }
-
- // Make a copy of the latestAttr
- public Nfs3FileAttributes copyLatestAttr() {
- Nfs3FileAttributes ret;
- lockCtx();
- try {
- ret = new Nfs3FileAttributes(latestAttr);
- } finally {
- unlockCtx();
- }
- return ret;
- }
-
- private long getNextOffsetUnprotected() {
- assert(ctxLock.isLocked());
- return nextOffset;
- }
-
- public long getNextOffset() {
- long ret;
- lockCtx();
- try {
- ret = getNextOffsetUnprotected();
- } finally {
- unlockCtx();
- }
- return ret;
+ public Nfs3FileAttributes getLatestAttr() {
+ return latestAttr;
}
// Get flushed offset. Note that flushed data may not be persisted.
@@ -187,12 +161,7 @@ class OpenFileCtx {
}
// Check if need to dump the new writes
- private void checkDump(long count) {
- assert (ctxLock.isLocked());
-
- // Always update the in memory count
- updateNonSequentialWriteInMemory(count);
-
+ private void checkDump() {
if (!enabledDump) {
if (LOG.isDebugEnabled()) {
LOG.debug("Do nothing, dump is disabled.");
@@ -200,66 +169,111 @@ class OpenFileCtx {
return;
}
- if (nonSequentialWriteInMemory < DUMP_WRITE_WATER_MARK) {
+ if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
return;
}
- // Create dump outputstream for the first time
- if (dumpOut == null) {
- LOG.info("Create dump file:" + dumpFilePath);
- File dumpFile = new File(dumpFilePath);
- try {
- if (dumpFile.exists()) {
- LOG.fatal("The dump file should not exist:" + dumpFilePath);
- throw new RuntimeException("The dump file should not exist:"
- + dumpFilePath);
+ // wake up the dumper thread to dump the data
+ synchronized (this) {
+ if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Asking dumper to dump...");
+ }
+ if (dumpThread == null) {
+ dumpThread = new Daemon(new Dumper());
+ dumpThread.start();
+ } else {
+ this.notifyAll();
}
- dumpOut = new FileOutputStream(dumpFile);
- } catch (IOException e) {
- LOG.error("Got failure when creating dump stream " + dumpFilePath
- + " with error:" + e);
- enabledDump = false;
- IOUtils.cleanup(LOG, dumpOut);
- return;
}
}
- // Get raf for the first dump
- if (raf == null) {
- try {
- raf = new RandomAccessFile(dumpFilePath, "r");
- } catch (FileNotFoundException e) {
- LOG.error("Can't get random access to file " + dumpFilePath);
- // Disable dump
- enabledDump = false;
- return;
+ }
+
+ class Dumper implements Runnable {
+ /** Dump data into a file */
+ private void dump() {
+ // Create dump outputstream for the first time
+ if (dumpOut == null) {
+ LOG.info("Create dump file:" + dumpFilePath);
+ File dumpFile = new File(dumpFilePath);
+ try {
+ synchronized (this) {
+ // check if alive again
+ Preconditions.checkState(dumpFile.createNewFile(),
+ "The dump file should not exist: %s", dumpFilePath);
+ dumpOut = new FileOutputStream(dumpFile);
+ }
+ } catch (IOException e) {
+ LOG.error("Got failure when creating dump stream " + dumpFilePath, e);
+ enabledDump = false;
+ if (dumpOut != null) {
+ try {
+ dumpOut.close();
+ } catch (IOException e1) {
+ LOG.error("Can't close dump stream " + dumpFilePath, e);
+ }
+ }
+ return;
+ }
}
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Start dump, current write number:" + pendingWrites.size());
- }
- Iterator<OffsetRange> it = pendingWrites.keySet().iterator();
- while (it.hasNext()) {
- OffsetRange key = it.next();
- WriteCtx writeCtx = pendingWrites.get(key);
- try {
- long dumpedDataSize = writeCtx.dumpData(dumpOut, raf);
- if (dumpedDataSize > 0) {
- updateNonSequentialWriteInMemory(-dumpedDataSize);
+
+ // Get raf for the first dump
+ if (raf == null) {
+ try {
+ raf = new RandomAccessFile(dumpFilePath, "r");
+ } catch (FileNotFoundException e) {
+ LOG.error("Can't get random access to file " + dumpFilePath);
+ // Disable dump
+ enabledDump = false;
+ return;
}
- } catch (IOException e) {
- LOG.error("Dump data failed:" + writeCtx + " with error:" + e);
- // Disable dump
- enabledDump = false;
- return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Start dump. Before dump, nonSequentialWriteInMemory == "
+ + nonSequentialWriteInMemory.get());
+ }
+
+ Iterator<OffsetRange> it = pendingWrites.keySet().iterator();
+ while (activeState && it.hasNext()
+ && nonSequentialWriteInMemory.get() > 0) {
+ OffsetRange key = it.next();
+ WriteCtx writeCtx = pendingWrites.get(key);
+ try {
+ long dumpedDataSize = writeCtx.dumpData(dumpOut, raf);
+ if (dumpedDataSize > 0) {
+ updateNonSequentialWriteInMemory(-dumpedDataSize);
+ }
+ } catch (IOException e) {
+ LOG.error("Dump data failed:" + writeCtx + " with error:" + e
+ + " OpenFileCtx state:" + activeState);
+ // Disable dump
+ enabledDump = false;
+ return;
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("After dump, nonSequentialWriteInMemory == "
+ + nonSequentialWriteInMemory.get());
}
}
- if (nonSequentialWriteInMemory != 0) {
- LOG.fatal("After dump, nonSequentialWriteInMemory is not zero: "
- + nonSequentialWriteInMemory);
- throw new RuntimeException(
- "After dump, nonSequentialWriteInMemory is not zero: "
- + nonSequentialWriteInMemory);
+
+ @Override
+ public void run() {
+ while (activeState && enabledDump) {
+ if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
+ dump();
+ }
+ synchronized (OpenFileCtx.this) {
+ if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
+ try {
+ OpenFileCtx.this.wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
}
}
@@ -283,148 +297,196 @@ class OpenFileCtx {
public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request,
Channel channel, int xid, AsyncDataService asyncDataService,
IdUserGroup iug) {
-
- lockCtx();
- try {
- if (!activeState) {
- LOG.info("OpenFileCtx is inactive, fileId:"
- + request.getHandle().getFileId());
- WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
- WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
- fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
- Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
- new XDR(), xid, new VerifierNone()), xid);
- } else {
- // Handle repeated write requests(same xid or not).
- // If already replied, send reply again. If not replied, drop the
- // repeated request.
- WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel,
- xid);
- if (existantWriteCtx != null) {
- if (!existantWriteCtx.getReplied()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Repeated write request which hasn't be served: xid="
- + xid + ", drop it.");
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Repeated write request which is already served: xid="
- + xid + ", resend response.");
- }
- WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
- WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
- fileWcc, request.getCount(), request.getStableHow(),
- Nfs3Constant.WRITE_COMMIT_VERF);
- Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
- new XDR(), xid, new VerifierNone()), xid);
+
+ if (!activeState) {
+ LOG.info("OpenFileCtx is inactive, fileId:"
+ + request.getHandle().getFileId());
+ WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
+ WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
+ fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
+ Nfs3Utils.writeChannel(channel,
+ response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
+ xid);
+ } else {
+ // Update the write time first
+ updateLastAccessTime();
+
+ // Handle repeated write requests (same xid or not).
+ // If already replied, send reply again. If not replied, drop the
+ // repeated request.
+ WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel,
+ xid);
+ if (existantWriteCtx != null) {
+ if (!existantWriteCtx.getReplied()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Repeated write request which hasn't be served: xid="
+ + xid + ", drop it.");
}
- updateLastAccessTime();
-
} else {
- receivedNewWriteInternal(dfsClient, request, channel, xid,
- asyncDataService, iug);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Repeated write request which is already served: xid="
+ + xid + ", resend response.");
+ }
+ WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
+ WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
+ fileWcc, request.getCount(), request.getStableHow(),
+ Nfs3Constant.WRITE_COMMIT_VERF);
+ Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
+ new XDR(), xid, new VerifierNone()), xid);
}
+ } else {
+ // not a repeated write request
+ receivedNewWriteInternal(dfsClient, request, channel, xid,
+ asyncDataService, iug);
}
-
- } finally {
- unlockCtx();
}
}
- private void receivedNewWriteInternal(DFSClient dfsClient,
- WRITE3Request request, Channel channel, int xid,
- AsyncDataService asyncDataService, IdUserGroup iug) {
+ /**
+ * Creates and adds a WriteCtx into the pendingWrites map. This is a
+ * synchronized method to handle concurrent writes.
+ *
+ * @return A non-null {@link WriteCtx} instance if the incoming write
+ * request's offset >= nextOffset. Otherwise null.
+ */
+ private synchronized WriteCtx addWritesToCache(WRITE3Request request,
+ Channel channel, int xid) {
long offset = request.getOffset();
int count = request.getCount();
- WriteStableHow stableHow = request.getStableHow();
+ long cachedOffset = nextOffset.get();
- // Get file length, fail non-append call
- WccAttr preOpAttr = latestAttr.getWccAttr();
if (LOG.isDebugEnabled()) {
- LOG.debug("requesed offset=" + offset + " and current filesize="
- + preOpAttr.getSize());
+ LOG.debug("requesed offset=" + offset + " and current offset="
+ + cachedOffset);
}
- long nextOffset = getNextOffsetUnprotected();
- if (offset == nextOffset) {
- LOG.info("Add to the list, update nextOffset and notify the writer,"
- + " nextOffset:" + nextOffset);
+ // Fail non-append call
+ if (offset < cachedOffset) {
+ LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
+ + nextOffset + ")");
+ return null;
+ } else {
+ DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP
+ : WriteCtx.DataState.ALLOW_DUMP;
WriteCtx writeCtx = new WriteCtx(request.getHandle(),
request.getOffset(), request.getCount(), request.getStableHow(),
- request.getData().array(), channel, xid, false, DataState.NO_DUMP);
- addWrite(writeCtx);
-
- // Create an async task and change openFileCtx status to indicate async
- // task pending
+ request.getData().array(), channel, xid, false, dataState);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Add new write to the list with nextOffset " + cachedOffset
+ + " and requesed offset=" + offset);
+ }
+ if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
+ // update the memory size
+ updateNonSequentialWriteInMemory(count);
+ }
+ // check if there is a WriteCtx with the same range in pendingWrites
+ WriteCtx oldWriteCtx = checkRepeatedWriteRequest(request, channel, xid);
+ if (oldWriteCtx == null) {
+ addWrite(writeCtx);
+ } else {
+ LOG.warn("Got a repeated request, same range, with xid:"
+ + writeCtx.getXid());
+ }
+ return writeCtx;
+ }
+ }
+
+ /** Process an overwrite write request */
+ private void processOverWrite(DFSClient dfsClient, WRITE3Request request,
+ Channel channel, int xid, IdUserGroup iug) {
+ WccData wccData = new WccData(latestAttr.getWccAttr(), null);
+ long offset = request.getOffset();
+ int count = request.getCount();
+ WriteStableHow stableHow = request.getStableHow();
+ WRITE3Response response;
+ long cachedOffset = nextOffset.get();
+ if (offset + count > cachedOffset) {
+ LOG.warn("Haven't noticed any partial overwrite for a sequential file"
+ + " write requests. Treat it as a real random write, no support.");
+ response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
+ WriteStableHow.UNSTABLE, 0);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Process perfectOverWrite");
+ }
+ // TODO: let executor handle perfect overwrite
+ response = processPerfectOverWrite(dfsClient, offset, count, stableHow,
+ request.getData().array(),
+ Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug);
+ }
+ updateLastAccessTime();
+ Nfs3Utils.writeChannel(channel,
+ response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()),
+ xid);
+ }
+
+ /**
+ * Check if we can start the write (back to HDFS) now. If there is no hole for
+ * writing, and there is no other threads writing (i.e., asyncStatus is
+ * false), start the writing and set asyncStatus to true.
+ *
+ * @return True if the new write is sequencial and we can start writing
+ * (including the case that there is already a thread writing).
+ */
+ private synchronized boolean checkAndStartWrite(
+ AsyncDataService asyncDataService, WriteCtx writeCtx) {
+
+ if (writeCtx.getOffset() == nextOffset.get()) {
if (!asyncStatus) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trigger the write back task. Current nextOffset: "
+ + nextOffset.get());
+ }
asyncStatus = true;
asyncDataService.execute(new AsyncDataService.WriteBackTask(this));
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The write back thread is working.");
+ }
}
-
- // Update the write time first
- updateLastAccessTime();
- Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr);
-
- // Send response immediately for unstable write
- if (request.getStableHow() == WriteStableHow.UNSTABLE) {
- WccData fileWcc = new WccData(preOpAttr, postOpAttr);
- WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
- fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
- Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
- new XDR(), xid, new VerifierNone()), xid);
- writeCtx.setReplied(true);
- }
-
- } else if (offset > nextOffset) {
- LOG.info("Add new write to the list but not update nextOffset:"
- + nextOffset);
- WriteCtx writeCtx = new WriteCtx(request.getHandle(),
- request.getOffset(), request.getCount(), request.getStableHow(),
- request.getData().array(), channel, xid, false, DataState.ALLOW_DUMP);
- addWrite(writeCtx);
+ return true;
+ } else {
+ return false;
+ }
+ }
- // Check if need to dump some pending requests to file
- checkDump(request.getCount());
- updateLastAccessTime();
- Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr);
-
- // In test, noticed some Linux client sends a batch (e.g., 1MB)
- // of reordered writes and won't send more writes until it gets
- // responses of the previous batch. So here send response immediately for
- // unstable non-sequential write
- if (request.getStableHow() == WriteStableHow.UNSTABLE) {
- WccData fileWcc = new WccData(preOpAttr, postOpAttr);
- WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
- fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
- Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
- new XDR(), xid, new VerifierNone()), xid);
- writeCtx.setReplied(true);
- }
+ private void receivedNewWriteInternal(DFSClient dfsClient,
+ WRITE3Request request, Channel channel, int xid,
+ AsyncDataService asyncDataService, IdUserGroup iug) {
+ WriteStableHow stableHow = request.getStableHow();
+ WccAttr preOpAttr = latestAttr.getWccAttr();
+ int count = request.getCount();
- } else {
+ WriteCtx writeCtx = addWritesToCache(request, channel, xid);
+ if (writeCtx == null) {
// offset < nextOffset
- LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
- + nextOffset + ")");
- WccData wccData = new WccData(preOpAttr, null);
- WRITE3Response response;
-
- if (offset + count > nextOffset) {
- LOG.warn("Haven't noticed any partial overwrite out of a sequential file"
- + "write requests, so treat it as a real random write, no support.");
- response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
- WriteStableHow.UNSTABLE, 0);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Process perfectOverWrite");
+ processOverWrite(dfsClient, request, channel, xid, iug);
+ } else {
+ // The writes is added to pendingWrites.
+ // Check and start writing back if necessary
+ boolean startWriting = checkAndStartWrite(asyncDataService, writeCtx);
+ if (!startWriting) {
+ // offset > nextOffset. check if we need to dump data
+ checkDump();
+
+ // In test, noticed some Linux client sends a batch (e.g., 1MB)
+ // of reordered writes and won't send more writes until it gets
+ // responses of the previous batch. So here send response immediately
+ // for unstable non-sequential write
+ if (request.getStableHow() == WriteStableHow.UNSTABLE) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("UNSTABLE write request, send response for offset: "
+ + writeCtx.getOffset());
+ }
+ WccData fileWcc = new WccData(preOpAttr, latestAttr);
+ WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
+ fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+ Nfs3Utils
+ .writeChannel(channel, response.writeHeaderAndResponse(new XDR(),
+ xid, new VerifierNone()), xid);
+ writeCtx.setReplied(true);
}
- response = processPerfectOverWrite(dfsClient, offset, count, stableHow,
- request.getData().array(),
- Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug);
}
-
- updateLastAccessTime();
- Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
- new XDR(), xid, new VerifierNone()), xid);
}
}
@@ -436,7 +498,6 @@ class OpenFileCtx {
private WRITE3Response processPerfectOverWrite(DFSClient dfsClient,
long offset, int count, WriteStableHow stableHow, byte[] data,
String path, WccData wccData, IdUserGroup iug) {
- assert (ctxLock.isLocked());
WRITE3Response response = null;
// Read the content back
@@ -447,21 +508,30 @@ class OpenFileCtx {
try {
// Sync file data and length to avoid partial read failure
fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
-
+ } catch (ClosedChannelException closedException) {
+ LOG.info("The FSDataOutputStream has been closed. " +
+ "Continue processing the perfect overwrite.");
+ } catch (IOException e) {
+ LOG.info("hsync failed when processing possible perfect overwrite, path="
+ + path + " error:" + e);
+ return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
+ Nfs3Constant.WRITE_COMMIT_VERF);
+ }
+
+ try {
fis = new FSDataInputStream(dfsClient.open(path));
readCount = fis.read(offset, readbuffer, 0, count);
if (readCount < count) {
LOG.error("Can't read back " + count + " bytes, partial read size:"
+ readCount);
- return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0,
- stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+ return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
+ Nfs3Constant.WRITE_COMMIT_VERF);
}
-
} catch (IOException e) {
LOG.info("Read failed when processing possible perfect overwrite, path="
+ path + " error:" + e);
- return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0,
- stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
+ return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
+ Nfs3Constant.WRITE_COMMIT_VERF);
} finally {
IOUtils.cleanup(LOG, fis);
}
@@ -492,40 +562,26 @@ class OpenFileCtx {
}
return response;
}
-
- public final static int COMMIT_FINISHED = 0;
- public final static int COMMIT_WAIT = 1;
- public final static int COMMIT_INACTIVE_CTX = 2;
- public final static int COMMIT_ERROR = 3;
/**
* return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
* COMMIT_INACTIVE_CTX, COMMIT_ERROR
*/
public int checkCommit(long commitOffset) {
- int ret = COMMIT_WAIT;
-
- lockCtx();
- try {
- if (!activeState) {
- ret = COMMIT_INACTIVE_CTX;
- } else {
- ret = checkCommitInternal(commitOffset);
- }
- } finally {
- unlockCtx();
- }
- return ret;
+ return activeState ? checkCommitInternal(commitOffset)
+ : COMMIT_INACTIVE_CTX;
}
private int checkCommitInternal(long commitOffset) {
if (commitOffset == 0) {
// Commit whole file
- commitOffset = getNextOffsetUnprotected();
+ commitOffset = nextOffset.get();
}
long flushed = getFlushedOffset();
- LOG.info("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
+ }
if (flushed < commitOffset) {
// Keep stream active
updateLastAccessTime();
@@ -538,6 +594,13 @@ class OpenFileCtx {
fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
// Nothing to do for metadata since attr related change is pass-through
ret = COMMIT_FINISHED;
+ } catch (ClosedChannelException cce) {
+ ret = COMMIT_INACTIVE_CTX;
+ if (pendingWrites.isEmpty()) {
+ ret = COMMIT_INACTIVE_CTX;
+ } else {
+ ret = COMMIT_INACTIVE_WITH_PENDING_WRITE;
+ }
} catch (IOException e) {
LOG.error("Got stream error during data sync:" + e);
// Do nothing. Stream will be closed eventually by StreamMonitor.
@@ -550,18 +613,16 @@ class OpenFileCtx {
}
private void addWrite(WriteCtx writeCtx) {
- assert (ctxLock.isLocked());
long offset = writeCtx.getOffset();
int count = writeCtx.getCount();
pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
}
-
/**
* Check stream status to decide if it should be closed
* @return true, remove stream; false, keep stream
*/
- public boolean streamCleanup(long fileId, long streamTimeout) {
+ public synchronized boolean streamCleanup(long fileId, long streamTimeout) {
if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) {
throw new InvalidParameterException("StreamTimeout" + streamTimeout
+ "ms is less than MINIMIUM_STREAM_TIMEOUT "
@@ -569,107 +630,97 @@ class OpenFileCtx {
}
boolean flag = false;
- if (!ctxLock.tryLock()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Another thread is working on it" + ctxLock.toString());
- }
- return flag;
- }
-
- try {
- // Check the stream timeout
- if (checkStreamTimeout(streamTimeout)) {
- LOG.info("closing stream for fileId:" + fileId);
- cleanup();
- flag = true;
+ // Check the stream timeout
+ if (checkStreamTimeout(streamTimeout)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("closing stream for fileId:" + fileId);
}
- } finally {
- unlockCtx();
+ cleanup();
+ flag = true;
}
return flag;
}
- // Invoked by AsynDataService to do the write back
- public void executeWriteBack() {
- long nextOffset;
- OffsetRange key;
- WriteCtx writeCtx;
-
- try {
- // Don't lock OpenFileCtx for all writes to reduce the timeout of other
- // client request to the same file
- while (true) {
- lockCtx();
- if (!asyncStatus) {
- // This should never happen. There should be only one thread working
- // on one OpenFileCtx anytime.
- LOG.fatal("The openFileCtx has false async status");
- throw new RuntimeException("The openFileCtx has false async status");
- }
- // Any single write failure can change activeState to false, so do the
- // check each loop.
- if (pendingWrites.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("The asyn write task has no pendding writes, fileId: "
- + latestAttr.getFileId());
- }
- break;
+ /**
+ * Get (and remove) the next WriteCtx from {@link #pendingWrites} if possible.
+ *
+ * @return Null if {@link #pendingWrites} is null, or the next WriteCtx's
+ * offset is larger than nextOffSet.
+ */
+ private synchronized WriteCtx offerNextToWrite() {
+ if (pendingWrites.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The asyn write task has no pending writes, fileId: "
+ + latestAttr.getFileId());
+ }
+ this.asyncStatus = false;
+ } else {
+ Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry();
+ OffsetRange range = lastEntry.getKey();
+ WriteCtx toWrite = lastEntry.getValue();
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("range.getMin()=" + range.getMin() + " nextOffset="
+ + nextOffset);
+ }
+
+ long offset = nextOffset.get();
+ if (range.getMin() > offset) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The next sequencial write has not arrived yet");
}
- if (!activeState) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("The openFileCtx is not active anymore, fileId: "
- + latestAttr.getFileId());
- }
- break;
+ this.asyncStatus = false;
+ } else if (range.getMin() < offset && range.getMax() > offset) {
+ // shouldn't happen since we do sync for overlapped concurrent writers
+ LOG.warn("Got a overlapping write (" + range.getMin() + ","
+ + range.getMax() + "), nextOffset=" + offset
+ + ". Silently drop it now");
+ pendingWrites.remove(range);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax()
+ + ") from the list");
}
-
- // Get the next sequential write
- nextOffset = getNextOffsetUnprotected();
- key = pendingWrites.firstKey();
- if (LOG.isTraceEnabled()) {
- LOG.trace("key.getMin()=" + key.getMin() + " nextOffset="
- + nextOffset);
+ // after writing, remove the WriteCtx from cache
+ pendingWrites.remove(range);
+ // update nextOffset
+ nextOffset.addAndGet(toWrite.getCount());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Change nextOffset to " + nextOffset.get());
}
-
- if (key.getMin() > nextOffset) {
- if (LOG.isDebugEnabled()) {
- LOG.info("The next sequencial write has not arrived yet");
- }
- break;
-
- } else if (key.getMin() < nextOffset && key.getMax() > nextOffset) {
- // Can't handle overlapping write. Didn't see it in tests yet.
- LOG.fatal("Got a overlapping write (" + key.getMin() + ","
- + key.getMax() + "), nextOffset=" + nextOffset);
- throw new RuntimeException("Got a overlapping write (" + key.getMin()
- + "," + key.getMax() + "), nextOffset=" + nextOffset);
-
- } else {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Remove write(" + key.getMin() + "-" + key.getMax()
- + ") from the list");
- }
- writeCtx = pendingWrites.remove(key);
+ return toWrite;
+ }
+ }
+ return null;
+ }
+
+ /** Invoked by AsynDataService to write back to HDFS */
+ void executeWriteBack() {
+ Preconditions.checkState(asyncStatus,
+ "The openFileCtx has false async status");
+ try {
+ while (activeState) {
+ WriteCtx toWrite = offerNextToWrite();
+ if (toWrite != null) {
// Do the write
- doSingleWrite(writeCtx);
+ doSingleWrite(toWrite);
updateLastAccessTime();
+ } else {
+ break;
}
-
- unlockCtx();
}
-
+
+ if (!activeState && LOG.isDebugEnabled()) {
+ LOG.debug("The openFileCtx is not active anymore, fileId: "
+ + +latestAttr.getFileId());
+ }
} finally {
- // Always reset the async status so another async task can be created
- // for this file
+ // make sure we reset asyncStatus to false
asyncStatus = false;
- if (ctxLock.isHeldByCurrentThread()) {
- unlockCtx();
- }
}
}
private void doSingleWrite(final WriteCtx writeCtx) {
- assert(ctxLock.isLocked());
Channel channel = writeCtx.getChannel();
int xid = writeCtx.getXid();
@@ -679,20 +730,25 @@ class OpenFileCtx {
byte[] data = null;
try {
data = writeCtx.getData();
- } catch (IOException e1) {
+ } catch (Exception e1) {
LOG.error("Failed to get request data offset:" + offset + " count:"
+ count + " error:" + e1);
// Cleanup everything
cleanup();
return;
}
- assert (data.length == count);
+
+ Preconditions.checkState(data.length == count);
FileHandle handle = writeCtx.getHandle();
- LOG.info("do write, fileId: " + handle.getFileId() + " offset: " + offset
- + " length:" + count + " stableHow:" + stableHow.getValue());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("do write, fileId: " + handle.getFileId() + " offset: "
+ + offset + " length:" + count + " stableHow:" + stableHow.getValue());
+ }
try {
+ // The write is not protected by lock. asyncState is used to make sure
+ // there is one thread doing write back at any time
fos.write(data, 0, count);
long flushedOffset = getFlushedOffset();
@@ -701,11 +757,20 @@ class OpenFileCtx {
+ flushedOffset + " and nextOffset should be"
+ (offset + count));
}
- nextOffset = flushedOffset;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("After writing " + handle.getFileId() + " at offset "
+ + offset + ", update the memory count.");
+ }
// Reduce memory occupation size if request was allowed dumped
- if (writeCtx.getDataState() == DataState.ALLOW_DUMP) {
- updateNonSequentialWriteInMemory(-count);
+ if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
+ synchronized (writeCtx) {
+ if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
+ writeCtx.setDataState(WriteCtx.DataState.NO_DUMP);
+ updateNonSequentialWriteInMemory(-count);
+ }
+ }
}
if (!writeCtx.getReplied()) {
@@ -716,7 +781,6 @@ class OpenFileCtx {
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
new XDR(), xid, new VerifierNone()), xid);
}
-
} catch (IOException e) {
LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
+ offset + " and length " + data.length, e);
@@ -733,9 +797,21 @@ class OpenFileCtx {
}
}
- private void cleanup() {
- assert(ctxLock.isLocked());
+ private synchronized void cleanup() {
+ if (!activeState) {
+ LOG.info("Current OpenFileCtx is already inactive, no need to cleanup.");
+ return;
+ }
activeState = false;
+
+ // stop the dump thread
+ if (dumpThread != null) {
+ dumpThread.interrupt();
+ try {
+ dumpThread.join(3000);
+ } catch (InterruptedException e) {
+ }
+ }
// Close stream
try {
@@ -753,7 +829,7 @@ class OpenFileCtx {
while (!pendingWrites.isEmpty()) {
OffsetRange key = pendingWrites.firstKey();
LOG.info("Fail pending write: (" + key.getMin() + "," + key.getMax()
- + "), nextOffset=" + getNextOffsetUnprotected());
+ + "), nextOffset=" + nextOffset.get());
WriteCtx writeCtx = pendingWrites.remove(key);
if (!writeCtx.getReplied()) {
@@ -767,23 +843,23 @@ class OpenFileCtx {
}
// Cleanup dump file
- if (dumpOut!=null){
+ if (dumpOut != null) {
try {
dumpOut.close();
} catch (IOException e) {
e.printStackTrace();
}
+ File dumpFile = new File(dumpFilePath);
+ if (dumpFile.exists() && !dumpFile.delete()) {
+ LOG.error("Failed to delete dumpfile: " + dumpFile);
+ }
}
- if (raf!=null) {
+ if (raf != null) {
try {
raf.close();
} catch (IOException e) {
e.printStackTrace();
}
}
- File dumpFile = new File(dumpFilePath);
- if (dumpFile.delete()) {
- LOG.error("Failed to delete dumpfile: "+ dumpFile);
- }
}
}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java?rev=1525759&r1=1525758&r2=1525759&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java Tue Sep 24 00:57:43 2013
@@ -27,6 +27,8 @@ import org.apache.hadoop.nfs.nfs3.FileHa
import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
import org.jboss.netty.channel.Channel;
+import com.google.common.base.Preconditions;
+
/**
* WriteCtx saves the context of one write request, such as request, channel,
* xid and reply status.
@@ -49,13 +51,21 @@ class WriteCtx {
private final long offset;
private final int count;
private final WriteStableHow stableHow;
- private byte[] data;
+ private volatile byte[] data;
private final Channel channel;
private final int xid;
private boolean replied;
- private DataState dataState;
+ /**
+ * Data belonging to the same {@link OpenFileCtx} may be dumped to a file.
+ * After being dumped to the file, the corresponding {@link WriteCtx} records
+ * the dump file and the offset.
+ */
+ private RandomAccessFile raf;
+ private long dumpFileOffset;
+
+ private volatile DataState dataState;
public DataState getDataState() {
return dataState;
@@ -64,12 +74,13 @@ class WriteCtx {
public void setDataState(DataState dataState) {
this.dataState = dataState;
}
-
- private RandomAccessFile raf;
- private long dumpFileOffset;
- // Return the dumped data size
- public long dumpData(FileOutputStream dumpOut, RandomAccessFile raf)
+ /**
+ * Writing the data into a local file. After the writing, if
+ * {@link #dataState} is still ALLOW_DUMP, set {@link #data} to null and set
+ * {@link #dataState} to DUMPED.
+ */
+ long dumpData(FileOutputStream dumpOut, RandomAccessFile raf)
throws IOException {
if (dataState != DataState.ALLOW_DUMP) {
if (LOG.isTraceEnabled()) {
@@ -84,48 +95,63 @@ class WriteCtx {
if (LOG.isDebugEnabled()) {
LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset);
}
- data = null;
- dataState = DataState.DUMPED;
- return count;
+ // it is possible that while we dump the data, the data is also being
+ // written back to HDFS. After dump, if the writing back has not finished
+ // yet, we change its flag to DUMPED and set the data to null. Otherwise
+ // this WriteCtx instance should have been removed from the buffer.
+ if (dataState == DataState.ALLOW_DUMP) {
+ synchronized (this) {
+ if (dataState == DataState.ALLOW_DUMP) {
+ data = null;
+ dataState = DataState.DUMPED;
+ return count;
+ }
+ }
+ }
+ return 0;
}
- public FileHandle getHandle() {
+ FileHandle getHandle() {
return handle;
}
- public long getOffset() {
+ long getOffset() {
return offset;
}
- public int getCount() {
+ int getCount() {
return count;
}
- public WriteStableHow getStableHow() {
+ WriteStableHow getStableHow() {
return stableHow;
}
- public byte[] getData() throws IOException {
+ byte[] getData() throws IOException {
if (dataState != DataState.DUMPED) {
- if (data == null) {
- throw new IOException("Data is not dumpted but has null:" + this);
- }
- } else {
- // read back
- if (data != null) {
- throw new IOException("Data is dumpted but not null");
- }
- data = new byte[count];
- raf.seek(dumpFileOffset);
- int size = raf.read(data, 0, count);
- if (size != count) {
- throw new IOException("Data count is " + count + ", but read back "
- + size + "bytes");
+ synchronized (this) {
+ if (dataState != DataState.DUMPED) {
+ Preconditions.checkState(data != null);
+ return data;
+ }
}
}
+ // read back from dumped file
+ this.loadData();
return data;
}
+ private void loadData() throws IOException {
+ Preconditions.checkState(data == null);
+ data = new byte[count];
+ raf.seek(dumpFileOffset);
+ int size = raf.read(data, 0, count);
+ if (size != count) {
+ throw new IOException("Data count is " + count + ", but read back "
+ + size + "bytes");
+ }
+ }
+
Channel getChannel() {
return channel;
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1525759&r1=1525758&r2=1525759&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java Tue Sep 24 00:57:43 2013
@@ -67,8 +67,8 @@ public class WriteManager {
*/
private long streamTimeout;
- public static final long DEFAULT_STREAM_TIMEOUT = 10 * 1000; // 10 second
- public static final long MINIMIUM_STREAM_TIMEOUT = 1 * 1000; // 1 second
+ public static final long DEFAULT_STREAM_TIMEOUT = 10 * 60 * 1000; //10 minutes
+ public static final long MINIMIUM_STREAM_TIMEOUT = 10 * 1000; //10 seconds
void addOpenFileStream(FileHandle h, OpenFileCtx ctx) {
openFileMap.put(h, ctx);
@@ -215,6 +215,10 @@ public class WriteManager {
LOG.info("Inactive stream, fileId=" + fileHandle.getFileId()
+ " commitOffset=" + commitOffset);
return true;
+ } else if (ret == OpenFileCtx.COMMIT_INACTIVE_WITH_PENDING_WRITE) {
+ LOG.info("Inactive stream with pending writes, fileId="
+ + fileHandle.getFileId() + " commitOffset=" + commitOffset);
+ return false;
}
assert (ret == OpenFileCtx.COMMIT_WAIT || ret == OpenFileCtx.COMMIT_ERROR);
if (ret == OpenFileCtx.COMMIT_ERROR) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java?rev=1525759&r1=1525758&r2=1525759&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java Tue Sep 24 00:57:43 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.nfs.nfs3;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
import java.io.IOException;
@@ -51,8 +52,9 @@ public class TestOffsetRange {
OffsetRange r3 = new OffsetRange(1, 3);
OffsetRange r4 = new OffsetRange(3, 4);
- assertTrue(r2.compareTo(r3) == 0);
- assertTrue(r2.compareTo(r1) == 1);
- assertTrue(r2.compareTo(r4) == -1);
+ assertEquals(0, OffsetRange.ReverseComparatorOnMin.compare(r2, r3));
+ assertEquals(0, OffsetRange.ReverseComparatorOnMin.compare(r2, r2));
+ assertTrue(OffsetRange.ReverseComparatorOnMin.compare(r2, r1) < 0);
+ assertTrue(OffsetRange.ReverseComparatorOnMin.compare(r2, r4) > 0);
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1525759&r1=1525758&r2=1525759&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Sep 24 00:57:43 2013
@@ -286,8 +286,12 @@ Release 2.3.0 - UNRELEASED
and the excludedNodes parameter types respectively to Node and Set.
(Junping Du via szetszwo)
+ HDFS-5240. Separate formatting from logging in the audit logger API (daryn)
+
OPTIMIZATIONS
+ HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
+
BUG FIXES
HDFS-5034. Remove debug prints from GetFileLinkInfo (Andrew Wang via Colin
Patrick McCabe)
@@ -321,6 +325,8 @@ Release 2.2.0 - UNRELEASED
BUG FIXES
+ HDFS-5139. Remove redundant -R option from setrep.
+
Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES
@@ -408,6 +414,9 @@ Release 2.1.1-beta - 2013-09-23
HDFS-5212. Refactor RpcMessage and NFS3Response to support different
types of authentication information. (jing9)
+ HDFS-4971. Move IO operations out of locking in OpenFileCtx. (brandonli and
+ jing9)
+
OPTIMIZATIONS
BUG FIXES
Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1525409-1525758
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1525759&r1=1525758&r2=1525759&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Sep 24 00:57:43 2013
@@ -443,7 +443,7 @@ public class FSNamesystem implements Nam
private final long accessTimePrecision;
/** Lock to protect FSNamesystem. */
- private ReentrantReadWriteLock fsLock = new ReentrantReadWriteLock(true);
+ private ReentrantReadWriteLock fsLock;
/**
* Used when this NN is in standby state to read from the shared edit log.
@@ -618,6 +618,9 @@ public class FSNamesystem implements Nam
*/
FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache)
throws IOException {
+ boolean fair = conf.getBoolean("dfs.namenode.fslock.fair", true);
+ LOG.info("fsLock is fair:" + fair);
+ fsLock = new ReentrantReadWriteLock(fair);
try {
resourceRecheckInterval = conf.getLong(
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
@@ -6893,10 +6896,13 @@ public class FSNamesystem implements Nam
}
sb.append(trackingId);
}
- auditLog.info(sb);
+ logAuditMessage(sb.toString());
}
}
+ public void logAuditMessage(String message) {
+ auditLog.info(message);
+ }
}
}
Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1525409-1525758
Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode:r1525409-1525758
Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1525409-1525758
Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary:r1525409-1525758
Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs:r1525409-1525758
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java?rev=1525759&r1=1525758&r2=1525759&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java Tue Sep 24 00:57:43 2013
@@ -61,6 +61,8 @@ import static org.junit.Assert.*;
public class TestDFSShell {
private static final Log LOG = LogFactory.getLog(TestDFSShell.class);
private static AtomicInteger counter = new AtomicInteger();
+ private final int SUCCESS = 0;
+ private final int ERROR = 1;
static final String TEST_ROOT_DIR = PathUtils.getTestDirName(TestDFSShell.class);
@@ -1619,9 +1621,6 @@ public class TestDFSShell {
// force Copy Option is -f
@Test (timeout = 30000)
public void testCopyCommandsWithForceOption() throws Exception {
- final int SUCCESS = 0;
- final int ERROR = 1;
-
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.format(true).build();
@@ -1682,7 +1681,55 @@ public class TestDFSShell {
}
cluster.shutdown();
}
+ }
+
+ // setrep for file and directory.
+ @Test (timeout = 30000)
+ public void testSetrep() throws Exception {
+
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+ .format(true).build();
+ FsShell shell = null;
+ FileSystem fs = null;
+
+ final String testdir1 = "/tmp/TestDFSShell-testSetrep-" + counter.getAndIncrement();
+ final String testdir2 = testdir1 + "/nestedDir";
+ final Path hdfsFile1 = new Path(testdir1, "testFileForSetrep");
+ final Path hdfsFile2 = new Path(testdir2, "testFileForSetrep");
+ final Short oldRepFactor = new Short((short) 1);
+ final Short newRepFactor = new Short((short) 3);
+ try {
+ String[] argv;
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ assertThat(fs.mkdirs(new Path(testdir2)), is(true));
+ shell = new FsShell(conf);
+
+ fs.create(hdfsFile1, true).close();
+ fs.create(hdfsFile2, true).close();
+
+ // Tests for setrep on a file.
+ argv = new String[] { "-setrep", newRepFactor.toString(), hdfsFile1.toString() };
+ assertThat(shell.run(argv), is(SUCCESS));
+ assertThat(fs.getFileStatus(hdfsFile1).getReplication(), is(newRepFactor));
+ assertThat(fs.getFileStatus(hdfsFile2).getReplication(), is(oldRepFactor));
+
+ // Tests for setrep
+
+ // Tests for setrep on a directory and make sure it is applied recursively.
+ argv = new String[] { "-setrep", newRepFactor.toString(), testdir1 };
+ assertThat(shell.run(argv), is(SUCCESS));
+ assertThat(fs.getFileStatus(hdfsFile1).getReplication(), is(newRepFactor));
+ assertThat(fs.getFileStatus(hdfsFile2).getReplication(), is(newRepFactor));
+
+ } finally {
+ if (shell != null) {
+ shell.close();
+ }
+ cluster.shutdown();
+ }
}
/**
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java?rev=1525759&r1=1525758&r2=1525759&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java Tue Sep 24 00:57:43 2013
@@ -20,8 +20,7 @@ package org.apache.hadoop.hdfs.server.na
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
@@ -142,4 +141,21 @@ public class TestFSNamesystem {
assertTrue("Replication queues weren't being populated after entering "
+ "safemode 2nd time", fsn.isPopulatingReplQueues());
}
+
+ @Test
+ public void testFsLockFairness() throws IOException, InterruptedException{
+ Configuration conf = new Configuration();
+
+ FSEditLog fsEditLog = Mockito.mock(FSEditLog.class);
+ FSImage fsImage = Mockito.mock(FSImage.class);
+ Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog);
+
+ conf.setBoolean("dfs.namenode.fslock.fair", true);
+ FSNamesystem fsNamesystem = new FSNamesystem(conf, fsImage);
+ assertTrue(fsNamesystem.getFsLockForTests().isFair());
+
+ conf.setBoolean("dfs.namenode.fslock.fair", false);
+ fsNamesystem = new FSNamesystem(conf, fsImage);
+ assertFalse(fsNamesystem.getFsLockForTests().isFair());
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml?rev=1525759&r1=1525758&r2=1525759&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml Tue Sep 24 00:57:43 2013
@@ -6049,7 +6049,7 @@
<command>-fs NAMENODE -mkdir /dir0</command>
<command>-fs NAMENODE -touchz /dir0/file0</command>
<command>-fs NAMENODE -touchz /dir0/file1</command>
- <command>-fs NAMENODE -setrep -R 2 /dir0</command>
+ <command>-fs NAMENODE -setrep 2 /dir0</command>
</test-commands>
<cleanup-commands>
<command>-fs NAMENODE -rm -r /user</command>
@@ -6072,7 +6072,7 @@
<command>-fs NAMENODE -mkdir -p dir0</command>
<command>-fs NAMENODE -touchz dir0/file0</command>
<command>-fs NAMENODE -touchz dir0/file1</command>
- <command>-fs NAMENODE -setrep -R 2 dir0</command>
+ <command>-fs NAMENODE -setrep 2 dir0</command>
</test-commands>
<cleanup-commands>
<command>-fs NAMENODE -rm -r /user</command>
@@ -6090,6 +6090,24 @@
</test>
<test> <!-- TESTED -->
+ <description>setrep: -R ignored for existing file</description>
+ <test-commands>
+ <command>-fs NAMENODE -mkdir -p dir0</command>
+ <command>-fs NAMENODE -touchz dir0/file0</command>
+ <command>-fs NAMENODE -setrep -R 2 dir0/file0</command>
+ </test-commands>
+ <cleanup-commands>
+ <command>-fs NAMENODE -rm -r /user</command>
+ </cleanup-commands>
+ <comparators>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^Replication 2 set: dir0/file0</expected-output>
+ </comparator>
+ </comparators>
+ </test>
+
+ <test> <!-- TESTED -->
<description>setrep: non existent file (absolute path)</description>
<test-commands>
<command>-fs NAMENODE -setrep 2 /dir0/file</command>
@@ -6145,7 +6163,7 @@
<command>-fs NAMENODE -mkdir hdfs:///dir0/</command>
<command>-fs NAMENODE -touchz hdfs:///dir0/file0</command>
<command>-fs NAMENODE -touchz hdfs:///dir0/file1</command>
- <command>-fs NAMENODE -setrep -R 2 hdfs:///dir0</command>
+ <command>-fs NAMENODE -setrep 2 hdfs:///dir0</command>
</test-commands>
<cleanup-commands>
<command>-fs NAMENODE -rm -r hdfs:///*</command>
@@ -6203,7 +6221,7 @@
<command>-fs NAMENODE -mkdir -p NAMENODE/dir0</command>
<command>-fs NAMENODE -touchz NAMENODE/dir0/file0</command>
<command>-fs NAMENODE -touchz NAMENODE/dir0/file1</command>
- <command>-fs NAMENODE -setrep -R 2 NAMENODE/dir0</command>
+ <command>-fs NAMENODE -setrep 2 NAMENODE/dir0</command>
</test-commands>
<cleanup-commands>
<command>-fs NAMENODE -rm -r NAMENODE/*</command>