You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/12 18:12:32 UTC

[kafka] branch trunk updated: KAFKA-7023: Add unit test (#5197)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7a59061  KAFKA-7023: Add unit test (#5197)
7a59061 is described below

commit 7a590612524bd8e899c8d6bbb73daece7f352d46
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Jun 12 11:12:07 2018 -0700

    KAFKA-7023: Add unit test (#5197)
    
    Add a unit test that validates after restoreStart, the options are set with bulk loading configs; and after restoreEnd, it resumes to the customized configs
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 .../streams/state/internals/RocksDBStore.java      |  5 ++
 .../streams/state/internals/RocksDBStoreTest.java  | 89 ++++++++++++++--------
 .../kafka/test/InternalMockProcessorContext.java   | 15 +++-
 3 files changed, 74 insertions(+), 35 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 6084ecb..e858ac0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -572,4 +572,9 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
             rocksDBStore.toggleDbForBulkLoading(false);
         }
     }
+
+    // for testing
+    public Options getOptions() {
+        return options;
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index b7a9d37..63d877a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -16,23 +16,21 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
 import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -43,10 +41,10 @@ import java.io.File;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -57,8 +55,6 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class RocksDBStoreTest {
-    private final File tempDir = TestUtils.tempDirectory();
-
     private Serializer<String> stringSerializer = new StringSerializer();
     private Deserializer<String> stringDeserializer = new StringDeserializer();
     private RocksDBStore rocksDBStore;
@@ -67,13 +63,14 @@ public class RocksDBStoreTest {
 
     @Before
     public void setUp() {
+        final Properties props = StreamsTestUtils.minimalStreamsConfig();
+        props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
         rocksDBStore = new RocksDBStore("test");
         dir = TestUtils.tempDirectory();
         context = new InternalMockProcessorContext(dir,
             Serdes.String(),
             Serdes.String(),
-            new NoOpRecordCollector(),
-            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
+            new StreamsConfig(props));
     }
 
     @After
@@ -82,6 +79,21 @@ public class RocksDBStoreTest {
     }
 
     @Test
+    public void shouldRespectBulkloadOptionsDuringInit() {
+        rocksDBStore.init(context, rocksDBStore);
+
+        StateRestoreListener restoreListener = context.getRestoreListener(rocksDBStore.name());
+
+        restoreListener.onRestoreStart(null, rocksDBStore.name(), 0L, 0L);
+
+        assertThat(rocksDBStore.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30));
+
+        restoreListener.onRestoreEnd(null, rocksDBStore.name(), 0L);
+
+        assertThat(rocksDBStore.getOptions().level0FileNumCompactionTrigger(), equalTo(10));
+    }
+
+    @Test
     public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() throws Exception {
         rocksDBStore.init(context, rocksDBStore);
 
@@ -108,28 +120,27 @@ public class RocksDBStoreTest {
     }
 
     @Test
-    public void verifyRocksDbConfigSetterIsCalled() {
-        final Map<String, Object> configs = new HashMap<>();
-        configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
-        configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test-server:9092");
-        configs.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
+    public void shouldCallRocksDbConfigSetter() {
         MockRocksDbConfigSetter.called = false;
-        rocksDBStore.openDB(new InternalMockProcessorContext(tempDir, new StreamsConfig(configs)));
+
+        rocksDBStore.openDB(context);
 
         assertTrue(MockRocksDbConfigSetter.called);
     }
 
-    @Test(expected = ProcessorStateException.class)
-    public void shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir() throws IOException {
+    @Test
+    public void shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir() {
         final File tmpDir = TestUtils.tempDirectory();
-        InternalMockProcessorContext tmpContext = new InternalMockProcessorContext(tmpDir,
-            Serdes.String(),
-            Serdes.Long(),
-            new NoOpRecordCollector(),
-            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
-        tmpDir.setReadOnly();
+        InternalMockProcessorContext tmpContext = new InternalMockProcessorContext(tmpDir, new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()));
+
+        assertTrue(tmpDir.setReadOnly());
 
-        rocksDBStore.openDB(tmpContext);
+        try {
+            rocksDBStore.openDB(tmpContext);
+            fail("Should have thrown ProcessorStateException");
+        } catch (ProcessorStateException e) {
+            // this is good, do nothing
+        }
     }
 
     @Test
@@ -221,7 +232,7 @@ public class RocksDBStoreTest {
     }
 
     @Test
-    public void shouldPutOnlyIfAbsentValue() throws Exception {
+    public void shouldPutOnlyIfAbsentValue() {
         rocksDBStore.init(context, rocksDBStore);
         final Bytes keyBytes = new Bytes(stringSerializer.serialize(null, "one"));
         final byte[] valueBytes = stringSerializer.serialize(null, "A");
@@ -237,7 +248,7 @@ public class RocksDBStoreTest {
     @Test
     public void shouldHandleDeletesOnRestoreAll() throws Exception {
         final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
-        entries.add(new KeyValue<>("1".getBytes("UTF-8"), (byte[]) null));
+        entries.add(new KeyValue<>("1".getBytes("UTF-8"), null));
 
         rocksDBStore.init(context, rocksDBStore);
         context.restore(rocksDBStore.name(), entries);
@@ -258,7 +269,7 @@ public class RocksDBStoreTest {
         entries.add(new KeyValue<>("1".getBytes("UTF-8"), "a".getBytes("UTF-8")));
         entries.add(new KeyValue<>("2".getBytes("UTF-8"), "b".getBytes("UTF-8")));
         // this will be deleted
-        entries.add(new KeyValue<>("1".getBytes("UTF-8"), (byte[]) null));
+        entries.add(new KeyValue<>("1".getBytes("UTF-8"), null));
         entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8")));
         // this will restore key "1" as WriteBatch applies updates in order
         entries.add(new KeyValue<>("1".getBytes("UTF-8"), "restored".getBytes("UTF-8")));
@@ -320,7 +331,7 @@ public class RocksDBStoreTest {
 
         entries.add(new KeyValue<>("2".getBytes("UTF-8"), "b".getBytes("UTF-8")));
         entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8")));
-        entries.add(new KeyValue<>("1".getBytes("UTF-8"), (byte[]) null));
+        entries.add(new KeyValue<>("1".getBytes("UTF-8"), null));
 
         context.restore(rocksDBStore.name(), entries);
 
@@ -342,7 +353,9 @@ public class RocksDBStoreTest {
         try {
             rocksDBStore.put(null, stringSerializer.serialize(null, "someVal"));
             fail("Should have thrown NullPointerException on null put()");
-        } catch (NullPointerException e) { }
+        } catch (NullPointerException e) {
+            // this is good
+        }
     }
 
     @Test
@@ -351,7 +364,9 @@ public class RocksDBStoreTest {
         try {
             rocksDBStore.put(null, stringSerializer.serialize(null, "someVal"));
             fail("Should have thrown NullPointerException on null put()");
-        } catch (NullPointerException e) { }
+        } catch (NullPointerException e) {
+            // this is good
+        }
     }
 
     @Test
@@ -360,7 +375,9 @@ public class RocksDBStoreTest {
         try {
             rocksDBStore.get(null);
             fail("Should have thrown NullPointerException on null get()");
-        } catch (NullPointerException e) { }
+        } catch (NullPointerException e) {
+            // this is good
+        }
     }
 
     @Test
@@ -369,7 +386,9 @@ public class RocksDBStoreTest {
         try {
             rocksDBStore.delete(null);
             fail("Should have thrown NullPointerException on deleting null key");
-        } catch (NullPointerException e) { }
+        } catch (NullPointerException e) {
+            // this is good
+        }
     }
 
     @Test
@@ -378,7 +397,9 @@ public class RocksDBStoreTest {
         try {
             rocksDBStore.range(null, new Bytes(stringSerializer.serialize(null, "2")));
             fail("Should have thrown NullPointerException on deleting null key");
-        } catch (NullPointerException e) { }
+        } catch (NullPointerException e) {
+            // this is good
+        }
     }
 
     @Test(expected = ProcessorStateException.class)
@@ -397,6 +418,8 @@ public class RocksDBStoreTest {
         @Override
         public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
             called = true;
+
+            options.setLevel0FileNumCompactionTrigger(10);
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index e5571eb..bb42d1c 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -78,6 +78,13 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
         this(stateDir, null, null, new StreamsMetricsImpl(new Metrics(), "mock"), config, null, null);
     }
 
+    public InternalMockProcessorContext(final File stateDir,
+                                        final Serde<?> keySerde,
+                                        final Serde<?> valSerde,
+                                        final StreamsConfig config) {
+        this(stateDir, keySerde, valSerde, new StreamsMetricsImpl(new Metrics(), "mock"), config, null, null);
+    }
+
     public InternalMockProcessorContext(final StateSerdes<?, ?> serdes,
                                         final RecordCollector collector) {
         this(null, serdes.keySerde(), serdes.valueSerde(), collector, null);
@@ -293,10 +300,14 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
         return Collections.unmodifiableMap(storeMap);
     }
 
-    public void restore(final String storeName, final Iterable<KeyValue<byte[], byte[]>> changeLog) {
+    public StateRestoreListener getRestoreListener(final String storeName) {
+        final BatchingStateRestoreCallback restoreCallback = getBatchingRestoreCallback(restoreFuncs.get(storeName));
+        return getStateRestoreListener(restoreCallback);
+    }
 
+    public void restore(final String storeName, final Iterable<KeyValue<byte[], byte[]>> changeLog) {
         final BatchingStateRestoreCallback restoreCallback = getBatchingRestoreCallback(restoreFuncs.get(storeName));
-        final StateRestoreListener restoreListener = getStateRestoreListener(restoreCallback);
+        final StateRestoreListener restoreListener = getRestoreListener(storeName);
 
         restoreListener.onRestoreStart(null, storeName, 0L, 0L);
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.