You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by br...@apache.org on 2014/11/11 22:03:46 UTC
hadoop git commit: HDFS-7387. NFS may only do partial commit due to a
race between COMMIT and write. Contributed by Brandon Li
Repository: hadoop
Updated Branches:
refs/heads/trunk 571e9c623 -> 99d9d0c2d
HDFS-7387. NFS may only do partial commit due to a race between COMMIT and write. Contributed by Brandon Li
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/99d9d0c2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/99d9d0c2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/99d9d0c2
Branch: refs/heads/trunk
Commit: 99d9d0c2d19b9f161b765947f3fb64619ea58090
Parents: 571e9c6
Author: Brandon Li <br...@apache.org>
Authored: Tue Nov 11 13:03:31 2014 -0800
Committer: Brandon Li <br...@apache.org>
Committed: Tue Nov 11 13:03:31 2014 -0800
----------------------------------------------------------------------
.../hadoop/hdfs/nfs/nfs3/OpenFileCtx.java | 88 +++++++++++----
.../apache/hadoop/hdfs/nfs/nfs3/TestWrites.java | 107 +++++++++++++------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
3 files changed, 146 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99d9d0c2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
index b04c21c..b31baf5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
@@ -816,6 +816,42 @@ class OpenFileCtx {
return ret;
}
+ // Check if the to-commit range is sequential
+ @VisibleForTesting
+ synchronized boolean checkSequential(final long commitOffset,
+ final long nextOffset) {
+ Preconditions.checkState(commitOffset >= nextOffset, "commitOffset "
+ + commitOffset + " less than nextOffset " + nextOffset);
+ long offset = nextOffset;
+ Iterator<OffsetRange> it = pendingWrites.descendingKeySet().iterator();
+ while (it.hasNext()) {
+ OffsetRange range = it.next();
+ if (range.getMin() != offset) {
+ // got a hole
+ return false;
+ }
+ offset = range.getMax();
+ if (offset > commitOffset) {
+ return true;
+ }
+ }
+ // there is gap between the last pending write and commitOffset
+ return false;
+ }
+
+ private COMMIT_STATUS handleSpecialWait(boolean fromRead, long commitOffset,
+ Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
+ if (!fromRead) {
+ // let client retry the same request, add pending commit to sync later
+ CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid, preOpAttr);
+ pendingCommits.put(commitOffset, commitCtx);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("return COMMIT_SPECIAL_WAIT");
+ }
+ return COMMIT_STATUS.COMMIT_SPECIAL_WAIT;
+ }
+
@VisibleForTesting
synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead) {
@@ -827,17 +863,34 @@ class OpenFileCtx {
return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE;
}
}
- if (pendingWrites.isEmpty()) {
- // Note that, there is no guarantee data is synced. Caller should still
- // do a sync here though the output stream might be closed.
- return COMMIT_STATUS.COMMIT_FINISHED;
- }
long flushed = getFlushedOffset();
if (LOG.isDebugEnabled()) {
- LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
+ LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset
+ + "nextOffset=" + nextOffset.get());
}
-
+
+ if (pendingWrites.isEmpty()) {
+ if (aixCompatMode) {
+ // Note that, there is no guarantee data is synced. Caller should still
+ // do a sync here though the output stream might be closed.
+ return COMMIT_STATUS.COMMIT_FINISHED;
+ } else {
+ if (flushed < nextOffset.get()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("get commit while still writing to the requested offset,"
+ + " with empty queue");
+ }
+ return handleSpecialWait(fromRead, nextOffset.get(), channel, xid,
+ preOpAttr);
+ } else {
+ return COMMIT_STATUS.COMMIT_FINISHED;
+ }
+ }
+ }
+
+ Preconditions.checkState(flushed <= nextOffset.get(), "flushed " + flushed
+ + " is larger than nextOffset " + nextOffset.get());
// Handle large file upload
if (uploadLargeFile && !aixCompatMode) {
long co = (commitOffset > 0) ? commitOffset : pendingWrites.firstEntry()
@@ -846,21 +899,20 @@ class OpenFileCtx {
if (co <= flushed) {
return COMMIT_STATUS.COMMIT_DO_SYNC;
} else if (co < nextOffset.get()) {
- if (!fromRead) {
- // let client retry the same request, add pending commit to sync later
- CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid,
- preOpAttr);
- pendingCommits.put(commitOffset, commitCtx);
- }
if (LOG.isDebugEnabled()) {
- LOG.debug("return COMMIT_SPECIAL_WAIT");
+ LOG.debug("get commit while still writing to the requested offset");
}
- return COMMIT_STATUS.COMMIT_SPECIAL_WAIT;
+ return handleSpecialWait(fromRead, co, channel, xid, preOpAttr);
} else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("return COMMIT_SPECIAL_SUCCESS");
+ // co >= nextOffset
+ if (checkSequential(co, nextOffset.get())) {
+ return handleSpecialWait(fromRead, co, channel, xid, preOpAttr);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("return COMMIT_SPECIAL_SUCCESS");
+ }
+ return COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS;
}
- return COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99d9d0c2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
index 96e6393..56603b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
@@ -217,14 +217,14 @@ public class TestWrites {
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);
- ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
+ ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);
// Test request with non zero commit offset
ctx.setActiveStatusForTest(true);
- Mockito.when(fos.getPos()).thenReturn((long) 10);
+ Mockito.when(fos.getPos()).thenReturn((long) 8);
ctx.setNextOffsetForTest(10);
COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
@@ -232,35 +232,40 @@ public class TestWrites {
ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
+ // Test commit sequential writes
status = ctx.checkCommitInternal(10, ch, 1, attr, false);
- Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
+ Assert.assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
- Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
+ Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
+ // Test commit non-sequential writes
ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
.getPendingCommitsForTest();
- Assert.assertTrue(commits.size() == 0);
- ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false);
+ Assert.assertTrue(commits.size() == 1);
+ ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS);
- Assert.assertTrue(commits.size() == 0);
+ Assert.assertTrue(commits.size() == 1);
// Test request with zero commit offset
- commits.remove(new Long(11));
- // There is one pending write [5,10]
+ commits.remove(new Long(10));
+ // There is one pending write [10,15]
ret = ctx.checkCommitInternal(0, ch, 1, attr, false);
- Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_DO_SYNC);
+ Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
- Mockito.when(fos.getPos()).thenReturn((long) 6);
- ret = ctx.checkCommitInternal(8, ch, 1, attr, false);
+ ret = ctx.checkCommitInternal(9, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
- Assert.assertTrue(commits.size() == 1);
- long key = commits.firstKey();
- Assert.assertTrue(key == 8);
+ Assert.assertTrue(commits.size() == 2);
+ // Empty pending writes. nextOffset=10, flushed pos=8
+ ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
+ ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
+ Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
+
// Empty pending writes
- ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
+ ctx.setNextOffsetForTest((long) 8); // flushed pos = 8
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
+
}
@Test
@@ -286,6 +291,7 @@ public class TestWrites {
ctx.getPendingWritesForTest().put(new OffsetRange(0, 10),
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
Mockito.when(fos.getPos()).thenReturn((long) 10);
+ ctx.setNextOffsetForTest((long)10);
status = ctx.checkCommitInternal(5, null, 1, attr, false);
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
}
@@ -317,7 +323,7 @@ public class TestWrites {
assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
- ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
+ ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
@@ -326,6 +332,7 @@ public class TestWrites {
// Test request with non zero commit offset
ctx.setActiveStatusForTest(true);
Mockito.when(fos.getPos()).thenReturn((long) 10);
+ ctx.setNextOffsetForTest((long)10);
COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
// Do_SYNC state will be updated to FINISHED after data sync
@@ -355,7 +362,7 @@ public class TestWrites {
assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
// Empty pending writes
- ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
+ ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
@@ -386,7 +393,7 @@ public class TestWrites {
assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
- ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
+ ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
@@ -394,7 +401,8 @@ public class TestWrites {
// Test request with non zero commit offset
ctx.setActiveStatusForTest(true);
- Mockito.when(fos.getPos()).thenReturn((long) 10);
+ Mockito.when(fos.getPos()).thenReturn((long) 6);
+ ctx.setNextOffsetForTest((long)10);
COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
// Do_SYNC state will be updated to FINISHED after data sync
@@ -402,32 +410,34 @@ public class TestWrites {
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));
- status = ctx.checkCommitInternal(10, ch, 1, attr, true);
- assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
- ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true);
- assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
- assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10));
-
+ // Test request with sequential writes
+ status = ctx.checkCommitInternal(9, ch, 1, attr, true);
+ assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
+ ret = ctx.checkCommit(dfsClient, 9, ch, 1, attr, true);
+ assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
+ assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 9));
+
+ // Test request with non-sequential writes
ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
.getPendingCommitsForTest();
assertTrue(commits.size() == 0);
- ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true);
+ ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, true);
assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS, ret);
assertEquals(0, commits.size()); // commit triggered by read doesn't wait
- assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 11));
+ assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 16));
// Test request with zero commit offset
- // There is one pending write [5,10]
+ // There is one pending write [10,15]
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
- assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
+ assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
assertEquals(0, commits.size());
- assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
+ assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
// Empty pending writes
- ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
+ ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
- assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
- assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
+ assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
+ assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
}
private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime)
@@ -629,4 +639,33 @@ public class TestWrites {
}
}
}
+
+ @Test
+ public void testCheckSequential() throws IOException {
+ DFSClient dfsClient = Mockito.mock(DFSClient.class);
+ Nfs3FileAttributes attr = new Nfs3FileAttributes();
+ HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
+ Mockito.when(fos.getPos()).thenReturn((long) 0);
+ NfsConfiguration config = new NfsConfiguration();
+
+ config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
+ OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
+ new ShellBasedIdMapping(config), false, config);
+
+ ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
+ new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
+ ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
+ new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
+ ctx.getPendingWritesForTest().put(new OffsetRange(20, 25),
+ new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
+
+ assertTrue(!ctx.checkSequential(5, 4));
+ assertTrue(ctx.checkSequential(9, 5));
+ assertTrue(ctx.checkSequential(10, 5));
+ assertTrue(ctx.checkSequential(14, 5));
+ assertTrue(!ctx.checkSequential(15, 5));
+ assertTrue(!ctx.checkSequential(20, 5));
+ assertTrue(!ctx.checkSequential(25, 5));
+ assertTrue(!ctx.checkSequential(999, 5));
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/99d9d0c2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 8922a0f..b183731 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -410,6 +410,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7366. BlockInfo should take replication as an short in the constructor.
(Li Lu via wheat9)
+ HDFS-7387. NFS may only do partial commit due to a race between COMMIT and write
+ (brandonli)
+
Release 2.6.0 - 2014-11-15
INCOMPATIBLE CHANGES