You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/04/18 20:02:00 UTC
[kafka] branch trunk updated: MINOR: cleanup RocksDBStore tests
(#8510)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6ea3eed MINOR: cleanup RocksDBStore tests (#8510)
6ea3eed is described below
commit 6ea3eedfd8c74d40157d6e489408d871e172ba34
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Sat Apr 18 13:01:16 2020 -0700
MINOR: cleanup RocksDBStore tests (#8510)
One of the new rocksdb unit tests creates a non-temporary rocksdb directory wherever the test is run from, with some rocksdb files left behind after the test(s) are done. We should use the tempDirectory dir for this testing
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../streams/state/internals/RocksDBStoreTest.java | 118 ++++++++-------------
.../internals/RocksDBTimestampedStoreTest.java | 4 -
2 files changed, 47 insertions(+), 75 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index a67754d..b1ee48f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -74,8 +74,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.powermock.api.easymock.PowerMock.replay;
import static org.powermock.api.easymock.PowerMock.verify;
@@ -106,6 +106,11 @@ public class RocksDBStoreTest {
context.metrics().setRocksDBMetricsRecordingTrigger(new RocksDBMetricsRecordingTrigger());
}
+ @After
+ public void tearDown() {
+ rocksDBStore.close();
+ }
+
RocksDBStore getRocksDBStore() {
return new RocksDBStore(DB_NAME, METRICS_SCOPE);
}
@@ -137,15 +142,10 @@ public class RocksDBStoreTest {
return getProcessorContext(streamsProps);
}
- @After
- public void tearDown() {
- rocksDBStore.close();
- }
-
@Test
public void shouldAddStatisticsToInjectedMetricsRecorderWhenRecordingLevelIsDebug() {
final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder();
- final InternalMockProcessorContext mockContext = getProcessorContext(RecordingLevel.DEBUG);
+ context = getProcessorContext(RecordingLevel.DEBUG);
reset(metricsRecorder);
metricsRecorder.addStatistics(
eq(DB_NAME),
@@ -153,46 +153,46 @@ public class RocksDBStoreTest {
);
replay(metricsRecorder);
- store.openDB(mockContext);
+ store.openDB(context);
verify(metricsRecorder);
}
@Test
public void shouldNotAddStatisticsToInjectedMetricsRecorderWhenRecordingLevelIsInfo() {
- final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder();
- final InternalMockProcessorContext mockContext = getProcessorContext(RecordingLevel.INFO);
+ rocksDBStore= getRocksDBStoreWithRocksDBMetricsRecorder();
+ context = getProcessorContext(RecordingLevel.INFO);
reset(metricsRecorder);
replay(metricsRecorder);
- store.openDB(mockContext);
+ rocksDBStore.openDB(context);
verify(metricsRecorder);
}
@Test
public void shouldRemoveStatisticsFromInjectedMetricsRecorderOnCloseWhenRecordingLevelIsDebug() {
- final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder();
- final InternalMockProcessorContext mockContext = getProcessorContext(RecordingLevel.DEBUG);
- store.openDB(mockContext);
+ rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
+ context = getProcessorContext(RecordingLevel.DEBUG);
+ rocksDBStore.openDB(context);
reset(metricsRecorder);
metricsRecorder.removeStatistics(DB_NAME);
replay(metricsRecorder);
- store.close();
+ rocksDBStore.close();
verify(metricsRecorder);
}
@Test
public void shouldNotRemoveStatisticsFromInjectedMetricsRecorderOnCloseWhenRecordingLevelIsInfo() {
- final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder();
- final InternalMockProcessorContext mockContext = getProcessorContext(RecordingLevel.INFO);
- store.openDB(mockContext);
+ rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
+ context = getProcessorContext(RecordingLevel.INFO);
+ rocksDBStore.openDB(context);
reset(metricsRecorder);
replay(metricsRecorder);
- store.close();
+ rocksDBStore.close();
verify(metricsRecorder);
}
@@ -211,26 +211,22 @@ public class RocksDBStoreTest {
@Test
public void shouldNotAddStatisticsToInjectedMetricsRecorderWhenUserProvidesStatistics() {
- final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder();
- final InternalMockProcessorContext mockContext =
- getProcessorContext(RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class);
+ rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
+ context = getProcessorContext(RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class);
replay(metricsRecorder);
- store.openDB(mockContext);
+ rocksDBStore.openDB(context);
verify(metricsRecorder);
}
@Test
public void shouldNotRemoveStatisticsFromInjectedMetricsRecorderOnCloseWhenUserProvidesStatistics() {
- final RocksDBStore store = getRocksDBStoreWithRocksDBMetricsRecorder();
- final InternalMockProcessorContext mockContext =
- getProcessorContext(RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class);
- store.openDB(mockContext);
+ rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
+ context = getProcessorContext(RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class);
+ rocksDBStore.openDB(context);
reset(metricsRecorder);
replay(metricsRecorder);
- store.close();
-
verify(metricsRecorder);
}
@@ -290,12 +286,7 @@ public class RocksDBStoreTest {
assertTrue(tmpDir.setReadOnly());
- try {
- rocksDBStore.openDB(tmpContext);
- fail("Should have thrown ProcessorStateException");
- } catch (final ProcessorStateException e) {
- // this is good, do nothing
- }
+ assertThrows(ProcessorStateException.class, () -> rocksDBStore.openDB(tmpContext));
}
@Test
@@ -503,56 +494,41 @@ public class RocksDBStoreTest {
@Test
public void shouldThrowNullPointerExceptionOnNullPut() {
rocksDBStore.init(context, rocksDBStore);
- try {
- rocksDBStore.put(null, stringSerializer.serialize(null, "someVal"));
- fail("Should have thrown NullPointerException on null put()");
- } catch (final NullPointerException e) {
- // this is good
- }
+ assertThrows(
+ NullPointerException.class,
+ () -> rocksDBStore.put(null, stringSerializer.serialize(null, "someVal")));
}
@Test
public void shouldThrowNullPointerExceptionOnNullPutAll() {
rocksDBStore.init(context, rocksDBStore);
- try {
- rocksDBStore.put(null, stringSerializer.serialize(null, "someVal"));
- fail("Should have thrown NullPointerException on null put()");
- } catch (final NullPointerException e) {
- // this is good
- }
+ assertThrows(
+ NullPointerException.class,
+ () -> rocksDBStore.put(null, stringSerializer.serialize(null, "someVal")));
}
@Test
public void shouldThrowNullPointerExceptionOnNullGet() {
rocksDBStore.init(context, rocksDBStore);
- try {
- rocksDBStore.get(null);
- fail("Should have thrown NullPointerException on null get()");
- } catch (final NullPointerException e) {
- // this is good
- }
+ assertThrows(
+ NullPointerException.class,
+ () -> rocksDBStore.get(null));
}
@Test
public void shouldThrowNullPointerExceptionOnDelete() {
rocksDBStore.init(context, rocksDBStore);
- try {
- rocksDBStore.delete(null);
- fail("Should have thrown NullPointerException on deleting null key");
- } catch (final NullPointerException e) {
- // this is good
- }
+ assertThrows(
+ NullPointerException.class,
+ () -> rocksDBStore.delete(null));
}
@Test
public void shouldThrowNullPointerExceptionOnRange() {
rocksDBStore.init(context, rocksDBStore);
- try {
- rocksDBStore.range(null, new Bytes(stringSerializer.serialize(null, "2")));
- fail("Should have thrown NullPointerException on null range key");
- } catch (final NullPointerException e) {
- // this is good
- }
+ assertThrows(
+ NullPointerException.class,
+ () -> rocksDBStore.range(null, new Bytes(stringSerializer.serialize(null, "2"))));
}
@Test(expected = ProcessorStateException.class)
@@ -567,10 +543,8 @@ public class RocksDBStoreTest {
@Test
public void shouldHandleToggleOfEnablingBloomFilters() {
-
final Properties props = StreamsTestUtils.getStreamsConfig();
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TestingBloomFilterRocksDBConfigSetter.class);
- rocksDBStore = getRocksDBStore();
dir = TestUtils.tempDirectory();
context = new InternalMockProcessorContext(dir,
Serdes.String(),
@@ -616,20 +590,22 @@ public class RocksDBStoreTest {
@Test
public void shouldVerifyThatMetricsGetMeasurementsFromRocksDB() {
+ final TaskId taskId = new TaskId(0, 0);
+
final RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger = new RocksDBMetricsRecordingTrigger();
final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.DEBUG));
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-application", StreamsConfig.METRICS_LATEST);
streamsMetrics.setRocksDBMetricsRecordingTrigger(rocksDBMetricsRecordingTrigger);
- final ProcessorContext<Object, Object> context = EasyMock.niceMock(ProcessorContext.class);
+
+ context = EasyMock.niceMock(InternalMockProcessorContext.class);
EasyMock.expect(context.metrics()).andStubReturn(streamsMetrics);
- final TaskId taskId = new TaskId(0, 0);
EasyMock.expect(context.taskId()).andStubReturn(taskId);
EasyMock.expect(context.appConfigs())
.andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
- EasyMock.expect(context.taskId()).andStubReturn(taskId);
+ EasyMock.expect(context.stateDir()).andStubReturn(dir);
EasyMock.replay(context);
- final RocksDBStore rocksDBStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
+
rocksDBStore.init(context, rocksDBStore);
final byte[] key = "hello".getBytes();
final byte[] value = "world".getBytes();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
index 3ba2f1f..fdfb6a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
@@ -71,13 +71,10 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
// re-open store
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
- rocksDBStore = getRocksDBStore();
rocksDBStore.init(context, rocksDBStore);
assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in regular mode"));
LogCaptureAppender.unregister(appender);
- rocksDBStore.close();
-
// verify store
final DBOptions dbOptions = new DBOptions();
final ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions();
@@ -201,7 +198,6 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
// approx: 0 entries on old CF, 3 in new CF
assertThat(rocksDBStore.approximateNumEntries(), is(3L));
-
iteratorsShouldNotMigrateData();
assertThat(rocksDBStore.approximateNumEntries(), is(3L));