You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/06 22:38:27 UTC
[2/2] kafka git commit: KAFKA-5651;
Follow-up: add with method to Materialized
KAFKA-5651; Follow-up: add with method to Materialized
Add a `with(Serde keySerde, Serde valSerde)` to `Materialized` for cases where people don't care about the state store name.
Author: Damian Guy <da...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Matthias J. Sax <ma...@confluent.io>
Closes #4009 from dguy/materialized
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/23a01405
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/23a01405
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/23a01405
Branch: refs/heads/trunk
Commit: 23a014052d39521a3af471b3f95809c2164820f7
Parents: 105ab47
Author: Damian Guy <da...@gmail.com>
Authored: Fri Oct 6 15:38:23 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Oct 6 15:38:23 2017 -0700
----------------------------------------------------------------------
.../apache/kafka/streams/StreamsBuilder.java | 31 ++++----
.../kafka/streams/kstream/KGroupedStream.java | 6 +-
.../kafka/streams/kstream/KGroupedTable.java | 2 +-
.../kafka/streams/kstream/KStreamBuilder.java | 2 +-
.../kafka/streams/kstream/Materialized.java | 18 +++++
.../GroupedStreamAggregateBuilder.java | 2 +-
.../kstream/internals/InternalNameProvider.java | 23 ++++++
.../internals/InternalStreamsBuilder.java | 25 ++++---
.../kstream/internals/KGroupedStreamImpl.java | 14 ++--
.../kstream/internals/KGroupedTableImpl.java | 34 +++++----
.../streams/kstream/internals/KStreamImpl.java | 54 +++++++-------
.../streams/kstream/internals/KTableImpl.java | 50 +++++++------
.../internals/KeyValueStoreMaterializer.java | 6 +-
.../kstream/internals/MaterializedInternal.java | 21 +++---
.../internals/SessionWindowedKStreamImpl.java | 11 ++-
.../internals/TimeWindowedKStreamImpl.java | 14 ++--
.../KStreamAggregationIntegrationTest.java | 6 +-
.../kstream/internals/AbstractStreamTest.java | 2 +-
.../internals/InternalStreamsBuilderTest.java | 40 ++++++-----
.../kstream/internals/KTableForeachTest.java | 6 +-
.../internals/MaterializedInternalTest.java | 76 ++++++++++++++++++++
.../SessionWindowedKStreamImplTest.java | 16 ++++-
.../internals/TimeWindowedKStreamImplTest.java | 2 +-
.../KeyValueStoreMaterializerTest.java | 23 ++++--
24 files changed, 330 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 94d19ae..b5cc6d7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -224,7 +224,7 @@ public class StreamsBuilder {
materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
return internalStreamsBuilder.table(topic,
new ConsumedInternal<>(consumed),
- new MaterializedInternal<>(materialized));
+ new MaterializedInternal<>(materialized, internalStreamsBuilder, topic));
}
/**
@@ -273,11 +273,9 @@ public class StreamsBuilder {
return internalStreamsBuilder.table(topic,
new ConsumedInternal<>(consumed),
new MaterializedInternal<>(
- Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(
- internalStreamsBuilder.newStoreName(topic))
- .withKeySerde(consumed.keySerde)
- .withValueSerde(consumed.valueSerde),
- false));
+ Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumed.keySerde, consumed.valueSerde),
+ internalStreamsBuilder,
+ topic));
}
/**
@@ -301,11 +299,12 @@ public class StreamsBuilder {
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
- final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+ final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
+ = new MaterializedInternal<>(materialized, internalStreamsBuilder, topic);
return internalStreamsBuilder.table(topic,
new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
materializedInternal.valueSerde())),
- new MaterializedInternal<>(materialized));
+ materializedInternal);
}
/**
@@ -329,11 +328,12 @@ public class StreamsBuilder {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized =
- new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(
- internalStreamsBuilder.newStoreName(topic))
- .withKeySerde(consumed.keySerde)
- .withValueSerde(consumed.valueSerde),
- false);
+ new MaterializedInternal<>(
+ Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumed.keySerde, consumed.valueSerde),
+ internalStreamsBuilder,
+ topic);
+
+
return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(consumed), materialized);
}
@@ -399,7 +399,7 @@ public class StreamsBuilder {
materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
return internalStreamsBuilder.globalTable(topic,
new ConsumedInternal<>(consumed),
- new MaterializedInternal<>(materialized));
+ new MaterializedInternal<>(materialized, internalStreamsBuilder, topic));
}
/**
@@ -431,7 +431,8 @@ public class StreamsBuilder {
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
- final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+ final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
+ new MaterializedInternal<>(materialized, internalStreamsBuilder, topic);
return internalStreamsBuilder.globalTable(topic,
new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
materializedInternal.valueSerde())),
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index b3945f7..6347960 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -1173,7 +1173,7 @@ public interface KGroupedStream<K, V> {
* @param <VR> the value type of the resulting {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
- * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
+ * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized) aggregate(initializer, aggregator, Materialized.with(null, aggValueSerde))}
*/
@Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
@@ -1346,7 +1346,7 @@ public interface KGroupedStream<K, V> {
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
* @deprecated use {@link #windowedBy(Windows) windowedBy(windows)} followed by
- * {@link TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized)} aggregate(initializer, aggregator, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
+ * {@link TimeWindowedKStream#aggregate(Initializer, Aggregator, Materialized)} aggregate(initializer, aggregator, Materialized.with(null, aggValueSerde))}
*/
@Deprecated
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
@@ -1509,7 +1509,7 @@ public interface KGroupedStream<K, V> {
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
* @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
- * {@link SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized) aggregate(initializer, aggregator, sessionMerger, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
+ * {@link SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized) aggregate(initializer, aggregator, sessionMerger, Materialized.with(null, aggValueSerde))}
*/
@Deprecated
<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index d0a38cc..4d2bb29 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -855,7 +855,7 @@ public interface KGroupedTable<K, V> {
* @param <VR> the value type of the aggregated {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
- * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
+ * @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.with(null, aggValueSerde))}
*/
@Deprecated
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 6b51c86..77745d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -1251,7 +1251,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
* @return a new unique name
*/
public String newName(final String prefix) {
- return internalStreamsBuilder.newName(prefix);
+ return internalStreamsBuilder.newProcessorName(prefix);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
index dd7165c..48dd12e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -139,6 +139,24 @@ public class Materialized<K, V, S extends StateStore> {
}
/**
+ * Materialize a {@link StateStore} with the provided key and value {@link Serde}s.
+ * An internal name will be used for the store.
+ *
+ * @param keySerde the key {@link Serde} to use. If the {@link Serde} is null, then the default key
+ * serde from configs will be used
+ * @param valueSerde the value {@link Serde} to use. If the {@link Serde} is null, then the default value
+ * serde from configs will be used
+ * @param <K> key type
+ * @param <V> value type
+ * @param <S> store type
+ * @return a new {@link Materialized} instance with the given key and value serdes
+ */
+ public static <K, V, S extends StateStore> Materialized<K, V, S> with(final Serde<K> keySerde,
+ final Serde<V> valueSerde) {
+ return new Materialized<K, V, S>((String) null).withKeySerde(keySerde).withValueSerde(valueSerde);
+ }
+
+ /**
* Set the valueSerde the materialized {@link StateStore} will use.
*
* @param valueSerde the value {@link Serde} to use. If the {@link Serde} is null, then the default value
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index 6fb7a35..e4429cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -65,7 +65,7 @@ class GroupedStreamAggregateBuilder<K, V> {
final String functionName,
final StoreBuilder storeBuilder,
final boolean isQueryable) {
- final String aggFunctionName = builder.newName(functionName);
+ final String aggFunctionName = builder.newProcessorName(functionName);
final String sourceName = repartitionIfRequired(storeBuilder.name());
builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
builder.internalTopologyBuilder.addStateStore(storeBuilder, aggFunctionName);
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalNameProvider.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalNameProvider.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalNameProvider.java
new file mode 100644
index 0000000..8d8ebfc
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalNameProvider.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+public interface InternalNameProvider {
+ String newProcessorName(String prefix);
+
+ String newStoreName(String prefix);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 0df1524..4308e5d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -32,7 +32,7 @@ import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
-public class InternalStreamsBuilder {
+public class InternalStreamsBuilder implements InternalNameProvider {
final InternalTopologyBuilder internalTopologyBuilder;
@@ -44,7 +44,7 @@ public class InternalStreamsBuilder {
public <K, V> KStream<K, V> stream(final Collection<String> topics,
final ConsumedInternal<K, V> consumed) {
- final String name = newName(KStreamImpl.SOURCE_NAME);
+ final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
name,
@@ -57,7 +57,7 @@ public class InternalStreamsBuilder {
}
public <K, V> KStream<K, V> stream(final Pattern topicPattern, final ConsumedInternal<K, V> consumed) {
- final String name = newName(KStreamImpl.SOURCE_NAME);
+ final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
name,
@@ -74,8 +74,8 @@ public class InternalStreamsBuilder {
final ConsumedInternal<K, V> consumed,
final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
- final String source = newName(KStreamImpl.SOURCE_NAME);
- final String name = newName(KTableImpl.SOURCE_NAME);
+ final String source = newProcessorName(KStreamImpl.SOURCE_NAME);
+ final String name = newProcessorName(KTableImpl.SOURCE_NAME);
final KTable<K, V> kTable = createKTable(consumed,
topic,
@@ -94,10 +94,11 @@ public class InternalStreamsBuilder {
public <K, V> KTable<K, V> table(final String topic,
final ConsumedInternal<K, V> consumed,
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
- final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize();
+ final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized)
+ .materialize();
- final String source = newName(KStreamImpl.SOURCE_NAME);
- final String name = newName(KTableImpl.SOURCE_NAME);
+ final String source = newProcessorName(KStreamImpl.SOURCE_NAME);
+ final String name = newProcessorName(KTableImpl.SOURCE_NAME);
final KTable<K, V> kTable = createKTable(consumed,
topic,
storeBuilder.name(),
@@ -141,8 +142,8 @@ public class InternalStreamsBuilder {
// explicitly disable logging for global stores
materialized.withLoggingDisabled();
final StoreBuilder storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize();
- final String sourceName = newName(KStreamImpl.SOURCE_NAME);
- final String processorName = newName(KTableImpl.SOURCE_NAME);
+ final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME);
+ final String processorName = newProcessorName(KTableImpl.SOURCE_NAME);
final KTableSource<K, V> tableSource = new KTableSource<>(storeBuilder.name());
@@ -160,10 +161,12 @@ public class InternalStreamsBuilder {
return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name()));
}
- public String newName(final String prefix) {
+ @Override
+ public String newProcessorName(final String prefix) {
return prefix + String.format("%010d", index.getAndIncrement());
}
+ @Override
public String newStoreName(final String prefix) {
return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index dafaa62..45ae7da 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -109,7 +109,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
Objects.requireNonNull(reducer, "reducer can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
- = new MaterializedInternal<>(materialized);
+ = new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
return doAggregate(
new KStreamReduce<K, V>(materializedInternal.storeName(), reducer),
REDUCE_NAME,
@@ -171,7 +171,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
final Aggregator<? super K, ? super V, VR> aggregator,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal
- = new MaterializedInternal<>(materialized);
+ = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
return doAggregate(
new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator),
AGGREGATE_NAME,
@@ -183,10 +183,10 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
final Aggregator<? super K, ? super V, VR> aggregator) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
- final String storeName = builder.newStoreName(AGGREGATE_NAME);
-
MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
- new MaterializedInternal<>(Materialized.<K, VR, KeyValueStore<Bytes, byte[]>>as(storeName), false);
+ new MaterializedInternal<>(Materialized.<K, VR, KeyValueStore<Bytes, byte[]>>with(keySerde, null),
+ builder,
+ AGGREGATE_NAME);
return doAggregate(new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator),
AGGREGATE_NAME,
materializedInternal);
@@ -277,7 +277,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> materializedInternal
- = new MaterializedInternal<>(materialized);
+ = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
if (materializedInternal.valueSerde() == null) {
materialized.withValueSerde(Serdes.Long());
}
@@ -495,7 +495,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
final String functionName,
final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier) {
- final String aggFunctionName = builder.newName(functionName);
+ final String aggFunctionName = builder.newProcessorName(functionName);
final String sourceName = repartitionIfRequired(storeSupplier.name());
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 507944a..d5a4e71 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Collections;
@@ -46,8 +47,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
private static final String REDUCE_NAME = "KTABLE-REDUCE-";
- protected final Serde<? extends K> keySerde;
- protected final Serde<? extends V> valSerde;
+ protected final Serde<K> keySerde;
+ protected final Serde<V> valSerde;
private boolean isQueryable = true;
private final Initializer<Long> countInitializer = new Initializer<Long>() {
@Override
@@ -73,8 +74,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
KGroupedTableImpl(final InternalStreamsBuilder builder,
final String name,
final String sourceName,
- final Serde<? extends K> keySerde,
- final Serde<? extends V> valSerde) {
+ final Serde<K> keySerde,
+ final Serde<V> valSerde) {
super(builder, name, Collections.singleton(sourceName));
this.keySerde = keySerde;
this.valSerde = valSerde;
@@ -141,10 +142,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
@SuppressWarnings("deprecation")
private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
final String functionName,
- final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
- final String sinkName = builder.newName(KStreamImpl.SINK_NAME);
- final String sourceName = builder.newName(KStreamImpl.SOURCE_NAME);
- final String funcName = builder.newName(functionName);
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ final String sinkName = builder.newProcessorName(KStreamImpl.SINK_NAME);
+ final String sourceName = builder.newProcessorName(KStreamImpl.SOURCE_NAME);
+ final String funcName = builder.newProcessorName(functionName);
buildAggregate(aggregateSupplier,
storeSupplier.name() + KStreamImpl.REPARTITION_TOPIC_SUFFIX,
@@ -184,15 +185,16 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
final String functionName,
final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) {
- final String sinkName = builder.newName(KStreamImpl.SINK_NAME);
- final String sourceName = builder.newName(KStreamImpl.SOURCE_NAME);
- final String funcName = builder.newName(functionName);
+ final String sinkName = builder.newProcessorName(KStreamImpl.SINK_NAME);
+ final String sourceName = builder.newProcessorName(KStreamImpl.SOURCE_NAME);
+ final String funcName = builder.newProcessorName(functionName);
buildAggregate(aggregateSupplier,
materialized.storeName() + KStreamImpl.REPARTITION_TOPIC_SUFFIX,
funcName,
sourceName, sinkName);
- builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materialized).materialize(), funcName);
+ builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materialized)
+ .materialize(), funcName);
// return the KTable representation with the intermediate topic as the sources
return new KTableImpl<>(builder, funcName, aggregateSupplier, Collections.singleton(sourceName), materialized.storeName(), isQueryable);
@@ -215,7 +217,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
Objects.requireNonNull(subtractor, "subtractor can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
- = new MaterializedInternal<>(materialized);
+ = new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(materializedInternal.storeName(),
adder,
subtractor);
@@ -255,6 +257,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
materialized);
}
+ @SuppressWarnings("unchecked")
@Override
public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
@@ -265,7 +268,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
Objects.requireNonNull(subtractor, "subtractor can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
- new MaterializedInternal<>(materialized);
+ new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+ if (materializedInternal.keySerde() == null) {
+ materializedInternal.withKeySerde(keySerde);
+ }
final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(materializedInternal.storeName(),
initializer,
adder,
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index cbbe848..8e80315 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -123,7 +123,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public KStream<K, V> filter(final Predicate<? super K, ? super V> predicate) {
Objects.requireNonNull(predicate, "predicate can't be null");
- String name = builder.newName(FILTER_NAME);
+ String name = builder.newProcessorName(FILTER_NAME);
builder.internalTopologyBuilder.addProcessor(name, new KStreamFilter<>(predicate, false), this.name);
@@ -133,7 +133,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate) {
Objects.requireNonNull(predicate, "predicate can't be null");
- String name = builder.newName(FILTER_NAME);
+ String name = builder.newProcessorName(FILTER_NAME);
builder.internalTopologyBuilder.addProcessor(name, new KStreamFilter<>(predicate, true), this.name);
@@ -147,7 +147,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
}
private <K1> String internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
- String name = builder.newName(KEY_SELECT_NAME);
+ String name = builder.newProcessorName(KEY_SELECT_NAME);
builder.internalTopologyBuilder.addProcessor(
name,
new KStreamMap<>(
@@ -166,7 +166,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public <K1, V1> KStream<K1, V1> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper) {
Objects.requireNonNull(mapper, "mapper can't be null");
- String name = builder.newName(MAP_NAME);
+ String name = builder.newProcessorName(MAP_NAME);
builder.internalTopologyBuilder.addProcessor(name, new KStreamMap<>(mapper), this.name);
@@ -177,7 +177,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public <V1> KStream<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper) {
Objects.requireNonNull(mapper, "mapper can't be null");
- String name = builder.newName(MAPVALUES_NAME);
+ String name = builder.newProcessorName(MAPVALUES_NAME);
builder.internalTopologyBuilder.addProcessor(name, new KStreamMapValues<>(mapper), this.name);
@@ -247,7 +247,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
public void print(final Printed<K, V> printed) {
Objects.requireNonNull(printed, "printed can't be null");
final PrintedInternal<K, V> printedInternal = new PrintedInternal<>(printed);
- final String name = builder.newName(PRINTING_NAME);
+ final String name = builder.newProcessorName(PRINTING_NAME);
builder.internalTopologyBuilder.addProcessor(name, printedInternal.build(this.name), this.name);
}
@@ -320,7 +320,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public <K1, V1> KStream<K1, V1> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
Objects.requireNonNull(mapper, "mapper can't be null");
- String name = builder.newName(FLATMAP_NAME);
+ String name = builder.newProcessorName(FLATMAP_NAME);
builder.internalTopologyBuilder.addProcessor(name, new KStreamFlatMap<>(mapper), this.name);
@@ -330,7 +330,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public <V1> KStream<K, V1> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper) {
Objects.requireNonNull(mapper, "mapper can't be null");
- String name = builder.newName(FLATMAPVALUES_NAME);
+ String name = builder.newProcessorName(FLATMAPVALUES_NAME);
builder.internalTopologyBuilder.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name);
@@ -346,13 +346,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
for (final Predicate<? super K, ? super V> predicate : predicates) {
Objects.requireNonNull(predicate, "predicates can't have null values");
}
- String branchName = builder.newName(BRANCH_NAME);
+ String branchName = builder.newProcessorName(BRANCH_NAME);
builder.internalTopologyBuilder.addProcessor(branchName, new KStreamBranch(predicates.clone()), this.name);
KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
for (int i = 0; i < predicates.length; i++) {
- String childName = builder.newName(BRANCHCHILD_NAME);
+ String childName = builder.newProcessorName(BRANCHCHILD_NAME);
builder.internalTopologyBuilder.addProcessor(childName, new KStreamPassThrough<K, V>(), branchName);
@@ -371,7 +371,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
private KStream<K, V> merge(final InternalStreamsBuilder builder,
final KStream<K, V> stream) {
KStreamImpl<K, V> streamImpl = (KStreamImpl<K, V>) stream;
- String name = builder.newName(MERGE_NAME);
+ String name = builder.newProcessorName(MERGE_NAME);
String[] parentNames = {this.name, streamImpl.name};
Set<String> allSourceNodes = new HashSet<>();
@@ -406,7 +406,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public void foreach(final ForeachAction<? super K, ? super V> action) {
Objects.requireNonNull(action, "action can't be null");
- String name = builder.newName(FOREACH_NAME);
+ String name = builder.newProcessorName(FOREACH_NAME);
builder.internalTopologyBuilder.addProcessor(name, new KStreamPeek<>(action, false), this.name);
}
@@ -414,7 +414,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public KStream<K, V> peek(final ForeachAction<? super K, ? super V> action) {
Objects.requireNonNull(action, "action can't be null");
- final String name = builder.newName(PEEK_NAME);
+ final String name = builder.newProcessorName(PEEK_NAME);
builder.internalTopologyBuilder.addProcessor(name, new KStreamPeek<>(action, true), this.name);
@@ -482,7 +482,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@SuppressWarnings("unchecked")
private void to(final String topic, final ProducedInternal<K, V> produced) {
- final String name = builder.newName(SINK_NAME);
+ final String name = builder.newProcessorName(SINK_NAME);
final Serializer<K> keySerializer = produced.keySerde() == null ? null : produced.keySerde().serializer();
final Serializer<V> valSerializer = produced.valueSerde() == null ? null : produced.valueSerde().serializer();
final StreamPartitioner<? super K, ? super V> partitioner = produced.streamPartitioner();
@@ -500,7 +500,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
public <K1, V1> KStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
- String name = builder.newName(TRANSFORM_NAME);
+ String name = builder.newProcessorName(TRANSFORM_NAME);
builder.internalTopologyBuilder.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
if (stateStoreNames != null && stateStoreNames.length > 0) {
@@ -514,7 +514,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
public <V1> KStream<K, V1> transformValues(final ValueTransformerSupplier<? super V, ? extends V1> valueTransformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");
- String name = builder.newName(TRANSFORMVALUES_NAME);
+ String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
builder.internalTopologyBuilder.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name);
if (stateStoreNames != null && stateStoreNames.length > 0) {
@@ -527,7 +527,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public void process(final ProcessorSupplier<? super K, ? super V> processorSupplier,
final String... stateStoreNames) {
- final String name = builder.newName(PROCESSOR_NAME);
+ final String name = builder.newProcessorName(PROCESSOR_NAME);
builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
if (stateStoreNames != null && stateStoreNames.length > 0) {
@@ -645,9 +645,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
String baseName = topicNamePrefix != null ? topicNamePrefix : name;
String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
- String sinkName = builder.newName(SINK_NAME);
- String filterName = builder.newName(FILTER_NAME);
- String sourceName = builder.newName(SOURCE_NAME);
+ String sinkName = builder.newProcessorName(SINK_NAME);
+ String filterName = builder.newProcessorName(FILTER_NAME);
+ String sourceName = builder.newProcessorName(SOURCE_NAME);
builder.internalTopologyBuilder.addInternalTopic(repartitionTopic);
builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
@@ -753,7 +753,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Objects.requireNonNull(joiner, "joiner can't be null");
final KTableValueGetterSupplier<K1, V1> valueGetterSupplier = ((GlobalKTableImpl<K1, V1>) globalTable).valueGetterSupplier();
- final String name = builder.newName(LEFTJOIN_NAME);
+ final String name = builder.newProcessorName(LEFTJOIN_NAME);
builder.internalTopologyBuilder.addProcessor(name, new KStreamGlobalKTableJoin<>(valueGetterSupplier, joiner, keyMapper, leftJoin), this.name);
return new KStreamImpl<>(builder, name, sourceNodes, false);
}
@@ -767,7 +767,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
- final String name = builder.newName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
+ final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
builder.internalTopologyBuilder.addProcessor(name, new KStreamKTableJoin<>(((KTableImpl<K, ?, V1>) other).valueGetterSupplier(), joiner, leftJoin), this.name);
builder.internalTopologyBuilder.connectProcessorAndStateStores(name, ((KTableImpl<K, ?, V1>) other).internalStoreName());
builder.internalTopologyBuilder.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
@@ -886,11 +886,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
final ValueJoiner<? super V1, ? super V2, ? extends R> joiner,
final JoinWindows windows,
final Joined<K1, V1, V2> joined) {
- String thisWindowStreamName = builder.newName(WINDOWED_NAME);
- String otherWindowStreamName = builder.newName(WINDOWED_NAME);
- String joinThisName = rightOuter ? builder.newName(OUTERTHIS_NAME) : builder.newName(JOINTHIS_NAME);
- String joinOtherName = leftOuter ? builder.newName(OUTEROTHER_NAME) : builder.newName(JOINOTHER_NAME);
- String joinMergeName = builder.newName(MERGE_NAME);
+ String thisWindowStreamName = builder.newProcessorName(WINDOWED_NAME);
+ String otherWindowStreamName = builder.newProcessorName(WINDOWED_NAME);
+ String joinThisName = rightOuter ? builder.newProcessorName(OUTERTHIS_NAME) : builder.newProcessorName(JOINTHIS_NAME);
+ String joinOtherName = leftOuter ? builder.newProcessorName(OUTEROTHER_NAME) : builder.newProcessorName(JOINOTHER_NAME);
+ String joinMergeName = builder.newProcessorName(MERGE_NAME);
final StoreBuilder<WindowStore<K1, V1>> thisWindow =
createWindowedStateStore(windows, joined.keySerde(), joined.valueSerde(), joinThisName + "-store");
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index db8de1a..8c79dec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -146,7 +146,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier,
final boolean isFilterNot) {
Objects.requireNonNull(predicate, "predicate can't be null");
- String name = builder.newName(FILTER_NAME);
+ String name = builder.newProcessorName(FILTER_NAME);
String internalStoreName = null;
if (storeSupplier != null) {
internalStoreName = storeSupplier.name();
@@ -162,7 +162,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized,
final boolean filterNot) {
- String name = builder.newName(FILTER_NAME);
+ String name = builder.newProcessorName(FILTER_NAME);
KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this,
predicate,
@@ -193,7 +193,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(predicate, "predicate can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
- return doFilter(predicate, new MaterializedInternal<>(materialized), false);
+ return doFilter(predicate, new MaterializedInternal<>(materialized, builder, FILTER_NAME), false);
}
@SuppressWarnings("deprecation")
@@ -225,7 +225,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(predicate, "predicate can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
- return doFilter(predicate, new MaterializedInternal<>(materialized), true);
+ return doFilter(predicate, new MaterializedInternal<>(materialized, builder, FILTER_NAME), true);
}
@SuppressWarnings("deprecation")
@@ -252,7 +252,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
final Serde<V1> valueSerde,
final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(mapper);
- String name = builder.newName(MAPVALUES_NAME);
+ String name = builder.newProcessorName(MAPVALUES_NAME);
String internalStoreName = null;
if (storeSupplier != null) {
internalStoreName = storeSupplier.name();
@@ -278,13 +278,14 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
Objects.requireNonNull(mapper, "mapper can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal
- = new MaterializedInternal<>(materialized);
- final String name = builder.newName(MAPVALUES_NAME);
+ = new MaterializedInternal<>(materialized, builder, MAPVALUES_NAME);
+ final String name = builder.newProcessorName(MAPVALUES_NAME);
final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>(this,
mapper,
materializedInternal.storeName());
builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
- builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materializedInternal).materialize(),
+ builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materializedInternal)
+ .materialize(),
name);
return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, true);
}
@@ -335,7 +336,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
final Serde<V> valSerde,
final String label) {
Objects.requireNonNull(label, "label can't be null");
- final String name = builder.newName(PRINTING_NAME);
+ final String name = builder.newProcessorName(PRINTING_NAME);
builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null, defaultKeyValueMapper, label)), this.name);
}
@@ -374,7 +375,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
if (filePath.trim().isEmpty()) {
throw new TopologyException("filePath can't be an empty string");
}
- String name = builder.newName(PRINTING_NAME);
+ String name = builder.newProcessorName(PRINTING_NAME);
try {
PrintWriter printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name());
builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(printWriter, defaultKeyValueMapper, label)), this.name);
@@ -387,7 +388,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
@Override
public void foreach(final ForeachAction<? super K, ? super V> action) {
Objects.requireNonNull(action, "action can't be null");
- String name = builder.newName(FOREACH_NAME);
+ String name = builder.newProcessorName(FOREACH_NAME);
KStreamPeek<K, Change<V>> processorSupplier = new KStreamPeek<>(new ForeachAction<K, Change<V>>() {
@Override
public void apply(K key, Change<V> value) {
@@ -404,16 +405,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
final StreamPartitioner<? super K, ? super V> partitioner,
final String topic,
final String queryableStoreName) {
- final String internalStoreName = queryableStoreName != null ? queryableStoreName : builder.newStoreName(KTableImpl.TOSTREAM_NAME);
-
to(keySerde, valSerde, partitioner, topic);
return builder.table(topic,
new ConsumedInternal<>(keySerde, valSerde, new FailOnInvalidTimestamp(), null),
- new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(internalStoreName)
- .withKeySerde(keySerde)
- .withValueSerde(valSerde),
- queryableStoreName != null));
+ new MaterializedInternal<>(Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(keySerde, valSerde),
+ builder,
+ KTableImpl.TOSTREAM_NAME));
}
@SuppressWarnings("deprecation")
@@ -543,7 +541,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
@Override
public KStream<K, V> toStream() {
- String name = builder.newName(TOSTREAM_NAME);
+ String name = builder.newProcessorName(TOSTREAM_NAME);
builder.internalTopologyBuilder.addProcessor(name, new KStreamMapValues<K, Change<V>, V>(new ValueMapper<Change<V>, V>() {
@Override
@@ -573,7 +571,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
Objects.requireNonNull(other, "other can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
- return doJoin(other, joiner, new MaterializedInternal<>(materialized), false, false);
+ return doJoin(other, joiner, new MaterializedInternal<>(materialized, builder, MERGE_NAME), false, false);
}
@SuppressWarnings("deprecation")
@@ -604,7 +602,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
public <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
- return doJoin(other, joiner, new MaterializedInternal<>(materialized), true, true);
+ return doJoin(other, joiner, new MaterializedInternal<>(materialized, builder, MERGE_NAME), true, true);
}
@SuppressWarnings("deprecation")
@@ -637,7 +635,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return doJoin(other,
joiner,
- new MaterializedInternal<>(materialized),
+ new MaterializedInternal<>(materialized, builder, MERGE_NAME),
true,
false);
}
@@ -684,7 +682,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
Objects.requireNonNull(other, "other can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
- final String joinMergeName = builder.newName(MERGE_NAME);
+ final String joinMergeName = builder.newProcessorName(MERGE_NAME);
final String internalQueryableName = storeSupplier == null ? null : storeSupplier.name();
final KTable<K, R> result = buildJoin((AbstractStream<K>) other,
joiner,
@@ -709,7 +707,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
Objects.requireNonNull(other, "other can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
final String internalQueryableName = materialized == null ? null : materialized.storeName();
- final String joinMergeName = builder.newName(MERGE_NAME);
+ final String joinMergeName = builder.newProcessorName(MERGE_NAME);
final KTable<K, VR> result = buildJoin((AbstractStream<K>) other,
joiner,
leftOuter,
@@ -741,8 +739,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
((KTableImpl) other).enableSendingOldValues();
}
- final String joinThisName = builder.newName(JOINTHIS_NAME);
- final String joinOtherName = builder.newName(JOINOTHER_NAME);
+ final String joinThisName = builder.newProcessorName(JOINTHIS_NAME);
+ final String joinOtherName = builder.newProcessorName(JOINOTHER_NAME);
final KTableKTableAbstractJoin<K, R, V, V1> joinThis;
@@ -792,7 +790,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
final Serialized<K1, V1> serialized) {
Objects.requireNonNull(selector, "selector can't be null");
Objects.requireNonNull(serialized, "serialized can't be null");
- String selectName = builder.newName(SELECT_NAME);
+ String selectName = builder.newProcessorName(SELECT_NAME);
KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index 1d702f2..c8cd35d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -29,10 +29,14 @@ public class KeyValueStoreMaterializer<K, V> {
this.materialized = materialized;
}
+ /**
+ * @return StoreBuilder
+ */
public StoreBuilder<KeyValueStore<K, V>> materialize() {
KeyValueBytesStoreSupplier supplier = (KeyValueBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
- supplier = Stores.persistentKeyValueStore(materialized.storeName());
+ final String name = materialized.storeName();
+ supplier = Stores.persistentKeyValueStore(name);
}
final StoreBuilder<KeyValueStore<K, V>> builder = Stores.keyValueStoreBuilder(supplier,
materialized.keySerde(),
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index 9f186fd..c933b86 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -27,21 +27,24 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
private final boolean queryable;
- public MaterializedInternal(final Materialized<K, V, S> materialized) {
- this(materialized, true);
- }
-
+
public MaterializedInternal(final Materialized<K, V, S> materialized,
- final boolean queryable) {
+ final InternalNameProvider nameProvider,
+ final String generatedStorePrefix) {
super(materialized);
- this.queryable = queryable;
+ if (storeName() == null) {
+ queryable = false;
+ storeName = nameProvider.newStoreName(generatedStorePrefix);
+ } else {
+ queryable = true;
+ }
}
public String storeName() {
- if (storeName != null) {
- return storeName;
+ if (storeSupplier != null) {
+ return storeSupplier.name();
}
- return storeSupplier.name();
+ return storeName;
}
public StoreSupplier<S> storeSupplier() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index 6644c92..34e5bd7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -83,7 +83,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
public KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, Long, SessionStore<Bytes, byte[]>> materializedInternal
- = new MaterializedInternal<>(materialized);
+ = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
if (materializedInternal.valueSerde() == null) {
materialized.withValueSerde(Serdes.Long());
}
@@ -93,6 +93,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
materialized);
}
+ @SuppressWarnings("unchecked")
@Override
public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
@@ -100,7 +101,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
Objects.requireNonNull(sessionMerger, "sessionMerger can't be null");
- return doAggregate(initializer, aggregator, sessionMerger, null);
+ return doAggregate(initializer, aggregator, sessionMerger, (Serde<T>) valSerde);
}
@SuppressWarnings("unchecked")
@@ -113,7 +114,11 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
Objects.requireNonNull(aggregator, "aggregator can't be null");
Objects.requireNonNull(sessionMerger, "sessionMerger can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
- final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+ final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materializedInternal
+ = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+ if (materializedInternal.keySerde() == null) {
+ materializedInternal.withKeySerde(keySerde);
+ }
return (KTable<Windowed<K>, VR>) aggregateBuilder.build(
new KStreamSessionWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator, sessionMerger),
AGGREGATE_NAME,
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index daba4c3..5e54770 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -73,7 +73,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
public KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>> materializedInternal
- = new MaterializedInternal<>(materialized);
+ = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
if (materializedInternal.valueSerde() == null) {
materialized.withValueSerde(Serdes.Long());
}
@@ -81,12 +81,13 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
}
+ @SuppressWarnings("unchecked")
@Override
public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator) {
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
- return doAggregate(initializer, aggregator, null);
+ return doAggregate(initializer, aggregator, (Serde<VR>) valSerde);
}
@SuppressWarnings("unchecked")
@@ -108,7 +109,11 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
Objects.requireNonNull(initializer, "initializer can't be null");
Objects.requireNonNull(aggregator, "aggregator can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
- final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+ final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal
+ = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME);
+ if (materializedInternal.keySerde() == null) {
+ materializedInternal.withKeySerde(keySerde);
+ }
return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows,
materializedInternal.storeName(),
initializer,
@@ -135,7 +140,8 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(reducer, "reducer can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
- final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
+ final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal
+ = new MaterializedInternal<>(materialized, builder, REDUCE_NAME);
return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, materializedInternal.storeName(), reducer),
REDUCE_NAME,
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 350facf..4169eb2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
@@ -38,6 +39,7 @@ import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
@@ -48,6 +50,7 @@ import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
+import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.TestUtils;
@@ -308,7 +311,8 @@ public class KStreamAggregationIntegrationTest {
groupedStream.windowedBy(TimeWindows.of(500L))
.aggregate(
initializer,
- aggregator
+ aggregator,
+ Materialized.<String, Integer, WindowStore<Bytes, byte[]>>with(null, Serdes.Integer())
)
.toStream(new KeyValueMapper<Windowed<String>, Integer, String>() {
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index 8e1a8a1..e16d8e4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -64,7 +64,7 @@ public class AbstractStreamTest {
}
KStream<K, V> randomFilter() {
- String name = builder.newName("RANDOM-FILTER-");
+ String name = builder.newProcessorName("RANDOM-FILTER-");
builder.internalTopologyBuilder.addProcessor(name, new ExtendedKStreamDummy(), this.name);
return new KStreamImpl<>(builder, name, sourceNodes, false);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 68d0e24..05a0214 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -61,8 +61,9 @@ public class InternalStreamsBuilderTest {
private KStreamTestDriver driver = null;
private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>();
+ private final String storePrefix = "prefix-";
private MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
- = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-store"), false);
+ = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-store"), builder, storePrefix);
@Before
public void setUp() {
@@ -79,15 +80,15 @@ public class InternalStreamsBuilderTest {
@Test
public void testNewName() {
- assertEquals("X-0000000000", builder.newName("X-"));
- assertEquals("Y-0000000001", builder.newName("Y-"));
- assertEquals("Z-0000000002", builder.newName("Z-"));
+ assertEquals("X-0000000000", builder.newProcessorName("X-"));
+ assertEquals("Y-0000000001", builder.newProcessorName("Y-"));
+ assertEquals("Z-0000000002", builder.newProcessorName("Z-"));
final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
- assertEquals("X-0000000000", newBuilder.newName("X-"));
- assertEquals("Y-0000000001", newBuilder.newName("Y-"));
- assertEquals("Z-0000000002", newBuilder.newName("Z-"));
+ assertEquals("X-0000000000", newBuilder.newProcessorName("X-"));
+ assertEquals("Y-0000000001", newBuilder.newProcessorName("Y-"));
+ assertEquals("Z-0000000002", newBuilder.newProcessorName("Z-"));
}
@Test
@@ -141,16 +142,18 @@ public class InternalStreamsBuilderTest {
KTable table1 = builder.table("topic2",
consumed,
new MaterializedInternal<>(
- Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("topic2"),
- false));
+ Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(null, null),
+ builder,
+ storePrefix));
final ProcessorTopology topology = builder.internalTopologyBuilder.build(null);
assertEquals(1, topology.stateStores().size());
- assertEquals("topic2", topology.stateStores().get(0).name());
+ final String storeName = "prefix-STATE-STORE-0000000000";
+ assertEquals(storeName, topology.stateStores().get(0).name());
assertEquals(1, topology.storeToChangelogTopic().size());
- assertEquals("topic2", topology.storeToChangelogTopic().get("topic2"));
+ assertEquals("topic2", topology.storeToChangelogTopic().get(storeName));
assertNull(table1.queryableStoreName());
}
@@ -160,7 +163,8 @@ public class InternalStreamsBuilderTest {
consumed,
new MaterializedInternal<>(
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalTable"),
- false));
+ builder,
+ storePrefix));
final ProcessorTopology topology = builder.internalTopologyBuilder.buildGlobalStateTopology();
final List<StateStore> stateStores = topology.globalStateStores();
@@ -184,11 +188,11 @@ public class InternalStreamsBuilderTest {
builder.globalTable("table",
consumed,
new MaterializedInternal<>(
- Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global1")));
+ Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global1"), builder, storePrefix));
builder.globalTable("table2",
consumed,
new MaterializedInternal<>(
- Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global2")));
+ Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global2"), builder, storePrefix));
doBuildGlobalTopologyWithAllGlobalTables();
}
@@ -201,14 +205,14 @@ public class InternalStreamsBuilderTest {
final GlobalKTable<String, String> globalTable = builder.globalTable("table",
consumed,
new MaterializedInternal<>(
- Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(one)));
+ Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(one), builder, storePrefix));
final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2",
consumed,
new MaterializedInternal<>(
- Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(two)));
+ Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(two), builder, storePrefix));
final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
- = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("not-global"), false);
+ = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("not-global"), builder, storePrefix);
builder.table("not-global", consumed, materialized);
final KeyValueMapper<String, String, String> kvMapper = new KeyValueMapper<String, String, String>() {
@@ -243,7 +247,7 @@ public class InternalStreamsBuilderTest {
final KStream<String, String> playEvents = builder.stream(Collections.singleton("events"), consumed);
final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
- = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("table-store"), false);
+ = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("table-store"), builder, storePrefix);
final KTable<String, String> table = builder.table("table-topic", consumed, materialized);
assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store"));
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
index 23e0b59..d8b3a5f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -85,9 +85,9 @@ public class KTableForeachTest {
StreamsBuilder builder = new StreamsBuilder();
KTable<Integer, String> table = builder.table(topicName,
Consumed.with(intSerde, stringSerde),
- new MaterializedInternal<>(Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(topicName)
- .withKeySerde(intSerde)
- .withValueSerde(stringSerde)));
+ Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(topicName)
+ .withKeySerde(intSerde)
+ .withValueSerde(stringSerde));
table.foreach(action);
// Then
http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
new file mode 100644
index 0000000..5fd76f3
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@RunWith(EasyMockRunner.class)
+public class MaterializedInternalTest {
+
+ @Mock(type = MockType.NICE)
+ private InternalNameProvider nameProvider;
+
+ @Mock(type = MockType.NICE)
+ private KeyValueBytesStoreSupplier supplier;
+ private final String prefix = "prefix";
+
+ @Test
+ public void shouldGenerateStoreNameWithPrefixIfProvidedNameIsNull() {
+ final String generatedName = prefix + "-store";
+ EasyMock.expect(nameProvider.newStoreName(prefix)).andReturn(generatedName);
+
+ EasyMock.replay(nameProvider);
+
+ final MaterializedInternal<Object, Object, StateStore> materialized
+ = new MaterializedInternal<>(Materialized.with(null, null), nameProvider, prefix);
+
+ assertThat(materialized.storeName(), equalTo(generatedName));
+ EasyMock.verify(nameProvider);
+ }
+
+ @Test
+ public void shouldUseProvidedStoreNameWhenSet() {
+ final String storeName = "store-name";
+ final MaterializedInternal<Object, Object, StateStore> materialized
+ = new MaterializedInternal<>(Materialized.as(storeName), nameProvider, prefix);
+ assertThat(materialized.storeName(), equalTo(storeName));
+ }
+
+ @Test
+ public void shouldUseStoreNameOfSupplierWhenProvided() {
+ final String storeName = "other-store-name";
+ EasyMock.expect(supplier.name()).andReturn(storeName).anyTimes();
+ EasyMock.replay(supplier);
+ final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized
+ = new MaterializedInternal<>(Materialized.as(supplier), nameProvider, prefix);
+ assertThat(materialized.storeName(), equalTo(storeName));
+ }
+}
\ No newline at end of file