You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/05 09:42:12 UTC

kafka git commit: MINOR: Fix flaky StateDirectoryTest

Repository: kafka
Updated Branches:
  refs/heads/trunk aea146511 -> 0df910c03


MINOR: Fix flaky StateDirectoryTest

This fixes:
```
java.lang.AssertionError: expected:<2> but was:<3>
	at org.junit.Assert.fail(Assert.java:88)
	at org.junit.Assert.failNotEquals(Assert.java:834)
	at org.junit.Assert.assertEquals(Assert.java:645)
	at org.junit.Assert.assertEquals(Assert.java:631)
	at org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked(StateDirectoryTest.java:145)
```

While running test in infinite loop, hit other problems:
 - fixed file management (release all locks and close everything)
 - increased sleep time for `shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusCleanupDelay` too (was flaky as well)

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Eno Thereska <en...@confluent.io>, Damian Guy <da...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #2781 from mjsax/minor-fix-stateDirectoryTest


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0df910c0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0df910c0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0df910c0

Branch: refs/heads/trunk
Commit: 0df910c0345ce7a667aa3276a9fea15851df50f1
Parents: aea1465
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Apr 5 10:42:08 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Apr 5 10:42:08 2017 +0100

----------------------------------------------------------------------
 .../processor/internals/StateDirectoryTest.java | 106 +++++++++++++------
 1 file changed, 73 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0df910c0/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index e8d2763..770ff25 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -80,14 +80,18 @@ public class StateDirectoryTest {
 
         directory.lock(taskId, 0);
 
-        final FileChannel channel = FileChannel.open(new File(taskDirectory, StateDirectory.LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
-        try {
+        try (
+            final FileChannel channel = FileChannel.open(
+                new File(taskDirectory, StateDirectory.LOCK_FILE_NAME).toPath(),
+                StandardOpenOption.CREATE, StandardOpenOption.WRITE)
+        ) {
             channel.tryLock();
             fail("shouldn't be able to lock already locked directory");
-        } catch (OverlappingFileLockException e) {
+        } catch (final OverlappingFileLockException e) {
             // pass
+        } finally {
+            directory.unlock(taskId);
         }
-
     }
 
     @Test
@@ -95,7 +99,11 @@ public class StateDirectoryTest {
         final TaskId taskId = new TaskId(0, 0);
         directory.directoryForTask(taskId);
         directory.lock(taskId, 0);
-        assertTrue(directory.lock(taskId, 0));
+        try {
+            assertTrue(directory.lock(taskId, 0));
+        } finally {
+            directory.unlock(taskId);
+        }
     }
 
 
@@ -106,17 +114,27 @@ public class StateDirectoryTest {
         final TaskId taskId2 = new TaskId(1, 0);
         final File task2Dir = directory.directoryForTask(taskId2);
 
-        directory.lock(taskId, 0);
-        directory.lock(taskId2, 0);
 
-        final FileChannel channel1 = FileChannel.open(new File(task1Dir, StateDirectory.LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
-        final FileChannel channel2 = FileChannel.open(new File(task2Dir, StateDirectory.LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
-        try {
+        try (
+            final FileChannel channel1 = FileChannel.open(
+                new File(task1Dir, StateDirectory.LOCK_FILE_NAME).toPath(),
+                StandardOpenOption.CREATE,
+                StandardOpenOption.WRITE);
+            final FileChannel channel2 = FileChannel.open(new File(task2Dir, StateDirectory.LOCK_FILE_NAME).toPath(),
+                StandardOpenOption.CREATE,
+                StandardOpenOption.WRITE)
+        ) {
+            directory.lock(taskId, 0);
+            directory.lock(taskId2, 0);
+
             channel1.tryLock();
             channel2.tryLock();
             fail("shouldn't be able to lock already locked directory");
-        } catch (OverlappingFileLockException e) {
+        } catch (final OverlappingFileLockException e) {
             // pass
+        } finally {
+            directory.unlock(taskId);
+            directory.unlock(taskId2);
         }
     }
 
@@ -128,23 +146,35 @@ public class StateDirectoryTest {
         directory.lock(taskId, 1);
         directory.unlock(taskId);
 
-        final FileChannel channel = FileChannel.open(new File(taskDirectory, StateDirectory.LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
-        channel.tryLock();
+        try (
+            final FileChannel channel = FileChannel.open(
+                new File(taskDirectory, StateDirectory.LOCK_FILE_NAME).toPath(),
+                StandardOpenOption.CREATE,
+                StandardOpenOption.WRITE)
+        ) {
+            channel.tryLock();
+        }
     }
 
     @Test
     public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws Exception {
         final TaskId task0 = new TaskId(0, 0);
         final TaskId task1 = new TaskId(1, 0);
-        directory.lock(task0, 0);
-        directory.lock(task1, 0);
-        directory.directoryForTask(new TaskId(2, 0));
-
-        directory.cleanRemovedTasks(0);
-        final List<File> files = Arrays.asList(appDir.listFiles());
-        assertEquals(2, files.size());
-        assertTrue(files.contains(new File(appDir, task0.toString())));
-        assertTrue(files.contains(new File(appDir, task1.toString())));
+        try {
+            directory.lock(task0, 0);
+            directory.lock(task1, 0);
+            directory.directoryForTask(new TaskId(2, 0));
+
+            time.sleep(1000);
+            directory.cleanRemovedTasks(0);
+            final List<File> files = Arrays.asList(appDir.listFiles());
+            assertEquals(2, files.size());
+            assertTrue(files.contains(new File(appDir, task0.toString())));
+            assertTrue(files.contains(new File(appDir, task1.toString())));
+        } finally {
+            directory.unlock(task0);
+            directory.unlock(task1);
+        }
     }
 
     @Test
@@ -154,7 +184,7 @@ public class StateDirectoryTest {
         directory.cleanRemovedTasks(cleanupDelayMs);
         assertTrue(dir.exists());
 
-        time.sleep(cleanupDelayMs + 1);
+        time.sleep(cleanupDelayMs + 1000);
         directory.cleanRemovedTasks(cleanupDelayMs);
         assertFalse(dir.exists());
     }
@@ -190,24 +220,34 @@ public class StateDirectoryTest {
 
     @Test(expected = OverlappingFileLockException.class)
     public void shouldLockGlobalStateDirectory() throws Exception {
-        final FileChannel channel = FileChannel.open(new File(directory.globalStateDir(),
-                                                              StateDirectory.LOCK_FILE_NAME).toPath(),
-                                                     StandardOpenOption.CREATE, StandardOpenOption.WRITE);
         directory.lockGlobalState(1);
-        channel.lock();
+
+        try (
+            final FileChannel channel = FileChannel.open(
+                new File(directory.globalStateDir(), StateDirectory.LOCK_FILE_NAME).toPath(),
+                StandardOpenOption.CREATE,
+                StandardOpenOption.WRITE)
+        ) {
+            channel.lock();
+        } finally {
+            directory.unlockGlobalState();
+        }
     }
 
     @Test
     public void shouldUnlockGlobalStateDirectory() throws Exception {
-        final FileChannel channel = FileChannel.open(new File(directory.globalStateDir(),
-                                                              StateDirectory.LOCK_FILE_NAME).toPath(),
-                                                     StandardOpenOption.CREATE, StandardOpenOption.WRITE);
         directory.lockGlobalState(1);
-
         directory.unlockGlobalState();
 
-        // should lock without any exceptions
-        channel.lock();
+        try (
+            final FileChannel channel = FileChannel.open(
+                new File(directory.globalStateDir(), StateDirectory.LOCK_FILE_NAME).toPath(),
+                StandardOpenOption.CREATE,
+                StandardOpenOption.WRITE)
+        ) {
+            // should lock without any exceptions
+            channel.lock();
+        }
     }
 
 }
\ No newline at end of file