You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/12 18:12:32 UTC
[kafka] branch trunk updated: KAFKA-7023: Add unit test (#5197)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7a59061 KAFKA-7023: Add unit test (#5197)
7a59061 is described below
commit 7a590612524bd8e899c8d6bbb73daece7f352d46
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Jun 12 11:12:07 2018 -0700
KAFKA-7023: Add unit test (#5197)
Add a unit test that validates after restoreStart, the options are set with bulk loading configs; and after restoreEnd, it resumes to the customized configs
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
.../streams/state/internals/RocksDBStore.java | 5 ++
.../streams/state/internals/RocksDBStoreTest.java | 89 ++++++++++++++--------
.../kafka/test/InternalMockProcessorContext.java | 15 +++-
3 files changed, 74 insertions(+), 35 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 6084ecb..e858ac0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -572,4 +572,9 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
rocksDBStore.toggleDbForBulkLoading(false);
}
}
+
+ // for testing
+ public Options getOptions() {
+ return options;
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index b7a9d37..63d877a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -16,23 +16,21 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@@ -43,10 +41,10 @@ import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -57,8 +55,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class RocksDBStoreTest {
- private final File tempDir = TestUtils.tempDirectory();
-
private Serializer<String> stringSerializer = new StringSerializer();
private Deserializer<String> stringDeserializer = new StringDeserializer();
private RocksDBStore rocksDBStore;
@@ -67,13 +63,14 @@ public class RocksDBStoreTest {
@Before
public void setUp() {
+ final Properties props = StreamsTestUtils.minimalStreamsConfig();
+ props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
rocksDBStore = new RocksDBStore("test");
dir = TestUtils.tempDirectory();
context = new InternalMockProcessorContext(dir,
Serdes.String(),
Serdes.String(),
- new NoOpRecordCollector(),
- new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
+ new StreamsConfig(props));
}
@After
@@ -82,6 +79,21 @@ public class RocksDBStoreTest {
}
@Test
+ public void shouldRespectBulkloadOptionsDuringInit() {
+ rocksDBStore.init(context, rocksDBStore);
+
+ StateRestoreListener restoreListener = context.getRestoreListener(rocksDBStore.name());
+
+ restoreListener.onRestoreStart(null, rocksDBStore.name(), 0L, 0L);
+
+ assertThat(rocksDBStore.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30));
+
+ restoreListener.onRestoreEnd(null, rocksDBStore.name(), 0L);
+
+ assertThat(rocksDBStore.getOptions().level0FileNumCompactionTrigger(), equalTo(10));
+ }
+
+ @Test
public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() throws Exception {
rocksDBStore.init(context, rocksDBStore);
@@ -108,28 +120,27 @@ public class RocksDBStoreTest {
}
@Test
- public void verifyRocksDbConfigSetterIsCalled() {
- final Map<String, Object> configs = new HashMap<>();
- configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
- configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test-server:9092");
- configs.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
+ public void shouldCallRocksDbConfigSetter() {
MockRocksDbConfigSetter.called = false;
- rocksDBStore.openDB(new InternalMockProcessorContext(tempDir, new StreamsConfig(configs)));
+
+ rocksDBStore.openDB(context);
assertTrue(MockRocksDbConfigSetter.called);
}
- @Test(expected = ProcessorStateException.class)
- public void shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir() throws IOException {
+ @Test
+ public void shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir() {
final File tmpDir = TestUtils.tempDirectory();
- InternalMockProcessorContext tmpContext = new InternalMockProcessorContext(tmpDir,
- Serdes.String(),
- Serdes.Long(),
- new NoOpRecordCollector(),
- new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
- tmpDir.setReadOnly();
+ InternalMockProcessorContext tmpContext = new InternalMockProcessorContext(tmpDir, new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()));
+
+ assertTrue(tmpDir.setReadOnly());
- rocksDBStore.openDB(tmpContext);
+ try {
+ rocksDBStore.openDB(tmpContext);
+ fail("Should have thrown ProcessorStateException");
+ } catch (ProcessorStateException e) {
+ // this is good, do nothing
+ }
}
@Test
@@ -221,7 +232,7 @@ public class RocksDBStoreTest {
}
@Test
- public void shouldPutOnlyIfAbsentValue() throws Exception {
+ public void shouldPutOnlyIfAbsentValue() {
rocksDBStore.init(context, rocksDBStore);
final Bytes keyBytes = new Bytes(stringSerializer.serialize(null, "one"));
final byte[] valueBytes = stringSerializer.serialize(null, "A");
@@ -237,7 +248,7 @@ public class RocksDBStoreTest {
@Test
public void shouldHandleDeletesOnRestoreAll() throws Exception {
final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
- entries.add(new KeyValue<>("1".getBytes("UTF-8"), (byte[]) null));
+ entries.add(new KeyValue<>("1".getBytes("UTF-8"), null));
rocksDBStore.init(context, rocksDBStore);
context.restore(rocksDBStore.name(), entries);
@@ -258,7 +269,7 @@ public class RocksDBStoreTest {
entries.add(new KeyValue<>("1".getBytes("UTF-8"), "a".getBytes("UTF-8")));
entries.add(new KeyValue<>("2".getBytes("UTF-8"), "b".getBytes("UTF-8")));
// this will be deleted
- entries.add(new KeyValue<>("1".getBytes("UTF-8"), (byte[]) null));
+ entries.add(new KeyValue<>("1".getBytes("UTF-8"), null));
entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8")));
// this will restore key "1" as WriteBatch applies updates in order
entries.add(new KeyValue<>("1".getBytes("UTF-8"), "restored".getBytes("UTF-8")));
@@ -320,7 +331,7 @@ public class RocksDBStoreTest {
entries.add(new KeyValue<>("2".getBytes("UTF-8"), "b".getBytes("UTF-8")));
entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8")));
- entries.add(new KeyValue<>("1".getBytes("UTF-8"), (byte[]) null));
+ entries.add(new KeyValue<>("1".getBytes("UTF-8"), null));
context.restore(rocksDBStore.name(), entries);
@@ -342,7 +353,9 @@ public class RocksDBStoreTest {
try {
rocksDBStore.put(null, stringSerializer.serialize(null, "someVal"));
fail("Should have thrown NullPointerException on null put()");
- } catch (NullPointerException e) { }
+ } catch (NullPointerException e) {
+ // this is good
+ }
}
@Test
@@ -351,7 +364,9 @@ public class RocksDBStoreTest {
try {
rocksDBStore.put(null, stringSerializer.serialize(null, "someVal"));
fail("Should have thrown NullPointerException on null put()");
- } catch (NullPointerException e) { }
+ } catch (NullPointerException e) {
+ // this is good
+ }
}
@Test
@@ -360,7 +375,9 @@ public class RocksDBStoreTest {
try {
rocksDBStore.get(null);
fail("Should have thrown NullPointerException on null get()");
- } catch (NullPointerException e) { }
+ } catch (NullPointerException e) {
+ // this is good
+ }
}
@Test
@@ -369,7 +386,9 @@ public class RocksDBStoreTest {
try {
rocksDBStore.delete(null);
fail("Should have thrown NullPointerException on deleting null key");
- } catch (NullPointerException e) { }
+ } catch (NullPointerException e) {
+ // this is good
+ }
}
@Test
@@ -378,7 +397,9 @@ public class RocksDBStoreTest {
try {
rocksDBStore.range(null, new Bytes(stringSerializer.serialize(null, "2")));
fail("Should have thrown NullPointerException on deleting null key");
- } catch (NullPointerException e) { }
+ } catch (NullPointerException e) {
+ // this is good
+ }
}
@Test(expected = ProcessorStateException.class)
@@ -397,6 +418,8 @@ public class RocksDBStoreTest {
@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
called = true;
+
+ options.setLevel0FileNumCompactionTrigger(10);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index e5571eb..bb42d1c 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -78,6 +78,13 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
this(stateDir, null, null, new StreamsMetricsImpl(new Metrics(), "mock"), config, null, null);
}
+ public InternalMockProcessorContext(final File stateDir,
+ final Serde<?> keySerde,
+ final Serde<?> valSerde,
+ final StreamsConfig config) {
+ this(stateDir, keySerde, valSerde, new StreamsMetricsImpl(new Metrics(), "mock"), config, null, null);
+ }
+
public InternalMockProcessorContext(final StateSerdes<?, ?> serdes,
final RecordCollector collector) {
this(null, serdes.keySerde(), serdes.valueSerde(), collector, null);
@@ -293,10 +300,14 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
return Collections.unmodifiableMap(storeMap);
}
- public void restore(final String storeName, final Iterable<KeyValue<byte[], byte[]>> changeLog) {
+ public StateRestoreListener getRestoreListener(final String storeName) {
+ final BatchingStateRestoreCallback restoreCallback = getBatchingRestoreCallback(restoreFuncs.get(storeName));
+ return getStateRestoreListener(restoreCallback);
+ }
+ public void restore(final String storeName, final Iterable<KeyValue<byte[], byte[]>> changeLog) {
final BatchingStateRestoreCallback restoreCallback = getBatchingRestoreCallback(restoreFuncs.get(storeName));
- final StateRestoreListener restoreListener = getStateRestoreListener(restoreCallback);
+ final StateRestoreListener restoreListener = getRestoreListener(storeName);
restoreListener.onRestoreStart(null, storeName, 0L, 0L);
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.