You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/04/18 20:02:00 UTC

[kafka] branch trunk updated: MINOR: cleanup RocksDBStore tests (#8510)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6ea3eed  MINOR: cleanup RocksDBStore tests  (#8510)
6ea3eed is described below

commit 6ea3eedfd8c74d40157d6e489408d871e172ba34
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Sat Apr 18 13:01:16 2020 -0700

    MINOR: cleanup RocksDBStore tests  (#8510)
    
    One of the new rocksdb unit tests creates a non-temporary rocksdb directory wherever the test is run from, with some rocksdb files left behind after the test(s) are done. We should use the tempDirectory dir for this testing
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../streams/state/internals/RocksDBStoreTest.java  | 118 ++++++++-------------
 .../internals/RocksDBTimestampedStoreTest.java     |   4 -
 2 files changed, 47 insertions(+), 75 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index a67754d..b1ee48f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -74,8 +74,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.powermock.api.easymock.PowerMock.replay;
 import static org.powermock.api.easymock.PowerMock.verify;
 
@@ -106,6 +106,11 @@ public class RocksDBStoreTest {
         context.metrics().setRocksDBMetricsRecordingTrigger(new RocksDBMetricsRecordingTrigger());
     }
 
+    @After
+    public void tearDown() {
+        rocksDBStore.close();
+    }
+
     RocksDBStore getRocksDBStore() {
         return new RocksDBStore(DB_NAME, METRICS_SCOPE);
     }
@@ -137,15 +142,10 @@ public class RocksDBStoreTest {
         return getProcessorContext(streamsProps);
     }
 
-    @After
-    public void tearDown() {
-        rocksDBStore.close();
-    }
-
     @Test
     public void shouldAddStatisticsToInjectedMetricsRecorderWhenRecordingLevelIsDebug() {
         final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder();
-        final InternalMockProcessorContext mockContext = getProcessorContext(RecordingLevel.DEBUG);
+        context = getProcessorContext(RecordingLevel.DEBUG);
         reset(metricsRecorder);
         metricsRecorder.addStatistics(
             eq(DB_NAME),
@@ -153,46 +153,46 @@ public class RocksDBStoreTest {
         );
         replay(metricsRecorder);
 
-        store.openDB(mockContext);
+        store.openDB(context);
 
         verify(metricsRecorder);
     }
 
     @Test
     public void shouldNotAddStatisticsToInjectedMetricsRecorderWhenRecordingLevelIsInfo() {
-        final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder();
-        final InternalMockProcessorContext mockContext = getProcessorContext(RecordingLevel.INFO);
+        rocksDBStore= getRocksDBStoreWithRocksDBMetricsRecorder();
+        context = getProcessorContext(RecordingLevel.INFO);
         reset(metricsRecorder);
         replay(metricsRecorder);
 
-        store.openDB(mockContext);
+        rocksDBStore.openDB(context);
 
         verify(metricsRecorder);
     }
 
     @Test
     public void shouldRemoveStatisticsFromInjectedMetricsRecorderOnCloseWhenRecordingLevelIsDebug() {
-        final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder();
-        final InternalMockProcessorContext mockContext = getProcessorContext(RecordingLevel.DEBUG);
-        store.openDB(mockContext);
+        rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
+        context = getProcessorContext(RecordingLevel.DEBUG);
+        rocksDBStore.openDB(context);
         reset(metricsRecorder);
         metricsRecorder.removeStatistics(DB_NAME);
         replay(metricsRecorder);
 
-        store.close();
+        rocksDBStore.close();
 
         verify(metricsRecorder);
     }
 
     @Test
     public void shouldNotRemoveStatisticsFromInjectedMetricsRecorderOnCloseWhenRecordingLevelIsInfo() {
-        final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder();
-        final InternalMockProcessorContext mockContext = getProcessorContext(RecordingLevel.INFO);
-        store.openDB(mockContext);
+        rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
+        context = getProcessorContext(RecordingLevel.INFO);
+        rocksDBStore.openDB(context);
         reset(metricsRecorder);
         replay(metricsRecorder);
 
-        store.close();
+        rocksDBStore.close();
 
         verify(metricsRecorder);
     }
@@ -211,26 +211,22 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldNotAddStatisticsToInjectedMetricsRecorderWhenUserProvidesStatistics() {
-        final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder();
-        final InternalMockProcessorContext mockContext =
-            getProcessorContext(RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class);
+        rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
+        context = getProcessorContext(RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class);
         replay(metricsRecorder);
 
-        store.openDB(mockContext);
+        rocksDBStore.openDB(context);
         verify(metricsRecorder);
     }
 
     @Test
     public void shouldNotRemoveStatisticsFromInjectedMetricsRecorderOnCloseWhenUserProvidesStatistics() {
-        final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder();
-        final InternalMockProcessorContext mockContext =
-            getProcessorContext(RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class);
-        store.openDB(mockContext);
+        rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
+        context = getProcessorContext(RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class);
+        rocksDBStore.openDB(context);
         reset(metricsRecorder);
         replay(metricsRecorder);
 
-        store.close();
-
         verify(metricsRecorder);
     }
 
@@ -290,12 +286,7 @@ public class RocksDBStoreTest {
 
         assertTrue(tmpDir.setReadOnly());
 
-        try {
-            rocksDBStore.openDB(tmpContext);
-            fail("Should have thrown ProcessorStateException");
-        } catch (final ProcessorStateException e) {
-            // this is good, do nothing
-        }
+        assertThrows(ProcessorStateException.class, () -> rocksDBStore.openDB(tmpContext));
     }
 
     @Test
@@ -503,56 +494,41 @@ public class RocksDBStoreTest {
     @Test
     public void shouldThrowNullPointerExceptionOnNullPut() {
         rocksDBStore.init(context, rocksDBStore);
-        try {
-            rocksDBStore.put(null, stringSerializer.serialize(null, "someVal"));
-            fail("Should have thrown NullPointerException on null put()");
-        } catch (final NullPointerException e) {
-            // this is good
-        }
+        assertThrows(
+            NullPointerException.class,
+            () -> rocksDBStore.put(null, stringSerializer.serialize(null, "someVal")));
     }
 
     @Test
     public void shouldThrowNullPointerExceptionOnNullPutAll() {
         rocksDBStore.init(context, rocksDBStore);
-        try {
-            rocksDBStore.put(null, stringSerializer.serialize(null, "someVal"));
-            fail("Should have thrown NullPointerException on null put()");
-        } catch (final NullPointerException e) {
-            // this is good
-        }
+        assertThrows(
+            NullPointerException.class,
+            () -> rocksDBStore.put(null, stringSerializer.serialize(null, "someVal")));
     }
 
     @Test
     public void shouldThrowNullPointerExceptionOnNullGet() {
         rocksDBStore.init(context, rocksDBStore);
-        try {
-            rocksDBStore.get(null);
-            fail("Should have thrown NullPointerException on null get()");
-        } catch (final NullPointerException e) {
-            // this is good
-        }
+        assertThrows(
+            NullPointerException.class,
+            () -> rocksDBStore.get(null));
     }
 
     @Test
     public void shouldThrowNullPointerExceptionOnDelete() {
         rocksDBStore.init(context, rocksDBStore);
-        try {
-            rocksDBStore.delete(null);
-            fail("Should have thrown NullPointerException on deleting null key");
-        } catch (final NullPointerException e) {
-            // this is good
-        }
+        assertThrows(
+            NullPointerException.class,
+            () -> rocksDBStore.delete(null));
     }
 
     @Test
     public void shouldThrowNullPointerExceptionOnRange() {
         rocksDBStore.init(context, rocksDBStore);
-        try {
-            rocksDBStore.range(null, new Bytes(stringSerializer.serialize(null, "2")));
-            fail("Should have thrown NullPointerException on null range key");
-        } catch (final NullPointerException e) {
-            // this is good
-        }
+        assertThrows(
+            NullPointerException.class,
+            () -> rocksDBStore.range(null, new Bytes(stringSerializer.serialize(null, "2"))));
     }
 
     @Test(expected = ProcessorStateException.class)
@@ -567,10 +543,8 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldHandleToggleOfEnablingBloomFilters() {
-
         final Properties props = StreamsTestUtils.getStreamsConfig();
         props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TestingBloomFilterRocksDBConfigSetter.class);
-        rocksDBStore = getRocksDBStore();
         dir = TestUtils.tempDirectory();
         context = new InternalMockProcessorContext(dir,
             Serdes.String(),
@@ -616,20 +590,22 @@ public class RocksDBStoreTest {
 
     @Test
     public void shouldVerifyThatMetricsGetMeasurementsFromRocksDB() {
+        final TaskId taskId = new TaskId(0, 0);
+
         final RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger = new RocksDBMetricsRecordingTrigger();
         final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.DEBUG));
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, "test-application", StreamsConfig.METRICS_LATEST);
         streamsMetrics.setRocksDBMetricsRecordingTrigger(rocksDBMetricsRecordingTrigger);
-        final ProcessorContext<Object, Object> context = EasyMock.niceMock(ProcessorContext.class);
+
+        context = EasyMock.niceMock(InternalMockProcessorContext.class);
         EasyMock.expect(context.metrics()).andStubReturn(streamsMetrics);
-        final TaskId taskId = new TaskId(0, 0);
         EasyMock.expect(context.taskId()).andStubReturn(taskId);
         EasyMock.expect(context.appConfigs())
             .andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
-        EasyMock.expect(context.taskId()).andStubReturn(taskId);
+        EasyMock.expect(context.stateDir()).andStubReturn(dir);
         EasyMock.replay(context);
-        final RocksDBStore rocksDBStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
+
         rocksDBStore.init(context, rocksDBStore);
         final byte[] key = "hello".getBytes();
         final byte[] value = "world".getBytes();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
index 3ba2f1f..fdfb6a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
@@ -71,13 +71,10 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
 
         // re-open store
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
-        rocksDBStore = getRocksDBStore();
         rocksDBStore.init(context, rocksDBStore);
         assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in regular mode"));
         LogCaptureAppender.unregister(appender);
 
-        rocksDBStore.close();
-
         // verify store
         final DBOptions dbOptions = new DBOptions();
         final ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions();
@@ -201,7 +198,6 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
         // approx: 0 entries on old CF, 3 in new CF
         assertThat(rocksDBStore.approximateNumEntries(), is(3L));
 
-
         iteratorsShouldNotMigrateData();
         assertThat(rocksDBStore.approximateNumEntries(), is(3L));