You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/07 08:39:54 UTC
[1/2] kafka git commit: KAFKA-5650;
add StateStoreBuilder interface and implementations
Repository: kafka
Updated Branches:
refs/heads/trunk 667cd60dc -> 9cbb9f093
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
new file mode 100644
index 0000000..a0500b6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import static org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS;
+
+public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier {
+ private final String name;
+ private final long retentionPeriod;
+ private final int segments;
+ private final long windowSize;
+ private final boolean retainDuplicates;
+
+ public RocksDbWindowBytesStoreSupplier(final String name,
+ final long retentionPeriod,
+ final int segments,
+ final long windowSize,
+ final boolean retainDuplicates) {
+ if (segments < MIN_SEGMENTS) {
+ throw new IllegalArgumentException("numSegments must be >= " + MIN_SEGMENTS);
+ }
+ this.name = name;
+ this.retentionPeriod = retentionPeriod;
+ this.segments = segments;
+ this.windowSize = windowSize;
+ this.retainDuplicates = retainDuplicates;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public WindowStore<Bytes, byte[]> get() {
+ final RocksDBSegmentedBytesStore segmentedBytesStore = new RocksDBSegmentedBytesStore(
+ name,
+ retentionPeriod,
+ segments,
+ new WindowKeySchema()
+ );
+ return RocksDBWindowStore.bytesStore(segmentedBytesStore,
+ retainDuplicates,
+ windowSize);
+
+ }
+
+ @Override
+ public String metricsScope() {
+ return "rocksdb-window";
+ }
+
+ @Override
+ public int segments() {
+ return segments;
+ }
+
+ @Override
+ public long windowSize() {
+ return windowSize;
+ }
+
+ @Override
+ public boolean retainDuplicates() {
+ return retainDuplicates;
+ }
+
+ @Override
+ public long retentionPeriod() {
+ return retentionPeriod;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
new file mode 100644
index 0000000..61919c3
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+
+
+public class SessionStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, SessionStore<K, V>> {
+
+ private final SessionBytesStoreSupplier storeSupplier;
+
+ public SessionStoreBuilder(final SessionBytesStoreSupplier storeSupplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final Time time) {
+ super(storeSupplier.name(), keySerde, valueSerde, time);
+ this.storeSupplier = storeSupplier;
+ }
+
+ @Override
+ public SessionStore<K, V> build() {
+ return new MeteredSessionStore<>(maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
+ storeSupplier.metricsScope(),
+ keySerde,
+ valueSerde,
+ time);
+ }
+
+ private SessionStore<Bytes, byte[]> maybeWrapCaching(final SessionStore<Bytes, byte[]> inner) {
+ if (!enableCaching) {
+ return inner;
+ }
+ return new CachingSessionStore<>(inner,
+ keySerde,
+ valueSerde,
+ storeSupplier.segmentIntervalMs());
+ }
+
+ private SessionStore<Bytes, byte[]> maybeWrapLogging(final SessionStore<Bytes, byte[]> inner) {
+ if (!enableLogging) {
+ return inner;
+ }
+ return new ChangeLoggingSessionBytesStore(inner);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
new file mode 100644
index 0000000..97b4883
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+
+public class WindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowStore<K, V>> {
+
+ private final WindowBytesStoreSupplier storeSupplier;
+
+ public WindowStoreBuilder(final WindowBytesStoreSupplier storeSupplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final Time time) {
+ super(storeSupplier.name(), keySerde, valueSerde, time);
+ this.storeSupplier = storeSupplier;
+ }
+
+ @Override
+ public WindowStore<K, V> build() {
+ return new MeteredWindowStore<>(maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
+ storeSupplier.metricsScope(),
+ time,
+ keySerde,
+ valueSerde);
+ }
+
+ private WindowStore<Bytes, byte[]> maybeWrapCaching(final WindowStore<Bytes, byte[]> inner) {
+ if (!enableCaching) {
+ return inner;
+ }
+ return new CachingWindowStore<>(inner,
+ keySerde,
+ valueSerde,
+ storeSupplier.windowSize(),
+ storeSupplier.segments());
+ }
+
+ private WindowStore<Bytes, byte[]> maybeWrapLogging(final WindowStore<Bytes, byte[]> inner) {
+ if (!enableLogging) {
+ return inner;
+ }
+ return new ChangeLoggingWindowBytesStore(inner, storeSupplier.retainDuplicates());
+ }
+
+ public long retentionPeriod() {
+ return storeSupplier.retentionPeriod();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 12947d8..dbdc854 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -23,10 +23,12 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.ProcessorTopologyTestDriver;
+import org.easymock.EasyMock;
import org.junit.Test;
import java.util.Arrays;
@@ -42,6 +44,8 @@ import static org.junit.Assert.fail;
public class TopologyTest {
+ private final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
+ private final KeyValueStoreBuilder globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class);
private final Topology topology = new Topology();
private final InternalTopologyBuilder.TopologyDescription expectedDescription = new InternalTopologyBuilder.TopologyDescription();
@@ -203,38 +207,52 @@ public class TopologyTest {
@Test(expected = TopologyException.class)
public void shouldNotAllowToAddStateStoreToNonExistingProcessor() {
- topology.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
+ mockStoreBuilder();
+ EasyMock.replay(storeBuilder);
+ topology.addStateStore(storeBuilder, "no-such-processsor");
}
@Test
public void shouldNotAllowToAddStateStoreToSource() {
+ mockStoreBuilder();
+ EasyMock.replay(storeBuilder);
topology.addSource("source-1", "topic-1");
try {
- topology.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
+ topology.addStateStore(storeBuilder, "source-1");
fail("Should have thrown TopologyException for adding store to source node");
} catch (final TopologyException expected) { }
}
@Test
public void shouldNotAllowToAddStateStoreToSink() {
+ mockStoreBuilder();
+ EasyMock.replay(storeBuilder);
topology.addSink("sink-1", "topic-1");
try {
- topology.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
+ topology.addStateStore(storeBuilder, "sink-1");
fail("Should have thrown TopologyException for adding store to sink node");
} catch (final TopologyException expected) { }
}
+ private void mockStoreBuilder() {
+ EasyMock.expect(storeBuilder.name()).andReturn("store").anyTimes();
+ EasyMock.expect(storeBuilder.logConfig()).andReturn(Collections.emptyMap());
+ EasyMock.expect(storeBuilder.loggingEnabled()).andReturn(false);
+ }
+
@Test
public void shouldNotAllowToAddStoreWithSameName() {
- topology.addStateStore(new MockStateStoreSupplier("store", false));
+ mockStoreBuilder();
+ EasyMock.replay(storeBuilder);
+ topology.addStateStore(storeBuilder);
try {
- topology.addStateStore(new MockStateStoreSupplier("store", false));
+ topology.addStateStore(storeBuilder);
fail("Should have thrown TopologyException for duplicate store name");
} catch (final TopologyException expected) { }
}
@Test(expected = TopologyBuilderException.class)
- public void shouldThroughOnUnassignedStateStoreAccess() throws Exception {
+ public void shouldThrowOnUnassignedStateStoreAccess() throws Exception {
final String sourceNodeName = "source";
final String goodNodeName = "goodGuy";
final String badNodeName = "badGuy";
@@ -243,12 +261,14 @@ public class TopologyTest {
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
final StreamsConfig streamsConfig = new StreamsConfig(config);
-
+ mockStoreBuilder();
+ EasyMock.expect(storeBuilder.build()).andReturn(new MockStateStoreSupplier.MockStateStore("store", false));
+ EasyMock.replay(storeBuilder);
topology
.addSource(sourceNodeName, "topic")
.addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName)
.addStateStore(
- Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
+ storeBuilder,
goodNodeName)
.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
@@ -292,8 +312,10 @@ public class TopologyTest {
@Test(expected = TopologyException.class)
public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
+ EasyMock.expect(globalStoreBuilder.name()).andReturn("anyName").anyTimes();
+ EasyMock.replay(globalStoreBuilder);
topology.addGlobalStore(
- new MockStateStoreSupplier("anyName", false, false),
+ globalStoreBuilder,
"sameName",
null,
null,
@@ -611,7 +633,10 @@ public class TopologyTest {
topology.addProcessor(processorName, new MockProcessorSupplier(), parentNames);
if (newStores) {
for (final String store : storeNames) {
- topology.addStateStore(new MockStateStoreSupplier(store, false), processorName);
+ final StoreBuilder storeBuilder = EasyMock.createNiceMock(StoreBuilder.class);
+ EasyMock.expect(storeBuilder.name()).andReturn(store).anyTimes();
+ EasyMock.replay(storeBuilder);
+ topology.addStateStore(storeBuilder, processorName);
}
} else {
topology.connectProcessorAndStateStores(processorName, storeNames);
@@ -651,8 +676,11 @@ public class TopologyTest {
final String sourceName,
final String globalTopicName,
final String processorName) {
+ final KeyValueStoreBuilder globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class);
+ EasyMock.expect(globalStoreBuilder.name()).andReturn(globalStoreName).anyTimes();
+ EasyMock.replay(globalStoreBuilder);
topology.addGlobalStore(
- new MockStateStoreSupplier(globalStoreName, false, false),
+ globalStoreBuilder,
sourceName,
null,
null,
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 9bd8756..91edac5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -470,7 +470,7 @@ public class InternalTopologyBuilderTest {
@Test(expected = NullPointerException.class)
public void shouldNotAddNullStateStoreSupplier() throws Exception {
- builder.addStateStore(null);
+ builder.addStateStore((StateStoreSupplier) null);
}
private Set<String> nodeNames(final Collection<ProcessorNode> nodes) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index 66adbf5..700b243 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -18,11 +18,20 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
+import org.apache.kafka.streams.state.internals.RocksDBSessionStore;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
+import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
import org.junit.Test;
import java.util.Collections;
import java.util.Map;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -30,6 +39,7 @@ import static org.junit.Assert.fail;
public class StoresTest {
+ @SuppressWarnings("deprecation")
@Test
public void shouldCreateInMemoryStoreSupplierWithLoggedConfig() throws Exception {
final StateStoreSupplier supplier = Stores.create("store")
@@ -44,6 +54,7 @@ public class StoresTest {
assertEquals("1000", config.get("retention.ms"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldCreateInMemoryStoreSupplierNotLogged() throws Exception {
final StateStoreSupplier supplier = Stores.create("store")
@@ -56,6 +67,7 @@ public class StoresTest {
assertFalse(supplier.loggingEnabled());
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldCreatePersistenStoreSupplierWithLoggedConfig() throws Exception {
final StateStoreSupplier supplier = Stores.create("store")
@@ -70,6 +82,7 @@ public class StoresTest {
assertEquals("1000", config.get("retention.ms"));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldCreatePersistenStoreSupplierNotLogged() throws Exception {
final StateStoreSupplier supplier = Stores.create("store")
@@ -95,4 +108,53 @@ public class StoresTest {
// ok
}
}
+
+ @Test
+ public void shouldCreateInMemoryKeyValueStore() {
+ assertThat(Stores.inMemoryKeyValueStore("memory").get(), instanceOf(InMemoryKeyValueStore.class));
+ }
+
+ @Test
+ public void shouldCreateMemoryNavigableCache() {
+ assertThat(Stores.lruMap("map", 10).get(), instanceOf(MemoryNavigableLRUCache.class));
+ }
+
+ @Test
+ public void shouldCreateRocksDbStore() {
+ assertThat(Stores.persistentKeyValueStore("store").get(), instanceOf(RocksDBStore.class));
+ }
+
+ @Test
+ public void shouldCreateRocksDbWindowStore() {
+ assertThat(Stores.persistentWindowStore("store", 1, 3, 1, false).get(), instanceOf(RocksDBWindowStore.class));
+ }
+
+ @Test
+ public void shouldCreateRocksDbSessionStore() {
+ assertThat(Stores.persistentSessionStore("store", 1).get(), instanceOf(RocksDBSessionStore.class));
+ }
+
+ @Test
+ public void shouldBuildWindowStore() {
+ final WindowStore<String, String> store = Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 3, 2, 3, true),
+ Serdes.String(),
+ Serdes.String()).build();
+ assertThat(store, not(nullValue()));
+ }
+
+ @Test
+ public void shouldBuildKeyValueStore() {
+ final KeyValueStore<String, String> store = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("name"),
+ Serdes.String(),
+ Serdes.String()).build();
+ assertThat(store, not(nullValue()));
+ }
+
+ @Test
+ public void shouldBuildSessionStore() {
+ final SessionStore<String, String> store = Stores.sessionStoreBuilder(Stores.persistentSessionStore("name", 10),
+ Serdes.String(),
+ Serdes.String()).build();
+ assertThat(store, not(nullValue()));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java
new file mode 100644
index 0000000..2d378d8
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.hamcrest.CoreMatchers;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.Collections;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
+@RunWith(EasyMockRunner.class)
+public class KeyValueStoreBuilderTest {
+
+ @Mock(type = MockType.NICE)
+ private KeyValueBytesStoreSupplier supplier;
+ @Mock(type = MockType.NICE)
+ private KeyValueStore<Bytes, byte[]> inner;
+ private KeyValueStoreBuilder<String, String> builder;
+
+ @Before
+ public void setUp() throws Exception {
+ EasyMock.expect(supplier.get()).andReturn(inner);
+ EasyMock.expect(supplier.name()).andReturn("name");
+ EasyMock.replay(supplier);
+ builder = new KeyValueStoreBuilder<>(supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()
+ );
+
+ }
+
+ @Test
+ public void shouldHaveMeteredStoreAsOuterStore() {
+ final KeyValueStore<String, String> store = builder.build();
+ assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+ }
+
+ @Test
+ public void shouldHaveChangeLoggingStoreByDefault() {
+ final KeyValueStore<String, String> store = builder.build();
+ assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+ final StateStore next = ((WrappedStateStore) store).wrappedStore();
+ assertThat(next, instanceOf(ChangeLoggingKeyValueBytesStore.class));
+ }
+
+ @Test
+ public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
+ final KeyValueStore<String, String> store = builder.withLoggingDisabled().build();
+ final StateStore next = ((WrappedStateStore) store).wrappedStore();
+ assertThat(next, CoreMatchers.<StateStore>equalTo(inner));
+ }
+
+ @Test
+ public void shouldHaveCachingStoreWhenEnabled() {
+ final KeyValueStore<String, String> store = builder.withCachingEnabled().build();
+ final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+ assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+ assertThat(wrapped, instanceOf(CachingKeyValueStore.class));
+ }
+
+ @Test
+ public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
+ final KeyValueStore<String, String> store = builder
+ .withLoggingEnabled(Collections.<String, String>emptyMap())
+ .build();
+ final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+ assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+ assertThat(wrapped, instanceOf(ChangeLoggingKeyValueBytesStore.class));
+ assertThat(((WrappedStateStore) wrapped).wrappedStore(), CoreMatchers.<StateStore>equalTo(inner));
+ }
+
+ @Test
+ public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
+ final KeyValueStore<String, String> store = builder
+ .withLoggingEnabled(Collections.<String, String>emptyMap())
+ .withCachingEnabled()
+ .build();
+ final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
+ final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrappedStore();
+ assertThat(store, instanceOf(MeteredKeyValueBytesStore.class));
+ assertThat(caching, instanceOf(CachingKeyValueStore.class));
+ assertThat(changeLogging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
+ assertThat(changeLogging.wrappedStore(), CoreMatchers.<StateStore>equalTo(inner));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerIfInnerIsNull() {
+ new KeyValueStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerIfKeySerdeIsNull() {
+ new KeyValueStoreBuilder<>(supplier, null, Serdes.String(), new MockTime());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerIfValueSerdeIsNull() {
+ new KeyValueStoreBuilder<>(supplier, Serdes.String(), null, new MockTime());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerIfTimeIsNull() {
+ new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerIfMetricsScopeIsNull() {
+ new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java
new file mode 100644
index 0000000..621a1c2
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.hamcrest.CoreMatchers;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.Collections;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
+@RunWith(EasyMockRunner.class)
+public class SessionStoreBuilderTest {
+
+ @Mock(type = MockType.NICE)
+ private SessionBytesStoreSupplier supplier;
+ @Mock(type = MockType.NICE)
+ private SessionStore<Bytes, byte[]> inner;
+ private SessionStoreBuilder<String, String> builder;
+
+ @Before
+ public void setUp() throws Exception {
+
+ EasyMock.expect(supplier.get()).andReturn(inner);
+ EasyMock.expect(supplier.name()).andReturn("name");
+ EasyMock.replay(supplier);
+
+ builder = new SessionStoreBuilder<>(supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()
+ );
+ }
+
+ @Test
+ public void shouldHaveMeteredStoreAsOuterStore() {
+ final SessionStore<String, String> store = builder.build();
+ assertThat(store, instanceOf(MeteredSessionStore.class));
+ }
+
+ @Test
+ public void shouldHaveChangeLoggingStoreByDefault() {
+ final SessionStore<String, String> store = builder.build();
+ final StateStore next = ((WrappedStateStore) store).wrappedStore();
+ assertThat(next, instanceOf(ChangeLoggingSessionBytesStore.class));
+ }
+
+ @Test
+ public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
+ final SessionStore<String, String> store = builder.withLoggingDisabled().build();
+ final StateStore next = ((WrappedStateStore) store).wrappedStore();
+ assertThat(next, CoreMatchers.<StateStore>equalTo(inner));
+ }
+
+ @Test
+ public void shouldHaveCachingStoreWhenEnabled() {
+ final SessionStore<String, String> store = builder.withCachingEnabled().build();
+ final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+ assertThat(store, instanceOf(MeteredSessionStore.class));
+ assertThat(wrapped, instanceOf(CachingSessionStore.class));
+ }
+
+ @Test
+ public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
+ final SessionStore<String, String> store = builder
+ .withLoggingEnabled(Collections.<String, String>emptyMap())
+ .build();
+ final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+ assertThat(store, instanceOf(MeteredSessionStore.class));
+ assertThat(wrapped, instanceOf(ChangeLoggingSessionBytesStore.class));
+ assertThat(((WrappedStateStore) wrapped).wrappedStore(), CoreMatchers.<StateStore>equalTo(inner));
+ }
+
+ @Test
+ public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
+ final SessionStore<String, String> store = builder
+ .withLoggingEnabled(Collections.<String, String>emptyMap())
+ .withCachingEnabled()
+ .build();
+ final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
+ final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrappedStore();
+ assertThat(store, instanceOf(MeteredSessionStore.class));
+ assertThat(caching, instanceOf(CachingSessionStore.class));
+ assertThat(changeLogging, instanceOf(ChangeLoggingSessionBytesStore.class));
+ assertThat(changeLogging.wrappedStore(), CoreMatchers.<StateStore>equalTo(inner));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerIfInnerIsNull() {
+ new SessionStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerIfKeySerdeIsNull() {
+ new SessionStoreBuilder<>(supplier, null, Serdes.String(), new MockTime());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerIfValueSerdeIsNull() {
+ new SessionStoreBuilder<>(supplier, Serdes.String(), null, new MockTime());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerIfTimeIsNull() {
+ new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerIfMetricsScopeIsNull() {
+ new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
new file mode 100644
index 0000000..25b8178
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.hamcrest.CoreMatchers;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.Collections;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
+@RunWith(EasyMockRunner.class)
+public class WindowStoreBuilderTest {
+
+ @Mock(type = MockType.NICE)
+ private WindowBytesStoreSupplier supplier;
+ @Mock(type = MockType.NICE)
+ private WindowStore<Bytes, byte[]> inner;
+ private WindowStoreBuilder<String, String> builder;
+
+ @Before
+ public void setUp() throws Exception {
+ EasyMock.expect(supplier.get()).andReturn(inner);
+ EasyMock.expect(supplier.name()).andReturn("name");
+ EasyMock.replay(supplier);
+
+ builder = new WindowStoreBuilder<>(supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime());
+
+ }
+
+ @Test
+ public void shouldHaveMeteredStoreAsOuterStore() {
+ final WindowStore<String, String> store = builder.build();
+ assertThat(store, instanceOf(MeteredWindowStore.class));
+ }
+
+ @Test
+ public void shouldHaveChangeLoggingStoreByDefault() {
+ final WindowStore<String, String> store = builder.build();
+ final StateStore next = ((WrappedStateStore) store).wrappedStore();
+ assertThat(next, instanceOf(ChangeLoggingWindowBytesStore.class));
+ }
+
+ @Test
+ public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
+ final WindowStore<String, String> store = builder.withLoggingDisabled().build();
+ final StateStore next = ((WrappedStateStore) store).wrappedStore();
+ assertThat(next, CoreMatchers.<StateStore>equalTo(inner));
+ }
+
+ @Test
+ public void shouldHaveCachingStoreWhenEnabled() {
+ final WindowStore<String, String> store = builder.withCachingEnabled().build();
+ final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+ assertThat(store, instanceOf(MeteredWindowStore.class));
+ assertThat(wrapped, instanceOf(CachingWindowStore.class));
+ }
+
+ @Test
+ public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
+ final WindowStore<String, String> store = builder
+ .withLoggingEnabled(Collections.<String, String>emptyMap())
+ .build();
+ final StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+ assertThat(store, instanceOf(MeteredWindowStore.class));
+ assertThat(wrapped, instanceOf(ChangeLoggingWindowBytesStore.class));
+ assertThat(((WrappedStateStore) wrapped).wrappedStore(), CoreMatchers.<StateStore>equalTo(inner));
+ }
+
+ @Test
+ public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
+ final WindowStore<String, String> store = builder
+ .withLoggingEnabled(Collections.<String, String>emptyMap())
+ .withCachingEnabled()
+ .build();
+ final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore();
+ final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrappedStore();
+ assertThat(store, instanceOf(MeteredWindowStore.class));
+ assertThat(caching, instanceOf(CachingWindowStore.class));
+ assertThat(changeLogging, instanceOf(ChangeLoggingWindowBytesStore.class));
+ assertThat(changeLogging.wrappedStore(), CoreMatchers.<StateStore>equalTo(inner));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerIfInnerIsNull() {
+ new WindowStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerIfKeySerdeIsNull() {
+ new WindowStoreBuilder<>(supplier, null, Serdes.String(), new MockTime());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerIfValueSerdeIsNull() {
+ new WindowStoreBuilder<>(supplier, Serdes.String(), null, new MockTime());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerIfTimeIsNull() {
+ new WindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null);
+ }
+
+}
\ No newline at end of file
[2/2] kafka git commit: KAFKA-5650;
add StateStoreBuilder interface and implementations
Posted by da...@apache.org.
KAFKA-5650; add StateStoreBuilder interface and implementations
Part of KIP-182
- Add `StateStoreBuilder` interface and `WindowStateStoreBuilder`, `KeyValueStateStoreBuilder`, and `SessionStateStoreBuilder` implementations
- Add `StoreSupplier`, `WindowBytesStoreSupplier`, `KeyValueBytesStoreSupplier`, `SessionBytesStoreSupplier` interfaces and implementations
- Add new methods to `Stores` to create the newly added `StoreSupplier` and `StateStoreBuilder` implementations
- Update `Topology` and `InternalTopology` to use the interfaces
Author: Damian Guy <da...@gmail.com>
Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #3767 from dguy/kafka-5650
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9cbb9f09
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9cbb9f09
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9cbb9f09
Branch: refs/heads/trunk
Commit: 9cbb9f0939c88868bf33ae340c5fa756ee2965e8
Parents: 667cd60
Author: Damian Guy <da...@gmail.com>
Authored: Thu Sep 7 09:39:46 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Thu Sep 7 09:39:46 2017 +0100
----------------------------------------------------------------------
docs/streams/developer-guide.html | 56 ++--
.../wordcount/WordCountProcessorDemo.java | 6 +-
.../java/org/apache/kafka/streams/Topology.java | 24 +-
.../streams/processor/StateStoreSupplier.java | 2 +
.../internals/InternalTopologyBuilder.java | 295 +++++++++++++++----
.../state/KeyValueBytesStoreSupplier.java | 26 ++
.../state/SessionBytesStoreSupplier.java | 33 +++
.../kafka/streams/state/StoreBuilder.java | 82 ++++++
.../kafka/streams/state/StoreSupplier.java | 47 +++
.../org/apache/kafka/streams/state/Stores.java | 148 ++++++++++
.../streams/state/WindowBytesStoreSupplier.java | 56 ++++
.../state/internals/AbstractStoreBuilder.java | 84 ++++++
.../state/internals/KeyValueStoreBuilder.java | 62 ++++
.../internals/RocksDBKeyValueStoreSupplier.java | 49 +--
.../state/internals/RocksDBSessionStore.java | 2 +-
.../internals/RocksDBSessionStoreSupplier.java | 51 +---
.../state/internals/RocksDBWindowStore.java | 2 +-
.../internals/RocksDBWindowStoreSupplier.java | 62 ++--
.../RocksDbKeyValueBytesStoreSupplier.java | 48 +++
.../RocksDbSessionBytesStoreSupplier.java | 59 ++++
.../RocksDbWindowBytesStoreSupplier.java | 90 ++++++
.../state/internals/SessionStoreBuilder.java | 63 ++++
.../state/internals/WindowStoreBuilder.java | 68 +++++
.../org/apache/kafka/streams/TopologyTest.java | 52 +++-
.../internals/InternalTopologyBuilderTest.java | 2 +-
.../apache/kafka/streams/state/StoresTest.java | 62 ++++
.../internals/KeyValueStoreBuilderTest.java | 141 +++++++++
.../internals/SessionStoreBuilderTest.java | 141 +++++++++
.../state/internals/WindowStoreBuilderTest.java | 135 +++++++++
29 files changed, 1716 insertions(+), 232 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index 10220fb..a140b46 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -320,23 +320,24 @@ You set the <code>org.apache.kafka.streams.processor.StateRestoreListener</code>
<h4> <a id="disable-changelogs" href="#disable-changelogs">Enable / Disable Fault Tolerance of State Stores (Store Changelogs)</a></h4>
<p>
- You can enable or disable fault tolerance for a state store by enabling or disabling, respectively, the changelogging of the store through <code>enableLogging()</code> and <code>disableLogging()</code>.
- You can also fine-tune the associated topic’s configuration if needed.
+ You can enable or disable fault tolerance for a state store by enabling or disabling, respectively ,the changelogging of the store through <code>StateStoreBuilder#withLoggingEnabled(Map<String, String>)</code>
+ and <code>StateStoreBuilder#withLoggingDisabled()</code>.
+ You can also fine-tune the associated topic’s configuration if needed.
</p>
<p>Example for disabling fault-tolerance:</p>
<pre class="brush: java;">
- import org.apache.kafka.streams.processor.StateStoreSupplier;
+ import org.apache.kafka.streams.processor.state.KeyValueBytesStoreSupplier;
+ import org.apache.kafka.streams.processor.state.StateStoreBuilder;
import org.apache.kafka.streams.state.Stores;
- StateStoreSupplier countStoreSupplier = Stores.create("Counts")
- .withKeys(Serdes.String())
- .withValues(Serdes.Long())
- .persistent()
- .disableLogging() // disable backing up the store to a changelog topic
- .build();
+ KeyValueBytesStoreSupplier countStoreSupplier = Stores.inMemoryKeyValueStore("Counts");
+ StateStoreBuilder builder = Stores.keyValueStoreBuilder(countStoreSupplier,
+ Serdes.String(),
+ Serdes.Long())
+ .withLoggingDisabled(); // disable backing up the store to a changelog topic
</pre>
@@ -351,19 +352,20 @@ You set the <code>org.apache.kafka.streams.processor.StateRestoreListener</code>
<pre class="brush: java;">
- import org.apache.kafka.streams.processor.StateStoreSupplier;
+ import org.apache.kafka.streams.processor.state.KeyValueBytesStoreSupplier;
+ import org.apache.kafka.streams.processor.state.StateStoreBuilder;
import org.apache.kafka.streams.state.Stores;
Map<String, String> changelogConfig = new HashMap();
// override min.insync.replicas
changelogConfig.put("min.insyc.replicas", "1")
- StateStoreSupplier countStoreSupplier = Stores.create("Counts")
- .withKeys(Serdes.String())
- .withValues(Serdes.Long())
- .persistent()
- .enableLogging(changelogConfig) // enable changelogging, with custom changelog settings
- .build();
+ KeyValueBytesStoreSupplier countStoreSupplier = Stores.inMemoryKeyValueStore("Counts");
+ StateStoreBuilder builder = Stores.keyValueStoreBuilder(countStoreSupplier,
+ Serdes.String(),
+ Serdes.Long())
+ .withLoggingEnabled(changelogConfig); // enable changelogging, with custom changelog settings
+
</pre>
@@ -376,7 +378,7 @@ You set the <code>org.apache.kafka.streams.processor.StateRestoreListener</code>
</p>
<p>
- In addition to the actual store, you also need to provide a "factory" for the store by implementing the <code>org.apache.kafka.streams.processor.StateStoreSupplier</code> interface, which Kafka Streams uses to create instances of your store.
+ In addition to the actual store, you also need to provide a "factory" for the store by implementing the <code>org.apache.kafka.streams.processor.state.StoreSupplier</code> interface, which Kafka Streams uses to create instances of your store.
</p>
<p>
@@ -2244,7 +2246,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
<li>Your custom state store must implement <code>StateStore</code>.</li>
<li>You should have an interface to represent the operations available on the store.</li>
<li>It is recommended that you also provide an interface that restricts access to read-only operations so users of this API can't mutate the state of your running Kafka Streams application out-of-band.</li>
- <li>You also need to provide an implementation of <code>StateStoreSupplier</code> for creating instances of your store.</li>
+ <li>You also need to provide an implementation of <code>StoreSupplier</code> for creating instances of your store.</li>
</ol>
<p>
@@ -2266,7 +2268,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
V read(K key);
}
- public class MyCustomStoreSupplier implements StateStoreSupplier {
+ public class MyCustomStoreSupplier implements StoreSupplier {
// implementation of the supplier for MyCustomStore
}
</pre>
@@ -2655,18 +2657,14 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
might not want to use the unified record cache for both state store and forwarding downstream.
</p>
<p>
- Following from the example first shown in section <a href="#streams_processor_statestore">State Stores</a>, to enable caching, you can
- add the <code>enableCaching</code> call (note that caches are disabled by default and there is no explicit <code>disableCaching</code>
- call) :
+ Following from the example first shown in section <a href="#streams_processor_statestore">State Stores</a>, to enable caching,
+ you first create a <code>StateStoreBuilder</code> and then call <code>withCachingEnabled</code> (note that caches
+ are disabled by default and there is no explicit <code>withCachingDisabled</code> call) :
</p>
<pre class="brush: java;">
- StateStoreSupplier countStoreSupplier =
- Stores.create("Counts")
- .withKeys(Serdes.String())
- .withValues(Serdes.Long())
- .persistent()
- .enableCaching()
- .build();
+ KeyValueBytesStoreSupplier countSupplier = Stores.persistentKeyValueStore("Counts");
+ StateStoreBuilder<KeyValueStore<String, Long>> builder = Stores.keyValueStoreBuilder(countSupplier, Serdes.String(), Serdes.Long());
+ builder.withCachingEnabled()
</pre>
<h4><a id="streams_developer-guide_memory-management_other_memory_usage" href="#streams_developer-guide_memory-management_other_memory_usage">Other memory usage</a></h4>
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index b0b8be5..cfa2137 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -123,7 +123,11 @@ public class WordCountProcessorDemo {
builder.addSource("Source", "streams-plaintext-input");
builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
- builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");
+ builder.addStateStore(Stores.keyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("Counts"),
+ Serdes.String(),
+ Serdes.Integer()),
+ "Process");
builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/Topology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index e8f7d23..386aacf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
@@ -32,7 +31,8 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import java.util.regex.Pattern;
@@ -431,7 +431,7 @@ public class Topology {
* <p>
* The sink will also use the specified {@link StreamPartitioner} to determine how records are distributed among
* the named Kafka topic's partitions.
- * Such control is often useful with topologies that use {@link #addStateStore(StateStoreSupplier, String...) state
+ * Such control is often useful with topologies that use {@link #addStateStore(StoreBuilder, String...) state
* stores} in its processors.
* In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute
* records among partitions using Kafka's default partitioning logic.
@@ -537,14 +537,14 @@ public class Topology {
/**
* Adds a state store.
*
- * @param supplier the supplier used to obtain this state store {@link StateStore} instance
+ * @param storeBuilder the storeBuilder used to obtain this state store {@link StateStore} instance
* @param processorNames the names of the processors that should be able to access the provided store
* @return itself
* @throws TopologyException if state store supplier is already added
*/
- public synchronized Topology addStateStore(final StateStoreSupplier supplier,
+ public synchronized Topology addStateStore(final StoreBuilder storeBuilder,
final String... processorNames) {
- internalTopologyBuilder.addStateStore(supplier, processorNames);
+ internalTopologyBuilder.addStateStore(storeBuilder, processorNames);
return this;
}
@@ -561,7 +561,7 @@ public class Topology {
* This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
* The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
*
- * @param storeSupplier user defined state store supplier
+ * @param storeBuilder user defined state store builder
* @param sourceName name of the {@link SourceNode} that will be automatically added
* @param keyDeserializer the {@link Deserializer} to deserialize keys with
* @param valueDeserializer the {@link Deserializer} to deserialize values with
@@ -571,14 +571,14 @@ public class Topology {
* @return itself
* @throws TopologyException if the processor of state is already registered
*/
- public synchronized Topology addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+ public synchronized Topology addGlobalStore(final KeyValueStoreBuilder storeBuilder,
final String sourceName,
final Deserializer keyDeserializer,
final Deserializer valueDeserializer,
final String topic,
final String processorName,
final ProcessorSupplier stateUpdateSupplier) {
- internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer,
+ internalTopologyBuilder.addGlobalStore(storeBuilder, sourceName, null, keyDeserializer,
valueDeserializer, topic, processorName, stateUpdateSupplier);
return this;
}
@@ -595,7 +595,7 @@ public class Topology {
* records forwarded from the {@link SourceNode}.
* This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
*
- * @param storeSupplier user defined state store supplier
+ * @param storeBuilder user defined key value store builder
* @param sourceName name of the {@link SourceNode} that will be automatically added
* @param timestampExtractor the stateless timestamp extractor used for this source,
* if not specified the default extractor defined in the configs will be used
@@ -607,7 +607,7 @@ public class Topology {
* @return itself
* @throws TopologyException if the processor of state is already registered
*/
- public synchronized Topology addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+ public synchronized Topology addGlobalStore(final KeyValueStoreBuilder storeBuilder,
final String sourceName,
final TimestampExtractor timestampExtractor,
final Deserializer keyDeserializer,
@@ -615,7 +615,7 @@ public class Topology {
final String topic,
final String processorName,
final ProcessorSupplier stateUpdateSupplier) {
- internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer,
+ internalTopologyBuilder.addGlobalStore(storeBuilder, sourceName, timestampExtractor, keyDeserializer,
valueDeserializer, topic, processorName, stateUpdateSupplier);
return this;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
index 173dab9..536b194 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
@@ -22,7 +22,9 @@ import java.util.Map;
* A state store supplier which can create one or more {@link StateStore} instances.
*
* @param <T> State store type
+ * @deprecated use {@link org.apache.kafka.streams.state.StoreSupplier}
*/
+@Deprecated
public interface StateStoreSupplier<T extends StateStore> {
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index c7f70a0..da5fe38 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -28,6 +28,9 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -119,14 +122,107 @@ public class InternalTopologyBuilder {
private Map<Integer, Set<String>> nodeGroups = null;
- private static class StateStoreFactory {
- public final Set<String> users;
+ interface StateStoreFactory {
+ Set<String> users();
+ boolean loggingEnabled();
+ StateStore build();
+ String name();
+ boolean isWindowStore();
+ Map<String, String> logConfig();
+ long retentionPeriod();
+ }
+
+ private static abstract class AbstractStateStoreFactory implements StateStoreFactory {
+ private final Set<String> users = new HashSet<>();
+ private final String name;
+ private final boolean loggingEnabled;
+ private final boolean windowStore;
+ private final Map<String, String> logConfig;
+
+ AbstractStateStoreFactory(final String name,
+ final boolean loggingEnabled,
+ final boolean windowStore,
+ final Map<String, String> logConfig) {
+ this.name = name;
+ this.loggingEnabled = loggingEnabled;
+ this.windowStore = windowStore;
+ this.logConfig = logConfig;
+ }
+
+ @Override
+ public Set<String> users() {
+ return users;
+ }
+
+ @Override
+ public boolean loggingEnabled() {
+ return loggingEnabled;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
- public final StateStoreSupplier supplier;
+ @Override
+ public boolean isWindowStore() {
+ return windowStore;
+ }
+
+ @Override
+ public Map<String, String> logConfig() {
+ return logConfig;
+ }
+ }
- StateStoreFactory(final StateStoreSupplier supplier) {
+ private static class StateStoreSupplierFactory extends AbstractStateStoreFactory {
+ private final StateStoreSupplier supplier;
+
+ StateStoreSupplierFactory(final StateStoreSupplier<?> supplier) {
+ super(supplier.name(),
+ supplier.loggingEnabled(),
+ supplier instanceof WindowStoreSupplier,
+ supplier.logConfig());
this.supplier = supplier;
- users = new HashSet<>();
+
+ }
+
+ @Override
+ public StateStore build() {
+ return supplier.get();
+ }
+
+ @Override
+ public long retentionPeriod() {
+ if (!isWindowStore()) {
+ throw new IllegalStateException("retentionPeriod is not supported when not a window store");
+ }
+ return ((WindowStoreSupplier) supplier).retentionPeriod();
+ }
+ }
+
+ private static class StoreBuilderFactory extends AbstractStateStoreFactory {
+ private final StoreBuilder builder;
+
+ StoreBuilderFactory(final StoreBuilder<?> builder) {
+ super(builder.name(),
+ builder.loggingEnabled(),
+ builder instanceof WindowStoreBuilder,
+ builder.logConfig());
+ this.builder = builder;
+ }
+
+ @Override
+ public StateStore build() {
+ return builder.build();
+ }
+
+ @Override
+ public long retentionPeriod() {
+ if (!isWindowStore()) {
+ throw new IllegalStateException("retentionPeriod is not supported when not a window store");
+ }
+ return ((WindowStoreBuilder) builder).retentionPeriod();
}
}
@@ -405,7 +501,7 @@ public class InternalTopologyBuilder {
throw new TopologyException("StateStore " + supplier.name() + " is already added.");
}
- stateFactories.put(supplier.name(), new StateStoreFactory(supplier));
+ stateFactories.put(supplier.name(), new StateStoreSupplierFactory(supplier));
if (processorNames != null) {
for (final String processorName : processorNames) {
@@ -414,6 +510,22 @@ public class InternalTopologyBuilder {
}
}
+ public final void addStateStore(final StoreBuilder storeBuilder,
+ final String... processorNames) {
+ Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
+ if (stateFactories.containsKey(storeBuilder.name())) {
+ throw new TopologyException("StateStore " + storeBuilder.name() + " is already added.");
+ }
+
+ stateFactories.put(storeBuilder.name(), new StoreBuilderFactory(storeBuilder));
+
+ if (processorNames != null) {
+ for (final String processorName : processorNames) {
+ connectProcessorAndStateStore(processorName, storeBuilder.name());
+ }
+ }
+ }
+
public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
final String sourceName,
final TimestampExtractor timestampExtractor,
@@ -423,43 +535,52 @@ public class InternalTopologyBuilder {
final String processorName,
final ProcessorSupplier stateUpdateSupplier) {
Objects.requireNonNull(storeSupplier, "store supplier must not be null");
- Objects.requireNonNull(sourceName, "sourceName must not be null");
- Objects.requireNonNull(topic, "topic must not be null");
- Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
- Objects.requireNonNull(processorName, "processorName must not be null");
- if (nodeFactories.containsKey(sourceName)) {
- throw new TopologyException("Processor " + sourceName + " is already added.");
- }
- if (nodeFactories.containsKey(processorName)) {
- throw new TopologyException("Processor " + processorName + " is already added.");
- }
- if (stateFactories.containsKey(storeSupplier.name()) || globalStateStores.containsKey(storeSupplier.name())) {
- throw new TopologyException("StateStore " + storeSupplier.name() + " is already added.");
- }
- if (storeSupplier.loggingEnabled()) {
- throw new TopologyException("StateStore " + storeSupplier.name() + " for global table must not have logging enabled.");
- }
- if (sourceName.equals(processorName)) {
- throw new TopologyException("sourceName and processorName must be different.");
- }
-
+ final String name = storeSupplier.name();
+ validateGlobalStoreArguments(sourceName,
+ topic,
+ processorName,
+ stateUpdateSupplier,
+ name,
+ storeSupplier.loggingEnabled());
validateTopicNotAlreadyRegistered(topic);
+ addGlobalStore(sourceName,
+ timestampExtractor,
+ keyDeserializer,
+ valueDeserializer,
+ topic,
+ processorName,
+ stateUpdateSupplier,
+ name,
+ storeSupplier.get());
+ }
- globalTopics.add(topic);
- final String[] topics = {topic};
- nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer));
- nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
- nodeGrouper.add(sourceName);
- final String[] predecessors = {sourceName};
- final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, predecessors, stateUpdateSupplier);
- nodeFactory.addStateStore(storeSupplier.name());
- nodeFactories.put(processorName, nodeFactory);
- nodeGrouper.add(processorName);
- nodeGrouper.unite(processorName, predecessors);
+ public final void addGlobalStore(final KeyValueStoreBuilder storeBuilder,
+ final String sourceName,
+ final TimestampExtractor timestampExtractor,
+ final Deserializer keyDeserializer,
+ final Deserializer valueDeserializer,
+ final String topic,
+ final String processorName,
+ final ProcessorSupplier stateUpdateSupplier) {
+ Objects.requireNonNull(storeBuilder, "store builder must not be null");
+ validateGlobalStoreArguments(sourceName,
+ topic,
+ processorName,
+ stateUpdateSupplier,
+ storeBuilder.name(),
+ storeBuilder.loggingEnabled());
+ validateTopicNotAlreadyRegistered(topic);
- globalStateStores.put(storeSupplier.name(), storeSupplier.get());
- connectSourceStoreAndTopic(storeSupplier.name(), topic);
+ addGlobalStore(sourceName,
+ timestampExtractor,
+ keyDeserializer,
+ valueDeserializer,
+ topic,
+ processorName,
+ stateUpdateSupplier,
+ storeBuilder.name(),
+ storeBuilder.build());
}
private void validateTopicNotAlreadyRegistered(final String topic) {
@@ -521,6 +642,64 @@ public class InternalTopologyBuilder {
copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
}
+ private void validateGlobalStoreArguments(final String sourceName,
+ final String topic,
+ final String processorName,
+ final ProcessorSupplier stateUpdateSupplier,
+ final String storeName,
+ final boolean loggingEnabled) {
+ Objects.requireNonNull(sourceName, "sourceName must not be null");
+ Objects.requireNonNull(topic, "topic must not be null");
+ Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
+ Objects.requireNonNull(processorName, "processorName must not be null");
+ if (nodeFactories.containsKey(sourceName)) {
+ throw new TopologyException("Processor " + sourceName + " is already added.");
+ }
+ if (nodeFactories.containsKey(processorName)) {
+ throw new TopologyException("Processor " + processorName + " is already added.");
+ }
+ if (stateFactories.containsKey(storeName) || globalStateStores.containsKey(storeName)) {
+ throw new TopologyException("StateStore " + storeName + " is already added.");
+ }
+ if (loggingEnabled) {
+ throw new TopologyException("StateStore " + storeName + " for global table must not have logging enabled.");
+ }
+ if (sourceName.equals(processorName)) {
+ throw new TopologyException("sourceName and processorName must be different.");
+ }
+ }
+
+ private void addGlobalStore(final String sourceName,
+ final TimestampExtractor timestampExtractor,
+ final Deserializer keyDeserializer,
+ final Deserializer valueDeserializer,
+ final String topic,
+ final String processorName,
+ final ProcessorSupplier stateUpdateSupplier,
+ final String name,
+ final KeyValueStore store) {
+ final String[] topics = {topic};
+ final String[] predecessors = {sourceName};
+ final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName,
+ predecessors,
+ stateUpdateSupplier);
+ globalTopics.add(topic);
+ nodeFactories.put(sourceName, new SourceNodeFactory(sourceName,
+ topics,
+ null,
+ timestampExtractor,
+ keyDeserializer,
+ valueDeserializer));
+ nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
+ nodeGrouper.add(sourceName);
+ nodeFactory.addStateStore(name);
+ nodeFactories.put(processorName, nodeFactory);
+ nodeGrouper.add(processorName);
+ nodeGrouper.unite(processorName, predecessors);
+ globalStateStores.put(name, store);
+ connectSourceStoreAndTopic(name, topic);
+ }
+
private void connectProcessorAndStateStore(final String processorName,
final String stateStoreName) {
if (!stateFactories.containsKey(stateStoreName)) {
@@ -531,12 +710,12 @@ public class InternalTopologyBuilder {
}
final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
- final Iterator<String> iter = stateStoreFactory.users.iterator();
+ final Iterator<String> iter = stateStoreFactory.users().iterator();
if (iter.hasNext()) {
final String user = iter.next();
nodeGrouper.unite(user, processorName);
}
- stateStoreFactory.users.add(processorName);
+ stateStoreFactory.users().add(processorName);
final NodeFactory nodeFactory = nodeFactories.get(processorName);
if (nodeFactory instanceof ProcessorNodeFactory) {
@@ -723,22 +902,20 @@ public class InternalTopologyBuilder {
}
for (final String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
if (!stateStoreMap.containsKey(stateStoreName)) {
- final StateStore stateStore;
-
if (stateFactories.containsKey(stateStoreName)) {
- final StateStoreSupplier supplier = stateFactories.get(stateStoreName).supplier;
- stateStore = supplier.get();
+ final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
// remember the changelog topic if this state store is change-logging enabled
- if (supplier.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) {
+ if (stateStoreFactory.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) {
final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, stateStoreName);
storeToChangelogTopic.put(stateStoreName, changelogTopic);
}
+ stateStoreMap.put(stateStoreName, stateStoreFactory.build());
} else {
- stateStore = globalStateStores.get(stateStoreName);
+ stateStoreMap.put(stateStoreName, globalStateStores.get(stateStoreName));
}
- stateStoreMap.put(stateStoreName, stateStore);
+
}
}
} else if (factory instanceof SourceNodeFactory) {
@@ -839,10 +1016,9 @@ public class InternalTopologyBuilder {
// if the node is connected to a state, add to the state topics
for (final StateStoreFactory stateFactory : stateFactories.values()) {
- final StateStoreSupplier supplier = stateFactory.supplier;
- if (supplier.loggingEnabled() && stateFactory.users.contains(node)) {
- final String name = ProcessorStateManager.storeChangelogTopic(applicationId, supplier.name());
- final InternalTopicConfig internalTopicConfig = createInternalTopicConfig(supplier, name);
+ if (stateFactory.loggingEnabled() && stateFactory.users().contains(node)) {
+ final String name = ProcessorStateManager.storeChangelogTopic(applicationId, stateFactory.name());
+ final InternalTopicConfig internalTopicConfig = createInternalTopicConfig(stateFactory, name);
stateChangelogTopics.put(name, internalTopicConfig);
}
}
@@ -891,19 +1067,20 @@ public class InternalTopologyBuilder {
}
}
}
-
- private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier<?> supplier,
+
+ private InternalTopicConfig createInternalTopicConfig(final StateStoreFactory factory,
final String name) {
- if (!(supplier instanceof WindowStoreSupplier)) {
- return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), supplier.logConfig());
+ if (!factory.isWindowStore()) {
+ return new InternalTopicConfig(name,
+ Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+ factory.logConfig());
}
- final WindowStoreSupplier windowStoreSupplier = (WindowStoreSupplier) supplier;
final InternalTopicConfig config = new InternalTopicConfig(name,
Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact,
InternalTopicConfig.CleanupPolicy.delete),
- supplier.logConfig());
- config.setRetentionMs(windowStoreSupplier.retentionPeriod());
+ factory.logConfig());
+ config.setRetentionMs(factory.retentionPeriod());
return config;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
new file mode 100644
index 0000000..73e6732
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueBytesStoreSupplier.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.Bytes;
+
+/**
+ * A store supplier that can be used to create one or more {@link KeyValueStore KeyValueStore<Bytes, byte[]>} instances of type <Byte, byte[]>.
+ */
+public interface KeyValueBytesStoreSupplier extends StoreSupplier<KeyValueStore<Bytes, byte[]>> {
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
new file mode 100644
index 0000000..e5523da
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.Bytes;
+
+/**
+ * A store supplier that can be used to create one or more {@link SessionStore SessionStore<Bytes, byte[]>>} instances of type <Byte, byte[]>.
+ */
+public interface SessionBytesStoreSupplier extends StoreSupplier<SessionStore<Bytes, byte[]>> {
+
+ /**
+ * The size of a segment, in milliseconds. Used when caching is enabled to segment the cache
+ * and reduce the amount of data that needs to be scanned when performing range queries.
+ *
+ * @return segmentInterval in milliseconds
+ */
+ long segmentIntervalMs();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java
new file mode 100644
index 0000000..2d1b241
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.util.Map;
+
+/**
+ * Build a {@link StateStore} wrapped with optional caching and logging.
+ * @param <T> the type of store to build
+ */
+public interface StoreBuilder<T extends StateStore> {
+
+ /**
+ * Enable caching on the store.
+ * @return this
+ */
+ StoreBuilder<T> withCachingEnabled();
+
+ /**
+ * Maintain a changelog for any changes made to the store.
+ * Use the provided config to set the config of the changelog topic.
+ * @param config config applied to the changelog topic
+ * @return this
+ */
+ StoreBuilder<T> withLoggingEnabled(final Map<String, String> config);
+
+ /**
+ * Disable the changelog for store built by this {@link StoreBuilder}.
+ * This will turn off fault-tolerance for your store.
+ * By default the changelog is enabled.
+ * @return this
+ */
+ StoreBuilder<T> withLoggingDisabled();
+
+ /**
+ * Build the store as defined by the builder.
+ *
+ * @return the built {@link StateStore}
+ */
+ T build();
+
+
+ /**
+ * Returns a Map containing any log configs that will be used when creating the changelog for the {@link StateStore}.
+ * <p>
+ * Note: any unrecognized configs will be ignored by the Kafka brokers.
+ *
+ * @return Map containing any log configs to be used when creating the changelog for the {@link StateStore}
+ * If {@code loggingEnabled} returns false, this function will always return an empty map
+ */
+ Map<String, String> logConfig();
+
+ /**
+ * @return {@code true} if the {@link StateStore} should have logging enabled
+ */
+ boolean loggingEnabled();
+
+ /**
+ * Return the name of this state store builder.
+ * This must be a valid Kafka topic name; valid characters are ASCII alphanumerics, '.', '_' and '-'.
+ *
+ * @return the name of this state store builder
+ */
+ String name();
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/StoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/StoreSupplier.java
new file mode 100644
index 0000000..10e6f2d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StoreSupplier.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.processor.StateStore;
+
+/**
+ * A state store supplier which can create one or more {@link StateStore} instances.
+ *
+ * @param <T> State store type
+ */
+public interface StoreSupplier<T extends StateStore> {
+ /**
+ * Return the name of this state store supplier.
+ * This must be a valid Kafka topic name; valid characters are ASCII alphanumerics, '.', '_' and '-'.
+ *
+ * @return the name of this state store supplier
+ */
+ String name();
+
+ /**
+ * Return a new {@link StateStore} instance.
+ *
+ * @return a new {@link StateStore} instance of type T
+ */
+ T get();
+
+ /**
+ * Return a String that is used as the scope for metrics recorded by Metered stores.
+ * @return metricsScope
+ */
+ String metricsScope();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 3cf22c1..c0beb9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -19,12 +19,22 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
+import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
+import org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.RocksDbSessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
+import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,11 +51,149 @@ public class Stores {
private static final Logger log = LoggerFactory.getLogger(Stores.class);
/**
+ * Create a persistent {@link KeyValueBytesStoreSupplier}.
+ * @param name name of the store
+ * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
+ * to build a persistent store
+ */
+ public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) {
+ return new RocksDbKeyValueBytesStoreSupplier(name);
+ }
+
+ /**
+ * Create an in-memory {@link KeyValueBytesStoreSupplier}.
+ * @param name name of the store
+ * @return an instance of a {@link KeyValueBytesStoreSupplier} than can be used to
+ * build an in-memory store
+ */
+ public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(final String name) {
+ return new KeyValueBytesStoreSupplier() {
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public KeyValueStore<Bytes, byte[]> get() {
+ return new InMemoryKeyValueStore<>(name, Serdes.Bytes(), Serdes.ByteArray());
+ }
+
+ @Override
+ public String metricsScope() {
+ return "in-memory-state";
+ }
+ };
+ }
+
+ /**
+ * Create a LRU Map {@link KeyValueBytesStoreSupplier}.
+ * @param name name of the store
+ * @param maxCacheSize maximum number of items in the LRU
+ * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used to build
+ * an LRU Map based store
+ */
+ public static KeyValueBytesStoreSupplier lruMap(final String name, final int maxCacheSize) {
+ return new KeyValueBytesStoreSupplier() {
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public KeyValueStore<Bytes, byte[]> get() {
+ return new MemoryNavigableLRUCache<>(name, maxCacheSize, Serdes.Bytes(), Serdes.ByteArray());
+ }
+
+ @Override
+ public String metricsScope() {
+ return "in-memory-lru-state";
+ }
+ };
+ }
+
+ /**
+ * Create a persistent {@link WindowBytesStoreSupplier}.
+ * @param name name of the store
+ * @param retentionPeriod length of time to retain data in the store
+ * @param numSegments number of db segments
+ * @param windowSize size of the windows
+ * @param retainDuplicates whether or not to retain duplicates.
+ * @return an instance of {@link WindowBytesStoreSupplier}
+ */
+ public static WindowBytesStoreSupplier persistentWindowStore(final String name,
+ final long retentionPeriod,
+ final int numSegments,
+ final long windowSize,
+ final boolean retainDuplicates) {
+ return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, numSegments, windowSize, retainDuplicates);
+ }
+
+ /**
+ * Create a persistent {@link SessionBytesStoreSupplier}.
+ * @param name name of the store
+ * @param retentionPeriod length ot time to retain data in the store
+ * @return an instance of a {@link SessionBytesStoreSupplier}
+ */
+ public static SessionBytesStoreSupplier persistentSessionStore(final String name,
+ final long retentionPeriod) {
+ return new RocksDbSessionBytesStoreSupplier(name, retentionPeriod);
+ }
+
+
+ /**
+ * Creates a {@link StoreBuilder} that can be used to build a {@link WindowStore}.
+ * @param supplier a {@link WindowBytesStoreSupplier}
+ * @param keySerde the key serde to use
+ * @param valueSerde the value serde to use
+ * @param <K> key type
+ * @param <V> value type
+ * @return an instance of {@link StoreBuilder} than can build a {@link WindowStore}
+ */
+ public static <K, V> StoreBuilder<WindowStore<K, V>> windowStoreBuilder(final WindowBytesStoreSupplier supplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde) {
+ return new WindowStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
+ }
+
+ /**
+ * Creates a {@link StoreBuilder} than can be used to build a {@link KeyValueStore}.
+ * @param supplier a {@link KeyValueBytesStoreSupplier}
+ * @param keySerde the key serde to use
+ * @param valueSerde the value serde to use
+ * @param <K> key type
+ * @param <V> value type
+ * @return an instance of a {@link StoreBuilder} that can build a {@link KeyValueStore}
+ */
+ public static <K, V> StoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde) {
+ return new KeyValueStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
+ }
+
+ /**
+ * Creates a {@link StoreBuilder} that can be used to build a {@link SessionStore}.
+ * @param supplier a {@link SessionBytesStoreSupplier}
+ * @param keySerde the key serde to use
+ * @param valueSerde the value serde to use
+ * @param <K> key type
+ * @param <V> value type
+ * @return an instance of {@link StoreBuilder} than can build a {@link SessionStore}
+ * */
+ public static <K, V> StoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final SessionBytesStoreSupplier supplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde) {
+ return new SessionStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
+ }
+
+ /**
* Begin to create a new {@link org.apache.kafka.streams.processor.StateStoreSupplier} instance.
*
* @param name the name of the store
* @return the factory that can be used to specify other options or configurations for the store; never null
+ * @deprected use {@link #persistentKeyValueStore(String)}, {@link #persistentWindowStore(String, long, int, long, boolean)}
+ * {@link #persistentSessionStore(String, long)}, {@link #lruMap(String, int)}, or {@link #inMemoryKeyValueStore(String)}
*/
+ @Deprecated
public static StoreFactory create(final String name) {
return new StoreFactory() {
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
new file mode 100644
index 0000000..5fbe6b0
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.Bytes;
+
+/**
+ * A store supplier that can be used to create one or more {@link WindowStore WindowStore<Bytes, byte[]>>} instances of type <Byte, byte[]>.
+ */
+public interface WindowBytesStoreSupplier extends StoreSupplier<WindowStore<Bytes, byte[]>> {
+ /**
+ * The number of segments the store has. If your store is segmented then this should be the number of segments
+ * in the underlying store.
+ * It is also used to reduce the amount of data that is scanned when caching is enabled.
+ *
+ * @return number of segments
+ */
+ int segments();
+
+ /**
+ * The size of the windows any store created from this supplier is creating.
+ *
+ * @return window size
+ */
+ long windowSize();
+
+ /**
+ * Whether or not this store is retaining duplicate keys.
+ * Usually only true if the store is being used for joins.
+ * Note this should return false if caching is enabled.
+ *
+ * @return true if duplicates should be retained
+ */
+ boolean retainDuplicates();
+
+ /**
+ * The time period for which the {@link WindowStore} will retain historic data.
+ *
+ * @return retentionPeriod
+ */
+ long retentionPeriod();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
new file mode 100644
index 0000000..39b9d03
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+abstract class AbstractStoreBuilder<K, V, T extends StateStore> implements StoreBuilder<T> {
+ private final String name;
+ private Map<String, String> logConfig = new HashMap<>();
+ final Serde<K> keySerde;
+ final Serde<V> valueSerde;
+ final Time time;
+ boolean enableCaching;
+ boolean enableLogging = true;
+
+ AbstractStoreBuilder(final String name,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final Time time) {
+ Objects.requireNonNull(name, "name can't be null");
+ Objects.requireNonNull(time, "time can't be null");
+ this.name = name;
+ this.keySerde = keySerde;
+ this.valueSerde = valueSerde;
+ this.time = time;
+ }
+
+ @Override
+ public StoreBuilder<T> withCachingEnabled() {
+ enableCaching = true;
+ return this;
+ }
+
+ @Override
+ public StoreBuilder<T> withLoggingEnabled(final Map<String, String> config) {
+ Objects.requireNonNull(config, "config can't be null");
+ enableLogging = true;
+ logConfig = config;
+ return this;
+ }
+
+ @Override
+ public StoreBuilder<T> withLoggingDisabled() {
+ enableLogging = false;
+ logConfig.clear();
+ return this;
+ }
+
+ @Override
+ public Map<String, String> logConfig() {
+ return logConfig;
+ }
+
+ @Override
+ public boolean loggingEnabled() {
+ return enableLogging;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
new file mode 100644
index 0000000..20230f9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import java.util.Objects;
+
+public class KeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, KeyValueStore<K, V>> {
+
+ private final KeyValueBytesStoreSupplier storeSupplier;
+
+
+ public KeyValueStoreBuilder(final KeyValueBytesStoreSupplier storeSupplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final Time time) {
+ super(storeSupplier.name(), keySerde, valueSerde, time);
+ Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be null");
+ this.storeSupplier = storeSupplier;
+ }
+
+ @Override
+ public KeyValueStore<K, V> build() {
+ return new MeteredKeyValueBytesStore<>(maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
+ storeSupplier.metricsScope(),
+ time,
+ keySerde,
+ valueSerde);
+ }
+
+ private KeyValueStore<Bytes, byte[]> maybeWrapCaching(final KeyValueStore<Bytes, byte[]> inner) {
+ if (!enableCaching) {
+ return inner;
+ }
+ return new CachingKeyValueStore<>(inner, keySerde, valueSerde);
+ }
+
+ private KeyValueStore<Bytes, byte[]> maybeWrapLogging(final KeyValueStore<Bytes, byte[]> inner) {
+ if (!enableLogging) {
+ return inner;
+ }
+ return new ChangeLoggingKeyValueBytesStore(inner);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index 60a506b..d629c1c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -17,8 +17,6 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -34,48 +32,29 @@ import java.util.Map;
public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
- private static final String METRICS_SCOPE = "rocksdb-state";
- private final boolean cached;
+ private final KeyValueStoreBuilder<K, V> builder;
public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig, boolean cached) {
- this(name, keySerde, valueSerde, null, logged, logConfig, cached);
+ this(name, keySerde, valueSerde, Time.SYSTEM, logged, logConfig, cached);
}
public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig, boolean cached) {
super(name, keySerde, valueSerde, time, logged, logConfig);
- this.cached = cached;
- }
-
- public KeyValueStore get() {
- final RocksDBStore<Bytes, byte[]> rocks = new RocksDBStore<>(name,
- Serdes.Bytes(),
- Serdes.ByteArray());
-
- if (!cached && !logged) {
- return new MeteredKeyValueBytesStore<>(
- rocks, METRICS_SCOPE, time, keySerde, valueSerde);
- }
-
-
- if (cached && logged) {
- final KeyValueStore<Bytes, byte[]> caching = new CachingKeyValueStore<>(new ChangeLoggingKeyValueBytesStore(rocks), keySerde, valueSerde);
- return new MeteredKeyValueBytesStore<>(caching, METRICS_SCOPE, time, keySerde, valueSerde);
- }
-
+ builder = new KeyValueStoreBuilder<>(new RocksDbKeyValueBytesStoreSupplier(name),
+ keySerde,
+ valueSerde,
+ time);
if (cached) {
- final KeyValueStore<Bytes, byte[]> caching = new CachingKeyValueStore<>(rocks, keySerde, valueSerde);
- return new MeteredKeyValueBytesStore<>(caching, METRICS_SCOPE, time, keySerde, valueSerde);
-
- } else {
- // logged
- return new MeteredKeyValueBytesStore<>(
- new ChangeLoggingKeyValueBytesStore(rocks),
- METRICS_SCOPE,
- time,
- keySerde,
- valueSerde);
+ builder.withCachingEnabled();
}
+ // logged by default so we only need to worry about when it is disabled.
+ if (!logged) {
+ builder.withLoggingDisabled();
+ }
+ }
+ public KeyValueStore get() {
+ return builder.build();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
index 9fde74b..77b1abb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -29,7 +29,7 @@ import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
-class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore implements SessionStore<K, AGG> {
+public class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore implements SessionStore<K, AGG> {
private final Serde<K> keySerde;
private final Serde<AGG> aggSerde;
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
index aa0466d..f5432dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.SessionStore;
@@ -33,50 +32,30 @@ import java.util.Map;
*/
public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, SessionStore> implements WindowStoreSupplier<SessionStore> {
- private static final String METRIC_SCOPE = "rocksdb-session";
- private static final int NUM_SEGMENTS = 3;
+ static final int NUM_SEGMENTS = 3;
private final long retentionPeriod;
- private final boolean cached;
+ private final SessionStoreBuilder<K, V> builder;
public RocksDBSessionStoreSupplier(String name, long retentionPeriod, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig, boolean cached) {
super(name, keySerde, valueSerde, Time.SYSTEM, logged, logConfig);
this.retentionPeriod = retentionPeriod;
- this.cached = cached;
- }
-
- public String name() {
- return name;
- }
-
- public SessionStore<K, V> get() {
- final SessionKeySchema keySchema = new SessionKeySchema();
- final long segmentInterval = Segments.segmentInterval(retentionPeriod, NUM_SEGMENTS);
- final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(name,
- retentionPeriod,
- NUM_SEGMENTS,
- keySchema);
-
- final RocksDBSessionStore<Bytes, byte[]> bytesStore = RocksDBSessionStore.bytesStore(segmented);
- return new MeteredSessionStore<>(maybeWrapCaching(maybeWrapLogged(bytesStore), segmentInterval),
- METRIC_SCOPE,
- keySerde,
- valueSerde,
- time);
-
- }
-
- private SessionStore<Bytes, byte[]> maybeWrapLogged(final SessionStore<Bytes, byte[]> inner) {
+ builder = new SessionStoreBuilder<>(new RocksDbSessionBytesStoreSupplier(name,
+ retentionPeriod),
+ keySerde,
+ valueSerde,
+ time);
+ if (cached) {
+ builder.withCachingEnabled();
+ }
+ // logged by default so we only need to worry about when it is disabled.
if (!logged) {
- return inner;
+ builder.withLoggingDisabled();
}
- return new ChangeLoggingSessionBytesStore(inner);
}
- private SessionStore<Bytes, byte[]> maybeWrapCaching(final SessionStore<Bytes, byte[]> inner, final long segmentInterval) {
- if (!cached) {
- return inner;
- }
- return new CachingSessionStore<>(inner, keySerde, valueSerde, segmentInterval);
+ public SessionStore<K, V> get() {
+ return builder.build();
+
}
public long retentionPeriod() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index b147894..b7dd532 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -28,7 +28,7 @@ import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
-class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<K, V> {
+public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<K, V> {
// this is optimizing the case when this store is already a bytes store, in which we can avoid Bytes.wrap() costs
private static class RocksDBWindowBytesStore extends RocksDBWindowStore<Bytes, byte[]> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index 456d9e9..b899f5e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.WindowStore;
@@ -33,14 +32,9 @@ import java.util.Map;
*/
public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, WindowStore> implements WindowStoreSupplier<WindowStore> {
- private static final String METRICS_SCOPE = "rocksdb-window";
public static final int MIN_SEGMENTS = 2;
private final long retentionPeriod;
- private final boolean retainDuplicates;
- private final int numSegments;
- private final long segmentInterval;
- private final long windowSize;
- private final boolean enableCaching;
+ private WindowStoreBuilder<K, V> builder;
public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, long windowSize, boolean logged, Map<String, String> logConfig, boolean enableCaching) {
this(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde, Time.SYSTEM, windowSize, logged, logConfig, enableCaching);
@@ -52,34 +46,25 @@ public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V
throw new IllegalArgumentException("numSegments must be >= " + MIN_SEGMENTS);
}
this.retentionPeriod = retentionPeriod;
- this.retainDuplicates = retainDuplicates;
- this.numSegments = numSegments;
- this.windowSize = windowSize;
- this.enableCaching = enableCaching;
- this.segmentInterval = Segments.segmentInterval(retentionPeriod, numSegments);
- }
-
- public String name() {
- return name;
+ builder = new WindowStoreBuilder<>(new RocksDbWindowBytesStoreSupplier(name,
+ retentionPeriod,
+ numSegments,
+ windowSize,
+ retainDuplicates),
+ keySerde,
+ valueSerde,
+ time);
+ if (enableCaching) {
+ builder.withCachingEnabled();
+ }
+ // logged by default so we only need to worry about when it is disabled.
+ if (!logged) {
+ builder.withLoggingDisabled();
+ }
}
public WindowStore<K, V> get() {
- final RocksDBSegmentedBytesStore segmentedBytesStore = new RocksDBSegmentedBytesStore(
- name,
- retentionPeriod,
- numSegments,
- new WindowKeySchema()
- );
- final RocksDBWindowStore<Bytes, byte[]> innerStore = RocksDBWindowStore.bytesStore(segmentedBytesStore,
- retainDuplicates,
- windowSize);
-
- return new MeteredWindowStore<>(maybeWrapCaching(maybeWrapLogged(innerStore)),
- METRICS_SCOPE,
- time,
- keySerde,
- valueSerde);
-
+ return builder.build();
}
@Override
@@ -87,17 +72,4 @@ public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V
return retentionPeriod;
}
- private WindowStore<Bytes, byte[]> maybeWrapLogged(final WindowStore<Bytes, byte[]> inner) {
- if (!logged) {
- return inner;
- }
- return new ChangeLoggingWindowBytesStore(inner, retainDuplicates);
- }
-
- private WindowStore<Bytes, byte[]> maybeWrapCaching(final WindowStore<Bytes, byte[]> inner) {
- if (!enableCaching) {
- return inner;
- }
- return new CachingWindowStore<>(inner, keySerde, valueSerde, windowSize, segmentInterval);
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java
new file mode 100644
index 0000000..7870579
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class RocksDbKeyValueBytesStoreSupplier implements KeyValueBytesStoreSupplier {
+
+ private final String name;
+
+ public RocksDbKeyValueBytesStoreSupplier(final String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public KeyValueStore<Bytes, byte[]> get() {
+ return new RocksDBStore<>(name,
+ Serdes.Bytes(),
+ Serdes.ByteArray());
+ }
+
+ @Override
+ public String metricsScope() {
+ return "rocksdb-state";
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9cbb9f09/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
new file mode 100644
index 0000000..ffeb7d8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+
+import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS;
+
+public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSupplier {
+ private final String name;
+ private final long retentionPeriod;
+
+ public RocksDbSessionBytesStoreSupplier(final String name,
+ final long retentionPeriod) {
+ this.name = name;
+ this.retentionPeriod = retentionPeriod;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public SessionStore<Bytes, byte[]> get() {
+ final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(name,
+ retentionPeriod,
+ NUM_SEGMENTS,
+ new SessionKeySchema());
+ return new RocksDBSessionStore<>(segmented, Serdes.Bytes(), Serdes.ByteArray());
+ }
+
+ @Override
+ public String metricsScope() {
+ return "rocksdb-session";
+ }
+
+ @Override
+ public long segmentIntervalMs() {
+ return Segments.segmentInterval(retentionPeriod, NUM_SEGMENTS);
+ }
+}