You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yj...@apache.org on 2015/09/29 03:47:15 UTC
hadoop git commit: HDFS-9092. Nfs silently drops overlapping write
requests and causes data copying to fail. Contributed by Yongjun Zhang.
Repository: hadoop
Updated Branches:
refs/heads/trunk 5c3b663bf -> 151fca503
HDFS-9092. Nfs silently drops overlapping write requests and causes data copying to fail. Contributed by Yongjun Zhang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/151fca50
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/151fca50
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/151fca50
Branch: refs/heads/trunk
Commit: 151fca5032719e561226ef278e002739073c23ec
Parents: 5c3b663
Author: Yongjun Zhang <yz...@cloudera.com>
Authored: Mon Sep 28 18:45:00 2015 -0700
Committer: Yongjun Zhang <yz...@cloudera.com>
Committed: Mon Sep 28 18:45:00 2015 -0700
----------------------------------------------------------------------
.../hadoop/hdfs/nfs/nfs3/OffsetRange.java | 4 +
.../hadoop/hdfs/nfs/nfs3/OpenFileCtx.java | 141 +++++++++++--------
.../apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java | 82 ++++++++++-
.../apache/hadoop/hdfs/nfs/nfs3/TestWrites.java | 92 +++++++++++-
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
5 files changed, 260 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/151fca50/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java
index f02dcc0..764524a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java
@@ -70,4 +70,8 @@ public class OffsetRange {
}
return false;
}
+
+ public String toString() {
+ return "[" + getMin() + ", " + getMax() + ")";
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/151fca50/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
index 9610f48..9371a72 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
@@ -490,11 +490,11 @@ class OpenFileCtx {
int count = request.getCount();
long smallerCount = offset + count - cachedOffset;
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Got overwrite with appended data (%d-%d),"
- + " current offset %d," + " drop the overlapped section (%d-%d)"
- + " and append new data (%d-%d).", offset, (offset + count - 1),
- cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
- + count - 1)));
+ LOG.debug(String.format("Got overwrite with appended data [%d-%d),"
+ + " current offset %d," + " drop the overlapped section [%d-%d)"
+ + " and append new data [%d-%d).", offset, (offset + count),
+ cachedOffset, offset, cachedOffset, cachedOffset, (offset
+ + count)));
}
ByteBuffer data = request.getData();
@@ -508,6 +508,22 @@ class OpenFileCtx {
request.setCount((int) smallerCount);
}
+ @VisibleForTesting
+ private static void trimWriteRequest(WriteCtx writeCtx,
+ long currentOffset) {
+ long offset = writeCtx.getOffset();
+ if (LOG.isDebugEnabled()) {
+ int count = writeCtx.getCount();
+ LOG.debug(String.format("Trim request [%d-%d),"
+ + " current offset %d," + " drop the overlapped section [%d-%d)"
+ + " and write new data [%d-%d)",
+ offset, (offset + count),
+ currentOffset, offset, (currentOffset),
+ currentOffset, (offset + count)));
+ }
+ writeCtx.trimWrite((int)(currentOffset - offset));
+ }
+
/**
* Creates and adds a WriteCtx into the pendingWrites map. This is a
* synchronized method to handle concurrent writes.
@@ -527,23 +543,27 @@ class OpenFileCtx {
+ cachedOffset);
}
- // Handle a special case first
+ // Ignore write request with range below the current offset
+ if (offset + count <= cachedOffset) {
+ LOG.warn(String.format("Got overwrite [%d-%d) smaller than"
+ + " current offset %d," + " drop the request.",
+ offset, (offset + count), cachedOffset));
+ return null;
+ }
+
+ // Handle a special case: trim request whose offset is smaller than
+ // the current offset
if ((offset < cachedOffset) && (offset + count > cachedOffset)) {
// One Linux client behavior: after a file is closed and reopened to
// write, the client sometimes combines previous written data(could still
// be in kernel buffer) with newly appended data in one write. This is
// usually the first write after file reopened. In this
// case, we log the event and drop the overlapped section.
- LOG.warn(String.format("Got overwrite with appended data (%d-%d),"
- + " current offset %d," + " drop the overlapped section (%d-%d)"
- + " and append new data (%d-%d).", offset, (offset + count - 1),
- cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
- + count - 1)));
-
- if (!pendingWrites.isEmpty()) {
- LOG.warn("There are other pending writes, fail this jumbo write");
- return null;
- }
+ LOG.warn(String.format("Got overwrite with appended data [%d-%d),"
+ + " current offset %d," + " drop the overlapped section [%d-%d)"
+ + " and append new data [%d-%d).", offset, (offset + count),
+ cachedOffset, offset, cachedOffset, cachedOffset, (offset
+ + count)));
LOG.warn("Modify this write to write only the appended data");
alterWriteRequest(request, cachedOffset);
@@ -1002,45 +1022,56 @@ class OpenFileCtx {
this.asyncStatus = false;
return null;
}
-
- Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry();
- OffsetRange range = lastEntry.getKey();
- WriteCtx toWrite = lastEntry.getValue();
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("range.getMin()=" + range.getMin() + " nextOffset="
- + nextOffset);
+
+ Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry();
+ OffsetRange range = lastEntry.getKey();
+ WriteCtx toWrite = lastEntry.getValue();
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("range.getMin()=" + range.getMin() + " nextOffset="
+ + nextOffset);
+ }
+
+ long offset = nextOffset.get();
+ if (range.getMin() > offset) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The next sequential write has not arrived yet");
}
-
- long offset = nextOffset.get();
- if (range.getMin() > offset) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("The next sequential write has not arrived yet");
- }
- processCommits(nextOffset.get()); // handle race
- this.asyncStatus = false;
- } else if (range.getMin() < offset && range.getMax() > offset) {
- // shouldn't happen since we do sync for overlapped concurrent writers
- LOG.warn("Got an overlapping write (" + range.getMin() + ", "
- + range.getMax() + "), nextOffset=" + offset
- + ". Silently drop it now");
- pendingWrites.remove(range);
- processCommits(nextOffset.get()); // handle race
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax()
- + ") from the list");
- }
- // after writing, remove the WriteCtx from cache
- pendingWrites.remove(range);
- // update nextOffset
- nextOffset.addAndGet(toWrite.getCount());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Change nextOffset to " + nextOffset.get());
- }
- return toWrite;
+ processCommits(nextOffset.get()); // handle race
+ this.asyncStatus = false;
+ } else if (range.getMax() <= offset) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Remove write " + range.toString()
+ + " which is already written from the list");
}
-
+ // remove the WriteCtx from cache
+ pendingWrites.remove(range);
+ } else if (range.getMin() < offset && range.getMax() > offset) {
+ LOG.warn("Got an overlapping write " + range.toString()
+ + ", nextOffset=" + offset
+ + ". Remove and trim it");
+ pendingWrites.remove(range);
+ trimWriteRequest(toWrite, offset);
+ // update nextOffset
+ nextOffset.addAndGet(toWrite.getCount());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Change nextOffset (after trim) to " + nextOffset.get());
+ }
+ return toWrite;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Remove write " + range.toString()
+ + " from the list");
+ }
+ // after writing, remove the WriteCtx from cache
+ pendingWrites.remove(range);
+ // update nextOffset
+ nextOffset.addAndGet(toWrite.getCount());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Change nextOffset to " + nextOffset.get());
+ }
+ return toWrite;
+ }
return null;
}
@@ -1272,8 +1303,8 @@ class OpenFileCtx {
WccAttr preOpAttr = latestAttr.getWccAttr();
while (!pendingWrites.isEmpty()) {
OffsetRange key = pendingWrites.firstKey();
- LOG.info("Fail pending write: (" + key.getMin() + ", " + key.getMax()
- + "), nextOffset=" + nextOffset.get());
+ LOG.info("Fail pending write: " + key.toString()
+ + ", nextOffset=" + nextOffset.get());
WriteCtx writeCtx = pendingWrites.remove(key);
if (!writeCtx.getReplied()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/151fca50/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
index 82c826f..8c2c7ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java
@@ -51,8 +51,8 @@ class WriteCtx {
}
private final FileHandle handle;
- private final long offset;
- private final int count;
+ private long offset;
+ private int count;
/**
* Some clients can send a write that includes previously written data along
@@ -61,13 +61,61 @@ class WriteCtx {
* request before it was modified to write only the new data.
* @see OpenFileCtx#addWritesToCache for more details
*/
- private final int originalCount;
+ private int originalCount;
public static final int INVALID_ORIGINAL_COUNT = -1;
+ /**
+ * Overlapping Write Request Handling
+ * A write request can be in three states:
+ * s0. just created, with data != null
+ * s1. dumped as length "count", and data set to null
+ * s2. read back from dumped area as length "count"
+ *
+ * Write requests may have overlapping range, we detect this by comparing
+ * the data offset range of the request against the current offset of data
+ * already written to HDFS. There are two categories:
+ *
+ * 1. If the beginning part of a new write request data is already written
+ * due to an earlier request, we alter the new request by trimming this
+ * portion before the new request enters state s0, and the originalCount is
+ * remembered.
+ *
+ * 2. If the lower end of the write request range is beyond the current
+ * offset of data already written, we put the request into cache, and detect
+ * the overlapping when taking the request out from cache.
+ *
+ * For category 2, if we find out that a write request overlap with another,
+ * this write request is already in state s0, s1, or s3. We trim the
+ * beginning part of this request, by remembering the size of this portion
+ * as trimDelta. So the resulted offset of the write request is
+ * "offset + trimDelta" and the resulted size of the write request is
+ * "count - trimDelta".
+ *
+ * What important to notice is, if the request is in s1 when we do the
+ * trimming, the data dumped is of size "count", so when we load
+ * the data back from dumped area, we should set the position of the data
+ * buffer to trimDelta.
+ */
+ private int trimDelta;
+
public int getOriginalCount() {
return originalCount;
}
+ public void trimWrite(int delta) {
+ Preconditions.checkState(delta < count);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trim write request by delta:" + delta + " " + toString());
+ }
+ synchronized(this) {
+ trimDelta = delta;
+ if (originalCount == INVALID_ORIGINAL_COUNT) {
+ originalCount = count;
+ }
+ trimData();
+ }
+ }
+
private final WriteStableHow stableHow;
private volatile ByteBuffer data;
@@ -139,11 +187,17 @@ class WriteCtx {
}
long getOffset() {
- return offset;
+ synchronized(this) {
+ // See comment "Overlapping Write Request Handling" above
+ return offset + trimDelta;
+ }
}
int getCount() {
- return count;
+ synchronized(this) {
+ // See comment "Overlapping Write Request Handling" above
+ return count - trimDelta;
+ }
}
WriteStableHow getStableHow() {
@@ -174,7 +228,22 @@ class WriteCtx {
throw new IOException("Data count is " + count + ", but read back "
+ size + "bytes");
}
- data = ByteBuffer.wrap(rawData);
+ synchronized(this) {
+ data = ByteBuffer.wrap(rawData);
+ trimData();
+ }
+ }
+
+ private void trimData() {
+ if (data != null && trimDelta > 0) {
+ // make it not dump-able since the data will be used
+ // shortly
+ dataState = DataState.NO_DUMP;
+ data.position(data.position() + trimDelta);
+ offset += trimDelta;
+ count -= trimDelta;
+ trimDelta = 0;
+ }
}
public void writeData(HdfsDataOutputStream fos) throws IOException {
@@ -229,6 +298,7 @@ class WriteCtx {
this.offset = offset;
this.count = count;
this.originalCount = originalCount;
+ this.trimDelta = 0;
this.stableHow = stableHow;
this.data = data;
this.channel = channel;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/151fca50/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
index 3c193ae..9c327c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
@@ -640,7 +640,97 @@ public class TestWrites {
}
}
}
-
+
+ @Test
+ public void testOverlappingWrites() throws IOException, InterruptedException {
+ NfsConfiguration config = new NfsConfiguration();
+ MiniDFSCluster cluster = null;
+ RpcProgramNfs3 nfsd;
+ final int bufSize = 32;
+ SecurityHandler securityHandler = Mockito.mock(SecurityHandler.class);
+ Mockito.when(securityHandler.getUser()).thenReturn(
+ System.getProperty("user.name"));
+ String currentUser = System.getProperty("user.name");
+ config.set(
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserGroupConfKey(currentUser),
+ "*");
+ config.set(
+ DefaultImpersonationProvider.getTestProvider().
+ getProxySuperuserIpConfKey(currentUser),
+ "*");
+ ProxyUsers.refreshSuperUserGroupsConfiguration(config);
+ // Use emphral port in case tests are running in parallel
+ config.setInt("nfs3.mountd.port", 0);
+ config.setInt("nfs3.server.port", 0);
+
+ try {
+ cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
+ cluster.waitActive();
+
+ Nfs3 nfs3 = new Nfs3(config);
+ nfs3.startServiceInternal(false);
+ nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
+
+ DFSClient dfsClient = new DFSClient(DFSUtilClient.getNNAddress(config),
+ config);
+ HdfsFileStatus status = dfsClient.getFileInfo("/");
+ FileHandle rootHandle = new FileHandle(status.getFileId());
+
+ CREATE3Request createReq = new CREATE3Request(rootHandle,
+ "overlapping-writes" + System.currentTimeMillis(),
+ Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0);
+ XDR createXdr = new XDR();
+ createReq.serialize(createXdr);
+ CREATE3Response createRsp = nfsd.create(createXdr.asReadOnlyWrap(),
+ securityHandler, new InetSocketAddress("localhost", 1234));
+ FileHandle handle = createRsp.getObjHandle();
+ byte[] buffer = new byte[bufSize];
+ for (int i = 0; i < bufSize; i++) {
+ buffer[i] = (byte) i;
+ }
+ int[][] ranges = new int[][] {
+ {0, 10},
+ {5, 7},
+ {5, 5},
+ {10, 6},
+ {18, 6},
+ {20, 6},
+ {28, 4},
+ {16, 2},
+ {25, 4}
+ };
+ for (int i = 0; i < ranges.length; i++) {
+ int x[] = ranges[i];
+ byte[] tbuffer = new byte[x[1]];
+ for (int j = 0; j < x[1]; j++) {
+ tbuffer[j] = buffer[x[0] + j];
+ }
+ WRITE3Request writeReq = new WRITE3Request(handle, (long)x[0], x[1],
+ WriteStableHow.UNSTABLE, ByteBuffer.wrap(tbuffer));
+ XDR writeXdr = new XDR();
+ writeReq.serialize(writeXdr);
+ nfsd.write(writeXdr.asReadOnlyWrap(), null, 1, securityHandler,
+ new InetSocketAddress("localhost", 1234));
+ }
+
+ waitWrite(nfsd, handle, 60000);
+ READ3Request readReq = new READ3Request(handle, 0, bufSize);
+ XDR readXdr = new XDR();
+ readReq.serialize(readXdr);
+ READ3Response readRsp = nfsd.read(readXdr.asReadOnlyWrap(),
+ securityHandler, new InetSocketAddress("localhost", config.getInt(
+ NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY,
+ NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT)));
+
+ assertTrue(Arrays.equals(buffer, readRsp.getData().array()));
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
@Test
public void testCheckSequential() throws IOException {
DFSClient dfsClient = Mockito.mock(DFSClient.class);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/151fca50/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 3daf8d4..d55beae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1447,6 +1447,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9147. Fix the setting of visibleLength in ExternalBlockReader. (Colin
P. McCabe via Lei (Eddy) Xu)
+ HDFS-9092. Nfs silently drops overlapping write requests and causes data
+ copying to fail. (Yongjun Zhang)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES