You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/11 18:38:08 UTC
[kafka] branch trunk updated: KAFKA-6813: Remove deprecated APIs in
KIP-182, Part III (#4991)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new caca1fd KAFKA-6813: Remove deprecated APIs in KIP-182, Part III (#4991)
caca1fd is described below
commit caca1fdc90b6ef877385b044b2a3230f8e6e9874
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Fri May 11 11:38:02 2018 -0700
KAFKA-6813: Remove deprecated APIs in KIP-182, Part III (#4991)
1. Remove TopologyBuilder, TopologyBuilderException, KStreamBuilder,
2. Completed the leftover work of https://issues.apache.org/jira/browse/KAFKA-5660, when we remove TopologyBuilderException.
3. Added MockStoreBuilder to replace MockStateStoreSupplier, remove all XXStoreSupplier except StateStoreSupplier as it is still referenced in the logical streams graph.
4. Minor: rename KStreamsFineGrainedAutoResetIntegrationTest.java to FineGrainedAutoResetIntegrationTest.java.
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
.../org/apache/kafka/streams/KafkaStreams.java | 28 -
.../streams/errors/TopologyBuilderException.java | 42 -
.../kafka/streams/kstream/KStreamBuilder.java | 1269 --------------------
.../kstream/internals/InternalStreamsBuilder.java | 21 -
.../kafka/streams/processor/ProcessorSupplier.java | 4 +-
.../kafka/streams/processor/StreamPartitioner.java | 7 +-
.../kafka/streams/processor/TopologyBuilder.java | 958 ---------------
.../internals/InternalTopologyBuilder.java | 132 +-
.../internals/StreamsPartitionAssignor.java | 37 +-
.../state/internals/AbstractStoreBuilder.java | 12 +-
.../state/internals/AbstractStoreSupplier.java | 59 -
.../internals/RocksDBKeyValueStoreSupplier.java | 60 -
.../kafka/streams/TopologyTestDriverWrapper.java | 37 -
.../org/apache/kafka/streams/TopologyWrapper.java | 4 +
...va => FineGrainedAutoResetIntegrationTest.java} | 16 +-
.../integration/RegexSourceIntegrationTest.java | 40 +-
.../kafka/streams/kstream/KStreamBuilderTest.java | 457 -------
.../streams/kstream/internals/KStreamImplTest.java | 22 +-
.../streams/processor/TopologyBuilderTest.java | 752 ------------
.../CopartitionedTopicsValidatorTest.java | 6 +-
.../internals/InternalTopologyBuilderTest.java | 66 +-
.../processor/internals/ProcessorTopologyTest.java | 107 +-
.../processor/internals/StandbyTaskTest.java | 7 +-
.../internals/StreamsPartitionAssignorTest.java | 13 +-
.../CompositeReadOnlyKeyValueStoreTest.java | 3 +-
.../org/apache/kafka/test/KStreamTestDriver.java | 44 -
.../apache/kafka/test/MockStateStoreSupplier.java | 63 -
.../org/apache/kafka/test/MockStoreBuilder.java} | 28 +-
28 files changed, 164 insertions(+), 4130 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index d656181..f36bbac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -597,34 +597,6 @@ public class KafkaStreams {
* @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
*/
@Deprecated
- public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
- final Properties props) {
- this(builder.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
- }
-
- /**
- * @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
- */
- @Deprecated
- public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
- final StreamsConfig config) {
- this(builder.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
- }
-
- /**
- * @deprecated use {@link #KafkaStreams(Topology, Properties, KafkaClientSupplier)} instead
- */
- @Deprecated
- public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
- final StreamsConfig config,
- final KafkaClientSupplier clientSupplier) {
- this(builder.internalTopologyBuilder, config, clientSupplier);
- }
-
- /**
- * @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
- */
- @Deprecated
public KafkaStreams(final Topology topology,
final StreamsConfig config) {
this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
deleted file mode 100644
index 385d401..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.errors;
-
-
-/**
- * Indicates a pre-run time error occurred while parsing the {@link org.apache.kafka.streams.processor.TopologyBuilder
- * builder} to construct the {@link org.apache.kafka.streams.processor.internals.ProcessorTopology processor topology}.
- *
- * @deprecated use {@link org.apache.kafka.streams.Topology} instead of {@link org.apache.kafka.streams.processor.TopologyBuilder}
- */
-@Deprecated
-public class TopologyBuilderException extends StreamsException {
-
- private static final long serialVersionUID = 1L;
-
- public TopologyBuilderException(final String message) {
- super("Invalid topology building" + (message == null ? "" : ": " + message));
- }
-
- public TopologyBuilderException(final String message, final Throwable throwable) {
- super("Invalid topology building" + (message == null ? "" : ": " + message), throwable);
- }
-
- public TopologyBuilderException(final Throwable throwable) {
- super(throwable);
- }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
deleted file mode 100644
index d747ce8..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ /dev/null
@@ -1,1269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.kstream.internals.GlobalKTableImpl;
-import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
-import org.apache.kafka.streams.kstream.internals.KStreamImpl;
-import org.apache.kafka.streams.kstream.internals.KTableImpl;
-import org.apache.kafka.streams.kstream.internals.KTableSource;
-import org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.QueryableStoreType;
-import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
-
-import java.util.Collections;
-import java.util.Objects;
-import java.util.regex.Pattern;
-
-/**
- * {@code KStreamBuilder} provide the high-level Kafka Streams DSL to specify a Kafka Streams topology.
- *
- * @see org.apache.kafka.streams.processor.TopologyBuilder
- * @see KStream
- * @see KTable
- * @see GlobalKTable
- * @deprecated Use {@link org.apache.kafka.streams.StreamsBuilder StreamsBuilder} instead
- */
-@Deprecated
-public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyBuilder {
-
- private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(super.internalTopologyBuilder);
-
- private Topology.AutoOffsetReset translateAutoOffsetReset(final org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset resetPolicy) {
- if (resetPolicy == null) {
- return null;
- }
- return resetPolicy == org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset.EARLIEST ? Topology.AutoOffsetReset.EARLIEST : Topology.AutoOffsetReset.LATEST;
- }
-
- /**
- * Create a {@link KStream} from the specified topics.
- * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
- * deserializers as specified in the {@link StreamsConfig config} are used.
- * <p>
- * If multiple topics are specified there is no ordering guarantee for records from different topics.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case it is the user's responsibility to repartition the date before any key based operation
- * (like aggregation or join) is applied to the returned {@link KStream}.
- *
- * @param topics the topic names; must contain at least one topic name
- * @return a {@link KStream} for the specified topics
- */
- public <K, V> KStream<K, V> stream(final String... topics) {
- return stream(null, null, null, null, topics);
- }
-
- /**
- * Create a {@link KStream} from the specified topics.
- * The default {@link TimestampExtractor} and default key and value deserializers as specified in the
- * {@link StreamsConfig config} are used.
- * <p>
- * If multiple topics are specified there is no ordering guarantee for records from different topics.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case it is the user's responsibility to repartition the date before any key based operation
- * (like aggregation or join) is applied to the returned {@link KStream}.
- *
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topics if no valid committed
- * offsets are available
- * @param topics the topic names; must contain at least one topic name
- * @return a {@link KStream} for the specified topics
- */
- public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset,
- final String... topics) {
- return stream(offsetReset, null, null, null, topics);
- }
-
- /**
- * Create a {@link KStream} from the specified topic pattern.
- * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
- * deserializers as specified in the {@link StreamsConfig config} are used.
- * <p>
- * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
- * them and there is no ordering guarantee between records from different topics.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case it is the user's responsibility to repartition the date before any key based operation
- * (like aggregation or join) is applied to the returned {@link KStream}.
- *
- * @param topicPattern the pattern to match for topic names
- * @return a {@link KStream} for topics matching the regex pattern.
- */
- public <K, V> KStream<K, V> stream(final Pattern topicPattern) {
- return stream(null, null, null, null, topicPattern);
- }
-
- /**
- * Create a {@link KStream} from the specified topic pattern.
- * The default {@link TimestampExtractor} and default key and value deserializers as specified in the
- * {@link StreamsConfig config} are used.
- * <p>
- * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
- * them and there is no ordering guarantee between records from different topics.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case it is the user's responsibility to repartition the date before any key based operation
- * (like aggregation or join) is applied to the returned {@link KStream}.
- *
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the matched topics if no valid committed
- * offsets are available
- * @param topicPattern the pattern to match for topic names
- * @return a {@link KStream} for topics matching the regex pattern.
- */
- public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Pattern topicPattern) {
- return stream(offsetReset, null, null, null, topicPattern);
- }
-
- /**
- * Create a {@link KStream} from the specified topics.
- * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor} as specified in the
- * {@link StreamsConfig config} are used.
- * <p>
- * If multiple topics are specified there is no ordering guarantee for records from different topics.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case it is the user's responsibility to repartition the date before any key based operation
- * (like aggregation or join) is applied to the returned {@link KStream}.
- *
- * @param keySerde key serde used to read this source {@link KStream},
- * if not specified the default serde defined in the configs will be used
- * @param valSerde value serde used to read this source {@link KStream},
- * if not specified the default serde defined in the configs will be used
- * @param topics the topic names; must contain at least one topic name
- * @return a {@link KStream} for the specified topics
- */
- public <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerde, final String... topics) {
- return stream(null, null, keySerde, valSerde, topics);
- }
-
- /**
- * Create a {@link KStream} from the specified topics.
- * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
- * <p>
- * If multiple topics are specified there is no ordering guarantee for records from different topics.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case it is the user's responsibility to repartition the date before any key based operation
- * (like aggregation or join) is applied to the returned {@link KStream}.
- *
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topics if no valid committed
- * offsets are available
- * @param keySerde key serde used to read this source {@link KStream},
- * if not specified the default serde defined in the configs will be used
- * @param valSerde value serde used to read this source {@link KStream},
- * if not specified the default serde defined in the configs will be used
- * @param topics the topic names; must contain at least one topic name
- * @return a {@link KStream} for the specified topics
- */
- public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset,
- final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String... topics) {
- return stream(offsetReset, null, keySerde, valSerde, topics);
- }
-
- /**
- * Create a {@link KStream} from the specified topics.
- * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
- * <p>
- * If multiple topics are specified there is no ordering guarantee for records from different topics.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case it is the user's responsibility to repartition the date before any key based operation
- * (like aggregation or join) is applied to the returned {@link KStream}.
- *
- * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
- * if not specified the default extractor defined in the configs will be used
- * @param keySerde key serde used to read this source {@link KStream}, if not specified the default
- * serde defined in the configs will be used
- * @param valSerde value serde used to read this source {@link KStream},
- * if not specified the default serde defined in the configs will be used
- * @param topics the topic names; must contain at least one topic name
- * @return a {@link KStream} for the specified topics
- */
- public <K, V> KStream<K, V> stream(final TimestampExtractor timestampExtractor,
- final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String... topics) {
- return stream(null, timestampExtractor, keySerde, valSerde, topics);
- }
-
- /**
- * Create a {@link KStream} from the specified topics.
- * <p>
- * If multiple topics are specified there is no ordering guarantee for records from different topics.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case it is the user's responsibility to repartition the date before any key based operation
- * (like aggregation or join) is applied to the returned {@link KStream}.
- *
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topics
- * if no valid committed offsets are available
- * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
- * if not specified the default extractor defined in the configs will be used
- * @param keySerde key serde used to read this source {@link KStream},
- * if not specified the default serde defined in the configs will be used
- * @param valSerde value serde used to read this source {@link KStream},
- * if not specified the default serde defined in the configs will be used
- * @param topics the topic names; must contain at least one topic name
- * @return a {@link KStream} for the specified topics
- */
- public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset,
- final TimestampExtractor timestampExtractor,
- final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String... topics) {
- try {
- final String name = newName(KStreamImpl.SOURCE_NAME);
-
- internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor,
- keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);
-
- return new KStreamImpl<>(internalStreamsBuilder, name, Collections.singleton(name), false);
- } catch (final org.apache.kafka.streams.errors.TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- }
-
- /**
- * Create a {@link KStream} from the specified topic pattern.
- * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor}
- * as specified in the {@link StreamsConfig config} are used.
- * <p>
- * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
- * them and there is no ordering guarantee between records from different topics.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case it is the user's responsibility to repartition the date before any key based operation
- * (like aggregation or join) is applied to the returned {@link KStream}.
- *
- * @param keySerde key serde used to read this source {@link KStream},
- * if not specified the default serde defined in the configs will be used
- * @param valSerde value serde used to read this source {@link KStream},
- * if not specified the default serde defined in the configs will be used
- * @param topicPattern the pattern to match for topic names
- * @return a {@link KStream} for topics matching the regex pattern.
- */
- public <K, V> KStream<K, V> stream(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final Pattern topicPattern) {
- return stream(null, null, keySerde, valSerde, topicPattern);
- }
-
- /**
- * Create a {@link KStream} from the specified topic pattern.
- * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
- * <p>
- * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
- * them and there is no ordering guarantee between records from different topics.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case it is the user's responsibility to repartition the date before any key based operation
- * (like aggregation or join) is applied to the returned {@link KStream}.
- *
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the matched topics if no valid committed
- * offsets are available
- * @param keySerde key serde used to read this source {@link KStream},
- * if not specified the default serde defined in the configs will be used
- * @param valSerde value serde used to read this source {@link KStream},
- * if not specified the default serde defined in the configs will be used
- * @param topicPattern the pattern to match for topic names
- * @return a {@link KStream} for topics matching the regex pattern.
- */
- public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset,
- final Serde<K> keySerde,
- final Serde<V> valSerde,
- final Pattern topicPattern) {
- return stream(offsetReset, null, keySerde, valSerde, topicPattern);
- }
-
- /**
- * Create a {@link KStream} from the specified topic pattern.
- * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
- * <p>
- * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
- * them and there is no ordering guarantee between records from different topics.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case it is the user's responsibility to repartition the date before any key based operation
- * (like aggregation or join) is applied to the returned {@link KStream}.
- *
- * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
- * if not specified the default extractor defined in the configs will be used
- * @param keySerde key serde used to read this source {@link KStream},
- * if not specified the default serde defined in the configs will be used
- * @param valSerde value serde used to read this source {@link KStream},
- * if not specified the default serde defined in the configs will be used
- * @param topicPattern the pattern to match for topic names
- * @return a {@link KStream} for topics matching the regex pattern.
- */
- public <K, V> KStream<K, V> stream(final TimestampExtractor timestampExtractor,
- final Serde<K> keySerde,
- final Serde<V> valSerde,
- final Pattern topicPattern) {
- return stream(null, timestampExtractor, keySerde, valSerde, topicPattern);
- }
-
- /**
- * Create a {@link KStream} from the specified topic pattern.
- * <p>
- * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
- * them and there is no ordering guarantee between records from different topics.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case it is the user's responsibility to repartition the date before any key based operation
- * (like aggregation or join) is applied to the returned {@link KStream}.
- *
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the matched topics if no valid
- * committed offsets are available
- * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
- * if not specified the default extractor defined in the configs will be used
- * @param keySerde key serde used to read this source {@link KStream},
- * if not specified the default serde defined in the configs will be used
- * @param valSerde value serde used to read this source {@link KStream},
- * if not specified the default serde defined in the configs will be used
- * @param topicPattern the pattern to match for topic names
- * @return a {@link KStream} for topics matching the regex pattern.
- */
- public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset,
- final TimestampExtractor timestampExtractor,
- final Serde<K> keySerde,
- final Serde<V> valSerde,
- final Pattern topicPattern) {
- try {
- final String name = newName(KStreamImpl.SOURCE_NAME);
-
- internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor,
- keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern);
-
- return new KStreamImpl<>(internalStreamsBuilder, name, Collections.singleton(name), false);
- } catch (final org.apache.kafka.streams.errors.TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and
- * default key and value deserializers as specified in the {@link StreamsConfig config} are used.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topic must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(String)} ()}.
- * @return a {@link KTable} for the specified topic
- */
- public <K, V> KTable<K, V> table(final String topic,
- final String queryableStoreName) {
- return table(null, null, null, null, topic, queryableStoreName);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
- * {@link StreamsConfig config} are used.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param topic the topic name; cannot be {@code null}
- * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
- * @return a {@link KTable} for the specified topic
- */
- public <K, V> KTable<K, V> table(final String topic,
- final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
- return table(null, null, null, null, topic, storeSupplier);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
- * {@link StreamsConfig config} are used.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
- * store name. Note that store name may not be queriable through Interactive Queries.
- * No internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * @param topic the topic name; cannot be {@code null}
- * @return a {@link KTable} for the specified topic
- */
- public <K, V> KTable<K, V> table(final String topic) {
- return table(null, null, null, null, topic, (String) null);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
- * offsets are available
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; If {@code null} this is the equivalent of
- * {@link #table(org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset, String) table(AutoOffsetReset, String)}.
- * @return a {@link KTable} for the specified topic
- */
- public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
- final String topic,
- final String queryableStoreName) {
- return table(offsetReset, null, null, null, topic, queryableStoreName);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default {@link TimestampExtractor} and default key and value deserializers
- * as specified in the {@link StreamsConfig config} are used.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topic must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
- * offsets are available
- * @param topic the topic name; cannot be {@code null}
- * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
- * @return a {@link KTable} for the specified topic
- */
- public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
- final String topic,
- final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
- return table(offsetReset, null, null, null, topic, storeSupplier);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
- * store name. Note that store name may not be queriable through Interactive Queries.
- * No internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
- * offsets are available
- * @param topic the topic name; cannot be {@code null}
- * @return a {@link KTable} for the specified topic
- */
- public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
- final String topic) {
- return table(offsetReset, null, null, null, topic, (String) null);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default {@code "auto.offset.reset"} strategy and default key and value deserializers
- * as specified in the {@link StreamsConfig config} are used.
- * Input {@link KeyValue} pairs with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topic must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code storeName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
- * if not specified the default extractor defined in the configs will be used
- * @param topic the topic name; cannot be {@code null}
- * @param storeName the state store name; cannot be {@code null}
- * @return a {@link KTable} for the specified topic
- */
- public <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor,
- final String topic,
- final String storeName) {
- return table(null, timestampExtractor, null, null, topic, storeName);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
- * Input {@link KeyValue} pairs with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topic must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code storeName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
- * offsets are available
- * @param topic the topic name; cannot be {@code null}
- * @param storeName the state store name; cannot be {@code null}
- * @return a {@link KTable} for the specified topic
- */
- public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
- final TimestampExtractor timestampExtractor,
- final String topic,
- final String storeName) {
- return table(offsetReset, timestampExtractor, null, null, topic, storeName);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor}
- * as specified in the {@link StreamsConfig config} are used.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topic must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(Serde, Serde, String)} ()}.
- * @return a {@link KTable} for the specified topic
- */
- public <K, V> KTable<K, V> table(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic,
- final String queryableStoreName) {
- return table(null, null, keySerde, valSerde, topic, queryableStoreName);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
- * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topic must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
- * @return a {@link KTable} for the specified topic
- */
- public <K, V> KTable<K, V> table(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic,
- final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
- return table(null, null, keySerde, valSerde, topic, storeSupplier);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
- * store name. Note that store name may not be queriable through Interactive Queries.
- * No internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @return a {@link KTable} for the specified topic
- */
- public <K, V> KTable<K, V> table(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic) {
- return table(null, null, keySerde, valSerde, topic, (String) null);
- }
-
- private <K, V> KTable<K, V> doTable(final AutoOffsetReset offsetReset,
- final Serde<K> keySerde,
- final Serde<V> valSerde,
- final TimestampExtractor timestampExtractor,
- final String topic,
- final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier,
- final boolean isQueryable) {
- try {
- final String source = newName(KStreamImpl.SOURCE_NAME);
- final String name = newName(KTableImpl.SOURCE_NAME);
- final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());
-
- internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), source, timestampExtractor,
- keySerde == null ? null : keySerde.deserializer(),
- valSerde == null ? null : valSerde.deserializer(),
- topic);
- internalTopologyBuilder.addProcessor(name, processorSupplier, source);
-
- final KTableImpl<K, ?, V> kTable = new KTableImpl<>(internalStreamsBuilder, name, processorSupplier,
- keySerde, valSerde, Collections.singleton(source), storeSupplier.name(), isQueryable);
-
- addStateStore(storeSupplier, name);
- connectSourceStoreAndTopic(storeSupplier.name(), topic);
-
- return kTable;
- } catch (final org.apache.kafka.streams.errors.TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
- * offsets are available
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; If {@code null} this is the equivalent of
- * {@link #table(org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset, Serde, Serde, String) table(AutoOffsetReset, Serde, Serde, String)}
- * @return a {@link KTable} for the specified topic
- */
- public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
- final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic,
- final String queryableStoreName) {
- return table(offsetReset, null, keySerde, valSerde, topic, queryableStoreName);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
- * Input {@link KeyValue} pairs with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topic must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code storeName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
- * if not specified the default extractor defined in the configs will be used
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @param storeName the state store name; cannot be {@code null}
- * @return a {@link KTable} for the specified topic
- */
- public <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor,
- final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic,
- final String storeName) {
- return table(null, timestampExtractor, keySerde, valSerde, topic, storeName);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * Input {@link KeyValue} pairs with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topic must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code storeName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid
- * committed offsets are available
- * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
- * if not specified the default extractor defined in the configs will be used
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; If {@code null} this is the equivalent of
- * {@link #table(org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset, Serde, Serde, String) table(AutoOffsetReset, Serde, Serde, String)}
- * @return a {@link KTable} for the specified topic
- */
- @SuppressWarnings("unchecked")
- public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
- final TimestampExtractor timestampExtractor,
- final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic,
- final String queryableStoreName) {
- final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
- final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(
- internalStoreName,
- keySerde,
- valSerde,
- false,
- Collections.<String, String>emptyMap(),
- true);
- return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, queryableStoreName != null);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
- * store name. Note that store name may not be queriable through Interactive Queries.
- * No internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
- * offsets are available
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @return a {@link KTable} for the specified topic
- */
- public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
- final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic) {
- return table(offsetReset, null, keySerde, valSerde, topic, (String) null);
- }
-
- /**
- * Create a {@link KTable} for the specified topic.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * Note that the specified input topics must be partitioned by key.
- * If this is not the case the returned {@link KTable} will be corrupted.
- * <p>
- * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
- * }</pre>
- * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
- * query the value of the key on a parallel running instance of your Kafka Streams application.
- *
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
- * offsets are available
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
- * @return a {@link KTable} for the specified topic
- */
- public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
- final TimestampExtractor timestampExtractor,
- final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic,
- final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
- Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
- return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, true);
- }
-
- /**
- * Create a {@link GlobalKTable} for the specified topic.
- * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key);
- * }</pre>
- * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
- * regardless of the specified value in {@link StreamsConfig}.
- *
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#globalTable(String)}
- * @return a {@link GlobalKTable} for the specified topic
- */
- public <K, V> GlobalKTable<K, V> globalTable(final String topic,
- final String queryableStoreName) {
- return globalTable(null, null, null, topic, queryableStoreName);
- }
-
- /**
- * Create a {@link GlobalKTable} for the specified topic.
- * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
- * store name. Note that store name may not be queriable through Interactive Queries.
- * No internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
- * regardless of the specified value in {@link StreamsConfig}.
- *
- * @param topic the topic name; cannot be {@code null}
- * @return a {@link GlobalKTable} for the specified topic
- */
- public <K, V> GlobalKTable<K, V> globalTable(final String topic) {
- return globalTable(null, null, null, topic, null);
- }
-
- /**
- * Create a {@link GlobalKTable} for the specified topic.
- * The default {@link TimestampExtractor} and default key and value deserializers as specified in
- * the {@link StreamsConfig config} are used.
- * Input {@link KeyValue} pairs with {@code null} key will be dropped.
- * <p>
- * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key);
- * }</pre>
- * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
- * regardless of the specified value in {@link StreamsConfig}.
- *
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#globalTable(Serde, Serde, String)} ()}
- * @return a {@link GlobalKTable} for the specified topic
- */
- public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final TimestampExtractor timestampExtractor,
- final String topic,
- final String queryableStoreName) {
- final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
- return doGlobalTable(keySerde, valSerde, timestampExtractor, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName,
- keySerde,
- valSerde,
- false,
- Collections.<String, String>emptyMap(),
- true));
- }
-
- /**
- * Create a {@link GlobalKTable} for the specified topic.
- * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
- * Input {@link KeyValue} pairs with {@code null} key will be dropped.
- * <p>
- * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key);
- * }</pre>
- * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
- * regardless of the specified value in {@link StreamsConfig}.
- *
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
- * @return a {@link GlobalKTable} for the specified topic
- */
- public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic,
- final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
- return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier);
- }
-
- /**
- * Create a {@link GlobalKTable} for the specified topic.
- * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
- * Input {@link KeyValue} pairs with {@code null} key will be dropped.
- * <p>
- * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code queryableStoreName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key);
- * }</pre>
- * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
- * regardless of the specified value in {@link StreamsConfig}.
- *
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; If {@code null} this is the equivalent of
- * {@link KStreamBuilder#globalTable(Serde, Serde, String)} ()}
- * @return a {@link GlobalKTable} for the specified topic
- */
- public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic,
- final String queryableStoreName) {
- return globalTable(keySerde, valSerde, null, topic, queryableStoreName);
- }
-
- /**
- * Create a {@link GlobalKTable} for the specified topic.
- * Input {@link KeyValue} pairs with {@code null} key will be dropped.
- * <p>
- * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
- * {@code storeName}.
- * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * To query the local {@link KeyValueStore} it must be obtained via
- * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
- * <pre>{@code
- * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key);
- * }</pre>
- * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
- * regardless of the specified value in {@link StreamsConfig}.
- *
- * @param timestampExtractor the stateless timestamp extractor used for this source {@link GlobalKTable},
- * if not specified the default extractor defined in the configs will be used
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
- * @return a {@link GlobalKTable} for the specified topic
- */
- @SuppressWarnings("unchecked")
- private <K, V> GlobalKTable<K, V> doGlobalTable(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final TimestampExtractor timestampExtractor,
- final String topic,
- final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
- try {
- Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
- final String sourceName = newName(KStreamImpl.SOURCE_NAME);
- final String processorName = newName(KTableImpl.SOURCE_NAME);
- final KTableSource<K, V> tableSource = new KTableSource<>(storeSupplier.name());
-
- final Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
- final Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
-
- internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, tableSource);
- return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name()));
- } catch (final org.apache.kafka.streams.errors.TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- }
-
- /**
- * Create a {@link GlobalKTable} for the specified topic.
- * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
- * Input {@link KeyValue records} with {@code null} key will be dropped.
- * <p>
- * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
- * store name. Note that store name may not be queriable through Interactive Queries.
- * No internal changelog topic is created since the original input topic can be used for recovery (cf.
- * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
- * <p>
- * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
- * regardless of the specified value in {@link StreamsConfig}.
- *
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @return a {@link GlobalKTable} for the specified topic
- */
- public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
- final Serde<V> valSerde,
- final String topic) {
-
- return globalTable(keySerde, valSerde, null, topic, (String) null);
- }
-
- /**
- * Create a new instance of {@link KStream} by merging the given {@link KStream}s.
- * <p>
- * There is no ordering guarantee for records from different {@link KStream}s.
- *
- * @param streams the {@link KStream}s to be merged
- * @return a {@link KStream} containing all records of the given streams
- */
- @SuppressWarnings("unchecked")
- public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
- Objects.requireNonNull(streams, "streams can't be null");
- if (streams.length <= 1) {
- throw new IllegalArgumentException("Number of arguments required needs to be greater than one.");
- }
- try {
- KStream<K, V> mergedStream = streams[0];
- for (int i = 1; i < streams.length; i++) {
- mergedStream = mergedStream.merge(streams[i]);
- }
- return mergedStream;
- } catch (final org.apache.kafka.streams.errors.TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- }
-
- /**
- * <strong>This function is only for internal usage only and should not be called.</strong>
- * <p>
- * Create a unique processor name used for translation into the processor topology.
- *
- * @param prefix processor name prefix
- * @return a new unique name
- */
- public String newName(final String prefix) {
- return internalStreamsBuilder.newProcessorName(prefix);
- }
-
- /**
- * <strong>This function is only for internal usage only and should not be called.</strong>
- * <p>
- * Create a unique state store name.
- *
- * @param prefix processor name prefix
- * @return a new unique name
- */
- public String newStoreName(final String prefix) {
- return internalStreamsBuilder.newStoreName(prefix);
- }
-
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index fa47444..480794c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -68,27 +68,6 @@ public class InternalStreamsBuilder implements InternalNameProvider {
return new KStreamImpl<>(this, name, Collections.singleton(name), false);
}
- @SuppressWarnings("deprecation")
- public <K, V> KTable<K, V> table(final String topic,
- final ConsumedInternal<K, V> consumed,
- final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
- Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
- final String source = newProcessorName(KStreamImpl.SOURCE_NAME);
- final String name = newProcessorName(KTableImpl.SOURCE_NAME);
-
- final KTable<K, V> kTable = createKTable(consumed,
- topic,
- storeSupplier.name(),
- true,
- source,
- name);
-
- internalTopologyBuilder.addStateStore(storeSupplier, name);
- internalTopologyBuilder.connectSourceStoreAndTopic(storeSupplier.name(), topic);
-
- return kTable;
- }
-
@SuppressWarnings("unchecked")
public <K, V> KTable<K, V> table(final String topic,
final ConsumedInternal<K, V> consumed,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
index 25bbaab..a253638 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
@@ -16,10 +16,12 @@
*/
package org.apache.kafka.streams.processor;
+import org.apache.kafka.streams.Topology;
+
/**
* A processor supplier that can create one or more {@link Processor} instances.
*
- * It is used in {@link TopologyBuilder} for adding new processor operators, whose generated
+ * It is used in {@link Topology} for adding new processor operators, whose generated
* topology can then be replicated (and thus creating one or more {@link Processor} instances)
* and distributed to multiple stream threads.
*
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
index 1e622e5..1fa5e3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.streams.Topology;
/**
* Determine how records are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
@@ -36,16 +37,16 @@ import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
* determine to which partition each record should be written.
* <p>
* To do this, create a <code>StreamPartitioner</code> implementation, and when you build your topology specify that custom partitioner
- * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...) adding a sink}
+ * when {@link Topology#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...) adding a sink}
* for that topic.
* <p>
* All StreamPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes.
*
* @param <K> the type of keys
* @param <V> the type of values
- * @see TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer,
+ * @see Topology#addSink(String, String, org.apache.kafka.common.serialization.Serializer,
* org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...)
- * @see TopologyBuilder#addSink(String, String, StreamPartitioner, String...)
+ * @see Topology#addSink(String, String, StreamPartitioner, String...)
*/
public interface StreamPartitioner<K, V> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
deleted file mode 100644
index dab7bd7..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ /dev/null
@@ -1,958 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.errors.TopologyException;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.SinkNode;
-import org.apache.kafka.streams.processor.internals.SourceNode;
-import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.SubscriptionUpdates;
-import org.apache.kafka.streams.state.KeyValueStore;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-/**
- * A component that is used to build a {@link ProcessorTopology}. A topology contains an acyclic graph of sources, processors,
- * and sinks. A {@link SourceNode source} is a node in the graph that consumes one or more Kafka topics and forwards them to
- * its child nodes. A {@link Processor processor} is a node in the graph that receives input records from upstream nodes,
- * processes that records, and optionally forwarding new records to one or all of its children. Finally, a {@link SinkNode sink}
- * is a node in the graph that receives records from upstream nodes and writes them to a Kafka topic. This builder allows you
- * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreams}
- * instance that will then {@link org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and producing records}.
- *
- * @deprecated use {@link Topology} instead
- */
-@SuppressWarnings("unchecked")
-@Deprecated
-public class TopologyBuilder {
-
- /**
- * NOTE this member would not needed by developers working with the processor APIs, but only used
- * for internal functionalities.
- */
- public final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
-
- private Topology.AutoOffsetReset translateAutoOffsetReset(final TopologyBuilder.AutoOffsetReset resetPolicy) {
- if (resetPolicy == null) {
- return null;
- }
- return resetPolicy == TopologyBuilder.AutoOffsetReset.EARLIEST ? Topology.AutoOffsetReset.EARLIEST : Topology.AutoOffsetReset.LATEST;
- }
-
- /**
- * NOTE this class would not needed by developers working with the processor APIs, but only used
- * for internal functionalities.
- */
- public static class TopicsInfo {
- public Set<String> sinkTopics;
- public Set<String> sourceTopics;
- public Map<String, InternalTopicConfig> stateChangelogTopics;
- public Map<String, InternalTopicConfig> repartitionSourceTopics;
-
- public TopicsInfo(final Set<String> sinkTopics,
- final Set<String> sourceTopics,
- final Map<String, InternalTopicConfig> repartitionSourceTopics,
- final Map<String, InternalTopicConfig> stateChangelogTopics) {
- this.sinkTopics = sinkTopics;
- this.sourceTopics = sourceTopics;
- this.stateChangelogTopics = stateChangelogTopics;
- this.repartitionSourceTopics = repartitionSourceTopics;
- }
-
- @Override
- public boolean equals(final Object o) {
- if (o instanceof TopicsInfo) {
- final TopicsInfo other = (TopicsInfo) o;
- return other.sourceTopics.equals(sourceTopics) && other.stateChangelogTopics.equals(stateChangelogTopics);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- final long n = ((long) sourceTopics.hashCode() << 32) | (long) stateChangelogTopics.hashCode();
- return (int) (n % 0xFFFFFFFFL);
- }
-
- @Override
- public String toString() {
- return "TopicsInfo{" +
- "sinkTopics=" + sinkTopics +
- ", sourceTopics=" + sourceTopics +
- ", repartitionSourceTopics=" + repartitionSourceTopics +
- ", stateChangelogTopics=" + stateChangelogTopics +
- '}';
- }
- }
-
- /**
- * Enum used to define auto offset reset policy when creating {@link KStream} or {@link KTable}.
- */
- public enum AutoOffsetReset {
- EARLIEST, LATEST
- }
-
- /**
- * Create a new builder.
- */
- public TopologyBuilder() {}
-
- /** This class is not part of public API and should never be used by a developer. */
- public synchronized final TopologyBuilder setApplicationId(final String applicationId) {
- internalTopologyBuilder.setApplicationId(applicationId);
- return this;
- }
-
- /**
- * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
- * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
- * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
- * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
- * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
- *
- * @param name the unique name of the source used to reference this node when
- * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
- * @param topics the name of one or more Kafka topics that this source is to consume
- * @return this builder instance so methods can be chained together; never null
- */
- public synchronized final TopologyBuilder addSource(final String name,
- final String... topics) {
- try {
- internalTopologyBuilder.addSource(null, name, null, null, null, topics);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
- /**
- * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
- * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
- * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
- * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
- * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
- *
- * @param offsetReset the auto offset reset policy to use for this source if no committed offsets found; acceptable values earliest or latest
- * @param name the unique name of the source used to reference this node when
- * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
- * @param topics the name of one or more Kafka topics that this source is to consume
- * @return this builder instance so methods can be chained together; never null
- */
- public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
- final String name,
- final String... topics) {
- try {
- internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, null, null, topics);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
- /**
- * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
- * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
- * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
- * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
- *
- * @param timestampExtractor the stateless timestamp extractor used for this source,
- * if not specified the default extractor defined in the configs will be used
- * @param name the unique name of the source used to reference this node when
- * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
- * @param topics the name of one or more Kafka topics that this source is to consume
- * @return this builder instance so methods can be chained together; never null
- */
- public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor,
- final String name,
- final String... topics) {
- try {
- internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topics);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
- /**
- * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
- * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
- * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
- * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
- *
- * @param offsetReset the auto offset reset policy to use for this source if no committed offsets found;
- * acceptable values earliest or latest
- * @param timestampExtractor the stateless timestamp extractor used for this source,
- * if not specified the default extractor defined in the configs will be used
- * @param name the unique name of the source used to reference this node when
- * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
- * @param topics the name of one or more Kafka topics that this source is to consume
- * @return this builder instance so methods can be chained together; never null
- */
- public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
- final TimestampExtractor timestampExtractor,
- final String name,
- final String... topics) {
- try {
- internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, null, null, topics);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
- /**
- * Add a new source that consumes from topics matching the given pattern
- * and forward the records to child processor and/or sink nodes.
- * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
- * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
- * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
- * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
- *
- * @param name the unique name of the source used to reference this node when
- * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
- * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
- * @return this builder instance so methods can be chained together; never null
- */
- public synchronized final TopologyBuilder addSource(final String name,
- final Pattern topicPattern) {
- try {
- internalTopologyBuilder.addSource(null, name, null, null, null, topicPattern);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
- /**
- * Add a new source that consumes from topics matching the given pattern
- * and forward the records to child processor and/or sink nodes.
- * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
- * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
- * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
- * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
- *
- * @param offsetReset the auto offset reset policy value for this source if no committed offsets found; acceptable values earliest or latest.
- * @param name the unique name of the source used to reference this node when
- * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
- * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
- * @return this builder instance so methods can be chained together; never null
- */
- public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
- final String name,
- final Pattern topicPattern) {
- try {
- internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, null, null, topicPattern);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
-
- /**
- * Add a new source that consumes from topics matching the given pattern
- * and forward the records to child processor and/or sink nodes.
- * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
- * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
- * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
- *
- * @param timestampExtractor the stateless timestamp extractor used for this source,
- * if not specified the default extractor defined in the configs will be used
- * @param name the unique name of the source used to reference this node when
- * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
- * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
- * @return this builder instance so methods can be chained together; never null
- */
- public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor,
- final String name,
- final Pattern topicPattern) {
- try {
- internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topicPattern);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
- /**
- * Add a new source that consumes from topics matching the given pattern
- * and forward the records to child processor and/or sink nodes.
- * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
- * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
- * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
- *
- * @param offsetReset the auto offset reset policy value for this source if no committed offsets found;
- * acceptable values earliest or latest.
- * @param timestampExtractor the stateless timestamp extractor used for this source,
- * if not specified the default extractor defined in the configs will be used
- * @param name the unique name of the source used to reference this node when
- * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
- * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
- * @return this builder instance so methods can be chained together; never null
- */
- public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
- final TimestampExtractor timestampExtractor,
- final String name,
- final Pattern topicPattern) {
- try {
- internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, null, null, topicPattern);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
- /**
- * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
- * The source will use the specified key and value deserializers.
- * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
- *
- * @param name the unique name of the source used to reference this node when
- * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
- * @param keyDeserializer key deserializer used to read this source, if not specified the default
- * key deserializer defined in the configs will be used
- * @param valDeserializer value deserializer used to read this source,
- * if not specified the default value deserializer defined in the configs will be used
- * @param topics the name of one or more Kafka topics that this source is to consume
- * @return this builder instance so methods can be chained together; never null
- * @throws org.apache.kafka.streams.errors.TopologyBuilderException if processor is already added or if topics have already been registered by another source
- */
- public synchronized final TopologyBuilder addSource(final String name,
- final Deserializer keyDeserializer,
- final Deserializer valDeserializer,
- final String... topics) {
- try {
- internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topics);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
- /**
- * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
- * The source will use the specified key and value deserializers.
- *
- * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found;
- * acceptable values are earliest or latest.
- * @param name the unique name of the source used to reference this node when
- * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
- * @param timestampExtractor the stateless timestamp extractor used for this source,
- * if not specified the default extractor defined in the configs will be used
- * @param keyDeserializer key deserializer used to read this source, if not specified the default
- * key deserializer defined in the configs will be used
- * @param valDeserializer value deserializer used to read this source,
- * if not specified the default value deserializer defined in the configs will be used
- * @param topics the name of one or more Kafka topics that this source is to consume
- * @return this builder instance so methods can be chained together; never null
- * @throws org.apache.kafka.streams.errors.TopologyBuilderException if processor is already added or if topics have already been registered by another source
- */
- public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
- final String name,
- final TimestampExtractor timestampExtractor,
- final Deserializer keyDeserializer,
- final Deserializer valDeserializer,
- final String... topics) {
- try {
- internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, keyDeserializer, valDeserializer, topics);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
- /**
- * Adds a global {@link StateStore} to the topology. The {@link StateStore} sources its data
- * from all partitions of the provided input topic. There will be exactly one instance of this
- * {@link StateStore} per Kafka Streams instance.
- * <p>
- * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving
- * from the partitions of the input topic.
- * <p>
- * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will
- * receive all records forwarded from the {@link SourceNode}. This
- * {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
- * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
- *
- * @param storeSupplier user defined state store supplier
- * @param sourceName name of the {@link SourceNode} that will be automatically added
- * @param keyDeserializer the {@link Deserializer} to deserialize keys with
- * @param valueDeserializer the {@link Deserializer} to deserialize values with
- * @param topic the topic to source the data from
- * @param processorName the name of the {@link ProcessorSupplier}
- * @param stateUpdateSupplier the instance of {@link ProcessorSupplier}
- * @return this builder instance so methods can be chained together; never null
- */
- public synchronized TopologyBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
- final String sourceName,
- final Deserializer keyDeserializer,
- final Deserializer valueDeserializer,
- final String topic,
- final String processorName,
- final ProcessorSupplier stateUpdateSupplier) {
- try {
- internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
- /**
- * Adds a global {@link StateStore} to the topology. The {@link StateStore} sources its data
- * from all partitions of the provided input topic. There will be exactly one instance of this
- * {@link StateStore} per Kafka Streams instance.
- * <p>
- * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving
- * from the partitions of the input topic.
- * <p>
- * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will
- * receive all records forwarded from the {@link SourceNode}. This
- * {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
- *
- * @param storeSupplier user defined state store supplier
- * @param sourceName name of the {@link SourceNode} that will be automatically added
- * @param timestampExtractor the stateless timestamp extractor used for this source,
- * if not specified the default extractor defined in the configs will be used
- * @param keyDeserializer the {@link Deserializer} to deserialize keys with
- * @param valueDeserializer the {@link Deserializer} to deserialize values with
- * @param topic the topic to source the data from
- * @param processorName the name of the {@link ProcessorSupplier}
- * @param stateUpdateSupplier the instance of {@link ProcessorSupplier}
- * @return this builder instance so methods can be chained together; never null
- */
- public synchronized TopologyBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
- final String sourceName,
- final TimestampExtractor timestampExtractor,
- final Deserializer keyDeserializer,
- final Deserializer valueDeserializer,
- final String topic,
- final String processorName,
- final ProcessorSupplier stateUpdateSupplier) {
- try {
- internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
- /**
- * Add a new source that consumes from topics matching the given pattern
- * and forwards the records to child processor and/or sink nodes.
- * The source will use the specified key and value deserializers. The provided
- * de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
- * topics that share the same key-value data format.
- * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
- *
- * @param name the unique name of the source used to reference this node when
- * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
- * @param keyDeserializer key deserializer used to read this source, if not specified the default
- * key deserializer defined in the configs will be used
- * @param valDeserializer value deserializer used to read this source,
- * if not specified the default value deserializer defined in the configs will be used
- * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
- * @return this builder instance so methods can be chained together; never null
- * @throws org.apache.kafka.streams.errors.TopologyBuilderException if processor is already added or if topics have already been registered by name
- */
- public synchronized final TopologyBuilder addSource(final String name,
- final Deserializer keyDeserializer,
- final Deserializer valDeserializer,
- final Pattern topicPattern) {
- try {
- internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topicPattern);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
- /**
- * Add a new source that consumes from topics matching the given pattern
- * and forwards the records to child processor and/or sink nodes.
- * The source will use the specified key and value deserializers. The provided
- * de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
- * topics that share the same key-value data format.
- *
- * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found;
- * acceptable values are earliest or latest
- * @param name the unique name of the source used to reference this node when
- * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
- * @param timestampExtractor the stateless timestamp extractor used for this source,
- * if not specified the default extractor defined in the configs will be used
- * @param keyDeserializer key deserializer used to read this source, if not specified the default
- * key deserializer defined in the configs will be used
- * @param valDeserializer value deserializer used to read this source,
- * if not specified the default value deserializer defined in the configs will be used
- * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
- * @return this builder instance so methods can be chained together; never null
- * @throws org.apache.kafka.streams.errors.TopologyBuilderException if processor is already added or if topics have already been registered by name
- */
- public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
- final String name,
- final TimestampExtractor timestampExtractor,
- final Deserializer keyDeserializer,
- final Deserializer valDeserializer,
- final Pattern topicPattern) {
- try {
- internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, keyDeserializer, valDeserializer, topicPattern);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
- /**
- * Add a new source that consumes from topics matching the given pattern
- * and forwards the records to child processor and/or sink nodes.
- * The source will use the specified key and value deserializers. The provided
- * de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
- * topics that share the same key-value data format.
- *
- * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found;
- * acceptable values are earliest or latest
- * @param name the unique name of the source used to reference this node when
- * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
- * @param keyDeserializer key deserializer used to read this source, if not specified the default
- * key deserializer defined in the configs will be used
- * @param valDeserializer value deserializer used to read this source,
- * if not specified the default value deserializer defined in the configs will be used
- * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
- * @return this builder instance so methods can be chained together; never null
- * @throws org.apache.kafka.streams.errors.TopologyBuilderException if processor is already added or if topics have already been registered by name
- */
- public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
- final String name,
- final Deserializer keyDeserializer,
- final Deserializer valDeserializer,
- final Pattern topicPattern) {
- try {
- internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, keyDeserializer, valDeserializer, topicPattern);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
- /**
- * Add a new sink that forwards records from predecessor nodes (processors and/or sources) to the named Kafka topic.
- * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
- * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
- * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
- *
- * @param name the unique name of the sink
- * @param topic the name of the Kafka topic to which this sink should write its records
- * @param predecessorNames the name of one or more source or processor nodes whose output records this sink should consume
- * and write to its topic
- * @return this builder instance so methods can be chained together; never null
- * @see #addSink(String, String, StreamPartitioner, String...)
- * @see #addSink(String, String, Serializer, Serializer, String...)
- * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
- */
- public synchronized final TopologyBuilder addSink(final String name,
- final String topic,
- final String... predecessorNames) {
- try {
- internalTopologyBuilder.addSink(name, topic, null, null, null, predecessorNames);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
- /**
- * Add a new sink that forwards records from predecessor nodes (processors and/or sources) to the named Kafka topic, using
- * the supplied partitioner.
- * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
- * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
- * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
- * <p>
- * The sink will also use the specified {@link StreamPartitioner} to determine how records are distributed among
- * the named Kafka topic's partitions. Such control is often useful with topologies that use
- * {@link #addStateStore(StateStoreSupplier, String...) state stores}
- * in its processors. In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute
- * records among partitions using Kafka's default partitioning logic.
- *
- * @param name the unique name of the sink
- * @param topic the name of the Kafka topic to which this sink should write its records
- * @param partitioner the function that should be used to determine the partition for each record processed by the sink
- * @param predecessorNames the name of one or more source or processor nodes whose output records this sink should consume
- * and write to its topic
- * @return this builder instance so methods can be chained together; never null
- * @see #addSink(String, String, String...)
- * @see #addSink(String, String, Serializer, Serializer, String...)
- * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
- */
- public synchronized final TopologyBuilder addSink(final String name,
- final String topic,
- final StreamPartitioner partitioner,
- final String... predecessorNames) {
- try {
- internalTopologyBuilder.addSink(name, topic, null, null, partitioner, predecessorNames);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
- /**
- * Add a new sink that forwards records from predecessor nodes (processors and/or sources) to the named Kafka topic.
- * The sink will use the specified key and value serializers.
- *
- * @param name the unique name of the sink
- * @param topic the name of the Kafka topic to which this sink should write its records
- * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
- * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
- * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
- * @param valSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
- * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
- * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
- * @param predecessorNames the name of one or more source or processor nodes whose output records this sink should consume
- * and write to its topic
- * @return this builder instance so methods can be chained together; never null
- * @see #addSink(String, String, String...)
- * @see #addSink(String, String, StreamPartitioner, String...)
- * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
- */
- public synchronized final TopologyBuilder addSink(final String name,
- final String topic,
- final Serializer keySerializer,
- final Serializer valSerializer,
- final String... predecessorNames) {
- try {
- internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, null, predecessorNames);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
- /**
- * Add a new sink that forwards records from predecessor nodes (processors and/or sources) to the named Kafka topic.
- * The sink will use the specified key and value serializers, and the supplied partitioner.
- *
- * @param name the unique name of the sink
- * @param topic the name of the Kafka topic to which this sink should write its records
- * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
- * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
- * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
- * @param valSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
- * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
- * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
- * @param partitioner the function that should be used to determine the partition for each record processed by the sink
- * @param predecessorNames the name of one or more source or processor nodes whose output records this sink should consume
- * and write to its topic
- * @return this builder instance so methods can be chained together; never null
- * @see #addSink(String, String, String...)
- * @see #addSink(String, String, StreamPartitioner, String...)
- * @see #addSink(String, String, Serializer, Serializer, String...)
- * @throws org.apache.kafka.streams.errors.TopologyBuilderException if predecessor is not added yet, or if this processor's name is equal to the predecessor's name
- */
- public synchronized final <K, V> TopologyBuilder addSink(final String name,
- final String topic,
- final Serializer<K> keySerializer,
- final Serializer<V> valSerializer,
- final StreamPartitioner<? super K, ? super V> partitioner,
- final String... predecessorNames) {
- try {
- internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, predecessorNames);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
- /**
- * Add a new processor node that receives and processes records output by one or more predecessor source or processor node.
- * Any new record output by this processor will be forwarded to its child processor or sink nodes.
- * @param name the unique name of the processor node
- * @param supplier the supplier used to obtain this node's {@link Processor} instance
- * @param predecessorNames the name of one or more source or processor nodes whose output records this processor should receive
- * and process
- * @return this builder instance so methods can be chained together; never null
- * @throws org.apache.kafka.streams.errors.TopologyBuilderException if predecessor is not added yet, or if this processor's name is equal to the predecessor's name
- */
- public synchronized final TopologyBuilder addProcessor(final String name,
- final ProcessorSupplier supplier,
- final String... predecessorNames) {
- try {
- internalTopologyBuilder.addProcessor(name, supplier, predecessorNames);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
- /**
- * Adds a state store
- *
- * @param supplier the supplier used to obtain this state store {@link StateStore} instance
- * @return this builder instance so methods can be chained together; never null
- * @throws org.apache.kafka.streams.errors.TopologyBuilderException if state store supplier is already added
- */
- public synchronized final TopologyBuilder addStateStore(final StateStoreSupplier supplier,
- final String... processorNames) {
- try {
- internalTopologyBuilder.addStateStore(supplier, processorNames);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- return this;
- }
-
- /**
- * Connects the processor and the state stores
- *
- * @param processorName the name of the processor
- * @param stateStoreNames the names of state stores that the processor uses
- * @return this builder instance so methods can be chained together; never null
- */
- public synchronized final TopologyBuilder connectProcessorAndStateStores(final String processorName,
- final String... stateStoreNames) {
- if (stateStoreNames != null && stateStoreNames.length > 0) {
- try {
- internalTopologyBuilder.connectProcessorAndStateStores(processorName, stateStoreNames);
- } catch (final TopologyException e) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
- }
- }
- return this;
- }
-
- /**
- * This is used only for KStreamBuilder: when adding a KTable from a source topic,
- * we need to add the topic as the KTable's materialized state store's changelog.
- *
- * NOTE this function would not needed by developers working with the processor APIs, but only used
- * for the high-level DSL parsing functionalities.
- */
- protected synchronized final TopologyBuilder connectSourceStoreAndTopic(final String sourceStoreName,
- final String topic) {
- internalTopologyBuilder.connectSourceStoreAndTopic(sourceStoreName, topic);
- return this;
- }
-
- /**
- * Connects a list of processors.
- *
- * NOTE this function would not needed by developers working with the processor APIs, but only used
- * for the high-level DSL parsing functionalities.
- *
- * @param processorNames the name of the processors
- * @return this builder instance so methods can be chained together; never null
- * @throws org.apache.kafka.streams.errors.TopologyBuilderException if less than two processors are specified, or if one of the processors is not added yet
- */
- public synchronized final TopologyBuilder connectProcessors(final String... processorNames) {
- internalTopologyBuilder.connectProcessors(processorNames);
- return this;
- }
-
- /**
- * Adds an internal topic
- *
- * NOTE this function would not needed by developers working with the processor APIs, but only used
- * for the high-level DSL parsing functionalities.
- *
- * @param topicName the name of the topic
- * @return this builder instance so methods can be chained together; never null
- */
- public synchronized final TopologyBuilder addInternalTopic(final String topicName) {
- internalTopologyBuilder.addInternalTopic(topicName);
- return this;
- }
-
- /**
- * Asserts that the streams of the specified source nodes must be copartitioned.
- *
- * NOTE this function would not needed by developers working with the processor APIs, but only used
- * for the high-level DSL parsing functionalities.
- *
- * @param sourceNodes a set of source node names
- * @return this builder instance so methods can be chained together; never null
- */
- public synchronized final TopologyBuilder copartitionSources(final Collection<String> sourceNodes) {
- internalTopologyBuilder.copartitionSources(sourceNodes);
- return this;
- }
-
- /**
- * Returns the map of node groups keyed by the topic group id.
- *
- * NOTE this function would not needed by developers working with the processor APIs, but only used
- * for the high-level DSL parsing functionalities.
- *
- * @return groups of node names
- */
- public synchronized Map<Integer, Set<String>> nodeGroups() {
- return internalTopologyBuilder.nodeGroups();
- }
-
- /**
- * Build the topology for the specified topic group. This is called automatically when passing this builder into the
- * {@link org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)} constructor.
- *
- * NOTE this function would not needed by developers working with the processor APIs, but only used
- * for the high-level DSL parsing functionalities.
- *
- * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
- */
- public synchronized ProcessorTopology build(final Integer topicGroupId) {
- return internalTopologyBuilder.build(topicGroupId);
- }
-
- /**
- * Builds the topology for any global state stores
- *
- * NOTE this function would not needed by developers working with the processor APIs, but only used
- * for the high-level DSL parsing functionalities.
- *
- * @return ProcessorTopology
- */
- public synchronized ProcessorTopology buildGlobalStateTopology() {
- return internalTopologyBuilder.buildGlobalStateTopology();
- }
-
- /**
- * Get any global {@link StateStore}s that are part of the
- * topology
- *
- * NOTE this function would not needed by developers working with the processor APIs, but only used
- * for the high-level DSL parsing functionalities.
- *
- * @return map containing all global {@link StateStore}s
- */
- public Map<String, StateStore> globalStateStores() {
- return internalTopologyBuilder.globalStateStores();
- }
-
- /**
- * Returns the map of topic groups keyed by the group id.
- * A topic group is a group of topics in the same task.
- *
- * NOTE this function would not needed by developers working with the processor APIs, but only used
- * for the high-level DSL parsing functionalities.
- *
- * @return groups of topic names
- */
- public synchronized Map<Integer, TopicsInfo> topicGroups() {
- final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroupsWithNewTopicsInfo = internalTopologyBuilder.topicGroups();
- final Map<Integer, TopicsInfo> topicGroupsWithDeprecatedTopicInfo = new HashMap<>();
-
- for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroupsWithNewTopicsInfo.entrySet()) {
- final InternalTopologyBuilder.TopicsInfo newTopicsInfo = entry.getValue();
-
- topicGroupsWithDeprecatedTopicInfo.put(entry.getKey(), new TopicsInfo(
- newTopicsInfo.sinkTopics,
- newTopicsInfo.sourceTopics,
- newTopicsInfo.repartitionSourceTopics,
- newTopicsInfo.stateChangelogTopics));
- }
-
- return topicGroupsWithDeprecatedTopicInfo;
- }
-
- /**
- * Get the Pattern to match all topics requiring to start reading from earliest available offset
- *
- * NOTE this function would not needed by developers working with the processor APIs, but only used
- * for the high-level DSL parsing functionalities.
- *
- * @return the Pattern for matching all topics reading from earliest offset, never null
- */
- public synchronized Pattern earliestResetTopicsPattern() {
- return internalTopologyBuilder.earliestResetTopicsPattern();
- }
-
- /**
- * Get the Pattern to match all topics requiring to start reading from latest available offset
- *
- * NOTE this function would not needed by developers working with the processor APIs, but only used
- * for the high-level DSL parsing functionalities.
- *
- * @return the Pattern for matching all topics reading from latest offset, never null
- */
- public synchronized Pattern latestResetTopicsPattern() {
- return internalTopologyBuilder.latestResetTopicsPattern();
- }
-
- /**
- * NOTE this function would not needed by developers working with the processor APIs, but only used
- * for the high-level DSL parsing functionalities.
- *
- * @return a mapping from state store name to a Set of source Topics.
- */
- public Map<String, List<String>> stateStoreNameToSourceTopics() {
- return internalTopologyBuilder.stateStoreNameToSourceTopics();
- }
-
- /**
- * Returns the copartition groups.
- * A copartition group is a group of source topics that are required to be copartitioned.
- *
- * NOTE this function would not needed by developers working with the processor APIs, but only used
- * for the high-level DSL parsing functionalities.
- *
- * @return groups of topic names
- */
- public synchronized Collection<Set<String>> copartitionGroups() {
- return internalTopologyBuilder.copartitionGroups();
- }
-
- /**
- * NOTE this function would not needed by developers working with the processor APIs, but only used
- * for the high-level DSL parsing functionalities.
- */
- public SubscriptionUpdates subscriptionUpdates() {
- SubscriptionUpdates clonedSubscriptionUpdates = new SubscriptionUpdates();
- clonedSubscriptionUpdates.updateTopics(internalTopologyBuilder.subscriptionUpdates().getUpdates());
- return clonedSubscriptionUpdates;
- }
-
- /**
- * NOTE this function would not needed by developers working with the processor APIs, but only used
- * for the high-level DSL parsing functionalities.
- */
- public synchronized Pattern sourceTopicPattern() {
- return internalTopologyBuilder.sourceTopicPattern();
- }
-
- /**
- * NOTE this function would not needed by developers working with the processor APIs, but only used
- * for the high-level DSL parsing functionalities.
- */
- public synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates,
- final String threadId) {
- internalTopologyBuilder.updateSubscribedTopics(new HashSet<>(subscriptionUpdates.getUpdates()), "stream-thread [" + threadId + "] ");
- }
-
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index bfe8cda..70437e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
-import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -175,35 +174,6 @@ public class InternalTopologyBuilder {
}
}
- private static class StateStoreSupplierFactory extends AbstractStateStoreFactory {
- @SuppressWarnings("deprecation")
- private final org.apache.kafka.streams.processor.StateStoreSupplier supplier;
-
- @SuppressWarnings("deprecation")
- StateStoreSupplierFactory(final org.apache.kafka.streams.processor.StateStoreSupplier<?> supplier) {
- super(supplier.name(),
- supplier.loggingEnabled(),
- supplier instanceof WindowStoreSupplier,
- supplier.logConfig());
- this.supplier = supplier;
-
- }
-
- @Override
- public StateStore build() {
- return supplier.get();
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public long retentionPeriod() {
- if (!isWindowStore()) {
- throw new IllegalStateException("retentionPeriod is not supported when not a window store");
- }
- return ((WindowStoreSupplier) supplier).retentionPeriod();
- }
- }
-
private static class StoreBuilderFactory extends AbstractStateStoreFactory {
private final StoreBuilder builder;
@@ -432,7 +402,19 @@ public class InternalTopologyBuilder {
for (final String sourceTopicName : sourceTopicNames) {
if (topicPattern.matcher(sourceTopicName).matches()) {
- throw new TopologyException("Pattern " + topicPattern + " will match a topic that has already been registered by another source.");
+ throw new TopologyException("Pattern " + topicPattern + " will match a topic that has already been registered by another source.");
+ }
+ }
+
+ for (final Pattern otherPattern : earliestResetPatterns) {
+ if (topicPattern.pattern().contains(otherPattern.pattern()) || otherPattern.pattern().contains(topicPattern.pattern())) {
+ throw new TopologyException("Pattern " + topicPattern + " will overlap with another pattern " + otherPattern + " already been registered by another source");
+ }
+ }
+
+ for (final Pattern otherPattern : latestResetPatterns) {
+ if (topicPattern.pattern().contains(otherPattern.pattern()) || otherPattern.pattern().contains(topicPattern.pattern())) {
+ throw new TopologyException("Pattern " + topicPattern + " will overlap with another pattern " + otherPattern + " already been registered by another source");
}
}
@@ -498,24 +480,6 @@ public class InternalTopologyBuilder {
nodeGrouper.unite(name, predecessorNames);
}
- @SuppressWarnings("deprecation")
- public final void addStateStore(final org.apache.kafka.streams.processor.StateStoreSupplier supplier,
- final String... processorNames) {
- Objects.requireNonNull(supplier, "supplier can't be null");
- if (stateFactories.containsKey(supplier.name())) {
- throw new TopologyException("StateStore " + supplier.name() + " is already added.");
- }
-
- stateFactories.put(supplier.name(), new StateStoreSupplierFactory(supplier));
-
- if (processorNames != null) {
- for (final String processorName : processorNames) {
- Objects.requireNonNull(processorName, "processor name must not be null");
- connectProcessorAndStateStore(processorName, supplier.name());
- }
- }
- }
-
public final void addStateStore(final StoreBuilder storeBuilder,
final String... processorNames) {
Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
@@ -533,36 +497,6 @@ public class InternalTopologyBuilder {
}
}
- @SuppressWarnings("deprecation")
- public final void addGlobalStore(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier,
- final String sourceName,
- final TimestampExtractor timestampExtractor,
- final Deserializer keyDeserializer,
- final Deserializer valueDeserializer,
- final String topic,
- final String processorName,
- final ProcessorSupplier stateUpdateSupplier) {
- Objects.requireNonNull(storeSupplier, "store supplier must not be null");
- final String name = storeSupplier.name();
- validateGlobalStoreArguments(sourceName,
- topic,
- processorName,
- stateUpdateSupplier,
- name,
- storeSupplier.loggingEnabled());
- validateTopicNotAlreadyRegistered(topic);
- addGlobalStore(sourceName,
- timestampExtractor,
- keyDeserializer,
- valueDeserializer,
- topic,
- processorName,
- stateUpdateSupplier,
- name,
- storeSupplier.get());
- }
-
-
public final void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder,
final String sourceName,
final TimestampExtractor timestampExtractor,
@@ -1145,47 +1079,18 @@ public class InternalTopologyBuilder {
}
public synchronized Pattern earliestResetTopicsPattern() {
- return resetTopicsPattern(earliestResetTopics, earliestResetPatterns, latestResetTopics, latestResetPatterns);
+ return resetTopicsPattern(earliestResetTopics, earliestResetPatterns);
}
public synchronized Pattern latestResetTopicsPattern() {
- return resetTopicsPattern(latestResetTopics, latestResetPatterns, earliestResetTopics, earliestResetPatterns);
+ return resetTopicsPattern(latestResetTopics, latestResetPatterns);
}
private Pattern resetTopicsPattern(final Set<String> resetTopics,
- final Set<Pattern> resetPatterns,
- final Set<String> otherResetTopics,
- final Set<Pattern> otherResetPatterns) {
+ final Set<Pattern> resetPatterns) {
final List<String> topics = maybeDecorateInternalSourceTopics(resetTopics);
- final Pattern pattern = buildPatternForOffsetResetTopics(topics, resetPatterns);
-
- ensureNoRegexOverlap(pattern, otherResetPatterns, otherResetTopics);
-
- return pattern;
- }
-
- // TODO: we should check regex overlap at topology construction time and then throw TopologyException
- // instead of at runtime. See KAFKA-5660
- private void ensureNoRegexOverlap(final Pattern builtPattern,
- final Set<Pattern> otherPatterns,
- final Set<String> otherTopics) {
- for (final Pattern otherPattern : otherPatterns) {
- if (builtPattern.pattern().contains(otherPattern.pattern())) {
- throw new TopologyException(
- String.format("Found overlapping regex [%s] against [%s] for a KStream with auto offset resets",
- otherPattern.pattern(),
- builtPattern.pattern()));
- }
- }
- for (final String otherTopic : otherTopics) {
- if (builtPattern.matcher(otherTopic).matches()) {
- throw new TopologyException(
- String.format("Found overlapping regex [%s] matching topic [%s] for a KStream with auto offset resets",
- builtPattern.pattern(),
- otherTopic));
- }
- }
+ return buildPatternForOffsetResetTopics(topics, resetPatterns);
}
private static Pattern buildPatternForOffsetResetTopics(final Collection<String> sourceTopics,
@@ -1885,6 +1790,9 @@ public class InternalTopologyBuilder {
updateSubscriptions(subscriptionUpdates, logPrefix);
}
+
+ // following functions are for test only
+
public synchronized Set<String> getSourceTopicNames() {
return sourceTopicNames;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 1f00c04..d7a9b33 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -755,38 +755,6 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
}
}
- /**
- * Used to capture subscribed topic via Patterns discovered during the
- * partition assignment process.
- *
- * // TODO: this is a duplicate of the InternalTopologyBuilder#SubscriptionUpdates
- * and is maintained only for compatibility of the deprecated TopologyBuilder API
- */
- public static class SubscriptionUpdates {
-
- private final Set<String> updatedTopicSubscriptions = new HashSet<>();
-
- public void updateTopics(final Collection<String> topicNames) {
- updatedTopicSubscriptions.clear();
- updatedTopicSubscriptions.addAll(topicNames);
- }
-
- public Collection<String> getUpdates() {
- return Collections.unmodifiableSet(updatedTopicSubscriptions);
- }
-
- public boolean hasUpdates() {
- return !updatedTopicSubscriptions.isEmpty();
- }
-
- @Override
- public String toString() {
- return "SubscriptionUpdates{" +
- "updatedTopicSubscriptions=" + updatedTopicSubscriptions +
- '}';
- }
- }
-
static class CopartitionedTopicsValidator {
private final String logPrefix;
@@ -794,7 +762,6 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
this.logPrefix = logPrefix;
}
- @SuppressWarnings("deprecation")
void validate(final Set<String> copartitionGroup,
final Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
final Cluster metadata) {
@@ -805,7 +772,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
final Integer partitions = metadata.partitionCountForTopic(topic);
if (partitions == null) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopic not found: %s", logPrefix, topic));
+ throw new org.apache.kafka.streams.errors.TopologyException(String.format("%sTopic not found: %s", logPrefix, topic));
}
if (numPartitions == UNKNOWN) {
@@ -813,7 +780,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
} else if (numPartitions != partitions) {
final String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
Arrays.sort(topics);
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopics not co-partitioned: [%s]", logPrefix, Utils.join(Arrays.asList(topics), ",")));
+ throw new org.apache.kafka.streams.errors.TopologyException(String.format("%sTopics not co-partitioned: [%s]", logPrefix, Utils.join(Arrays.asList(topics), ",")));
}
} else if (allRepartitionTopicsNumPartitions.get(topic).numPartitions == NOT_AVAILABLE) {
numPartitions = NOT_AVAILABLE;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
index 39b9d03..fdcd2e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
@@ -25,19 +25,19 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-abstract class AbstractStoreBuilder<K, V, T extends StateStore> implements StoreBuilder<T> {
- private final String name;
+abstract public class AbstractStoreBuilder<K, V, T extends StateStore> implements StoreBuilder<T> {
private Map<String, String> logConfig = new HashMap<>();
+ protected final String name;
final Serde<K> keySerde;
final Serde<V> valueSerde;
final Time time;
boolean enableCaching;
boolean enableLogging = true;
- AbstractStoreBuilder(final String name,
- final Serde<K> keySerde,
- final Serde<V> valueSerde,
- final Time time) {
+ public AbstractStoreBuilder(final String name,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final Time time) {
Objects.requireNonNull(name, "name can't be null");
Objects.requireNonNull(time, "time can't be null");
this.name = name;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
deleted file mode 100644
index 10d0fe2..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.StateStore;
-
-import java.util.Map;
-
-@Deprecated
-abstract class AbstractStoreSupplier<K, V, T extends StateStore> implements org.apache.kafka.streams.processor.StateStoreSupplier<T> {
- protected final String name;
- protected final Serde<K> keySerde;
- protected final Serde<V> valueSerde;
- protected final Time time;
- protected final boolean logged;
- protected final Map<String, String> logConfig;
-
- AbstractStoreSupplier(final String name,
- final Serde<K> keySerde,
- final Serde<V> valueSerde,
- final Time time,
- final boolean logged,
- final Map<String, String> logConfig) {
- this.time = time;
- this.name = name;
- this.valueSerde = valueSerde;
- this.keySerde = keySerde;
- this.logged = logged;
- this.logConfig = logConfig;
- }
-
- public String name() {
- return name;
- }
-
- public Map<String, String> logConfig() {
- return logConfig;
- }
-
- public boolean loggingEnabled() {
- return logged;
- }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
deleted file mode 100644
index 3bc56c2..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.state.KeyValueStore;
-
-import java.util.Map;
-
-/**
- * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- */
-@Deprecated
-public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
-
- private final KeyValueStoreBuilder<K, V> builder;
-
- public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig, boolean cached) {
- this(name, keySerde, valueSerde, Time.SYSTEM, logged, logConfig, cached);
- }
-
- public RocksDBKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig, boolean cached) {
- super(name, keySerde, valueSerde, time, logged, logConfig);
- builder = new KeyValueStoreBuilder<>(new RocksDbKeyValueBytesStoreSupplier(name),
- keySerde,
- valueSerde,
- time);
- if (cached) {
- builder.withCachingEnabled();
- }
- // logged by default so we only need to worry about when it is disabled.
- if (!logged) {
- builder.withLoggingDisabled();
- }
- }
-
- public KeyValueStore get() {
- return builder.build();
- }
-
-
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
deleted file mode 100644
index fa976a8..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams;
-
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-
-import java.util.Properties;
-
-/**
- * This class allows the instantiation of a {@link TopologyTestDriver} using a
- * {@link InternalTopologyBuilder} by exposing a protected constructor.
- *
- * It should be used only for testing, and should be removed once the deprecated
- * classes {@link org.apache.kafka.streams.kstream.KStreamBuilder} and
- * {@link org.apache.kafka.streams.processor.TopologyBuilder} are removed.
- */
-public class TopologyTestDriverWrapper extends TopologyTestDriver {
-
- public TopologyTestDriverWrapper(final InternalTopologyBuilder builder,
- final Properties config) {
- super(builder, config);
- }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java b/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
index f106766..60f0e6f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
@@ -24,6 +24,10 @@ import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
*/
public class TopologyWrapper extends Topology {
+ static public InternalTopologyBuilder getInternalTopologyBuilder(final Topology topology) {
+ return topology.internalTopologyBuilder;
+ }
+
public InternalTopologyBuilder getInternalBuilder() {
return internalTopologyBuilder;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
similarity index 95%
rename from streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
rename to streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index 0f7df6b..a9f6fa8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -36,7 +36,6 @@ import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
@@ -64,7 +63,7 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
@Category({IntegrationTest.class})
-public class KStreamsFineGrainedAutoResetIntegrationTest {
+public class FineGrainedAutoResetIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
@@ -246,16 +245,13 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
}
@Test
- public void shouldThrowExceptionOverlappingPattern() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
+ public void shouldThrowExceptionOverlappingPattern() {
+ final StreamsBuilder builder = new StreamsBuilder();
//NOTE this would realistically get caught when building topology, the test is for completeness
- builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
- builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]_1"));
+ builder.stream(Pattern.compile("topic-[A-D]_1"), Consumed.with(Topology.AutoOffsetReset.EARLIEST));
- // TODO: we should check regex overlap at topology construction time and then throw TopologyException
- // instead of at runtime. See KAFKA-5660
try {
- builder.earliestResetTopicsPattern();
+ builder.stream(Pattern.compile("topic-[A-D]_1"), Consumed.with(Topology.AutoOffsetReset.LATEST));
fail("Should have thrown TopologyException");
} catch (final TopologyException expected) {
// do nothing
@@ -263,7 +259,7 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
}
@Test
- public void shouldThrowExceptionOverlappingTopic() throws Exception {
+ public void shouldThrowExceptionOverlappingTopic() {
final StreamsBuilder builder = new StreamsBuilder();
//NOTE this would realistically get caught when building topology, the test is for completeness
builder.stream(Pattern.compile("topic-[A-D]_1"), Consumed.with(Topology.AutoOffsetReset.EARLIEST));
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index e5160e1..770a3ae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -37,9 +37,9 @@ import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStoreBuilder;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
@@ -58,11 +58,12 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
/**
* End-to-end integration test based on using regex and named topics for creating sources, using
@@ -229,7 +230,7 @@ public class RegexSourceIntegrationTest {
public void shouldAddStateStoreToRegexDefinedSource() throws InterruptedException {
final ProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
- final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("testStateStore"), Serdes.String(), Serdes.String());
+ final StoreBuilder storeBuilder = new MockStoreBuilder("testStateStore", false);
final long thirtySecondTimeout = 30 * 1000;
final TopologyWrapper topology = new TopologyWrapper();
@@ -258,7 +259,6 @@ public class RegexSourceIntegrationTest {
}
}
-
@Test
public void testShouldReadFromRegexAndNamedTopics() throws Exception {
@@ -375,30 +375,31 @@ public class RegexSourceIntegrationTest {
}
- // TODO should be updated to expected = TopologyBuilderException after KAFKA-3708
- @Test(expected = AssertionError.class)
+ @Test
public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception {
-
- final String fooMessage = "fooMessage";
final String fMessage = "fMessage";
-
-
+ final String fooMessage = "fooMessage";
final Serde<String> stringSerde = Serdes.String();
-
final StreamsBuilder builder = new StreamsBuilder();
-
- // overlapping patterns here, no messages should be sent as TopologyBuilderException
+ // overlapping patterns here, no messages should be sent as TopologyException
// will be thrown when the processor topology is built.
-
final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("foo.*"));
final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("f.*"));
-
pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));
pattern2Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));
+ final AtomicBoolean expectError = new AtomicBoolean(false);
+
streams = new KafkaStreams(builder.build(), streamsConfiguration);
+ streams.setStateListener(new KafkaStreams.StateListener() {
+ @Override
+ public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
+ if (newState == KafkaStreams.State.ERROR)
+ expectError.set(true);
+ }
+ });
streams.start();
final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
@@ -407,9 +408,14 @@ public class RegexSourceIntegrationTest {
IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList(fooMessage), producerConfig, mockTime);
final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
+ try {
+ IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000);
+ throw new IllegalStateException("This should not happen: an assertion error should have been thrown before this.");
+ } catch (final AssertionError e) {
+ // this is fine
+ }
- IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000);
- fail("Should not get here");
+ assertThat(expectError.get(), is(true));
}
private static class TheConsumerRebalanceListener implements ConsumerRebalanceListener {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
deleted file mode 100644
index 27f0833..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ /dev/null
@@ -1,457 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.TopologyTestDriverWrapper;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
-import org.apache.kafka.streams.kstream.internals.KStreamImpl;
-import org.apache.kafka.streams.kstream.internals.KTableImpl;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.SourceNode;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.test.ConsumerRecordFactory;
-import org.apache.kafka.test.MockMapper;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockTimestampExtractor;
-import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-@SuppressWarnings("deprecation")
-public class KStreamBuilderTest {
-
- private static final String APP_ID = "app-id";
-
- private final KStreamBuilder builder = new KStreamBuilder();
- private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
- private TopologyTestDriverWrapper driver;
- private final Properties props = new Properties();
-
- @Before
- public void setup() {
- builder.setApplicationId(APP_ID);
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-builder-test");
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- }
-
- @After
- public void cleanup() {
- props.clear();
- if (driver != null) {
- driver.close();
- }
- driver = null;
- }
-
- @Test(expected = TopologyBuilderException.class)
- public void testFrom() {
- builder.stream("topic-1", "topic-2");
-
- builder.addSource(KStreamImpl.SOURCE_NAME + "0000000000", "topic-3");
- }
-
- @Test
- public void testNewName() {
- assertEquals("X-0000000000", builder.newName("X-"));
- assertEquals("Y-0000000001", builder.newName("Y-"));
- assertEquals("Z-0000000002", builder.newName("Z-"));
-
- final KStreamBuilder newBuilder = new KStreamBuilder();
-
- assertEquals("X-0000000000", newBuilder.newName("X-"));
- assertEquals("Y-0000000001", newBuilder.newName("Y-"));
- assertEquals("Z-0000000002", newBuilder.newName("Z-"));
- }
-
-
- @Test
- public void shouldProcessFromSinkTopic() {
- final KStream<String, String> source = builder.stream("topic-source");
- source.to("topic-sink");
-
- final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
-
- source.process(processorSupplier);
-
- driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, props);
- driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
-
- // no exception was thrown
- assertEquals(Utils.mkList("A:aa"), processorSupplier.theCapturedProcessor().processed);
- }
-
- @Test
- public void shouldProcessViaThroughTopic() {
- final KStream<String, String> source = builder.stream("topic-source");
- final KStream<String, String> through = source.through("topic-sink");
-
- final MockProcessorSupplier<String, String> sourceProcessorSupplier = new MockProcessorSupplier<>();
- final MockProcessorSupplier<String, String> throughProcessorSupplier = new MockProcessorSupplier<>();
-
- source.process(sourceProcessorSupplier);
- through.process(throughProcessorSupplier);
-
- driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, props);
- driver.pipeInput(recordFactory.create("topic-source", "A", "aa"));
-
- assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.theCapturedProcessor().processed);
- assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.theCapturedProcessor().processed);
- }
-
- @Test
- public void testNewStoreName() {
- assertEquals("X-STATE-STORE-0000000000", builder.newStoreName("X-"));
- assertEquals("Y-STATE-STORE-0000000001", builder.newStoreName("Y-"));
- assertEquals("Z-STATE-STORE-0000000002", builder.newStoreName("Z-"));
-
- KStreamBuilder newBuilder = new KStreamBuilder();
-
- assertEquals("X-STATE-STORE-0000000000", newBuilder.newStoreName("X-"));
- assertEquals("Y-STATE-STORE-0000000001", newBuilder.newStoreName("Y-"));
- assertEquals("Z-STATE-STORE-0000000002", newBuilder.newStoreName("Z-"));
- }
-
- @Test
- public void testMerge() {
- final String topic1 = "topic-1";
- final String topic2 = "topic-2";
-
- final KStream<String, String> source1 = builder.stream(topic1);
- final KStream<String, String> source2 = builder.stream(topic2);
- final KStream<String, String> merged = builder.merge(source1, source2);
-
- final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
- merged.process(processorSupplier);
-
- driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, props);
-
- driver.pipeInput(recordFactory.create(topic1, "A", "aa"));
- driver.pipeInput(recordFactory.create(topic2, "B", "bb"));
- driver.pipeInput(recordFactory.create(topic2, "C", "cc"));
- driver.pipeInput(recordFactory.create(topic1, "D", "dd"));
-
- assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.theCapturedProcessor().processed);
- }
-
- @Test
- public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() {
- final String topic1 = "topic-1";
- final String topic2 = "topic-2";
- final String topic3 = "topic-3";
- final KStream<String, String> source1 = builder.stream(topic1);
- final KStream<String, String> source2 = builder.stream(topic2);
- final KStream<String, String> source3 = builder.stream(topic3);
- final KStream<String, String> processedSource1 =
- source1.mapValues(new ValueMapper<String, String>() {
- @Override
- public String apply(final String value) {
- return value;
- }
- }).filter(new Predicate<String, String>() {
- @Override
- public boolean test(final String key, final String value) {
- return true;
- }
- });
- final KStream<String, String> processedSource2 = source2.filter(new Predicate<String, String>() {
- @Override
- public boolean test(final String key, final String value) {
- return true;
- }
- });
-
- final KStream<String, String> merged = processedSource1.merge(processedSource2).merge(source3);
- merged.groupByKey().count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("my-table"));
- final Map<String, List<String>> actual = builder.stateStoreNameToSourceTopics();
- assertEquals(Utils.mkList("topic-1", "topic-2", "topic-3"), actual.get("my-table"));
- }
-
- @Test(expected = TopologyBuilderException.class)
- public void shouldThrowExceptionWhenNoTopicPresent() {
- builder.stream();
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldThrowExceptionWhenTopicNamesAreNull() {
- builder.stream(Serdes.String(), Serdes.String(), null, null);
- }
-
- @Test
- public void shouldStillMaterializeSourceKTableIfStateNameNotSpecified() {
- KTable table1 = builder.table("topic1", "table1");
- KTable table2 = builder.table("topic2", (String) null);
-
- final ProcessorTopology topology = builder.build(null);
-
- assertEquals(2, topology.stateStores().size());
- assertEquals("table1", topology.stateStores().get(0).name());
-
- final String internalStoreName = topology.stateStores().get(1).name();
- assertTrue(internalStoreName.contains(KTableImpl.STATE_STORE_NAME));
- assertEquals(2, topology.storeToChangelogTopic().size());
- assertEquals("topic1", topology.storeToChangelogTopic().get("table1"));
- assertEquals("topic2", topology.storeToChangelogTopic().get(internalStoreName));
- assertEquals(table1.queryableStoreName(), "table1");
- assertNull(table2.queryableStoreName());
- }
-
- @Test
- public void shouldBuildSimpleGlobalTableTopology() {
- builder.globalTable("table", "globalTable");
-
- final ProcessorTopology topology = builder.buildGlobalStateTopology();
- final List<StateStore> stateStores = topology.globalStateStores();
-
- assertEquals(1, stateStores.size());
- assertEquals("globalTable", stateStores.get(0).name());
- }
-
- private void doBuildGlobalTopologyWithAllGlobalTables() {
- final ProcessorTopology topology = builder.buildGlobalStateTopology();
-
- final List<StateStore> stateStores = topology.globalStateStores();
- final Set<String> sourceTopics = topology.sourceTopics();
-
- assertEquals(Utils.mkSet("table", "table2"), sourceTopics);
- assertEquals(2, stateStores.size());
- }
-
- @Test
- public void shouldBuildGlobalTopologyWithAllGlobalTables() {
- builder.globalTable("table", "globalTable");
- builder.globalTable("table2", "globalTable2");
-
- doBuildGlobalTopologyWithAllGlobalTables();
- }
-
- @Test
- public void shouldBuildGlobalTopologyWithAllGlobalTablesWithInternalStoreName() {
- builder.globalTable("table");
- builder.globalTable("table2");
-
- doBuildGlobalTopologyWithAllGlobalTables();
- }
-
- @Test
- public void shouldAddGlobalTablesToEachGroup() {
- final String one = "globalTable";
- final String two = "globalTable2";
- final GlobalKTable<String, String> globalTable = builder.globalTable("table", one);
- final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2", two);
-
- builder.table("not-global", "not-global");
-
- final KeyValueMapper<String, String, String> kvMapper = new KeyValueMapper<String, String, String>() {
- @Override
- public String apply(final String key, final String value) {
- return value;
- }
- };
-
- final KStream<String, String> stream = builder.stream("t1");
- stream.leftJoin(globalTable, kvMapper, MockValueJoiner.TOSTRING_JOINER);
- final KStream<String, String> stream2 = builder.stream("t2");
- stream2.leftJoin(globalTable2, kvMapper, MockValueJoiner.TOSTRING_JOINER);
-
- final Map<Integer, Set<String>> nodeGroups = builder.nodeGroups();
- for (Integer groupId : nodeGroups.keySet()) {
- final ProcessorTopology topology = builder.build(groupId);
- final List<StateStore> stateStores = topology.globalStateStores();
- final Set<String> names = new HashSet<>();
- for (StateStore stateStore : stateStores) {
- names.add(stateStore.name());
- }
-
- assertEquals(2, stateStores.size());
- assertTrue(names.contains(one));
- assertTrue(names.contains(two));
- }
- }
-
- @Test
- public void shouldMapStateStoresToCorrectSourceTopics() {
- final KStream<String, String> playEvents = builder.stream("events");
-
- final KTable<String, String> table = builder.table("table-topic", "table-store");
- assertEquals(Collections.singletonList("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store"));
-
- final KStream<String, String> mapped = playEvents.map(MockMapper.<String, String>selectValueKeyValueMapper());
- mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count"));
- assertEquals(Collections.singletonList("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store"));
- assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000003-repartition"), builder.stateStoreNameToSourceTopics().get("count"));
- }
-
- @Test
- public void shouldAddTopicToEarliestAutoOffsetResetList() {
- final String topicName = "topic-1";
-
- builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, topicName);
-
- assertTrue(builder.earliestResetTopicsPattern().matcher(topicName).matches());
- assertFalse(builder.latestResetTopicsPattern().matcher(topicName).matches());
- }
-
- @Test
- public void shouldAddTopicToLatestAutoOffsetResetList() {
- final String topicName = "topic-1";
-
- builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, topicName);
-
- assertTrue(builder.latestResetTopicsPattern().matcher(topicName).matches());
- assertFalse(builder.earliestResetTopicsPattern().matcher(topicName).matches());
- }
-
- @Test
- public void shouldAddTableToEarliestAutoOffsetResetList() {
- final String topicName = "topic-1";
- final String storeName = "test-store";
-
- builder.table(TopologyBuilder.AutoOffsetReset.EARLIEST, topicName, storeName);
-
- assertTrue(builder.earliestResetTopicsPattern().matcher(topicName).matches());
- assertFalse(builder.latestResetTopicsPattern().matcher(topicName).matches());
- }
-
- @Test
- public void shouldAddTableToLatestAutoOffsetResetList() {
- final String topicName = "topic-1";
- final String storeName = "test-store";
-
- builder.table(TopologyBuilder.AutoOffsetReset.LATEST, topicName, storeName);
-
- assertTrue(builder.latestResetTopicsPattern().matcher(topicName).matches());
- assertFalse(builder.earliestResetTopicsPattern().matcher(topicName).matches());
- }
-
- @Test
- public void shouldNotAddTableToOffsetResetLists() {
- final String topicName = "topic-1";
- final String storeName = "test-store";
- final Serde<String> stringSerde = Serdes.String();
-
- builder.table(stringSerde, stringSerde, topicName, storeName);
-
- assertFalse(builder.latestResetTopicsPattern().matcher(topicName).matches());
- assertFalse(builder.earliestResetTopicsPattern().matcher(topicName).matches());
- }
-
- @Test
- public void shouldNotAddRegexTopicsToOffsetResetLists() {
- final Pattern topicPattern = Pattern.compile("topic-\\d");
- final String topic = "topic-5";
-
- builder.stream(topicPattern);
-
- assertFalse(builder.latestResetTopicsPattern().matcher(topic).matches());
- assertFalse(builder.earliestResetTopicsPattern().matcher(topic).matches());
-
- }
-
- @Test
- public void shouldAddRegexTopicToEarliestAutoOffsetResetList() {
- final Pattern topicPattern = Pattern.compile("topic-\\d+");
- final String topicTwo = "topic-500000";
-
- builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, topicPattern);
-
- assertTrue(builder.earliestResetTopicsPattern().matcher(topicTwo).matches());
- assertFalse(builder.latestResetTopicsPattern().matcher(topicTwo).matches());
- }
-
- @Test
- public void shouldAddRegexTopicToLatestAutoOffsetResetList() {
- final Pattern topicPattern = Pattern.compile("topic-\\d+");
- final String topicTwo = "topic-1000000";
-
- builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, topicPattern);
-
- assertTrue(builder.latestResetTopicsPattern().matcher(topicTwo).matches());
- assertFalse(builder.earliestResetTopicsPattern().matcher(topicTwo).matches());
- }
-
- @Test
- public void kStreamTimestampExtractorShouldBeNull() {
- builder.stream("topic");
- final ProcessorTopology processorTopology = builder.build(null);
- assertNull(processorTopology.source("topic").getTimestampExtractor());
- }
-
- @Test
- public void shouldAddTimestampExtractorToStreamWithKeyValSerdePerSource() {
- builder.stream(new MockTimestampExtractor(), null, null, "topic");
- final ProcessorTopology processorTopology = builder.build(null);
- for (final SourceNode sourceNode: processorTopology.sources()) {
- assertThat(sourceNode.getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
- }
- }
-
- @Test
- public void shouldAddTimestampExtractorToStreamWithOffsetResetPerSource() {
- builder.stream(null, new MockTimestampExtractor(), null, null, "topic");
- final ProcessorTopology processorTopology = builder.build(null);
- assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
- }
-
- @Test
- public void shouldAddTimestampExtractorToTablePerSource() {
- builder.table("topic", "store");
- final ProcessorTopology processorTopology = builder.build(null);
- assertNull(processorTopology.source("topic").getTimestampExtractor());
- }
-
- @Test
- public void kTableTimestampExtractorShouldBeNull() {
- builder.table("topic", "store");
- final ProcessorTopology processorTopology = builder.build(null);
- assertNull(processorTopology.source("topic").getTimestampExtractor());
- }
-
- @Test
- public void shouldAddTimestampExtractorToTableWithKeyValSerdePerSource() {
- builder.table(null, new MockTimestampExtractor(), null, null, "topic", "store");
- final ProcessorTopology processorTopology = builder.build(null);
- assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
- }
-}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index c6ee70f..463afb8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -24,15 +24,14 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
@@ -223,12 +222,11 @@ public class KStreamImplTest {
}
@Test
- // TODO: this test should be refactored when we removed KStreamBuilder so that the created Topology contains internal topics as well
public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated() {
- final KStreamBuilder builder = new KStreamBuilder();
- KStream<String, String> kStream = builder.stream(Serdes.String(), Serdes.String(), "topic-1");
- ValueJoiner<String, String, String> valueJoiner = MockValueJoiner.instance(":");
- long windowSize = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream<String, String> kStream = builder.stream("topic-1", stringConsumed);
+ final ValueJoiner<String, String, String> valueJoiner = MockValueJoiner.instance(":");
+ final long windowSize = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
final KStream<String, String> stream = kStream
.map(new KeyValueMapper<String, String, KeyValue<? extends String, ? extends String>>() {
@Override
@@ -244,10 +242,11 @@ public class KStreamImplTest {
Serdes.String()))
.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
- ProcessorTopology processorTopology = builder.setApplicationId("X").build(null);
- SourceNode originalSourceNode = processorTopology.source("topic-1");
+ final ProcessorTopology topology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build();
+
+ final SourceNode originalSourceNode = topology.source("topic-1");
- for (SourceNode sourceNode: processorTopology.sources()) {
+ for (SourceNode sourceNode: topology.sources()) {
if (sourceNode.name().equals(originalSourceNode.name())) {
assertEquals(sourceNode.getTimestampExtractor(), null);
} else {
@@ -427,10 +426,9 @@ public class KStreamImplTest {
null);
}
- @SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerOnPrintIfPrintedIsNull() {
- testStream.print((Printed) null);
+ testStream.print(null);
}
@Test(expected = NullPointerException.class)
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
deleted file mode 100644
index 93b233b..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ /dev/null
@@ -1,752 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.config.TopicConfig;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.TopologyTestDriverWrapper;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
-import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo;
-import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
-import org.apache.kafka.streams.processor.internals.UnwindowedChangelogTopicConfig;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStoreSupplier;
-import org.apache.kafka.test.MockTimestampExtractor;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import static org.apache.kafka.common.utils.Utils.mkList;
-import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-@SuppressWarnings("deprecation")
-public class TopologyBuilderTest {
-
- @Test
- public void shouldAddSourceWithOffsetReset() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- final String earliestTopic = "earliestTopic";
- final String latestTopic = "latestTopic";
-
- builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", earliestTopic);
- builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source2", latestTopic);
-
- assertTrue(builder.earliestResetTopicsPattern().matcher(earliestTopic).matches());
- assertTrue(builder.latestResetTopicsPattern().matcher(latestTopic).matches());
-
- }
-
- @Test
- public void shouldAddSourcePatternWithOffsetReset() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- final String earliestTopicPattern = "earliest.*Topic";
- final String latestTopicPattern = "latest.*Topic";
-
- builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", Pattern.compile(earliestTopicPattern));
- builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source2", Pattern.compile(latestTopicPattern));
-
- assertTrue(builder.earliestResetTopicsPattern().matcher("earliestTestTopic").matches());
- assertTrue(builder.latestResetTopicsPattern().matcher("latestTestTopic").matches());
- }
-
- @Test
- public void shouldAddSourceWithoutOffsetReset() {
- final TopologyBuilder builder = new TopologyBuilder();
- final Serde<String> stringSerde = Serdes.String();
- final Pattern expectedPattern = Pattern.compile("test-topic");
-
- builder.addSource("source", stringSerde.deserializer(), stringSerde.deserializer(), "test-topic");
-
- assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
- assertEquals(builder.earliestResetTopicsPattern().pattern(), "");
- assertEquals(builder.latestResetTopicsPattern().pattern(), "");
- }
-
- @Test
- public void shouldAddPatternSourceWithoutOffsetReset() {
- final TopologyBuilder builder = new TopologyBuilder();
- final Serde<String> stringSerde = Serdes.String();
- final Pattern expectedPattern = Pattern.compile("test-.*");
-
- builder.addSource("source", stringSerde.deserializer(), stringSerde.deserializer(), Pattern.compile("test-.*"));
-
- assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
- assertEquals(builder.earliestResetTopicsPattern().pattern(), "");
- assertEquals(builder.latestResetTopicsPattern().pattern(), "");
- }
-
- @Test
- public void shouldNotAllowOffsetResetSourceWithoutTopics() {
- final TopologyBuilder builder = new TopologyBuilder();
- final Serde<String> stringSerde = Serdes.String();
-
- try {
- builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer());
- fail("Should throw TopologyBuilderException with no topics");
- } catch (TopologyBuilderException tpe) {
- //no-op
- }
- }
-
- @Test
- public void shouldNotAllowOffsetResetSourceWithDuplicateSourceName() {
- final TopologyBuilder builder = new TopologyBuilder();
- final Serde<String> stringSerde = Serdes.String();
-
- builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-1");
- try {
- builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-2");
- fail("Should throw TopologyBuilderException for duplicate source name");
- } catch (TopologyBuilderException tpe) {
- //no-op
- }
- }
-
-
-
- @Test(expected = TopologyBuilderException.class)
- public void testAddSourceWithSameName() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("source", "topic-1");
- builder.addSource("source", "topic-2");
- }
-
- @Test(expected = TopologyBuilderException.class)
- public void testAddSourceWithSameTopic() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("source", "topic-1");
- builder.addSource("source-2", "topic-1");
- }
-
- @Test(expected = TopologyBuilderException.class)
- public void testAddProcessorWithSameName() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("source", "topic-1");
- builder.addProcessor("processor", new MockProcessorSupplier(), "source");
- builder.addProcessor("processor", new MockProcessorSupplier(), "source");
- }
-
- @Test(expected = TopologyBuilderException.class)
- public void testAddProcessorWithWrongParent() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addProcessor("processor", new MockProcessorSupplier(), "source");
- }
-
- @Test(expected = TopologyBuilderException.class)
- public void testAddProcessorWithSelfParent() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addProcessor("processor", new MockProcessorSupplier(), "processor");
- }
-
- @Test(expected = TopologyBuilderException.class)
- public void testAddSinkWithSameName() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("source", "topic-1");
- builder.addSink("sink", "topic-2", "source");
- builder.addSink("sink", "topic-3", "source");
- }
-
- @Test(expected = TopologyBuilderException.class)
- public void testAddSinkWithWrongParent() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSink("sink", "topic-2", "source");
- }
-
- @Test(expected = TopologyBuilderException.class)
- public void testAddSinkWithSelfParent() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSink("sink", "topic-2", "sink");
- }
-
- @Test
- public void testAddSinkConnectedWithParent() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("source", "source-topic");
- builder.addSink("sink", "dest-topic", "source");
-
- Map<Integer, Set<String>> nodeGroups = builder.nodeGroups();
- Set<String> nodeGroup = nodeGroups.get(0);
-
- assertTrue(nodeGroup.contains("sink"));
- assertTrue(nodeGroup.contains("source"));
-
- }
-
- @Test
- public void testAddSinkConnectedWithMultipleParent() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("source", "source-topic");
- builder.addSource("sourceII", "source-topicII");
- builder.addSink("sink", "dest-topic", "source", "sourceII");
-
- Map<Integer, Set<String>> nodeGroups = builder.nodeGroups();
- Set<String> nodeGroup = nodeGroups.get(0);
-
- assertTrue(nodeGroup.contains("sink"));
- assertTrue(nodeGroup.contains("source"));
- assertTrue(nodeGroup.contains("sourceII"));
-
- }
-
- @Test
- public void testSourceTopics() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.setApplicationId("X");
- builder.addSource("source-1", "topic-1");
- builder.addSource("source-2", "topic-2");
- builder.addSource("source-3", "topic-3");
- builder.addInternalTopic("topic-3");
-
- Pattern expectedPattern = Pattern.compile("X-topic-3|topic-1|topic-2");
-
- assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
- }
-
- @Test
- public void testPatternSourceTopic() {
- final TopologyBuilder builder = new TopologyBuilder();
- Pattern expectedPattern = Pattern.compile("topic-\\d");
- builder.addSource("source-1", expectedPattern);
- assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
- }
-
- @Test
- public void testAddMoreThanOnePatternSourceNode() {
- final TopologyBuilder builder = new TopologyBuilder();
- Pattern expectedPattern = Pattern.compile("topics[A-Z]|.*-\\d");
- builder.addSource("source-1", Pattern.compile("topics[A-Z]"));
- builder.addSource("source-2", Pattern.compile(".*-\\d"));
- assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
- }
-
- @Test
- public void testSubscribeTopicNameAndPattern() {
- final TopologyBuilder builder = new TopologyBuilder();
- Pattern expectedPattern = Pattern.compile("topic-bar|topic-foo|.*-\\d");
- builder.addSource("source-1", "topic-foo", "topic-bar");
- builder.addSource("source-2", Pattern.compile(".*-\\d"));
- assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());
- }
-
- @Test(expected = TopologyBuilderException.class)
- public void testPatternMatchesAlreadyProvidedTopicSource() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.addSource("source-1", "foo");
- builder.addSource("source-2", Pattern.compile("f.*"));
- }
-
- @Test(expected = TopologyBuilderException.class)
- public void testNamedTopicMatchesAlreadyProvidedPattern() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.addSource("source-1", Pattern.compile("f.*"));
- builder.addSource("source-2", "foo");
- }
-
- @Test(expected = TopologyBuilderException.class)
- public void testAddStateStoreWithNonExistingProcessor() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
- }
-
- @Test(expected = TopologyBuilderException.class)
- public void testAddStateStoreWithSource() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("source-1", "topic-1");
- builder.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
- }
-
- @Test(expected = TopologyBuilderException.class)
- public void testAddStateStoreWithSink() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSink("sink-1", "topic-1");
- builder.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
- }
-
- @Test(expected = TopologyBuilderException.class)
- public void testAddStateStoreWithDuplicates() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addStateStore(new MockStateStoreSupplier("store", false));
- builder.addStateStore(new MockStateStoreSupplier("store", false));
- }
-
- @Test
- public void testAddStateStore() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
- builder.addStateStore(supplier);
- builder.setApplicationId("X");
- builder.addSource("source-1", "topic-1");
- builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
-
- assertEquals(0, builder.build(null).stateStores().size());
-
- builder.connectProcessorAndStateStores("processor-1", "store-1");
-
- List<StateStore> suppliers = builder.build(null).stateStores();
- assertEquals(1, suppliers.size());
- assertEquals(supplier.name(), suppliers.get(0).name());
- }
-
- @Test
- public void testTopicGroups() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.setApplicationId("X");
- builder.addInternalTopic("topic-1x");
- builder.addSource("source-1", "topic-1", "topic-1x");
- builder.addSource("source-2", "topic-2");
- builder.addSource("source-3", "topic-3");
- builder.addSource("source-4", "topic-4");
- builder.addSource("source-5", "topic-5");
-
- builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
-
- builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
- builder.copartitionSources(mkList("source-1", "source-2"));
-
- builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
-
- Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
-
- Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
- expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
- expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
- expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
-
- assertEquals(3, topicGroups.size());
- assertEquals(expectedTopicGroups, topicGroups);
-
- Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
-
- assertEquals(mkSet(mkSet("topic-1", "X-topic-1x", "topic-2")), new HashSet<>(copartitionGroups));
- }
-
- @Test
- public void testTopicGroupsByStateStore() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.setApplicationId("X");
- builder.addSource("source-1", "topic-1", "topic-1x");
- builder.addSource("source-2", "topic-2");
- builder.addSource("source-3", "topic-3");
- builder.addSource("source-4", "topic-4");
- builder.addSource("source-5", "topic-5");
-
- builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
- builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2");
- builder.addStateStore(new MockStateStoreSupplier("store-1", false), "processor-1", "processor-2");
-
- builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3");
- builder.addProcessor("processor-4", new MockProcessorSupplier(), "source-4");
- builder.addStateStore(new MockStateStoreSupplier("store-2", false), "processor-3", "processor-4");
-
- builder.addProcessor("processor-5", new MockProcessorSupplier(), "source-5");
- StateStoreSupplier supplier = new MockStateStoreSupplier("store-3", false);
- builder.addStateStore(supplier);
- builder.connectProcessorAndStateStores("processor-5", "store-3");
-
- Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
-
- Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
- final String store1 = ProcessorStateManager.storeChangelogTopic("X", "store-1");
- final String store2 = ProcessorStateManager.storeChangelogTopic("X", "store-2");
- final String store3 = ProcessorStateManager.storeChangelogTopic("X", "store-3");
- expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"),
- Collections.<String, InternalTopicConfig>emptyMap(),
- Collections.singletonMap(store1, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store1, Collections.<String, String>emptyMap()))));
- expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"),
- Collections.<String, InternalTopicConfig>emptyMap(),
- Collections.singletonMap(store2, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store2, Collections.<String, String>emptyMap()))));
- expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"),
- Collections.<String, InternalTopicConfig>emptyMap(),
- Collections.singletonMap(store3, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store3, Collections.<String, String>emptyMap()))));
-
- assertEquals(3, topicGroups.size());
- assertEquals(expectedTopicGroups, topicGroups);
- }
-
- @Test
- public void testBuild() {
- final TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("source-1", "topic-1", "topic-1x");
- builder.addSource("source-2", "topic-2");
- builder.addSource("source-3", "topic-3");
- builder.addSource("source-4", "topic-4");
- builder.addSource("source-5", "topic-5");
-
- builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
- builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
- builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
-
- builder.setApplicationId("X");
- ProcessorTopology topology0 = builder.build(0);
- ProcessorTopology topology1 = builder.build(1);
- ProcessorTopology topology2 = builder.build(2);
-
- assertEquals(mkSet("source-1", "source-2", "processor-1", "processor-2"), nodeNames(topology0.processors()));
- assertEquals(mkSet("source-3", "source-4", "processor-3"), nodeNames(topology1.processors()));
- assertEquals(mkSet("source-5"), nodeNames(topology2.processors()));
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldNotAllowNullNameWhenAddingSink() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.addSink(null, "topic");
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldNotAllowNullTopicWhenAddingSink() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.addSink("name", null);
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldNotAllowNullNameWhenAddingProcessor() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.addProcessor(null, new ProcessorSupplier() {
- @Override
- public Processor get() {
- return null;
- }
- });
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldNotAllowNullProcessorSupplier() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.addProcessor("name", null);
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldNotAllowNullNameWhenAddingSource() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.addSource(null, Pattern.compile(".*"));
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.connectProcessorAndStateStores(null, "store");
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldNotAddNullInternalTopic() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.addInternalTopic(null);
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldNotSetApplicationIdToNull() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.setApplicationId(null);
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldNotAddNullStateStoreSupplier() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.addStateStore(null);
- }
-
- private Set<String> nodeNames(Collection<ProcessorNode> nodes) {
- Set<String> nodeNames = new HashSet<>();
- for (ProcessorNode node : nodes) {
- nodeNames.add(node.name());
- }
- return nodeNames;
- }
-
- @Test
- public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.addSource("source", "topic");
- builder.addProcessor("processor", new MockProcessorSupplier(), "source");
- builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
- final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
- assertEquals(1, stateStoreNameToSourceTopic.size());
- assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store"));
- }
-
- @Test
- public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.addSource("source", "topic");
- builder.addProcessor("processor", new MockProcessorSupplier(), "source");
- builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
- final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
- assertEquals(1, stateStoreNameToSourceTopic.size());
- assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store"));
- }
-
- @Test
- public void shouldCorrectlyMapStateStoreToInternalTopics() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.setApplicationId("appId");
- builder.addInternalTopic("internal-topic");
- builder.addSource("source", "internal-topic");
- builder.addProcessor("processor", new MockProcessorSupplier(), "source");
- builder.addStateStore(new MockStateStoreSupplier("store", false), "processor");
- final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
- assertEquals(1, stateStoreNameToSourceTopic.size());
- assertEquals(Collections.singletonList("appId-internal-topic"), stateStoreNameToSourceTopic.get("store"));
- }
-
- @Test
- public void shouldAddInternalTopicConfigForNonWindowStores() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.setApplicationId("appId");
- builder.addSource("source", "topic");
- builder.addProcessor("processor", new MockProcessorSupplier(), "source");
- builder.addStateStore(new MockStateStoreSupplier("store", true), "processor");
- final Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
- final TopicsInfo topicsInfo = topicGroups.values().iterator().next();
- final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
- final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000);
- assertEquals(1, properties.size());
- assertEquals("appId-store-changelog", topicConfig.name());
- }
-
- @Test
- public void shouldAddInternalTopicConfigForRepartitionTopics() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.setApplicationId("appId");
- builder.addInternalTopic("foo");
- builder.addSource("source", "foo");
- final TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
- final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
- final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000);
- assertEquals(5, properties.size());
- assertEquals(String.valueOf(Long.MAX_VALUE), properties.get(TopicConfig.RETENTION_MS_CONFIG));
- assertEquals("appId-foo", topicConfig.name());
- }
-
- @Test
- public void shouldThrowOnUnassignedStateStoreAccess() {
- final String sourceNodeName = "source";
- final String goodNodeName = "goodGuy";
- final String badNodeName = "badGuy";
-
- final Properties config = new Properties();
- config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
- config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
- config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-
- final TopologyBuilder builder = new TopologyBuilder();
- builder.addSource(sourceNodeName, "topic")
- .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName)
- .addStateStore(new MockStateStoreSupplier(LocalMockProcessorSupplier.STORE_NAME, false), goodNodeName)
- .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
- try {
- final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, config);
- driver.pipeInput(new ConsumerRecord<>("topic", 0, 0L, new byte[] {}, new byte[] {}));
- fail("Should have thrown StreamsException");
- } catch (final StreamsException e) {
- final String error = e.toString();
- final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName;
-
- assertThat(error, equalTo(expectedMessage));
- }
- }
-
- private static class LocalMockProcessorSupplier implements ProcessorSupplier {
- final static String STORE_NAME = "store";
-
- @Override
- public Processor get() {
- return new Processor() {
- @Override
- public void init(ProcessorContext context) {
- context.getStateStore(STORE_NAME);
- }
-
- @Override
- public void process(Object key, Object value) {
- }
-
- @Override
- public void close() {
- }
- };
- }
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.addSource("source-1", "topic-foo");
- builder.addSource("source-2", Pattern.compile("topic-[A-C]"));
- builder.addSource("source-3", Pattern.compile("topic-\\d"));
-
- StreamsPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamsPartitionAssignor.SubscriptionUpdates();
- Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
- updatedTopicsField.setAccessible(true);
-
- Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates);
-
- updatedTopics.add("topic-B");
- updatedTopics.add("topic-3");
- updatedTopics.add("topic-A");
-
- builder.updateSubscriptions(subscriptionUpdates, null);
- builder.setApplicationId("test-id");
-
- Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
- assertTrue(topicGroups.get(0).sourceTopics.contains("topic-foo"));
- assertTrue(topicGroups.get(1).sourceTopics.contains("topic-A"));
- assertTrue(topicGroups.get(1).sourceTopics.contains("topic-B"));
- assertTrue(topicGroups.get(2).sourceTopics.contains("topic-3"));
-
- }
-
- @Test
- public void shouldAddTimestampExtractorPerSource() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.addSource(new MockTimestampExtractor(), "source", "topic");
- final ProcessorTopology processorTopology = builder.build(null);
- assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
- }
-
- @Test
- public void shouldAddTimestampExtractorWithOffsetResetPerSource() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.addSource(null, new MockTimestampExtractor(), "source", "topic");
- final ProcessorTopology processorTopology = builder.build(null);
- assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
- }
-
- @Test
- public void shouldAddTimestampExtractorWithPatternPerSource() {
- final TopologyBuilder builder = new TopologyBuilder();
- final Pattern pattern = Pattern.compile("t.*");
- builder.addSource(new MockTimestampExtractor(), "source", pattern);
- final ProcessorTopology processorTopology = builder.build(null);
- assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
- }
-
- @Test
- public void shouldAddTimestampExtractorWithOffsetResetAndPatternPerSource() {
- final TopologyBuilder builder = new TopologyBuilder();
- final Pattern pattern = Pattern.compile("t.*");
- builder.addSource(null, new MockTimestampExtractor(), "source", pattern);
- final ProcessorTopology processorTopology = builder.build(null);
- assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
- }
-
- @Test
- public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesPerSource() {
- final TopologyBuilder builder = new TopologyBuilder();
- builder.addSource(null, "source", new MockTimestampExtractor(), null, null, "topic");
- final ProcessorTopology processorTopology = builder.build(null);
- assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
- }
-
- @Test
- public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesAndPatternPerSource() {
- final TopologyBuilder builder = new TopologyBuilder();
- final Pattern pattern = Pattern.compile("t.*");
- builder.addSource(null, "source", new MockTimestampExtractor(), null, null, pattern);
- final ProcessorTopology processorTopology = builder.build(null);
- assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception {
-
- final TopologyBuilder topologyBuilder = new TopologyBuilder()
- .addSource("ingest", Pattern.compile("topic-\\d+"))
- .addProcessor("my-processor", new MockProcessorSupplier(), "ingest")
- .addStateStore(new MockStateStoreSupplier("testStateStore", false), "my-processor");
-
- final StreamsPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamsPartitionAssignor.SubscriptionUpdates();
- final Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
- updatedTopicsField.setAccessible(true);
-
- final Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates);
-
- updatedTopics.add("topic-2");
- updatedTopics.add("topic-3");
- updatedTopics.add("topic-A");
-
- topologyBuilder.updateSubscriptions(subscriptionUpdates, "test-thread");
- topologyBuilder.setApplicationId("test-app");
-
- Map<String, List<String>> stateStoreAndTopics = topologyBuilder.stateStoreNameToSourceTopics();
- List<String> topics = stateStoreAndTopics.get("testStateStore");
-
- assertTrue("Expected to contain two topics", topics.size() == 2);
-
- assertTrue(topics.contains("topic-2"));
- assertTrue(topics.contains("topic-3"));
- assertFalse(topics.contains("topic-A"));
- }
-
- @SuppressWarnings("unchecked")
- @Test(expected = TopologyBuilderException.class)
- public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
- final String sameNameForSourceAndProcessor = "sameName";
- new TopologyBuilder()
- .addGlobalStore(new MockStateStoreSupplier("anyName", false, false),
- sameNameForSourceAndProcessor,
- null,
- null,
- "anyTopicName",
- sameNameForSourceAndProcessor,
- new MockProcessorSupplier());
- }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
index bbc59fa..b8221d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.errors.TopologyException;
import org.junit.Before;
import org.junit.Test;
@@ -46,14 +46,14 @@ public class CopartitionedTopicsValidatorTest {
partitions.put(new TopicPartition("second", 1), new PartitionInfo("second", 1, null, null, null));
}
- @Test(expected = TopologyBuilderException.class)
+ @Test(expected = TopologyException.class)
public void shouldThrowTopologyBuilderExceptionIfNoPartitionsFoundForCoPartitionedTopic() {
validator.validate(Collections.singleton("topic"),
Collections.<String, StreamsPartitionAssignor.InternalTopicMetadata>emptyMap(),
cluster);
}
- @Test(expected = TopologyBuilderException.class)
+ @Test(expected = TopologyException.class)
public void shouldThrowTopologyBuilderExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch() {
partitions.remove(new TopicPartition("second", 0));
validator.validate(Utils.mkSet("first", "second"),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index c73593e..d8e66b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -19,22 +19,18 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.TopologyTestDriverWrapper;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
-import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStoreBuilder;
import org.apache.kafka.test.MockTimestampExtractor;
-import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import java.lang.reflect.Field;
@@ -45,13 +41,11 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import static org.apache.kafka.common.utils.Utils.mkList;
import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
@@ -64,7 +58,7 @@ public class InternalTopologyBuilderTest {
private final Serde<String> stringSerde = Serdes.String();
private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
- private final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"), Serdes.ByteArray(), Serdes.ByteArray());
+ private final StoreBuilder storeBuilder = new MockStoreBuilder("store", false);
@Test
public void shouldAddSourceWithOffsetReset() {
@@ -355,14 +349,14 @@ public class InternalTopologyBuilderTest {
builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2");
- builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store-1"), Serdes.ByteArray(), Serdes.ByteArray()), "processor-1", "processor-2");
+ builder.addStateStore(new MockStoreBuilder("store-1", false), "processor-1", "processor-2");
builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3");
builder.addProcessor("processor-4", new MockProcessorSupplier(), "source-4");
- builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store-2"), Serdes.ByteArray(), Serdes.ByteArray()), "processor-3", "processor-4");
+ builder.addStateStore(new MockStoreBuilder("store-2", false), "processor-3", "processor-4");
builder.addProcessor("processor-5", new MockProcessorSupplier(), "source-5");
- builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store-3"), Serdes.ByteArray(), Serdes.ByteArray()));
+ builder.addStateStore(new MockStoreBuilder("store-3", false));
builder.connectProcessorAndStateStores("processor-5", "store-3");
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
@@ -553,56 +547,6 @@ public class InternalTopologyBuilderTest {
assertTrue(topicConfig instanceof RepartitionTopicConfig);
}
- @Test
- public void shouldThrowOnUnassignedStateStoreAccess() {
- final String sourceNodeName = "source";
- final String goodNodeName = "goodGuy";
- final String badNodeName = "badGuy";
-
- final Properties config = new Properties();
- config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
- config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
- config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
-
- builder.addSource(null, sourceNodeName, null, null, null, "topic");
- builder.addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
- builder.addStateStore(
- Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(LocalMockProcessorSupplier.STORE_NAME), Serdes.String(), Serdes.String()),
- goodNodeName);
- builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
-
- try {
- new TopologyTestDriverWrapper(builder, config);
- fail("Should have throw StreamsException");
- } catch (final StreamsException e) {
- final String error = e.toString();
- final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName;
-
- assertThat(error, equalTo(expectedMessage));
- }
- }
-
- private static class LocalMockProcessorSupplier implements ProcessorSupplier {
- final static String STORE_NAME = "store";
-
- @Override
- public Processor get() {
- return new Processor() {
- @Override
- public void init(final ProcessorContext context) {
- context.getStateStore(STORE_NAME);
- }
-
- @Override
- public void process(final Object key, final Object value) { }
-
- @Override
- public void close() {
- }
- };
- }
- }
-
@SuppressWarnings("unchecked")
@Test
public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 9f2b242..e247647 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -24,7 +24,8 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.TopologyTestDriverWrapper;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
@@ -67,7 +68,7 @@ public class ProcessorTopologyTest {
private final MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER, 0L);
- private TopologyTestDriverWrapper driver;
+ private TopologyTestDriver driver;
private final Properties props = new Properties();
@Before
@@ -122,7 +123,7 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingSimpleTopology() {
int partition = 10;
- driver = new TopologyTestDriverWrapper(createSimpleTopology(partition), props);
+ driver = new TopologyTestDriver(createSimpleTopology(partition), props);
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
assertNoOutputRecord(OUTPUT_TOPIC_2);
@@ -143,7 +144,7 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingMultiplexingTopology() {
- driver = new TopologyTestDriverWrapper(createMultiplexingTopology(), props);
+ driver = new TopologyTestDriver(createMultiplexingTopology(), props);
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
@@ -165,7 +166,7 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingMultiplexByNameTopology() {
- driver = new TopologyTestDriverWrapper(createMultiplexByNameTopology(), props);
+ driver = new TopologyTestDriver(createMultiplexByNameTopology(), props);
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
@@ -188,7 +189,7 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingStatefulTopology() {
String storeName = "entries";
- driver = new TopologyTestDriverWrapper(createStatefulTopology(storeName), props);
+ driver = new TopologyTestDriver(createStatefulTopology(storeName), props);
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
@@ -211,7 +212,7 @@ public class ProcessorTopologyTest {
topology.addGlobalStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String()).withLoggingDisabled(),
global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor(storeName)));
- driver = new TopologyTestDriverWrapper(topology.getInternalBuilder(), props);
+ driver = new TopologyTestDriver(topology, props);
final KeyValueStore<String, String> globalStore = driver.getKeyValueStore(storeName);
driver.pipeInput(recordFactory.create(topic, "key1", "value1"));
driver.pipeInput(recordFactory.create(topic, "key2", "value2"));
@@ -222,7 +223,7 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingSimpleMultiSourceTopology() {
final int partition = 10;
- driver = new TopologyTestDriverWrapper(createSimpleMultiSourceTopology(partition), props);
+ driver = new TopologyTestDriver(createSimpleMultiSourceTopology(partition), props);
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
@@ -235,7 +236,7 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingForwardToSourceTopology() {
- driver = new TopologyTestDriverWrapper(createForwardToSourceTopology(), props);
+ driver = new TopologyTestDriver(createForwardToSourceTopology(), props);
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
@@ -246,7 +247,7 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingInternalRepartitioningTopology() {
- driver = new TopologyTestDriverWrapper(createInternalRepartitioningTopology(), props);
+ driver = new TopologyTestDriver(createInternalRepartitioningTopology(), props);
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1"));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2"));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3"));
@@ -257,7 +258,7 @@ public class ProcessorTopologyTest {
@Test
public void testDrivingInternalRepartitioningForwardingTimestampTopology() {
- driver = new TopologyTestDriverWrapper(createInternalRepartitioningWithValueTimestampTopology(), props);
+ driver = new TopologyTestDriver(createInternalRepartitioningWithValueTimestampTopology(), props);
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1@1000"));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2@2000"));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3@3000"));
@@ -316,7 +317,7 @@ public class ProcessorTopologyTest {
@Test
public void shouldConsiderTimeStamps() {
final int partition = 10;
- driver = new TopologyTestDriverWrapper(createSimpleTopology(partition), props);
+ driver = new TopologyTestDriver(createSimpleTopology(partition), props);
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 30L));
@@ -328,7 +329,7 @@ public class ProcessorTopologyTest {
@Test
public void shouldConsiderModifiedTimeStamps() {
final int partition = 10;
- driver = new TopologyTestDriverWrapper(createTimestampTopology(partition), props);
+ driver = new TopologyTestDriver(createTimestampTopology(partition), props);
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L));
driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key3", "value3", 30L));
@@ -376,95 +377,87 @@ public class ProcessorTopologyTest {
};
}
- private InternalTopologyBuilder createSimpleTopology(final int partition) {
- return ((TopologyWrapper) topology
+ private Topology createSimpleTopology(final int partition) {
+ return topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
.addProcessor("processor", define(new ForwardingProcessor()), "source")
- .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor"))
- .getInternalBuilder();
+ .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
}
- private InternalTopologyBuilder createTimestampTopology(final int partition) {
- return ((TopologyWrapper) topology
+ private Topology createTimestampTopology(final int partition) {
+ return topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
.addProcessor("processor", define(new TimestampProcessor()), "source")
- .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor"))
- .getInternalBuilder();
+ .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
}
- private InternalTopologyBuilder createMultiplexingTopology() {
- return ((TopologyWrapper) topology
+ private Topology createMultiplexingTopology() {
+ return topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
.addProcessor("processor", define(new MultiplexingProcessor(2)), "source")
.addSink("sink1", OUTPUT_TOPIC_1, "processor")
- .addSink("sink2", OUTPUT_TOPIC_2, "processor"))
- .getInternalBuilder();
+ .addSink("sink2", OUTPUT_TOPIC_2, "processor");
}
- private InternalTopologyBuilder createMultiplexByNameTopology() {
- return ((TopologyWrapper) topology
+ private Topology createMultiplexByNameTopology() {
+ return topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
.addProcessor("processor", define(new MultiplexByNameProcessor(2)), "source")
.addSink("sink0", OUTPUT_TOPIC_1, "processor")
- .addSink("sink1", OUTPUT_TOPIC_2, "processor"))
- .getInternalBuilder();
+ .addSink("sink1", OUTPUT_TOPIC_2, "processor");
}
- private InternalTopologyBuilder createStatefulTopology(final String storeName) {
- return ((TopologyWrapper) topology
+ private Topology createStatefulTopology(final String storeName) {
+ return topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
.addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String()), "processor")
- .addSink("counts", OUTPUT_TOPIC_1, "processor"))
- .getInternalBuilder();
+ .addSink("counts", OUTPUT_TOPIC_1, "processor");
}
- private InternalTopologyBuilder createInternalRepartitioningTopology() {
- final InternalTopologyBuilder internalTopologyBuilder = ((TopologyWrapper) topology
- .addSource("source", INPUT_TOPIC_1)
+ private Topology createInternalRepartitioningTopology() {
+ topology.addSource("source", INPUT_TOPIC_1)
.addSink("sink0", THROUGH_TOPIC_1, "source")
.addSource("source1", THROUGH_TOPIC_1)
- .addSink("sink1", OUTPUT_TOPIC_1, "source1"))
- .getInternalBuilder();
+ .addSink("sink1", OUTPUT_TOPIC_1, "source1");
+ // use wrapper to get the internal topology builder to add internal topic
+ final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
internalTopologyBuilder.addInternalTopic(THROUGH_TOPIC_1);
- return internalTopologyBuilder;
+ return topology;
}
- private InternalTopologyBuilder createInternalRepartitioningWithValueTimestampTopology() {
- final InternalTopologyBuilder internalTopologyBuilder = ((TopologyWrapper) topology
- .addSource("source", INPUT_TOPIC_1)
- .addProcessor("processor", define(new ValueTimestampProcessor()), "source")
- .addSink("sink0", THROUGH_TOPIC_1, "processor")
- .addSource("source1", THROUGH_TOPIC_1)
- .addSink("sink1", OUTPUT_TOPIC_1, "source1"))
- .getInternalBuilder();
+ private Topology createInternalRepartitioningWithValueTimestampTopology() {
+ topology.addSource("source", INPUT_TOPIC_1)
+ .addProcessor("processor", define(new ValueTimestampProcessor()), "source")
+ .addSink("sink0", THROUGH_TOPIC_1, "processor")
+ .addSource("source1", THROUGH_TOPIC_1)
+ .addSink("sink1", OUTPUT_TOPIC_1, "source1");
+ // use wrapper to get the internal topology builder to add internal topic
+ final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
internalTopologyBuilder.addInternalTopic(THROUGH_TOPIC_1);
- return internalTopologyBuilder;
+ return topology;
}
- private InternalTopologyBuilder createForwardToSourceTopology() {
- return ((TopologyWrapper) topology.addSource("source-1", INPUT_TOPIC_1)
+ private Topology createForwardToSourceTopology() {
+ return topology.addSource("source-1", INPUT_TOPIC_1)
.addSink("sink-1", OUTPUT_TOPIC_1, "source-1")
.addSource("source-2", OUTPUT_TOPIC_1)
- .addSink("sink-2", OUTPUT_TOPIC_2, "source-2"))
- .getInternalBuilder();
+ .addSink("sink-2", OUTPUT_TOPIC_2, "source-2");
}
- private InternalTopologyBuilder createSimpleMultiSourceTopology(int partition) {
- return ((TopologyWrapper) topology.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+ private Topology createSimpleMultiSourceTopology(int partition) {
+ return topology.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
.addProcessor("processor-1", define(new ForwardingProcessor()), "source-1")
.addSink("sink-1", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor-1")
.addSource("source-2", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_2)
.addProcessor("processor-2", define(new ForwardingProcessor()), "source-2")
- .addSink("sink-2", OUTPUT_TOPIC_2, constantPartitioner(partition), "processor-2"))
- .getInternalBuilder();
+ .addSink("sink-2", OUTPUT_TOPIC_2, constantPartitioner(partition), "processor-2");
}
-
/**
* A processor that simply forwards all messages to all children.
*/
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 9090012..486b35e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -36,13 +36,12 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockRestoreConsumer;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.MockStateStore;
-import org.apache.kafka.test.MockStateStoreSupplier;
+import org.apache.kafka.test.MockStoreBuilder;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -90,7 +89,7 @@ public class StandbyTaskTest {
private final Set<TopicPartition> topicPartitions = Collections.emptySet();
private final ProcessorTopology topology = ProcessorTopology.withLocalStores(
- Utils.mkList(new MockStateStoreSupplier(storeName1, false).get(), new MockStateStoreSupplier(storeName2, true).get()),
+ Utils.mkList(new MockStoreBuilder(storeName1, false).build(), new MockStoreBuilder(storeName2, true).build()),
new HashMap<String, String>() {
{
put(storeName1, storeChangelogTopicName1);
@@ -100,7 +99,7 @@ public class StandbyTaskTest {
private final TopicPartition globalTopicPartition = new TopicPartition(globalStoreName, 0);
private final Set<TopicPartition> ktablePartitions = Utils.mkSet(globalTopicPartition);
private final ProcessorTopology ktableTopology = ProcessorTopology.withLocalStores(
- Collections.<StateStore>singletonList(new MockStateStoreSupplier(globalTopicPartition.topic(), true, false).get()),
+ Collections.singletonList(new MockStoreBuilder(globalTopicPartition.topic(), true).withLoggingDisabled().build()),
new HashMap<String, String>() {
{
put(globalStoreName, globalTopicPartition.topic());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 0048e73..9812158 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
@@ -41,10 +40,10 @@ import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStoreBuilder;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Test;
@@ -329,10 +328,10 @@ public class StreamsPartitionAssignorTest {
public void testAssignWithPartialTopology() {
builder.addSource(null, "source1", null, null, null, "topic1");
builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
- builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store1"), Serdes.ByteArray(), Serdes.ByteArray()), "processor1");
+ builder.addStateStore(new MockStoreBuilder("store1", false), "processor1");
builder.addSource(null, "source2", null, null, null, "topic2");
builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
- builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store2"), Serdes.ByteArray(), Serdes.ByteArray()), "processor2");
+ builder.addStateStore(new MockStoreBuilder("store2", false), "processor2");
List<String> topics = Utils.mkList("topic1", "topic2");
Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
@@ -470,11 +469,11 @@ public class StreamsPartitionAssignorTest {
builder.addSource(null, "source2", null, null, null, "topic2");
builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1");
- builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store1"), Serdes.ByteArray(), Serdes.ByteArray()), "processor-1");
+ builder.addStateStore(new MockStoreBuilder("store1", false), "processor-1");
builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2");
- builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store2"), Serdes.ByteArray(), Serdes.ByteArray()), "processor-2");
- builder.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store3"), Serdes.ByteArray(), Serdes.ByteArray()), "processor-2");
+ builder.addStateStore(new MockStoreBuilder("store2", false), "processor-2");
+ builder.addStateStore(new MockStoreBuilder("store3", false), "processor-2");
List<String> topics = Utils.mkList("topic1", "topic2");
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index a061ff2..317ffc9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -49,9 +49,8 @@ public class CompositeReadOnlyKeyValueStoreTest {
private final String storeNameA = "my-storeA";
private StateStoreProviderStub stubProviderTwo;
private KeyValueStore<String, String> stubOneUnderlying;
+ private KeyValueStore<String, String> otherUnderlyingStore;
private CompositeReadOnlyKeyValueStore<String, String> theStore;
- private KeyValueStore<String, String>
- otherUnderlyingStore;
@Before
public void before() {
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index bcb9856..033b68d 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -59,49 +58,6 @@ public class KStreamTestDriver extends ExternalResource {
private ProcessorTopology globalTopology;
private final LogContext logContext = new LogContext("testCache ");
- @Deprecated
- public void setUp(final KStreamBuilder builder) {
- setUp(builder, null, Serdes.ByteArray(), Serdes.ByteArray());
- }
-
- @Deprecated
- public void setUp(final KStreamBuilder builder, final File stateDir) {
- setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray());
- }
-
- @Deprecated
- public void setUp(final KStreamBuilder builder, final File stateDir, final long cacheSize) {
- setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), cacheSize);
- }
-
- @Deprecated
- public void setUp(final KStreamBuilder builder,
- final File stateDir,
- final Serde<?> keySerde,
- final Serde<?> valSerde) {
- setUp(builder, stateDir, keySerde, valSerde, DEFAULT_CACHE_SIZE_BYTES);
- }
-
- @Deprecated
- public void setUp(final KStreamBuilder builder,
- final File stateDir,
- final Serde<?> keySerde,
- final Serde<?> valSerde,
- final long cacheSize) {
- builder.setApplicationId("TestDriver");
- topology = builder.build(null);
- globalTopology = builder.buildGlobalStateTopology();
- final ThreadCache cache = new ThreadCache(logContext, cacheSize, new MockStreamsMetrics(new Metrics()));
- context = new InternalMockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
- context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
- // init global topology first as it will add stores to the
- // store map that are required for joins etc.
- if (globalTopology != null) {
- initTopology(globalTopology, globalTopology.globalStateStores());
- }
- initTopology(topology, topology.stateStores());
- }
-
public void setUp(final StreamsBuilder builder) {
setUp(builder, null, Serdes.ByteArray(), Serdes.ByteArray());
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
deleted file mode 100644
index 042bb63..0000000
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.test;
-
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-import java.util.Collections;
-import java.util.Map;
-
-@Deprecated
-public class MockStateStoreSupplier implements StateStoreSupplier {
- private String name;
- private boolean persistent;
- private boolean loggingEnabled;
-
- public MockStateStoreSupplier(final String name,
- final boolean persistent) {
- this(name, persistent, true);
- }
-
- public MockStateStoreSupplier(final String name,
- final boolean persistent,
- final boolean loggingEnabled) {
- this.name = name;
- this.persistent = persistent;
- this.loggingEnabled = loggingEnabled;
- }
-
- @Override
- public String name() {
- return name;
- }
-
- @Override
- public StateStore get() {
- return new MockStateStore(name, persistent);
- }
-
- @Override
- public Map<String, String> logConfig() {
- return Collections.emptyMap();
- }
-
- @Override
- public boolean loggingEnabled() {
- return loggingEnabled;
- }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStoreBuilder.java
similarity index 57%
rename from streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
rename to streams/src/test/java/org/apache/kafka/test/MockStoreBuilder.java
index 3495352..41aa239 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStoreBuilder.java
@@ -14,18 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.state.internals;
+package org.apache.kafka.test;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.internals.AbstractStoreBuilder;
-/**
- * A windowed state store supplier that extends the {@link org.apache.kafka.streams.processor.StateStoreSupplier} interface.
- *
- * @param <T> State store type
- */
-@Deprecated
-public interface WindowStoreSupplier<T extends StateStore> extends org.apache.kafka.streams.processor.StateStoreSupplier<T> {
+public class MockStoreBuilder extends AbstractStoreBuilder<Integer, byte[], StateStore> {
+
+ private final boolean persistent;
- // window retention period in milli-second
- long retentionPeriod();
+ public MockStoreBuilder(final String storeName, final boolean persistent) {
+ super(storeName, Serdes.Integer(), Serdes.ByteArray(), new MockTime());
+
+ this.persistent = persistent;
+ }
+
+ @Override
+ public StateStore build() {
+ return new MockStateStore(name, persistent);
+ }
}
+
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.