You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/11/30 18:31:22 UTC
[kafka] branch trunk updated: MINOR: improve state directory test
(#5961)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9b476bc MINOR: improve state directory test (#5961)
9b476bc is described below
commit 9b476bc5f4a2fdbd62ad84e50e65331c21b321d0
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri Nov 30 10:31:07 2018 -0800
MINOR: improve state directory test (#5961)
Reviewers: Bill Bejeck <bi...@confluent.io>, Kamal Chandraprakash (@kamalcph), Guozhang Wang <gu...@confluent.io>
---
.../processor/internals/StateDirectoryTest.java | 71 +++++++++++++---------
1 file changed, 42 insertions(+), 29 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index edb02c6..1f7163f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -33,6 +33,7 @@ import java.nio.channels.OverlappingFileLockException;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -48,12 +49,15 @@ public class StateDirectoryTest {
private final MockTime time = new MockTime();
private File stateDir;
- private String applicationId = "applicationId";
+ private final String applicationId = "applicationId";
private StateDirectory directory;
private File appDir;
- private void initializeStateDirectory(final boolean createStateDirectory) {
+ private void initializeStateDirectory(final boolean createStateDirectory) throws Exception {
stateDir = new File(TestUtils.IO_TMP_DIR, "kafka-" + TestUtils.randomString(5));
+ if (!createStateDirectory) {
+ cleanup();
+ }
directory = new StateDirectory(
new StreamsConfig(new Properties() {
{
@@ -67,12 +71,12 @@ public class StateDirectoryTest {
}
@Before
- public void before() {
+ public void before() throws Exception {
initializeStateDirectory(true);
}
@After
- public void cleanup() throws IOException {
+ public void cleanup() throws Exception {
Utils.delete(stateDir);
}
@@ -93,7 +97,7 @@ public class StateDirectoryTest {
}
@Test
- public void shouldLockTaskStateDirectory() throws IOException {
+ public void shouldLockTaskStateDirectory() throws Exception {
final TaskId taskId = new TaskId(0, 0);
final File taskDirectory = directory.directoryForTask(taskId);
@@ -107,14 +111,14 @@ public class StateDirectoryTest {
channel.tryLock();
fail("shouldn't be able to lock already locked directory");
} catch (final OverlappingFileLockException e) {
- // pass
+ // swallow
} finally {
directory.unlock(taskId);
}
}
@Test
- public void shouldBeTrueIfAlreadyHoldsLock() throws IOException {
+ public void shouldBeTrueIfAlreadyHoldsLock() throws Exception {
final TaskId taskId = new TaskId(0, 0);
directory.directoryForTask(taskId);
directory.lock(taskId);
@@ -125,16 +129,22 @@ public class StateDirectoryTest {
}
}
- @Test(expected = ProcessorStateException.class)
- public void shouldThrowProcessorStateException() throws IOException {
+ @Test
+ public void shouldThrowProcessorStateException() throws Exception {
final TaskId taskId = new TaskId(0, 0);
Utils.delete(stateDir);
- directory.directoryForTask(taskId);
+
+ try {
+ directory.directoryForTask(taskId);
+ fail("Should have thrown ProcessorStateException");
+ } catch (final ProcessorStateException expected) {
+ // swallow
+ }
}
@Test
- public void shouldNotLockDeletedDirectory() throws IOException {
+ public void shouldNotLockDeletedDirectory() throws Exception {
final TaskId taskId = new TaskId(0, 0);
Utils.delete(stateDir);
@@ -142,7 +152,7 @@ public class StateDirectoryTest {
}
@Test
- public void shouldLockMultipleTaskDirectories() throws IOException {
+ public void shouldLockMultipleTaskDirectories() throws Exception {
final TaskId taskId = new TaskId(0, 0);
final File task1Dir = directory.directoryForTask(taskId);
final TaskId taskId2 = new TaskId(1, 0);
@@ -165,7 +175,7 @@ public class StateDirectoryTest {
channel2.tryLock();
fail("shouldn't be able to lock already locked directory");
} catch (final OverlappingFileLockException e) {
- // pass
+ // swallow
} finally {
directory.unlock(taskId);
directory.unlock(taskId2);
@@ -173,7 +183,7 @@ public class StateDirectoryTest {
}
@Test
- public void shouldReleaseTaskStateDirectoryLock() throws IOException {
+ public void shouldReleaseTaskStateDirectoryLock() throws Exception {
final TaskId taskId = new TaskId(0, 0);
final File taskDirectory = directory.directoryForTask(taskId);
@@ -191,7 +201,7 @@ public class StateDirectoryTest {
}
@Test
- public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws IOException {
+ public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws Exception {
final TaskId task0 = new TaskId(0, 0);
final TaskId task1 = new TaskId(1, 0);
try {
@@ -199,13 +209,13 @@ public class StateDirectoryTest {
directory.lock(task1);
directory.directoryForTask(new TaskId(2, 0));
- List<File> files = Arrays.asList(appDir.listFiles());
+ List<File> files = Arrays.asList(Objects.requireNonNull(appDir.listFiles()));
assertEquals(3, files.size());
time.sleep(1000);
directory.cleanRemovedTasks(0);
- files = Arrays.asList(appDir.listFiles());
+ files = Arrays.asList(Objects.requireNonNull(appDir.listFiles()));
assertEquals(2, files.size());
assertTrue(files.contains(new File(appDir, task0.toString())));
assertTrue(files.contains(new File(appDir, task1.toString())));
@@ -264,8 +274,8 @@ public class StateDirectoryTest {
assertTrue(taskDir.exists());
}
- @Test(expected = OverlappingFileLockException.class)
- public void shouldLockGlobalStateDirectory() throws IOException {
+ @Test
+ public void shouldLockGlobalStateDirectory() throws Exception {
directory.lockGlobalState();
try (
@@ -275,13 +285,16 @@ public class StateDirectoryTest {
StandardOpenOption.WRITE)
) {
channel.lock();
+ fail("Should have thrown OverlappingFileLockException");
+ } catch (final OverlappingFileLockException expcted) {
+ // swallow
} finally {
directory.unlockGlobalState();
}
}
@Test
- public void shouldUnlockGlobalStateDirectory() throws IOException {
+ public void shouldUnlockGlobalStateDirectory() throws Exception {
directory.lockGlobalState();
directory.unlockGlobalState();
@@ -297,7 +310,7 @@ public class StateDirectoryTest {
}
@Test
- public void shouldNotLockStateDirLockedByAnotherThread() throws IOException, InterruptedException {
+ public void shouldNotLockStateDirLockedByAnotherThread() throws Exception {
final TaskId taskId = new TaskId(0, 0);
final AtomicReference<IOException> exceptionOnThread = new AtomicReference<>();
final Thread thread = new Thread(() -> {
@@ -314,7 +327,7 @@ public class StateDirectoryTest {
}
@Test
- public void shouldNotUnLockStateDirLockedByAnotherThread() throws IOException, InterruptedException {
+ public void shouldNotUnLockStateDirLockedByAnotherThread() throws Exception {
final TaskId taskId = new TaskId(0, 0);
final CountDownLatch lockLatch = new CountDownLatch(1);
final CountDownLatch unlockLatch = new CountDownLatch(1);
@@ -348,24 +361,24 @@ public class StateDirectoryTest {
directory.directoryForTask(new TaskId(1, 0));
directory.globalStateDir();
- List<File> files = Arrays.asList(appDir.listFiles());
+ List<File> files = Arrays.asList(Objects.requireNonNull(appDir.listFiles()));
assertEquals(2, files.size());
directory.clean();
- files = Arrays.asList(appDir.listFiles());
+ files = Arrays.asList(Objects.requireNonNull(appDir.listFiles()));
assertEquals(0, files.size());
}
@Test
- public void shouldNotCreateBaseDirectory() {
+ public void shouldNotCreateBaseDirectory() throws Exception {
initializeStateDirectory(false);
assertFalse(stateDir.exists());
assertFalse(appDir.exists());
}
@Test
- public void shouldNotCreateTaskStateDirectory() {
+ public void shouldNotCreateTaskStateDirectory() throws Exception {
initializeStateDirectory(false);
final TaskId taskId = new TaskId(0, 0);
final File taskDirectory = directory.directoryForTask(taskId);
@@ -373,21 +386,21 @@ public class StateDirectoryTest {
}
@Test
- public void shouldNotCreateGlobalStateDirectory() {
+ public void shouldNotCreateGlobalStateDirectory() throws Exception {
initializeStateDirectory(false);
final File globalStateDir = directory.globalStateDir();
assertFalse(globalStateDir.exists());
}
@Test
- public void shouldLockTaskStateDirectoryWhenDirectoryCreationDisabled() throws IOException {
+ public void shouldLockTaskStateDirectoryWhenDirectoryCreationDisabled() throws Exception {
initializeStateDirectory(false);
final TaskId taskId = new TaskId(0, 0);
assertTrue(directory.lock(taskId));
}
@Test
- public void shouldLockGlobalStateDirectoryWhenDirectoryCreationDisabled() throws IOException {
+ public void shouldLockGlobalStateDirectoryWhenDirectoryCreationDisabled() throws Exception {
initializeStateDirectory(false);
assertTrue(directory.lockGlobalState());
}