You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/18 14:53:49 UTC
[1/2] kafka git commit: KAFKA-5873;
add materialized overloads to StreamsBuilder
Repository: kafka
Updated Branches:
refs/heads/trunk 52d7b6763 -> f2b74aa1c
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index cbf2b56..0bdd3a3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
@@ -33,7 +34,9 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
@@ -101,9 +104,13 @@ public class GlobalKTableIntegrationTest {
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
- globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), null, globalOne, globalStore);
- stream = builder.stream(inputStream, Consumed.with(Serdes.String(), Serdes.Long()));
- table = builder.table(Serdes.String(), Serdes.Long(), inputTable, "table");
+ globalTable = builder.globalTable(globalOne, Consumed.with(Serdes.Long(), Serdes.String()),
+ Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(globalStore)
+ .withKeySerde(Serdes.Long())
+ .withValueSerde(Serdes.String()));
+ final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
+ stream = builder.stream(inputStream, stringLongConsumed);
+ table = builder.table(inputTable, stringLongConsumed);
foreachAction = new ForeachAction<String, String>() {
@Override
public void apply(final String key, final String value) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index 3a771c4..faa581b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -128,8 +128,8 @@ public class JoinIntegrationTest {
CLUSTER.createTopics(INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC);
builder = new StreamsBuilder();
- leftTable = builder.table(INPUT_TOPIC_1, "leftTable");
- rightTable = builder.table(INPUT_TOPIC_2, "rightTable");
+ leftTable = builder.table(INPUT_TOPIC_1);
+ rightTable = builder.table(INPUT_TOPIC_2);
leftStream = leftTable.toStream();
rightStream = rightTable.toStream();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index a433667..8d4299b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -211,7 +211,8 @@ public class KStreamKTableJoinIntegrationTest {
// subsequently processed in the `leftJoin`, the latest region update for "alice" is "europe"
// (which overrides her previous region value of "asia").
final KTable<String, String> userRegionsTable =
- builder.table(stringSerde, stringSerde, userRegionsTopic, userRegionsStoreName);
+ builder.table(userRegionsTopic,
+ Consumed.with(Serdes.String(), Serdes.String()));
// Compute the number of clicks per region, e.g. "europe" -> 13L.
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index 1b45711..a12ffac 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -343,9 +343,9 @@ public class KTableKTableJoinIntegrationTest {
private KafkaStreams prepareTopology(final JoinType joinType1, final JoinType joinType2, final String queryableName) {
final StreamsBuilder builder = new StreamsBuilder();
- final KTable<String, String> table1 = builder.table(TABLE_1, TABLE_1);
- final KTable<String, String> table2 = builder.table(TABLE_2, TABLE_2);
- final KTable<String, String> table3 = builder.table(TABLE_3, TABLE_3);
+ final KTable<String, String> table1 = builder.table(TABLE_1);
+ final KTable<String, String> table2 = builder.table(TABLE_2);
+ final KTable<String, String> table3 = builder.table(TABLE_3);
Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized = null;
if (queryableName != null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 69c42fe..31b7222 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@@ -106,7 +107,7 @@ public class RestoreIntegrationTest {
createStateForRestoration();
- builder.table(Serdes.Integer(), Serdes.Integer(), inputStream, "store")
+ builder.table(inputStream, Consumed.with(Serdes.Integer(), Serdes.Integer()))
.toStream()
.foreach(new ForeachAction<Integer, Integer>() {
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index 0ee74b8..0f8109d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -54,8 +54,9 @@ public class GlobalKTableJoinsTest {
@Before
public void setUp() {
stateDir = TestUtils.tempDirectory();
- global = builder.globalTable(Serdes.String(), Serdes.String(), null, globalTopic, "global-store");
- stream = builder.stream(streamTopic, Consumed.with(Serdes.String(), Serdes.String()));
+ final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
+ global = builder.globalTable(globalTopic, consumed);
+ stream = builder.stream(streamTopic, consumed);
keyValueMapper = new KeyValueMapper<String, String, String>() {
@Override
public String apply(final String key, final String value) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 1a2dc13..494e197 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -16,17 +16,20 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockTimestampExtractor;
@@ -58,6 +61,8 @@ public class InternalStreamsBuilderTest {
private KStreamTestDriver driver = null;
private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>();
+ private MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
+ = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-store"), false);
@Before
public void setUp() {
@@ -132,27 +137,30 @@ public class InternalStreamsBuilderTest {
}
@Test
- public void shouldStillMaterializeSourceKTableIfStateNameNotSpecified() throws Exception {
- KTable table1 = builder.table("topic1", consumed, "table1");
- KTable table2 = builder.table("topic2", consumed, null);
+ public void shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable() throws Exception {
+ KTable table1 = builder.table("topic2",
+ consumed,
+ new MaterializedInternal<>(
+ Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("topic2"),
+ false));
final ProcessorTopology topology = builder.internalTopologyBuilder.build(null);
- assertEquals(2, topology.stateStores().size());
- assertEquals("table1", topology.stateStores().get(0).name());
+ assertEquals(1, topology.stateStores().size());
+ assertEquals("topic2", topology.stateStores().get(0).name());
- final String internalStoreName = topology.stateStores().get(1).name();
- assertTrue(internalStoreName.contains(KTableImpl.STATE_STORE_NAME));
- assertEquals(2, topology.storeToChangelogTopic().size());
- assertEquals("topic1", topology.storeToChangelogTopic().get("table1"));
- assertEquals("topic2", topology.storeToChangelogTopic().get(internalStoreName));
- assertEquals(table1.queryableStoreName(), "table1");
- assertNull(table2.queryableStoreName());
+ assertEquals(1, topology.storeToChangelogTopic().size());
+ assertEquals("topic2", topology.storeToChangelogTopic().get("topic2"));
+ assertNull(table1.queryableStoreName());
}
@Test
public void shouldBuildSimpleGlobalTableTopology() throws Exception {
- builder.globalTable("table", consumed, "globalTable");
+ builder.globalTable("table",
+ consumed,
+ new MaterializedInternal<>(
+ Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalTable"),
+ false));
final ProcessorTopology topology = builder.internalTopologyBuilder.buildGlobalStateTopology();
final List<StateStore> stateStores = topology.globalStateStores();
@@ -173,16 +181,14 @@ public class InternalStreamsBuilderTest {
@Test
public void shouldBuildGlobalTopologyWithAllGlobalTables() throws Exception {
- builder.globalTable("table", consumed, "globalTable");
- builder.globalTable("table2", consumed, "globalTable2");
-
- doBuildGlobalTopologyWithAllGlobalTables();
- }
-
- @Test
- public void shouldBuildGlobalTopologyWithAllGlobalTablesWithInternalStoreName() throws Exception {
- builder.globalTable("table", consumed, null);
- builder.globalTable("table2", consumed, null);
+ builder.globalTable("table",
+ consumed,
+ new MaterializedInternal<>(
+ Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global1")));
+ builder.globalTable("table2",
+ consumed,
+ new MaterializedInternal<>(
+ Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global2")));
doBuildGlobalTopologyWithAllGlobalTables();
}
@@ -191,10 +197,19 @@ public class InternalStreamsBuilderTest {
public void shouldAddGlobalTablesToEachGroup() throws Exception {
final String one = "globalTable";
final String two = "globalTable2";
- final GlobalKTable<String, String> globalTable = builder.globalTable("table", consumed, one);
- final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2", consumed, two);
- builder.table("not-global", consumed, "not-global");
+ final GlobalKTable<String, String> globalTable = builder.globalTable("table",
+ consumed,
+ new MaterializedInternal<>(
+ Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(one)));
+ final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2",
+ consumed,
+ new MaterializedInternal<>(
+ Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(two)));
+
+ final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
+ = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("not-global"), false);
+ builder.table("not-global", consumed, materialized);
final KeyValueMapper<String, String, String> kvMapper = new KeyValueMapper<String, String, String>() {
@Override
@@ -227,7 +242,9 @@ public class InternalStreamsBuilderTest {
public void shouldMapStateStoresToCorrectSourceTopics() throws Exception {
final KStream<String, String> playEvents = builder.stream(Collections.singleton("events"), consumed);
- final KTable<String, String> table = builder.table("table-topic", consumed, "table-store");
+ final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
+ = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("table-store"), false);
+ final KTable<String, String> table = builder.table("table-topic", consumed, materialized);
assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store"));
final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
@@ -260,8 +277,7 @@ public class InternalStreamsBuilderTest {
@Test
public void shouldAddTableToEarliestAutoOffsetResetList() {
final String topicName = "topic-1";
- final String storeName = "test-store";
- builder.table(topicName, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.EARLIEST)), storeName);
+ builder.table(topicName, new ConsumedInternal<>(Consumed.<String, String>with(AutoOffsetReset.EARLIEST)), materialized);
assertTrue(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
@@ -270,9 +286,7 @@ public class InternalStreamsBuilderTest {
@Test
public void shouldAddTableToLatestAutoOffsetResetList() {
final String topicName = "topic-1";
- final String storeName = "test-store";
-
- builder.table(topicName, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST)), storeName);
+ builder.table(topicName, new ConsumedInternal<>(Consumed.<String, String>with(AutoOffsetReset.LATEST)), materialized);
assertTrue(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
@@ -281,9 +295,8 @@ public class InternalStreamsBuilderTest {
@Test
public void shouldNotAddTableToOffsetResetLists() {
final String topicName = "topic-1";
- final String storeName = "test-store";
- builder.table(topicName, consumed, storeName);
+ builder.table(topicName, consumed, materialized);
assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
@@ -340,15 +353,15 @@ public class InternalStreamsBuilderTest {
@Test
public void ktableShouldHaveNullTimestampExtractorWhenNoneSupplied() throws Exception {
- builder.table("topic", consumed, "store");
+ builder.table("topic", consumed, materialized);
final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
assertNull(processorTopology.source("topic").getTimestampExtractor());
}
@Test
public void ktableShouldUseProvidedTimestampExtractor() throws Exception {
- final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(new MockTimestampExtractor()));
- builder.table("topic", consumed, "store");
+ final ConsumedInternal<String, String> consumed = new ConsumedInternal<>(Consumed.<String, String>with(new MockTimestampExtractor()));
+ builder.table("topic", consumed, materialized);
final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index ff9726e..705cf62 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.ForeachAction;
@@ -60,7 +61,7 @@ public class KGroupedTableImplTest {
@Before
public void before() {
- groupedTable = builder.table(Serdes.String(), Serdes.String(), "blah", "blah")
+ groupedTable = builder.table("blah", Consumed.with(Serdes.String(), Serdes.String()))
.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
}
@@ -157,7 +158,11 @@ public class KGroupedTableImplTest {
}
};
- final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store")
+ final KTable<String, Integer> reduced = builder.table(topic,
+ Consumed.with(Serdes.String(), Serdes.Double()),
+ Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("store")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.Double()))
.groupBy(intProjection)
.reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR, "reduced");
@@ -175,7 +180,11 @@ public class KGroupedTableImplTest {
}
};
- final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store")
+ final KTable<String, Integer> reduced = builder.table(topic,
+ Consumed.with(Serdes.String(), Serdes.Double()),
+ Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("store")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.Double()))
.groupBy(intProjection)
.reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR);
@@ -194,7 +203,7 @@ public class KGroupedTableImplTest {
}
};
- final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store")
+ final KTable<String, Integer> reduced = builder.table(topic, Consumed.with(Serdes.String(), Serdes.Double()))
.groupBy(intProjection)
.reduce(MockReducer.INTEGER_ADDER,
MockReducer.INTEGER_SUBTRACTOR,
@@ -211,10 +220,10 @@ public class KGroupedTableImplTest {
@SuppressWarnings("unchecked")
@Test
public void shouldCountAndMaterializeResults() {
- final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), topic, "store");
+ final KTable<String, String> table = builder.table(topic, Consumed.with(Serdes.String(), Serdes.String()));
table.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
- Serialized.with(Serdes.String(),
- Serdes.String()))
+ Serialized.with(Serdes.String(),
+ Serdes.String()))
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()));
@@ -228,7 +237,7 @@ public class KGroupedTableImplTest {
@SuppressWarnings("unchecked")
@Test
public void shouldAggregateAndMaterializeResults() {
- final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), topic, "store");
+ final KTable<String, String> table = builder.table(topic, Consumed.with(Serdes.String(), Serdes.String()));
table.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
Serialized.with(Serdes.String(),
Serdes.String()))
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 5e8687f..be1d865 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -61,6 +61,7 @@ public class KStreamImplTest {
final private Serde<String> stringSerde = Serdes.String();
final private Serde<Integer> intSerde = Serdes.Integer();
+ private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
private KStream<String, String> testStream;
private StreamsBuilder builder;
private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
@@ -361,7 +362,7 @@ public class KStreamImplTest {
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullValueMapperOnTableJoin() {
- testStream.leftJoin(builder.table(Serdes.String(), Serdes.String(), "topic", "store"), null);
+ testStream.leftJoin(builder.table("topic", stringConsumed), null);
}
@Test(expected = NullPointerException.class)
@@ -383,14 +384,14 @@ public class KStreamImplTest {
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullMapperOnJoinWithGlobalTable() {
- testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
+ testStream.join(builder.globalTable("global", stringConsumed),
null,
MockValueJoiner.TOSTRING_JOINER);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullJoinerOnJoinWithGlobalTable() {
- testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
+ testStream.join(builder.globalTable("global", stringConsumed),
MockKeyValueMapper.<String, String>SelectValueMapper(),
null);
}
@@ -404,14 +405,14 @@ public class KStreamImplTest {
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullMapperOnLeftJoinWithGlobalTable() {
- testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
+ testStream.leftJoin(builder.globalTable("global", stringConsumed),
null,
MockValueJoiner.TOSTRING_JOINER);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullJoinerOnLeftJoinWithGlobalTable() {
- testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
+ testStream.leftJoin(builder.globalTable("global", stringConsumed),
MockKeyValueMapper.<String, String>SelectValueMapper(),
null);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 745ab4e..39b318f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -87,7 +87,7 @@ public class KStreamKStreamLeftJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- driver.setUp(builder, stateDir);
+ driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
driver.setTime(0L);
// push two items to the primary stream. the other window is empty
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index d206b02..d1226c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -67,8 +67,9 @@ public class KStreamKTableJoinTest {
final MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
- stream = builder.stream(topic1, Consumed.with(intSerde, stringSerde));
- table = builder.table(intSerde, stringSerde, topic2, "anyStoreName");
+ final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+ stream = builder.stream(topic1, consumed);
+ table = builder.table(topic2, consumed);
stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -76,7 +77,7 @@ public class KStreamKTableJoinTest {
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
- driver.setUp(builder, stateDir);
+ driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
driver.setTime(0L);
// push two items to the primary stream. the other table is empty
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index a79184e..ed835a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -68,8 +68,9 @@ public class KStreamKTableLeftJoinTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
- stream = builder.stream(topic1, Consumed.with(intSerde, stringSerde));
- table = builder.table(intSerde, stringSerde, topic2, "anyStoreName");
+ Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+ stream = builder.stream(topic1, consumed);
+ table = builder.table(topic2, consumed);
stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index accbb9c..0ae95dd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -50,6 +51,7 @@ import static org.junit.Assert.assertEquals;
public class KTableAggregateTest {
final private Serde<String> stringSerde = Serdes.String();
+ private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
private final Serialized<String, String> stringSerialzied = Serialized.with(stringSerde, stringSerde);
private File stateDir = null;
@@ -70,7 +72,7 @@ public class KTableAggregateTest {
final String topic1 = "topic1";
final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
- KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+ KTable<String, String> table1 = builder.table(topic1, consumed);
KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(),
stringSerialzied
).aggregate(MockInitializer.STRING_INIT,
@@ -81,7 +83,7 @@ public class KTableAggregateTest {
table2.toStream().process(proc);
- driver.setUp(builder, stateDir);
+ driver.setUp(builder, stateDir, Serdes.String(), Serdes.String());
driver.process(topic1, "A", "1");
driver.flushState();
@@ -118,7 +120,7 @@ public class KTableAggregateTest {
final String topic1 = "topic1";
final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
- KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+ KTable<String, String> table1 = builder.table(topic1, consumed);
KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(),
stringSerialzied
).aggregate(MockInitializer.STRING_INIT,
@@ -146,7 +148,7 @@ public class KTableAggregateTest {
final String topic1 = "topic1";
final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
- KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+ KTable<String, String> table1 = builder.table(topic1, consumed);
KTable<String, String> table2 = table1.groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String key, String value) {
@@ -232,7 +234,7 @@ public class KTableAggregateTest {
final String input = "count-test-input";
final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
- builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
+ builder.table(input, consumed)
.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied)
.count("count")
.toStream()
@@ -247,7 +249,7 @@ public class KTableAggregateTest {
final String input = "count-test-input";
final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
- builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
+ builder.table(input, consumed)
.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied)
.count()
.toStream()
@@ -262,7 +264,7 @@ public class KTableAggregateTest {
final String input = "count-test-input";
final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
- builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
+ builder.table(input, consumed)
.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied)
.count("count")
.toStream()
@@ -291,7 +293,7 @@ public class KTableAggregateTest {
final String input = "count-test-input";
final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
- builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
+ builder.table(input, consumed)
.groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
@@ -348,8 +350,8 @@ public class KTableAggregateTest {
final String reduceTopic = "TestDriver-reducer-store-repartition";
final Map<String, Long> reduceResults = new HashMap<>();
- final KTable<String, String> one = builder.table(Serdes.String(), Serdes.String(), tableOne, tableOne);
- final KTable<Long, String> two = builder.table(Serdes.Long(), Serdes.String(), tableTwo, tableTwo);
+ final KTable<String, String> one = builder.table(tableOne, consumed);
+ final KTable<Long, String> two = builder.table(tableTwo, Consumed.with(Serdes.Long(), Serdes.String()));
final KTable<String, Long> reduce = two.groupBy(new KeyValueMapper<Long, String, KeyValue<String, Long>>() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index a885edd..7986277 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
@@ -42,6 +43,7 @@ public class KTableFilterTest {
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
+ private final Consumed<String, Integer> consumed = Consumed.with(stringSerde, intSerde);
@Rule
public final KStreamTestDriver driver = new KStreamTestDriver();
private File stateDir = null;
@@ -79,7 +81,7 @@ public class KTableFilterTest {
final String topic1 = "topic1";
- KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+ KTable<String, Integer> table1 = builder.table(topic1, consumed);
KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
@Override
@@ -104,7 +106,7 @@ public class KTableFilterTest {
final String topic1 = "topic1";
- KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+ KTable<String, Integer> table1 = builder.table(topic1, consumed);
KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
@Override
@@ -128,7 +130,7 @@ public class KTableFilterTest {
final String topic1 = "topic1";
- KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+ KTable<String, Integer> table1 = builder.table(topic1, consumed);
KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
@Override
@@ -213,7 +215,7 @@ public class KTableFilterTest {
String topic1 = "topic1";
KTableImpl<String, Integer, Integer> table1 =
- (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+ (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
new Predicate<String, Integer>() {
@Override
@@ -239,7 +241,7 @@ public class KTableFilterTest {
String topic1 = "topic1";
KTableImpl<String, Integer, Integer> table1 =
- (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+ (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
new Predicate<String, Integer>() {
@Override
@@ -304,7 +306,7 @@ public class KTableFilterTest {
String topic1 = "topic1";
KTableImpl<String, Integer, Integer> table1 =
- (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+ (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
new Predicate<String, Integer>() {
@Override
@@ -323,7 +325,7 @@ public class KTableFilterTest {
String topic1 = "topic1";
KTableImpl<String, Integer, Integer> table1 =
- (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+ (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
new Predicate<String, Integer>() {
@Override
@@ -382,7 +384,7 @@ public class KTableFilterTest {
String topic1 = "topic1";
KTableImpl<String, Integer, Integer> table1 =
- (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+ (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
new Predicate<String, Integer>() {
@Override
@@ -401,7 +403,7 @@ public class KTableFilterTest {
String topic1 = "topic1";
KTableImpl<String, Integer, Integer> table1 =
- (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+ (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
new Predicate<String, Integer>() {
@Override
@@ -440,8 +442,9 @@ public class KTableFilterTest {
String topic1 = "topic1";
+ final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+ (KTableImpl<String, String, String>) builder.table(topic1, consumed);
KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter(
new Predicate<String, String>() {
@Override
@@ -461,8 +464,9 @@ public class KTableFilterTest {
String topic1 = "topic1";
+ final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+ (KTableImpl<String, String, String>) builder.table(topic1, consumed);
KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter(
new Predicate<String, String>() {
@Override
@@ -485,7 +489,7 @@ public class KTableFilterTest {
};
new StreamsBuilder()
- .<Integer, String>table("empty", "emptyStore")
+ .<Integer, String>table("empty")
.filter(numberKeyPredicate)
.filterNot(numberKeyPredicate)
.to("nirvana");
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
index 693aac8..23e0b59 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -18,10 +18,14 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
@@ -79,7 +83,11 @@ public class KTableForeachTest {
// When
StreamsBuilder builder = new StreamsBuilder();
- KTable<Integer, String> table = builder.table(intSerde, stringSerde, topicName, "anyStoreName");
+ KTable<Integer, String> table = builder.table(topicName,
+ Consumed.with(intSerde, stringSerde),
+ new MaterializedInternal<>(Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(topicName)
+ .withKeySerde(intSerde)
+ .withValueSerde(stringSerde)));
table.foreach(action);
// Then
@@ -105,7 +113,7 @@ public class KTableForeachTest {
};
new StreamsBuilder()
- .<Integer, String>table("emptyTopic", "emptyStore")
+ .<Integer, String>table("emptyTopic")
.foreach(consume);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 6ca38b8..9d918e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.KTable;
@@ -54,6 +55,7 @@ import static org.junit.Assert.assertTrue;
public class KTableImplTest {
final private Serde<String> stringSerde = Serdes.String();
+ private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
@Rule
public final KStreamTestDriver driver = new KStreamTestDriver();
private File stateDir = null;
@@ -64,7 +66,7 @@ public class KTableImplTest {
public void setUp() {
stateDir = TestUtils.tempDirectory("kafka-test");
builder = new StreamsBuilder();
- table = builder.table("test", "test");
+ table = builder.table("test");
}
@Test
@@ -73,10 +75,9 @@ public class KTableImplTest {
String topic1 = "topic1";
String topic2 = "topic2";
- String storeName1 = "storeName1";
String storeName2 = "storeName2";
- KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, storeName1);
+ KTable<String, String> table1 = builder.table(topic1, consumed);
MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
table1.toStream().process(proc1);
@@ -130,11 +131,10 @@ public class KTableImplTest {
String topic1 = "topic1";
String topic2 = "topic2";
- String storeName1 = "storeName1";
String storeName2 = "storeName2";
KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
+ (KTableImpl<String, String, String>) builder.table(topic1, consumed);
KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@Override
@@ -256,14 +256,12 @@ public class KTableImplTest {
public void testStateStoreLazyEval() {
String topic1 = "topic1";
String topic2 = "topic2";
- String storeName1 = "storeName1";
- String storeName2 = "storeName2";
final StreamsBuilder builder = new StreamsBuilder();
KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
- builder.table(stringSerde, stringSerde, topic2, storeName2);
+ (KTableImpl<String, String, String>) builder.table(topic1, consumed);
+ builder.table(topic2, consumed);
KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@@ -291,15 +289,13 @@ public class KTableImplTest {
public void testStateStore() {
String topic1 = "topic1";
String topic2 = "topic2";
- String storeName1 = "storeName1";
- String storeName2 = "storeName2";
final StreamsBuilder builder = new StreamsBuilder();
KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
+ (KTableImpl<String, String, String>) builder.table(topic1, consumed);
KTableImpl<String, String, String> table2 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2, storeName2);
+ (KTableImpl<String, String, String>) builder.table(topic2, consumed);
KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@@ -338,7 +334,12 @@ public class KTableImplTest {
final StreamsBuilder builder = new StreamsBuilder();
KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
+ (KTableImpl<String, String, String>) builder.table(topic1,
+ consumed,
+ Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(storeName1)
+ .withKeySerde(stringSerde)
+ .withValueSerde(stringSerde)
+ );
table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, "mock-result1");
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index 124114b..aeb2418 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
@@ -50,6 +51,7 @@ public class KTableKTableJoinTest {
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
+ private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
private File stateDir = null;
@Rule
public final KStreamTestDriver driver = new KStreamTestDriver();
@@ -165,8 +167,8 @@ public class KTableKTableJoinTest {
final MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
- table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
- table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+ table1 = builder.table(topic1, consumed);
+ table2 = builder.table(topic2, consumed);
joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
joined.toStream().process(processor);
@@ -186,8 +188,8 @@ public class KTableKTableJoinTest {
final MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
- table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
- table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+ table1 = builder.table(topic1, consumed);
+ table2 = builder.table(topic2, consumed);
joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Serdes.String(), "anyQueryableName");
joined.toStream().process(processor);
@@ -285,8 +287,8 @@ public class KTableKTableJoinTest {
final KTable<Integer, String> joined;
final MockProcessorSupplier<Integer, String> proc;
- table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
- table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+ table1 = builder.table(topic1, consumed);
+ table2 = builder.table(topic2, consumed);
joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
proc = new MockProcessorSupplier<>();
builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
@@ -306,8 +308,8 @@ public class KTableKTableJoinTest {
final KTable<Integer, String> joined;
final MockProcessorSupplier<Integer, String> proc;
- table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
- table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+ table1 = builder.table(topic1, consumed);
+ table2 = builder.table(topic2, consumed);
joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Serdes.String(), "anyQueryableName");
proc = new MockProcessorSupplier<>();
builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
@@ -327,8 +329,8 @@ public class KTableKTableJoinTest {
final KTable<Integer, String> joined;
final MockProcessorSupplier<Integer, String> proc;
- table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
- table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+ table1 = builder.table(topic1, consumed);
+ table2 = builder.table(topic2, consumed);
joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
proc = new MockProcessorSupplier<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 9cdc782..ca0c81c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
@@ -60,6 +61,7 @@ public class KTableKTableLeftJoinTest {
private File stateDir = null;
@Rule
public final KStreamTestDriver driver = new KStreamTestDriver();
+ private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
@Before
public void setUp() throws IOException {
@@ -72,8 +74,8 @@ public class KTableKTableLeftJoinTest {
final int[] expectedKeys = new int[]{0, 1, 2, 3};
- KTable<Integer, String> table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
- KTable<Integer, String> table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+ KTable<Integer, String> table1 = builder.table(topic1, consumed);
+ KTable<Integer, String> table2 = builder.table(topic2, consumed);
KTable<Integer, String> joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
@@ -171,8 +173,8 @@ public class KTableKTableLeftJoinTest {
final KTable<Integer, String> joined;
final MockProcessorSupplier<Integer, String> proc;
- table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
- table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+ table1 = builder.table(topic1, consumed);
+ table2 = builder.table(topic2, consumed);
joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
proc = new MockProcessorSupplier<>();
@@ -252,8 +254,8 @@ public class KTableKTableLeftJoinTest {
KTable<Integer, String> joined;
MockProcessorSupplier<Integer, String> proc;
- table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
- table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+ table1 = builder.table(topic1, consumed);
+ table2 = builder.table(topic2, consumed);
joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
@@ -341,7 +343,8 @@ public class KTableKTableLeftJoinTest {
final String[] inputs = {agg, tableOne, tableTwo, tableThree, tableFour, tableFive, tableSix};
final StreamsBuilder builder = new StreamsBuilder();
- final KTable<Long, String> aggTable = builder.table(Serdes.Long(), Serdes.String(), agg, agg)
+ final Consumed<Long, String> consumed = Consumed.with(Serdes.Long(), Serdes.String());
+ final KTable<Long, String> aggTable = builder.table(agg, consumed)
.groupBy(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
@Override
public KeyValue<Long, String> apply(final Long key, final String value) {
@@ -349,12 +352,12 @@ public class KTableKTableLeftJoinTest {
}
}, Serialized.with(Serdes.Long(), Serdes.String())).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_ADDER, "agg-store");
- final KTable<Long, String> one = builder.table(Serdes.Long(), Serdes.String(), tableOne, tableOne);
- final KTable<Long, String> two = builder.table(Serdes.Long(), Serdes.String(), tableTwo, tableTwo);
- final KTable<Long, String> three = builder.table(Serdes.Long(), Serdes.String(), tableThree, tableThree);
- final KTable<Long, String> four = builder.table(Serdes.Long(), Serdes.String(), tableFour, tableFour);
- final KTable<Long, String> five = builder.table(Serdes.Long(), Serdes.String(), tableFive, tableFive);
- final KTable<Long, String> six = builder.table(Serdes.Long(), Serdes.String(), tableSix, tableSix);
+ final KTable<Long, String> one = builder.table(tableOne, consumed);
+ final KTable<Long, String> two = builder.table(tableTwo, consumed);
+ final KTable<Long, String> three = builder.table(tableThree, consumed);
+ final KTable<Long, String> four = builder.table(tableFour, consumed);
+ final KTable<Long, String> five = builder.table(tableFive, consumed);
+ final KTable<Long, String> six = builder.table(tableSix, consumed);
final ValueMapper<String, String> mapper = new ValueMapper<String, String>() {
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 368a3ea..d6ab613 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
@@ -54,6 +55,7 @@ public class KTableKTableOuterJoinTest {
private File stateDir = null;
@Rule
public final KStreamTestDriver driver = new KStreamTestDriver();
+ private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
@Before
public void setUp() throws IOException {
@@ -72,8 +74,8 @@ public class KTableKTableOuterJoinTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
- table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
- table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+ table1 = builder.table(topic1, consumed);
+ table2 = builder.table(topic2, consumed);
joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
joined.toStream().process(processor);
@@ -176,8 +178,8 @@ public class KTableKTableOuterJoinTest {
KTable<Integer, String> joined;
MockProcessorSupplier<Integer, String> proc;
- table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
- table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+ table1 = builder.table(topic1, consumed);
+ table2 = builder.table(topic2, consumed);
joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
proc = new MockProcessorSupplier<>();
@@ -264,8 +266,8 @@ public class KTableKTableOuterJoinTest {
KTable<Integer, String> joined;
MockProcessorSupplier<Integer, String> proc;
- table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
- table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+ table1 = builder.table(topic1, consumed);
+ table2 = builder.table(topic2, consumed);
joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index 756404f..81797cb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
@@ -57,7 +58,7 @@ public class KTableMapKeysTest {
String topic1 = "topic_map_keys";
- KTable<Integer, String> table1 = builder.table(integerSerde, stringSerde, topic1, "anyStoreName");
+ KTable<Integer, String> table1 = builder.table(topic1, Consumed.with(integerSerde, stringSerde));
final Map<Integer, String> keyMap = new HashMap<>();
keyMap.put(1, "ONE");
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 4bfaea6..5d92846 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
@@ -43,6 +44,7 @@ import static org.junit.Assert.assertTrue;
public class KTableMapValuesTest {
final private Serde<String> stringSerde = Serdes.String();
+ private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
@Rule
public final KStreamTestDriver driver = new KStreamTestDriver();
private File stateDir = null;
@@ -69,7 +71,7 @@ public class KTableMapValuesTest {
String topic1 = "topic1";
- KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+ KTable<String, String> table1 = builder.table(topic1, consumed);
KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
@Override
public Integer apply(CharSequence value) {
@@ -89,7 +91,7 @@ public class KTableMapValuesTest {
String topic1 = "topic1";
- KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+ KTable<String, String> table1 = builder.table(topic1, consumed);
KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
@Override
public Integer apply(CharSequence value) {
@@ -210,11 +212,10 @@ public class KTableMapValuesTest {
String topic1 = "topic1";
String topic2 = "topic2";
- String storeName1 = "storeName1";
String storeName2 = "storeName2";
KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
+ (KTableImpl<String, String, String>) builder.table(topic1, consumed);
KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@Override
@@ -241,11 +242,10 @@ public class KTableMapValuesTest {
String topic1 = "topic1";
String topic2 = "topic2";
- String storeName1 = "storeName1";
String storeName2 = "storeName2";
KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
+ (KTableImpl<String, String, String>) builder.table(topic1, consumed);
KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@Override
@@ -273,7 +273,7 @@ public class KTableMapValuesTest {
String topic1 = "topic1";
KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+ (KTableImpl<String, String, String>) builder.table(topic1, consumed);
KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@Override
@@ -321,7 +321,7 @@ public class KTableMapValuesTest {
String topic1 = "topic1";
KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+ (KTableImpl<String, String, String>) builder.table(topic1, consumed);
KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 3f8a6b2..35a3dbd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.KStreamTestDriver;
@@ -37,6 +38,7 @@ import static org.junit.Assert.assertTrue;
public class KTableSourceTest {
final private Serde<String> stringSerde = Serdes.String();
+ private final Consumed<String, String> stringConsumed = Consumed.with(stringSerde, stringSerde);
final private Serde<Integer> intSerde = Serdes.Integer();
@Rule
public final KStreamTestDriver driver = new KStreamTestDriver();
@@ -53,7 +55,7 @@ public class KTableSourceTest {
String topic1 = "topic1";
- KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+ KTable<String, Integer> table1 = builder.table(topic1, Consumed.with(stringSerde, intSerde));
MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
table1.toStream().process(proc1);
@@ -77,7 +79,7 @@ public class KTableSourceTest {
String topic1 = "topic1";
- KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+ KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed);
KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
@@ -121,7 +123,7 @@ public class KTableSourceTest {
String topic1 = "topic1";
- KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+ KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed);
MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
@@ -159,7 +161,7 @@ public class KTableSourceTest {
String topic1 = "topic1";
- KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+ KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed);
table1.enableSendingOldValues();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 3cbceeb..1e4afcd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -45,6 +45,7 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.TestUtils;
@@ -686,7 +687,7 @@ public class SimpleBenchmark {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, byte[]> input1 = builder.stream(kStreamTopic);
- final KTable<Long, byte[]> input2 = builder.table(kTableTopic, kTableTopic + "-store");
+ final KTable<Long, byte[]> input2 = builder.table(kTableTopic);
input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
@@ -697,8 +698,8 @@ public class SimpleBenchmark {
String kTableTopic2, final CountDownLatch latch) {
final StreamsBuilder builder = new StreamsBuilder();
- final KTable<Long, byte[]> input1 = builder.table(kTableTopic1, kTableTopic1 + "-store");
- final KTable<Long, byte[]> input2 = builder.table(kTableTopic2, kTableTopic2 + "-store");
+ final KTable<Long, byte[]> input1 = builder.table(kTableTopic1);
+ final KTable<Long, byte[]> input2 = builder.table(kTableTopic2);
input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
@@ -725,10 +726,12 @@ public class SimpleBenchmark {
StreamsBuilder builder = new StreamsBuilder();
+ final StoreBuilder<KeyValueStore<Integer, byte[]>> storeBuilder
+ = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), Serdes.Integer(), Serdes.ByteArray());
if (enableCaching) {
- builder.addStateStore(Stores.create("store").withIntegerKeys().withByteArrayValues().persistent().enableCaching().build());
+ builder.addStateStore(storeBuilder.withCachingEnabled());
} else {
- builder.addStateStore(Stores.create("store").withIntegerKeys().withByteArrayValues().persistent().build());
+ builder.addStateStore(storeBuilder);
}
KStream<Integer, byte[]> source = builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE));
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index 0a10f9f..d98fd7f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -286,8 +286,7 @@ public class YahooBenchmark {
final KStream<String, ProjectedEvent> kEvents = builder.stream(eventsTopic,
Consumed.with(Serdes.String(),
Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer)));
- final KTable<String, String> kCampaigns = builder.table(Serdes.String(), Serdes.String(),
- campaignsTopic, "campaign-state");
+ final KTable<String, String> kCampaigns = builder.table(campaignsTopic, Consumed.with(Serdes.String(), Serdes.String()));
KStream<String, ProjectedEvent> filteredEvents = kEvents
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index b22c488..f3dbb32 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -22,14 +22,18 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.junit.Before;
import org.junit.Test;
@@ -88,7 +92,9 @@ public class StreamsMetadataStateTest {
}
});
- builder.globalTable("global-topic", "global-table");
+ builder.globalTable("global-topic",
+ Consumed.with(null, null),
+ Materialized.<Object, Object, KeyValueStore<Bytes, byte[]>>as(globalTable));
StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("appId");
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index c4e108d..4b75702 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -105,9 +105,9 @@ public class SmokeTestClient extends SmokeTestUtil {
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.ACKS_CONFIG, "all");
-
StreamsBuilder builder = new StreamsBuilder();
- KStream<String, Integer> source = builder.stream("data", Consumed.with(stringSerde, intSerde));
+ Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
+ KStream<String, Integer> source = builder.stream("data", stringIntConsumed);
source.to(stringSerde, intSerde, "echo");
KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
@Override
@@ -141,7 +141,7 @@ public class SmokeTestClient extends SmokeTestUtil {
new Unwindow<String, Integer>()
).to(stringSerde, intSerde, "min");
- KTable<String, Integer> minTable = builder.table(stringSerde, intSerde, "min", "minStoreName");
+ KTable<String, Integer> minTable = builder.table("min", stringIntConsumed);
minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min"));
// max
@@ -163,7 +163,7 @@ public class SmokeTestClient extends SmokeTestUtil {
new Unwindow<String, Integer>()
).to(stringSerde, intSerde, "max");
- KTable<String, Integer> maxTable = builder.table(stringSerde, intSerde, "max", "maxStoreName");
+ KTable<String, Integer> maxTable = builder.table("max", stringIntConsumed);
maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max"));
// sum
@@ -186,7 +186,8 @@ public class SmokeTestClient extends SmokeTestUtil {
).to(stringSerde, longSerde, "sum");
- KTable<String, Long> sumTable = builder.table(stringSerde, longSerde, "sum", "sumStoreName");
+ Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde);
+ KTable<String, Long> sumTable = builder.table("sum", stringLongConsumed);
sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum"));
// cnt
@@ -195,7 +196,7 @@ public class SmokeTestClient extends SmokeTestUtil {
new Unwindow<String, Long>()
).to(stringSerde, longSerde, "cnt");
- KTable<String, Long> cntTable = builder.table(stringSerde, longSerde, "cnt", "cntStoreName");
+ KTable<String, Long> cntTable = builder.table("cnt", stringLongConsumed);
cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt"));
// dif
[2/2] kafka git commit: KAFKA-5873;
add materialized overloads to StreamsBuilder
Posted by da...@apache.org.
KAFKA-5873; add materialized overloads to StreamsBuilder
Add overloads for `table` and `globalTable` that use `Materialized`
Author: Damian Guy <da...@gmail.com>
Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #3837 from dguy/kafka-5873
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f2b74aa1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f2b74aa1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f2b74aa1
Branch: refs/heads/trunk
Commit: f2b74aa1c36bf2882006c14f7cbd56b493f39d26
Parents: 52d7b67
Author: Damian Guy <da...@gmail.com>
Authored: Mon Sep 18 15:53:44 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Mon Sep 18 15:53:44 2017 +0100
----------------------------------------------------------------------
.../examples/pageview/PageViewTypedDemo.java | 7 +-
.../examples/pageview/PageViewUntypedDemo.java | 9 +-
.../apache/kafka/streams/StreamsBuilder.java | 700 +++----------------
.../java/org/apache/kafka/streams/Topology.java | 2 +-
.../internals/InternalStreamsBuilder.java | 152 ++--
.../streams/kstream/internals/KTableImpl.java | 9 +-
.../kstream/internals/MaterializedInternal.java | 7 +-
.../internals/InternalTopologyBuilder.java | 3 +-
.../apache/kafka/streams/KafkaStreamsTest.java | 3 +-
.../streams/integration/EosIntegrationTest.java | 12 +-
.../GlobalKTableIntegrationTest.java | 13 +-
.../integration/JoinIntegrationTest.java | 4 +-
.../KStreamKTableJoinIntegrationTest.java | 3 +-
.../KTableKTableJoinIntegrationTest.java | 6 +-
.../integration/RestoreIntegrationTest.java | 3 +-
.../internals/GlobalKTableJoinsTest.java | 5 +-
.../internals/InternalStreamsBuilderTest.java | 87 ++-
.../internals/KGroupedTableImplTest.java | 25 +-
.../kstream/internals/KStreamImplTest.java | 11 +-
.../internals/KStreamKStreamLeftJoinTest.java | 2 +-
.../internals/KStreamKTableJoinTest.java | 7 +-
.../internals/KStreamKTableLeftJoinTest.java | 5 +-
.../kstream/internals/KTableAggregateTest.java | 22 +-
.../kstream/internals/KTableFilterTest.java | 28 +-
.../kstream/internals/KTableForeachTest.java | 12 +-
.../kstream/internals/KTableImplTest.java | 29 +-
.../kstream/internals/KTableKTableJoinTest.java | 22 +-
.../internals/KTableKTableLeftJoinTest.java | 29 +-
.../internals/KTableKTableOuterJoinTest.java | 14 +-
.../kstream/internals/KTableMapKeysTest.java | 3 +-
.../kstream/internals/KTableMapValuesTest.java | 16 +-
.../kstream/internals/KTableSourceTest.java | 10 +-
.../kafka/streams/perf/SimpleBenchmark.java | 13 +-
.../kafka/streams/perf/YahooBenchmark.java | 3 +-
.../internals/StreamsMetadataStateTest.java | 8 +-
.../kafka/streams/tests/SmokeTestClient.java | 13 +-
36 files changed, 443 insertions(+), 854 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 72f9be8..068eece 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -145,8 +146,8 @@ public class PageViewTypedDemo {
KStream<String, PageView> views = builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), pageViewSerde));
- KTable<String, UserProfile> users = builder.table(Serdes.String(), userProfileSerde,
- "streams-userprofile-input", "streams-userprofile-store-name");
+ KTable<String, UserProfile> users = builder.table("streams-userprofile-input",
+ Consumed.with(Serdes.String(), userProfileSerde));
KStream<WindowedPageViewByRegion, RegionCount> regionCount = views
.leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
@@ -190,7 +191,7 @@ public class PageViewTypedDemo {
});
// write to the result topic
- regionCount.to(wPageViewByRegionSerde, regionCountSerde, "streams-pageviewstats-typed-output");
+ regionCount.to("streams-pageviewstats-typed-output", Produced.with(wPageViewByRegionSerde, regionCountSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index e8787af..c20c077 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -73,10 +74,10 @@ public class PageViewUntypedDemo {
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
- KStream<String, JsonNode> views = builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), jsonSerde));
+ final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);
+ KStream<String, JsonNode> views = builder.stream("streams-pageview-input", consumed);
- KTable<String, JsonNode> users = builder.table(Serdes.String(), jsonSerde,
- "streams-userprofile-input", "streams-userprofile-store-name");
+ KTable<String, JsonNode> users = builder.table("streams-userprofile-input", consumed);
KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() {
@Override
@@ -121,7 +122,7 @@ public class PageViewUntypedDemo {
});
// write to the result topic
- regionCount.to(jsonSerde, jsonSerde, "streams-pageviewstats-untyped-output");
+ regionCount.to("streams-pageviewstats-untyped-output", Produced.with(jsonSerde, jsonSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index a26822a..a272ec4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -16,28 +16,30 @@
*/
package org.apache.kafka.streams;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Collection;
import java.util.Collections;
+import java.util.Objects;
import java.util.regex.Pattern;
/**
@@ -129,6 +131,8 @@ public class StreamsBuilder {
*/
public synchronized <K, V> KStream<K, V> stream(final Collection<String> topics,
final Consumed<K, V> consumed) {
+ Objects.requireNonNull(topics, "topics can't be null");
+ Objects.requireNonNull(consumed, "consumed can't be null");
return internalStreamsBuilder.stream(topics, new ConsumedInternal<>(consumed));
}
@@ -170,6 +174,8 @@ public class StreamsBuilder {
*/
public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern,
final Consumed<K, V> consumed) {
+ Objects.requireNonNull(topicPattern, "topicPattern can't be null");
+ Objects.requireNonNull(consumed, "consumed can't be null");
return internalStreamsBuilder.stream(topicPattern, new ConsumedInternal<>(consumed));
}
@@ -182,11 +188,17 @@ public class StreamsBuilder {
* Note that the specified input topic must be partitioned by key.
* If this is not the case the returned {@link KTable} will be corrupted.
* <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} using the given
+ * {@code Materialized} instance.
* However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
+ * You should only specify serdes in the {@link Consumed} instance as these will also be used to overwrite the
+ * serdes in {@link Materialized}, i.e.,
+ * <pre> {@code
+ * streamBuilder.table(topic, Consumed.with(Serde.String(), Serde.String(), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName))
+ * }
+ * </pre>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
@@ -199,46 +211,20 @@ public class StreamsBuilder {
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
* @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; if {@code null} this is the equivalent of {@link #table(String)}
+ * @param consumed the instance of {@link Consumed} used to define optional parameters; cannot be {@code null}
+ * @param materialized the instance of {@link Materialized} used to materialize a state store; cannot be {@code null}
* @return a {@link KTable} for the specified topic
*/
public synchronized <K, V> KTable<K, V> table(final String topic,
- final String queryableStoreName) {
- return internalStreamsBuilder.table(topic, new ConsumedInternal<K, V>(), queryableStoreName);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
- * {@link StreamsConfig config} are used.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param topic the topic name; cannot be {@code null}
- * @param storeSupplier user defined state store supplier; cannot be {@code null}
- * @return a {@link KTable} for the specified topic
- */
- public synchronized <K, V> KTable<K, V> table(final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
- return internalStreamsBuilder.table(null, null, null, null, topic, storeSupplier);
+ final Consumed<K, V> consumed,
+ final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+ Objects.requireNonNull(topic, "topic can't be null");
+ Objects.requireNonNull(consumed, "consumed can't be null");
+ Objects.requireNonNull(materialized, "materialized can't be null");
+ materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
+ return internalStreamsBuilder.table(topic,
+ new ConsumedInternal<>(consumed),
+ new MaterializedInternal<>(materialized));
}
/**
@@ -259,7 +245,7 @@ public class StreamsBuilder {
* @return a {@link KTable} for the specified topic
*/
public synchronized <K, V> KTable<K, V> table(final String topic) {
- return internalStreamsBuilder.table(topic, new ConsumedInternal<K, V>(), null);
+ return table(topic, new ConsumedInternal<K, V>());
}
/**
@@ -277,450 +263,47 @@ public class StreamsBuilder {
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
*
* @param topic the topic name; cannot be {@code null}
- * @param consumed the instance of {@link Consumed} used to define optional parameters
+ * @param consumed the instance of {@link Consumed} used to define optional parameters; cannot be {@code null}
* @return a {@link KTable} for the specified topic
*/
public synchronized <K, V> KTable<K, V> table(final String topic,
final Consumed<K, V> consumed) {
- return internalStreamsBuilder.table(topic, new ConsumedInternal<>(consumed), null);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
- * offsets are available
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; if {@code null} this is the equivalent of
- * {@link #table(String, Consumed)}
- * @return a {@link KTable} for the specified topic
- */
- public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
- final String topic,
- final String queryableStoreName) {
- return internalStreamsBuilder.table(topic, new ConsumedInternal<>(Consumed.<K, V>with(offsetReset)), queryableStoreName);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default {@link TimestampExtractor} and default key and value deserializers
- * as specified in the {@link StreamsConfig config} are used.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topic must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
- * offsets are available
- * @param topic the topic name; cannot be {@code null}
- * @param storeSupplier user defined state store supplier; cannot be {@code null}
- * @return a {@link KTable} for the specified topic
- */
- public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
- final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
- return internalStreamsBuilder.table(offsetReset, null, null, null, topic, storeSupplier);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default {@code "auto.offset.reset"} strategy and default key and value deserializers
- * as specified in the {@link StreamsConfig config} are used.
- * Input {@link KeyValue} pairs with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topic must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code storeName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
- * if not specified the default extractor defined in the configs will be used
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
- * @return a {@link KTable} for the specified topic
- */
- public synchronized <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor,
- final String topic,
- final String queryableStoreName) {
- return internalStreamsBuilder.table(topic, new ConsumedInternal<>(Consumed.<K, V>with(timestampExtractor)), queryableStoreName);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
- * Input {@link KeyValue} pairs with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topic must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code storeName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
- * offsets are available
- * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
- * if not specified the default extractor defined in the configs will be used
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
- * @return a {@link KTable} for the specified topic
- */
- public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
- final TimestampExtractor timestampExtractor,
- final String topic,
- final String queryableStoreName) {
- final Consumed<K, V> consumed = Consumed.<K, V>with(offsetReset).withTimestampExtractor(timestampExtractor);
- return internalStreamsBuilder.table(topic, new ConsumedInternal<>(consumed), queryableStoreName);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor}
- * as specified in the {@link StreamsConfig config} are used.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topic must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valueSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
- * @return a {@link KTable} for the specified topic
- */
- public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde,
- final Serde<V> valueSerde,
- final String topic,
- final String queryableStoreName) {
+ Objects.requireNonNull(topic, "topic can't be null");
+ Objects.requireNonNull(consumed, "consumed can't be null");
return internalStreamsBuilder.table(topic,
- new ConsumedInternal<>(keySerde, valueSerde, null, null),
- queryableStoreName);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
- * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topic must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valueSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @param storeSupplier user defined state store supplier; cannot be {@code null}
- * @return a {@link KTable} for the specified topic
- */
- public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde,
- final Serde<V> valueSerde,
- final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
- return internalStreamsBuilder.table(null, null, keySerde, valueSerde, topic, storeSupplier);
+ new ConsumedInternal<>(consumed),
+ new MaterializedInternal<>(
+ Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(
+ internalStreamsBuilder.newStoreName(topic))
+ .withKeySerde(consumed.keySerde)
+ .withValueSerde(consumed.valueSerde),
+ false));
}
/**
* Create a {@link KTable} for the specified topic.
+ * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
+ * {@link StreamsConfig config} are used.
* Input {@link KeyValue records} with {@code null} key will be dropped.
* <p>
* Note that the specified input topics must be partitioned by key.
* If this is not the case the returned {@link KTable} will be corrupted.
* <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid
- * committed offsets are available
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valueSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
- * @return a {@link KTable} for the specified topic
- */
- public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
- final Serde<K> keySerde,
- final Serde<V> valueSerde,
- final String topic,
- final String queryableStoreName) {
- final ConsumedInternal<K, V> consumed = new ConsumedInternal<>(keySerde, valueSerde, null, offsetReset);
- return internalStreamsBuilder.table(topic, consumed, queryableStoreName);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
- * Input {@link KeyValue} pairs with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topic must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code storeName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
- * if not specified the default extractor defined in the configs will be used
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valueSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
- * @return a {@link KTable} for the specified topic
- */
- public synchronized <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor,
- final Serde<K> keySerde,
- final Serde<V> valueSerde,
- final String topic,
- final String queryableStoreName) {
- final ConsumedInternal<K, V> consumed = new ConsumedInternal<>(keySerde, valueSerde, timestampExtractor, null);
- return internalStreamsBuilder.table(topic, consumed, queryableStoreName);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * Input {@link KeyValue} pairs with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topic must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code storeName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+ * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} using the {@link Materialized} instance.
+ * No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid
- * committed offsets are available
- * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
- * if not specified the default extractor defined in the configs will be used
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valueSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+ * @param topic the topic name; cannot be {@code null}
+ * @param materialized the instance of {@link Materialized} used to materialize a state store; cannot be {@code null}
* @return a {@link KTable} for the specified topic
*/
- public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
- final TimestampExtractor timestampExtractor,
- final Serde<K> keySerde,
- final Serde<V> valueSerde,
- final String topic,
- final String queryableStoreName) {
+ public synchronized <K, V> KTable<K, V> table(final String topic,
+ final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+ Objects.requireNonNull(topic, "topic can't be null");
+ Objects.requireNonNull(materialized, "materialized can't be null");
return internalStreamsBuilder.table(topic,
- new ConsumedInternal<>(keySerde, valueSerde, timestampExtractor, offsetReset),
- queryableStoreName);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
- * offsets are available
- * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
- * if not specified the default extractor defined in the configs will be used
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valueSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @param storeSupplier user defined state store supplier; cannot be {@code null}
- * @return a {@link KTable} for the specified topic
- */
- public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
- final TimestampExtractor timestampExtractor,
- final Serde<K> keySerde,
- final Serde<V> valueSerde,
- final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
- return internalStreamsBuilder.table(offsetReset, timestampExtractor, keySerde, valueSerde, topic, storeSupplier);
- }
-
- /**
- * Create a {@link GlobalKTable} for the specified topic.
- * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key);
- * }</pre>
- * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
- * regardless of the specified value in {@link StreamsConfig}.
- *
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
- * @return a {@link GlobalKTable} for the specified topic
- */
- public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
- final String queryableStoreName) {
- return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<K, V>(), queryableStoreName);
+ new ConsumedInternal<K, V>(),
+ new MaterializedInternal<>(materialized));
}
/**
@@ -741,7 +324,15 @@ public class StreamsBuilder {
*/
public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
final Consumed<K, V> consumed) {
- return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(consumed), null);
+ Objects.requireNonNull(topic, "topic can't be null");
+ Objects.requireNonNull(consumed, "consumed can't be null");
+ final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized =
+ new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(
+ internalStreamsBuilder.newStoreName(topic))
+ .withKeySerde(consumed.keySerde)
+ .withValueSerde(consumed.valueSerde),
+ false);
+ return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(consumed), materialized);
}
/**
@@ -761,61 +352,25 @@ public class StreamsBuilder {
* @return a {@link GlobalKTable} for the specified topic
*/
public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic) {
- return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<K, V>(), null);
+ return globalTable(topic, Consumed.<K, V>with(null, null));
}
/**
* Create a {@link GlobalKTable} for the specified topic.
- * The default {@link TimestampExtractor} and default key and value deserializers as specified in
- * the {@link StreamsConfig config} are used.
- * Input {@link KeyValue} pairs with {@code null} key will be dropped.
- * <p>
- * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key);
- * }</pre>
- * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
- * regardless of the specified value in {@link StreamsConfig}.
*
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valueSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
- * if not specified the default extractor defined in the configs will be used
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
- * @return a {@link GlobalKTable} for the specified topic
- */
- public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
- final Serde<V> valueSerde,
- final TimestampExtractor timestampExtractor,
- final String topic,
- final String queryableStoreName) {
- return internalStreamsBuilder.globalTable(topic,
- new ConsumedInternal<>(keySerde, valueSerde, timestampExtractor, null),
- queryableStoreName);
- }
-
- /**
- * Create a {@link GlobalKTable} for the specified topic.
- * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
* Input {@link KeyValue} pairs with {@code null} key will be dropped.
* <p>
- * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
+ * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} configured with
+ * the provided instance of {@link Materialized}.
* However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
+ * You should only specify serdes in the {@link Consumed} instance as these will also be used to overwrite the
+ * serdes in {@link Materialized}, i.e.,
+ * <pre> {@code
+ * streamBuilder.globalTable(topic, Consumed.with(Serde.String(), Serde.String(), Materialized.<String, String, KeyValueStore<Bytes, byte[]>as(storeName))
+ * }
+ * </pre>
* To query the local {@link KeyValueStore} it must be obtained via
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
@@ -827,28 +382,31 @@ public class StreamsBuilder {
* Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
* regardless of the specified value in {@link StreamsConfig}.
*
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valueSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
* @param topic the topic name; cannot be {@code null}
- * @param storeSupplier user defined state store supplier; Cannot be {@code null}
+ * @param consumed the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
+ * @param materialized the instance of {@link Materialized} used to materialize a state store; cannot be {@code null}
* @return a {@link GlobalKTable} for the specified topic
*/
- public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
- final Serde<V> valueSerde,
- final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
- return internalStreamsBuilder.globalTable(keySerde, valueSerde, topic, storeSupplier);
+ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
+ final Consumed<K, V> consumed,
+ final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+ Objects.requireNonNull(topic, "topic can't be null");
+ Objects.requireNonNull(consumed, "consumed can't be null");
+ Objects.requireNonNull(materialized, "materialized can't be null");
+ // always use the serdes from consumed
+ materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
+ return internalStreamsBuilder.globalTable(topic,
+ new ConsumedInternal<>(consumed),
+ new MaterializedInternal<>(materialized));
}
/**
* Create a {@link GlobalKTable} for the specified topic.
- * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+ *
* Input {@link KeyValue} pairs with {@code null} key will be dropped.
* <p>
- * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
+ * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} configured with
+ * the provided instance of {@link Materialized}.
* However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p>
@@ -863,34 +421,30 @@ public class StreamsBuilder {
* Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
* regardless of the specified value in {@link StreamsConfig}.
*
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valueSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; if {@code null} an internal store name will be automatically given
+ * @param topic the topic name; cannot be {@code null}
+ * @param materialized the instance of {@link Materialized} used to materialize a state store; cannot be {@code null}
* @return a {@link GlobalKTable} for the specified topic
*/
- public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
- final Serde<V> valueSerde,
- final String topic,
- final String queryableStoreName) {
+ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
+ final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+ Objects.requireNonNull(topic, "topic can't be null");
+ Objects.requireNonNull(materialized, "materialized can't be null");
return internalStreamsBuilder.globalTable(topic,
- new ConsumedInternal<>(Consumed.with(keySerde, valueSerde)),
- queryableStoreName);
+ new ConsumedInternal<K, V>(),
+ new MaterializedInternal<>(materialized));
}
+
/**
* Adds a state store to the underlying {@link Topology}.
*
- * @param supplier the supplier used to obtain this state store {@link StateStore} instance
- * @param processorNames the names of the processors that should be able to access the provided store
+ * @param builder the builder used to obtain this state store {@link StateStore} instance
* @return itself
* @throws TopologyException if state store supplier is already added
*/
- public synchronized StreamsBuilder addStateStore(final StateStoreSupplier supplier,
- final String... processorNames) {
- internalStreamsBuilder.addStateStore(supplier, processorNames);
+ public synchronized StreamsBuilder addStateStore(final StoreBuilder builder) {
+ Objects.requireNonNull(builder, "builder can't be null");
+ internalStreamsBuilder.addStateStore(builder);
return this;
}
@@ -907,62 +461,30 @@ public class StreamsBuilder {
* This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
* The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
*
- * @param storeSupplier user defined state store supplier
+ * @param storeBuilder user defined {@link StoreBuilder}; can't be {@code null}
* @param sourceName name of the {@link SourceNode} that will be automatically added
- * @param keyDeserializer the {@link Deserializer} to deserialize keys with
- * @param valueDeserializer the {@link Deserializer} to deserialize values with
* @param topic the topic to source the data from
+ * @param consumed the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
* @param processorName the name of the {@link ProcessorSupplier}
* @param stateUpdateSupplier the instance of {@link ProcessorSupplier}
* @return itself
* @throws TopologyException if the processor of state is already registered
*/
- public synchronized StreamsBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
- final String sourceName,
- final Deserializer keyDeserializer,
- final Deserializer valueDeserializer,
+ @SuppressWarnings("unchecked")
+ public synchronized StreamsBuilder addGlobalStore(final StoreBuilder storeBuilder,
final String topic,
- final String processorName,
- final ProcessorSupplier stateUpdateSupplier) {
- internalStreamsBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer,
- valueDeserializer, topic, processorName, stateUpdateSupplier);
- return this;
- }
-
- /**
- * Adds a global {@link StateStore} to the topology.
- * The {@link StateStore} sources its data from all partitions of the provided input topic.
- * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
- * <p>
- * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
- * of the input topic.
- * <p>
- * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
- * records forwarded from the {@link SourceNode}.
- * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
- *
- * @param storeSupplier user defined state store supplier
- * @param sourceName name of the {@link SourceNode} that will be automatically added
- * @param timestampExtractor the stateless timestamp extractor used for this source,
- * if not specified the default extractor defined in the configs will be used
- * @param keyDeserializer the {@link Deserializer} to deserialize keys with
- * @param valueDeserializer the {@link Deserializer} to deserialize values with
- * @param topic the topic to source the data from
- * @param processorName the name of the {@link ProcessorSupplier}
- * @param stateUpdateSupplier the instance of {@link ProcessorSupplier}
- * @return itself
- * @throws TopologyException if the processor of state is already registered
- */
- public synchronized StreamsBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
final String sourceName,
- final TimestampExtractor timestampExtractor,
- final Deserializer keyDeserializer,
- final Deserializer valueDeserializer,
- final String topic,
+ final Consumed consumed,
final String processorName,
final ProcessorSupplier stateUpdateSupplier) {
- internalStreamsBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer,
- valueDeserializer, topic, processorName, stateUpdateSupplier);
+ Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
+ Objects.requireNonNull(consumed, "consumed can't be null");
+ internalStreamsBuilder.addGlobalStore(storeBuilder,
+ sourceName,
+ topic,
+ new ConsumedInternal<>(consumed),
+ processorName,
+ stateUpdateSupplier);
return this;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/Topology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 386aacf..85d769f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -571,7 +571,7 @@ public class Topology {
* @return itself
* @throws TopologyException if the processor of state is already registered
*/
- public synchronized Topology addGlobalStore(final KeyValueStoreBuilder storeBuilder,
+ public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder,
final String sourceName,
final Deserializer keyDeserializer,
final Deserializer valueDeserializer,
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 4da9906..3963657 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -17,17 +17,15 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.Topology;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
+import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Collection;
import java.util.Collections;
@@ -72,37 +70,55 @@ public class InternalStreamsBuilder {
return new KStreamImpl<>(this, name, Collections.singleton(name), false);
}
- @SuppressWarnings("unchecked")
public <K, V> KTable<K, V> table(final String topic,
final ConsumedInternal<K, V> consumed,
- final String queryableStoreName) {
- final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
- final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(internalStoreName,
- consumed.keySerde(),
- consumed.valueSerde(),
- false,
- Collections.<String, String>emptyMap(),
- true);
- return doTable(consumed, topic, storeSupplier, queryableStoreName != null);
- }
-
- public <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
- final TimestampExtractor timestampExtractor,
- final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic,
final StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
- return doTable(new ConsumedInternal<>(keySerde, valSerde, timestampExtractor, offsetReset), topic, storeSupplier, true);
+ final String source = newName(KStreamImpl.SOURCE_NAME);
+ final String name = newName(KTableImpl.SOURCE_NAME);
+
+ final KTable<K, V> kTable = createKTable(consumed,
+ topic,
+ storeSupplier.name(),
+ true,
+ source,
+ name);
+
+ internalTopologyBuilder.addStateStore(storeSupplier, name);
+ internalTopologyBuilder.connectSourceStoreAndTopic(storeSupplier.name(), topic);
+
+ return kTable;
}
- private <K, V> KTable<K, V> doTable(final ConsumedInternal<K, V> consumed,
- final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier,
- final boolean isQueryable) {
+ @SuppressWarnings("unchecked")
+ public <K, V> KTable<K, V> table(final String topic,
+ final ConsumedInternal<K, V> consumed,
+ final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+ final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize();
+
final String source = newName(KStreamImpl.SOURCE_NAME);
final String name = newName(KTableImpl.SOURCE_NAME);
- final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());
+ final KTable<K, V> kTable = createKTable(consumed,
+ topic,
+ storeBuilder.name(),
+ materialized.isQueryable(),
+ source,
+ name);
+
+ internalTopologyBuilder.addStateStore(storeBuilder, name);
+ internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic);
+
+ return kTable;
+ }
+
+
+ private <K, V> KTable<K, V> createKTable(final ConsumedInternal<K, V> consumed,
+ final String topic,
+ final String storeName,
+ final boolean isQueryable,
+ final String source,
+ final String name) {
+ final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeName);
internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
source,
@@ -112,48 +128,27 @@ public class InternalStreamsBuilder {
topic);
internalTopologyBuilder.addProcessor(name, processorSupplier, source);
- final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier,
- consumed.keySerde(), consumed.valueSerde(), Collections.singleton(source), storeSupplier.name(), isQueryable);
-
- internalTopologyBuilder.addStateStore(storeSupplier, name);
- internalTopologyBuilder.connectSourceStoreAndTopic(storeSupplier.name(), topic);
-
- return kTable;
+ return new KTableImpl<>(this, name, processorSupplier,
+ consumed.keySerde(), consumed.valueSerde(), Collections.singleton(source), storeName, isQueryable);
}
public <K, V> GlobalKTable<K, V> globalTable(final String topic,
final ConsumedInternal<K, V> consumed,
- final String queryableStoreName) {
- final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
- return doGlobalTable(consumed, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName,
- consumed.keySerde(),
- consumed.valueSerde(),
- false,
- Collections.<String, String>emptyMap(),
- true));
- }
-
- public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
- return doGlobalTable(new ConsumedInternal<>(keySerde, valSerde, null, null), topic, storeSupplier);
- }
-
- @SuppressWarnings("unchecked")
- private <K, V> GlobalKTable<K, V> doGlobalTable(final ConsumedInternal<K, V> consumed,
- final String topic,
- final StateStoreSupplier<KeyValueStore> storeSupplier) {
- Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+ final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+ Objects.requireNonNull(consumed, "consumed can't be null");
+ Objects.requireNonNull(materialized, "materialized can't be null");
+ // explicitly disable logging for global stores
+ materialized.withLoggingDisabled();
+ final StoreBuilder storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize();
final String sourceName = newName(KStreamImpl.SOURCE_NAME);
final String processorName = newName(KTableImpl.SOURCE_NAME);
- final KTableSource<K, V> tableSource = new KTableSource<>(storeSupplier.name());
+ final KTableSource<K, V> tableSource = new KTableSource<>(storeBuilder.name());
final Deserializer<K> keyDeserializer = consumed.keySerde() == null ? null : consumed.keySerde().deserializer();
final Deserializer<V> valueDeserializer = consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer();
- internalTopologyBuilder.addGlobalStore(storeSupplier,
+ internalTopologyBuilder.addGlobalStore(storeBuilder,
sourceName,
consumed.timestampExtractor(),
keyDeserializer,
@@ -161,9 +156,10 @@ public class InternalStreamsBuilder {
topic,
processorName,
tableSource);
- return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name()));
+ return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name()));
}
+
public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
return KStreamImpl.merge(this, streams);
}
@@ -172,35 +168,31 @@ public class InternalStreamsBuilder {
return prefix + String.format("%010d", index.getAndIncrement());
}
- String newStoreName(final String prefix) {
+ public String newStoreName(final String prefix) {
return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement());
}
- public synchronized void addStateStore(final StateStoreSupplier supplier,
- final String... processorNames) {
- internalTopologyBuilder.addStateStore(supplier, processorNames);
- }
-
- public synchronized void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
- final String sourceName,
- final Deserializer keyDeserializer,
- final Deserializer valueDeserializer,
- final String topic,
- final String processorName,
- final ProcessorSupplier stateUpdateSupplier) {
- internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer,
- valueDeserializer, topic, processorName, stateUpdateSupplier);
+ public synchronized void addStateStore(final StoreBuilder builder) {
+ internalTopologyBuilder.addStateStore(builder);
}
- public synchronized void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+ public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder,
final String sourceName,
- final TimestampExtractor timestampExtractor,
- final Deserializer keyDeserializer,
- final Deserializer valueDeserializer,
final String topic,
+ final ConsumedInternal consumed,
final String processorName,
final ProcessorSupplier stateUpdateSupplier) {
- internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer,
- valueDeserializer, topic, processorName, stateUpdateSupplier);
+ // explicitly disable logging for global stores
+ storeBuilder.withLoggingDisabled();
+ final Deserializer keyDeserializer = consumed.keySerde() == null ? null : consumed.keySerde().deserializer();
+ final Deserializer valueDeserializer = consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer();
+ internalTopologyBuilder.addGlobalStore(storeBuilder,
+ sourceName,
+ consumed.timestampExtractor(),
+ keyDeserializer,
+ valueDeserializer,
+ topic,
+ processorName,
+ stateUpdateSupplier);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 067bcfc..a42db0b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.ForeachAction;
@@ -401,7 +402,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return builder.table(topic,
new ConsumedInternal<>(keySerde, valSerde, new FailOnInvalidTimestamp(), null),
- internalStoreName);
+ new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(internalStoreName)
+ .withKeySerde(keySerde)
+ .withValueSerde(valSerde),
+ queryableStoreName != null));
}
@Override
@@ -413,7 +417,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
to(keySerde, valSerde, partitioner, topic);
- return builder.table(null, new FailOnInvalidTimestamp(), keySerde, valSerde, topic, storeSupplier);
+ final ConsumedInternal<K, V> consumed = new ConsumedInternal<>(Consumed.with(keySerde, valSerde, new FailOnInvalidTimestamp(), null));
+ return builder.table(topic, consumed, storeSupplier);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index 0ee610f..9f186fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -30,8 +30,9 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
public MaterializedInternal(final Materialized<K, V, S> materialized) {
this(materialized, true);
}
-
- MaterializedInternal(final Materialized<K, V, S> materialized, final boolean queryable) {
+
+ public MaterializedInternal(final Materialized<K, V, S> materialized,
+ final boolean queryable) {
super(materialized);
this.queryable = queryable;
}
@@ -67,7 +68,7 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
return cachingEnabled;
}
- public boolean isQueryable() {
+ boolean isQueryable() {
return queryable;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 193d0e1..d47af88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
import org.slf4j.Logger;
@@ -557,7 +556,7 @@ public class InternalTopologyBuilder {
}
- public final void addGlobalStore(final KeyValueStoreBuilder storeBuilder,
+ public final void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder,
final String sourceName,
final TimestampExtractor timestampExtractor,
final Deserializer keyDeserializer,
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 73c5484..f1ae6da 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -505,7 +505,8 @@ public class KafkaStreamsTest {
CLUSTER.createTopic(topic);
final StreamsBuilder builder = new StreamsBuilder();
- builder.table(Serdes.String(), Serdes.String(), topic, topic);
+ final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
+ builder.table(topic, consumed);
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index e50f4d0..6c7b2b4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -32,11 +32,11 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
@@ -583,13 +583,11 @@ public class EosIntegrationTest {
String[] storeNames = null;
if (withState) {
storeNames = new String[] {storeName};
- final StateStoreSupplier storeSupplier = Stores.create(storeName)
- .withLongKeys()
- .withLongValues()
- .persistent()
- .build();
+ final StoreBuilder<KeyValueStore<Long, Long>> storeBuilder
+ = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.Long(), Serdes.Long())
+ .withCachingEnabled();
- builder.addStateStore(storeSupplier);
+ builder.addStateStore(storeBuilder);
}
final KStream<Long, Long> input = builder.stream(MULTI_PARTITION_INPUT_TOPIC);