You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/05 09:42:12 UTC
kafka git commit: MINOR: Fix flaky StateDirectoryTest
Repository: kafka
Updated Branches:
refs/heads/trunk aea146511 -> 0df910c03
MINOR: Fix flaky StateDirectoryTest
This fixes:
```
java.lang.AssertionError: expected:<2> but was:<3>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked(StateDirectoryTest.java:145)
```
While running test in infinite loop, hit other problems:
- fixed file management (release all locks and close everything)
- increased sleep time for `shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusCleanupDelay` too (was flaky as well)
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Eno Thereska <en...@confluent.io>, Damian Guy <da...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #2781 from mjsax/minor-fix-stateDirectoryTest
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0df910c0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0df910c0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0df910c0
Branch: refs/heads/trunk
Commit: 0df910c0345ce7a667aa3276a9fea15851df50f1
Parents: aea1465
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Apr 5 10:42:08 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Apr 5 10:42:08 2017 +0100
----------------------------------------------------------------------
.../processor/internals/StateDirectoryTest.java | 106 +++++++++++++------
1 file changed, 73 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0df910c0/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index e8d2763..770ff25 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -80,14 +80,18 @@ public class StateDirectoryTest {
directory.lock(taskId, 0);
- final FileChannel channel = FileChannel.open(new File(taskDirectory, StateDirectory.LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
- try {
+ try (
+ final FileChannel channel = FileChannel.open(
+ new File(taskDirectory, StateDirectory.LOCK_FILE_NAME).toPath(),
+ StandardOpenOption.CREATE, StandardOpenOption.WRITE)
+ ) {
channel.tryLock();
fail("shouldn't be able to lock already locked directory");
- } catch (OverlappingFileLockException e) {
+ } catch (final OverlappingFileLockException e) {
// pass
+ } finally {
+ directory.unlock(taskId);
}
-
}
@Test
@@ -95,7 +99,11 @@ public class StateDirectoryTest {
final TaskId taskId = new TaskId(0, 0);
directory.directoryForTask(taskId);
directory.lock(taskId, 0);
- assertTrue(directory.lock(taskId, 0));
+ try {
+ assertTrue(directory.lock(taskId, 0));
+ } finally {
+ directory.unlock(taskId);
+ }
}
@@ -106,17 +114,27 @@ public class StateDirectoryTest {
final TaskId taskId2 = new TaskId(1, 0);
final File task2Dir = directory.directoryForTask(taskId2);
- directory.lock(taskId, 0);
- directory.lock(taskId2, 0);
- final FileChannel channel1 = FileChannel.open(new File(task1Dir, StateDirectory.LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
- final FileChannel channel2 = FileChannel.open(new File(task2Dir, StateDirectory.LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
- try {
+ try (
+ final FileChannel channel1 = FileChannel.open(
+ new File(task1Dir, StateDirectory.LOCK_FILE_NAME).toPath(),
+ StandardOpenOption.CREATE,
+ StandardOpenOption.WRITE);
+ final FileChannel channel2 = FileChannel.open(new File(task2Dir, StateDirectory.LOCK_FILE_NAME).toPath(),
+ StandardOpenOption.CREATE,
+ StandardOpenOption.WRITE)
+ ) {
+ directory.lock(taskId, 0);
+ directory.lock(taskId2, 0);
+
channel1.tryLock();
channel2.tryLock();
fail("shouldn't be able to lock already locked directory");
- } catch (OverlappingFileLockException e) {
+ } catch (final OverlappingFileLockException e) {
// pass
+ } finally {
+ directory.unlock(taskId);
+ directory.unlock(taskId2);
}
}
@@ -128,23 +146,35 @@ public class StateDirectoryTest {
directory.lock(taskId, 1);
directory.unlock(taskId);
- final FileChannel channel = FileChannel.open(new File(taskDirectory, StateDirectory.LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
- channel.tryLock();
+ try (
+ final FileChannel channel = FileChannel.open(
+ new File(taskDirectory, StateDirectory.LOCK_FILE_NAME).toPath(),
+ StandardOpenOption.CREATE,
+ StandardOpenOption.WRITE)
+ ) {
+ channel.tryLock();
+ }
}
@Test
public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws Exception {
final TaskId task0 = new TaskId(0, 0);
final TaskId task1 = new TaskId(1, 0);
- directory.lock(task0, 0);
- directory.lock(task1, 0);
- directory.directoryForTask(new TaskId(2, 0));
-
- directory.cleanRemovedTasks(0);
- final List<File> files = Arrays.asList(appDir.listFiles());
- assertEquals(2, files.size());
- assertTrue(files.contains(new File(appDir, task0.toString())));
- assertTrue(files.contains(new File(appDir, task1.toString())));
+ try {
+ directory.lock(task0, 0);
+ directory.lock(task1, 0);
+ directory.directoryForTask(new TaskId(2, 0));
+
+ time.sleep(1000);
+ directory.cleanRemovedTasks(0);
+ final List<File> files = Arrays.asList(appDir.listFiles());
+ assertEquals(2, files.size());
+ assertTrue(files.contains(new File(appDir, task0.toString())));
+ assertTrue(files.contains(new File(appDir, task1.toString())));
+ } finally {
+ directory.unlock(task0);
+ directory.unlock(task1);
+ }
}
@Test
@@ -154,7 +184,7 @@ public class StateDirectoryTest {
directory.cleanRemovedTasks(cleanupDelayMs);
assertTrue(dir.exists());
- time.sleep(cleanupDelayMs + 1);
+ time.sleep(cleanupDelayMs + 1000);
directory.cleanRemovedTasks(cleanupDelayMs);
assertFalse(dir.exists());
}
@@ -190,24 +220,34 @@ public class StateDirectoryTest {
@Test(expected = OverlappingFileLockException.class)
public void shouldLockGlobalStateDirectory() throws Exception {
- final FileChannel channel = FileChannel.open(new File(directory.globalStateDir(),
- StateDirectory.LOCK_FILE_NAME).toPath(),
- StandardOpenOption.CREATE, StandardOpenOption.WRITE);
directory.lockGlobalState(1);
- channel.lock();
+
+ try (
+ final FileChannel channel = FileChannel.open(
+ new File(directory.globalStateDir(), StateDirectory.LOCK_FILE_NAME).toPath(),
+ StandardOpenOption.CREATE,
+ StandardOpenOption.WRITE)
+ ) {
+ channel.lock();
+ } finally {
+ directory.unlockGlobalState();
+ }
}
@Test
public void shouldUnlockGlobalStateDirectory() throws Exception {
- final FileChannel channel = FileChannel.open(new File(directory.globalStateDir(),
- StateDirectory.LOCK_FILE_NAME).toPath(),
- StandardOpenOption.CREATE, StandardOpenOption.WRITE);
directory.lockGlobalState(1);
-
directory.unlockGlobalState();
- // should lock without any exceptions
- channel.lock();
+ try (
+ final FileChannel channel = FileChannel.open(
+ new File(directory.globalStateDir(), StateDirectory.LOCK_FILE_NAME).toPath(),
+ StandardOpenOption.CREATE,
+ StandardOpenOption.WRITE)
+ ) {
+ // should lock without any exceptions
+ channel.lock();
+ }
}
}
\ No newline at end of file