You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/11/30 18:31:22 UTC

[kafka] branch trunk updated: MINOR: improve state directory test (#5961)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9b476bc  MINOR: improve state directory test (#5961)
9b476bc is described below

commit 9b476bc5f4a2fdbd62ad84e50e65331c21b321d0
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri Nov 30 10:31:07 2018 -0800

    MINOR: improve state directory test (#5961)
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Kamal Chandraprakash (@kamalcph), Guozhang Wang <gu...@confluent.io>
---
 .../processor/internals/StateDirectoryTest.java    | 71 +++++++++++++---------
 1 file changed, 42 insertions(+), 29 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index edb02c6..1f7163f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -33,6 +33,7 @@ import java.nio.channels.OverlappingFileLockException;
 import java.nio.file.StandardOpenOption;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -48,12 +49,15 @@ public class StateDirectoryTest {
 
     private final MockTime time = new MockTime();
     private File stateDir;
-    private String applicationId = "applicationId";
+    private final String applicationId = "applicationId";
     private StateDirectory directory;
     private File appDir;
 
-    private void initializeStateDirectory(final boolean createStateDirectory) {
+    private void initializeStateDirectory(final boolean createStateDirectory) throws Exception {
         stateDir = new File(TestUtils.IO_TMP_DIR, "kafka-" + TestUtils.randomString(5));
+        if (!createStateDirectory) {
+            cleanup();
+        }
         directory = new StateDirectory(
             new StreamsConfig(new Properties() {
                 {
@@ -67,12 +71,12 @@ public class StateDirectoryTest {
     }
 
     @Before
-    public void before() {
+    public void before() throws Exception {
         initializeStateDirectory(true);
     }
 
     @After
-    public void cleanup() throws IOException {
+    public void cleanup() throws Exception {
         Utils.delete(stateDir);
     }
 
@@ -93,7 +97,7 @@ public class StateDirectoryTest {
     }
 
     @Test
-    public void shouldLockTaskStateDirectory() throws IOException {
+    public void shouldLockTaskStateDirectory() throws Exception {
         final TaskId taskId = new TaskId(0, 0);
         final File taskDirectory = directory.directoryForTask(taskId);
 
@@ -107,14 +111,14 @@ public class StateDirectoryTest {
             channel.tryLock();
             fail("shouldn't be able to lock already locked directory");
         } catch (final OverlappingFileLockException e) {
-            // pass
+            // swallow
         } finally {
             directory.unlock(taskId);
         }
     }
 
     @Test
-    public void shouldBeTrueIfAlreadyHoldsLock() throws IOException {
+    public void shouldBeTrueIfAlreadyHoldsLock() throws Exception {
         final TaskId taskId = new TaskId(0, 0);
         directory.directoryForTask(taskId);
         directory.lock(taskId);
@@ -125,16 +129,22 @@ public class StateDirectoryTest {
         }
     }
 
-    @Test(expected = ProcessorStateException.class)
-    public void shouldThrowProcessorStateException() throws IOException {
+    @Test
+    public void shouldThrowProcessorStateException() throws Exception {
         final TaskId taskId = new TaskId(0, 0);
 
         Utils.delete(stateDir);
-        directory.directoryForTask(taskId);
+
+        try {
+            directory.directoryForTask(taskId);
+            fail("Should have thrown ProcessorStateException");
+        } catch (final ProcessorStateException expected) {
+            // swallow
+        }
     }
 
     @Test
-    public void shouldNotLockDeletedDirectory() throws IOException {
+    public void shouldNotLockDeletedDirectory() throws Exception {
         final TaskId taskId = new TaskId(0, 0);
 
         Utils.delete(stateDir);
@@ -142,7 +152,7 @@ public class StateDirectoryTest {
     }
     
     @Test
-    public void shouldLockMultipleTaskDirectories() throws IOException {
+    public void shouldLockMultipleTaskDirectories() throws Exception {
         final TaskId taskId = new TaskId(0, 0);
         final File task1Dir = directory.directoryForTask(taskId);
         final TaskId taskId2 = new TaskId(1, 0);
@@ -165,7 +175,7 @@ public class StateDirectoryTest {
             channel2.tryLock();
             fail("shouldn't be able to lock already locked directory");
         } catch (final OverlappingFileLockException e) {
-            // pass
+            // swallow
         } finally {
             directory.unlock(taskId);
             directory.unlock(taskId2);
@@ -173,7 +183,7 @@ public class StateDirectoryTest {
     }
 
     @Test
-    public void shouldReleaseTaskStateDirectoryLock() throws IOException {
+    public void shouldReleaseTaskStateDirectoryLock() throws Exception {
         final TaskId taskId = new TaskId(0, 0);
         final File taskDirectory = directory.directoryForTask(taskId);
 
@@ -191,7 +201,7 @@ public class StateDirectoryTest {
     }
 
     @Test
-    public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws IOException {
+    public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws Exception {
         final TaskId task0 = new TaskId(0, 0);
         final TaskId task1 = new TaskId(1, 0);
         try {
@@ -199,13 +209,13 @@ public class StateDirectoryTest {
             directory.lock(task1);
             directory.directoryForTask(new TaskId(2, 0));
 
-            List<File> files = Arrays.asList(appDir.listFiles());
+            List<File> files = Arrays.asList(Objects.requireNonNull(appDir.listFiles()));
             assertEquals(3, files.size());
 
             time.sleep(1000);
             directory.cleanRemovedTasks(0);
 
-            files = Arrays.asList(appDir.listFiles());
+            files = Arrays.asList(Objects.requireNonNull(appDir.listFiles()));
             assertEquals(2, files.size());
             assertTrue(files.contains(new File(appDir, task0.toString())));
             assertTrue(files.contains(new File(appDir, task1.toString())));
@@ -264,8 +274,8 @@ public class StateDirectoryTest {
         assertTrue(taskDir.exists());
     }
 
-    @Test(expected = OverlappingFileLockException.class)
-    public void shouldLockGlobalStateDirectory() throws IOException {
+    @Test
+    public void shouldLockGlobalStateDirectory() throws Exception {
         directory.lockGlobalState();
 
         try (
@@ -275,13 +285,16 @@ public class StateDirectoryTest {
                 StandardOpenOption.WRITE)
         ) {
             channel.lock();
+            fail("Should have thrown OverlappingFileLockException");
+        } catch (final OverlappingFileLockException expcted) {
+            // swallow
         } finally {
             directory.unlockGlobalState();
         }
     }
 
     @Test
-    public void shouldUnlockGlobalStateDirectory() throws IOException {
+    public void shouldUnlockGlobalStateDirectory() throws Exception {
         directory.lockGlobalState();
         directory.unlockGlobalState();
 
@@ -297,7 +310,7 @@ public class StateDirectoryTest {
     }
 
     @Test
-    public void shouldNotLockStateDirLockedByAnotherThread() throws IOException, InterruptedException {
+    public void shouldNotLockStateDirLockedByAnotherThread() throws Exception {
         final TaskId taskId = new TaskId(0, 0);
         final AtomicReference<IOException> exceptionOnThread = new AtomicReference<>();
         final Thread thread = new Thread(() -> {
@@ -314,7 +327,7 @@ public class StateDirectoryTest {
     }
 
     @Test
-    public void shouldNotUnLockStateDirLockedByAnotherThread() throws IOException, InterruptedException {
+    public void shouldNotUnLockStateDirLockedByAnotherThread() throws Exception {
         final TaskId taskId = new TaskId(0, 0);
         final CountDownLatch lockLatch = new CountDownLatch(1);
         final CountDownLatch unlockLatch = new CountDownLatch(1);
@@ -348,24 +361,24 @@ public class StateDirectoryTest {
         directory.directoryForTask(new TaskId(1, 0));
         directory.globalStateDir();
 
-        List<File> files = Arrays.asList(appDir.listFiles());
+        List<File> files = Arrays.asList(Objects.requireNonNull(appDir.listFiles()));
         assertEquals(2, files.size());
 
         directory.clean();
 
-        files = Arrays.asList(appDir.listFiles());
+        files = Arrays.asList(Objects.requireNonNull(appDir.listFiles()));
         assertEquals(0, files.size());
     }
 
     @Test
-    public void shouldNotCreateBaseDirectory() {
+    public void shouldNotCreateBaseDirectory() throws Exception {
         initializeStateDirectory(false);
         assertFalse(stateDir.exists());
         assertFalse(appDir.exists());
     }
 
     @Test
-    public void shouldNotCreateTaskStateDirectory() {
+    public void shouldNotCreateTaskStateDirectory() throws Exception {
         initializeStateDirectory(false);
         final TaskId taskId = new TaskId(0, 0);
         final File taskDirectory = directory.directoryForTask(taskId);
@@ -373,21 +386,21 @@ public class StateDirectoryTest {
     }
 
     @Test
-    public void shouldNotCreateGlobalStateDirectory() {
+    public void shouldNotCreateGlobalStateDirectory() throws Exception {
         initializeStateDirectory(false);
         final File globalStateDir = directory.globalStateDir();
         assertFalse(globalStateDir.exists());
     }
 
     @Test
-    public void shouldLockTaskStateDirectoryWhenDirectoryCreationDisabled() throws IOException {
+    public void shouldLockTaskStateDirectoryWhenDirectoryCreationDisabled() throws Exception {
         initializeStateDirectory(false);
         final TaskId taskId = new TaskId(0, 0);
         assertTrue(directory.lock(taskId));
     }
 
     @Test
-    public void shouldLockGlobalStateDirectoryWhenDirectoryCreationDisabled() throws IOException {
+    public void shouldLockGlobalStateDirectoryWhenDirectoryCreationDisabled() throws Exception {
         initializeStateDirectory(false);
         assertTrue(directory.lockGlobalState());
     }