You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/31 22:29:11 UTC
[8/8] kafka git commit: KAFKA-5671: Add StreamsBuilder and Deprecate
KStreamBuilder
KAFKA-5671: Add StreamsBuilder and Deprecate KStreamBuilder
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy <da...@gmail.com>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #3602 from mjsax/kafka-5671-add-streamsbuilder
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/da220557
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/da220557
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/da220557
Branch: refs/heads/trunk
Commit: da2205578be3228ce40eb5e59f6bbcb0c8da0aba
Parents: 1844bf2
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Mon Jul 31 15:28:59 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Jul 31 15:28:59 2017 -0700
----------------------------------------------------------------------
docs/streams/developer-guide.html | 18 +-
docs/streams/index.html | 6 +-
.../examples/pageview/PageViewTypedDemo.java | 8 +-
.../examples/pageview/PageViewUntypedDemo.java | 8 +-
.../kafka/streams/examples/pipe/PipeDemo.java | 6 +-
.../examples/wordcount/WordCountDemo.java | 6 +-
.../kafka/streams/KafkaClientSupplier.java | 2 +-
.../org/apache/kafka/streams/KafkaStreams.java | 24 +-
.../apache/kafka/streams/StreamsBuilder.java | 1211 ++++++++++++++++++
.../org/apache/kafka/streams/StreamsConfig.java | 2 +-
.../java/org/apache/kafka/streams/Topology.java | 6 +
.../kafka/streams/kstream/GlobalKTable.java | 5 +-
.../apache/kafka/streams/kstream/KStream.java | 18 +-
.../kafka/streams/kstream/KStreamBuilder.java | 209 +--
.../apache/kafka/streams/kstream/KTable.java | 54 +-
.../kstream/internals/AbstractStream.java | 15 +-
.../internals/InternalStreamsBuilder.java | 203 +++
.../kstream/internals/KGroupedStreamImpl.java | 30 +-
.../kstream/internals/KGroupedTableImpl.java | 25 +-
.../streams/kstream/internals/KStreamImpl.java | 461 +++----
.../kstream/internals/KStreamJoinWindow.java | 6 +-
.../streams/kstream/internals/KTableImpl.java | 188 +--
.../streams/processor/PartitionGrouper.java | 2 +-
.../streams/processor/TopologyBuilder.java | 63 +-
.../internals/StreamPartitionAssignor.java | 7 +-
.../apache/kafka/streams/KafkaStreamsTest.java | 57 +-
.../kafka/streams/StreamsBuilderTest.java | 123 ++
.../org/apache/kafka/streams/TopologyTest.java | 1 +
.../streams/integration/EosIntegrationTest.java | 14 +-
.../integration/FanoutIntegrationTest.java | 6 +-
.../GlobalKTableIntegrationTest.java | 8 +-
.../InternalTopicIntegrationTest.java | 14 +-
.../integration/JoinIntegrationTest.java | 8 +-
.../KStreamAggregationDedupIntegrationTest.java | 13 +-
.../KStreamAggregationIntegrationTest.java | 8 +-
.../KStreamKTableJoinIntegrationTest.java | 6 +-
.../integration/KStreamRepartitionJoinTest.java | 9 +-
...eamsFineGrainedAutoResetIntegrationTest.java | 25 +-
.../KTableKTableJoinIntegrationTest.java | 6 +-
.../QueryableStateIntegrationTest.java | 32 +-
.../integration/RegexSourceIntegrationTest.java | 37 +-
.../integration/ResetIntegrationTest.java | 15 +-
.../streams/kstream/KStreamBuilderTest.java | 9 +-
.../kstream/internals/AbstractStreamTest.java | 16 +-
.../internals/GlobalKTableJoinsTest.java | 4 +-
.../internals/InternalStreamsBuilderTest.java | 370 ++++++
.../internals/KGroupedStreamImplTest.java | 24 +-
.../internals/KGroupedTableImplTest.java | 4 +-
.../kstream/internals/KStreamBranchTest.java | 7 +-
.../kstream/internals/KStreamFilterTest.java | 8 +-
.../kstream/internals/KStreamFlatMapTest.java | 10 +-
.../internals/KStreamFlatMapValuesTest.java | 4 +-
.../kstream/internals/KStreamForeachTest.java | 13 +-
.../kstream/internals/KStreamImplTest.java | 29 +-
.../internals/KStreamKStreamJoinTest.java | 22 +-
.../internals/KStreamKStreamLeftJoinTest.java | 10 +-
.../internals/KStreamKTableJoinTest.java | 6 +-
.../internals/KStreamKTableLeftJoinTest.java | 6 +-
.../kstream/internals/KStreamMapTest.java | 6 +-
.../kstream/internals/KStreamMapValuesTest.java | 4 +-
.../kstream/internals/KStreamPeekTest.java | 7 +-
.../kstream/internals/KStreamPrintTest.java | 13 +-
.../kstream/internals/KStreamSelectKeyTest.java | 6 +-
.../kstream/internals/KStreamTransformTest.java | 6 +-
.../internals/KStreamTransformValuesTest.java | 4 +-
.../internals/KStreamWindowAggregateTest.java | 6 +-
.../kstream/internals/KTableAggregateTest.java | 21 +-
.../kstream/internals/KTableFilterTest.java | 46 +-
.../kstream/internals/KTableForeachTest.java | 18 +-
.../kstream/internals/KTableImplTest.java | 50 +-
.../kstream/internals/KTableKTableJoinTest.java | 38 +-
.../internals/KTableKTableLeftJoinTest.java | 16 +-
.../internals/KTableKTableOuterJoinTest.java | 14 +-
.../kstream/internals/KTableMapKeysTest.java | 5 +-
.../kstream/internals/KTableMapValuesTest.java | 22 +-
.../kstream/internals/KTableSourceTest.java | 14 +-
.../kafka/streams/perf/SimpleBenchmark.java | 26 +-
.../kafka/streams/perf/YahooBenchmark.java | 10 +-
.../internals/GlobalStreamThreadTest.java | 2 +-
.../processor/internals/StandbyTaskTest.java | 15 +-
.../internals/StreamPartitionAssignorTest.java | 28 +-
.../processor/internals/StreamThreadTest.java | 185 +--
.../internals/StreamsMetadataStateTest.java | 19 +-
.../streams/tests/BrokerCompatibilityTest.java | 6 +-
.../kafka/streams/tests/EosTestClient.java | 6 +-
.../streams/tests/ShutdownDeadlockTest.java | 6 +-
.../kafka/streams/tests/SmokeTestClient.java | 6 +-
.../apache/kafka/test/KStreamTestDriver.java | 56 +
.../kafka/test/ProcessorTopologyTestDriver.java | 8 +-
89 files changed, 3119 insertions(+), 1057 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index 76c2dc7..f24b17f 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -294,7 +294,7 @@
</p>
<pre class="brush: java;">
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
KStream<String, GenericRecord> source1 = builder.stream("topic1", "topic2");
KTable<String, GenericRecord> source2 = builder.table("topic3", "stateStoreName");
@@ -571,7 +571,7 @@
<pre class="brush: java;">
StreamsConfig config = ...;
- KStreamBuilder builder = ...;
+ StreamsBuilder builder = ...;
KStream<String, String> textLines = ...;
// Define the processing topology (here: WordCount)
@@ -583,7 +583,7 @@
groupedByWord.count("CountsKeyValueStore");
// Start an instance of the topology
- KafkaStreams streams = new KafkaStreams(builder, config);
+ KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
</pre>
@@ -627,7 +627,7 @@
<pre class="brush: java;">
StreamsConfig config = ...;
- KStreamBuilder builder = ...;
+ StreamsBuilder builder = ...;
KStream<String, String> textLines = ...;
// Define the processing topology (here: WordCount)
@@ -850,7 +850,7 @@
// ... further settings may follow here ...
StreamsConfig config = new StreamsConfig(props);
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "word-count-input");
@@ -863,7 +863,7 @@
groupedByWord.count("word-count");
// Start an instance of the topology
- KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+ KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
// Then, create and start the actual RPC service for remote access to this
@@ -1082,8 +1082,8 @@
<pre class="brush: java;">
import org.apache.kafka.streams.KafkaStreams;
+ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
- import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.Topology;
// Use the builders to define the actual processing topology, e.g. to specify
@@ -1094,8 +1094,8 @@
//
// OR
//
- KStreamBuilder builder = ...; // when using the Kafka Streams DSL
- Topology topology = builder.topology();
+ StreamsBuilder builder = ...; // when using the Kafka Streams DSL
+ Topology topology = builder.build();
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/docs/streams/index.html
----------------------------------------------------------------------
diff --git a/docs/streams/index.html b/docs/streams/index.html
index bcaa831..5a6da8e 100644
--- a/docs/streams/index.html
+++ b/docs/streams/index.html
@@ -75,9 +75,9 @@
<pre class="brush: java;">
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
+ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
- import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import java.util.Arrays;
@@ -100,7 +100,7 @@
.count("Counts");
wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");
- KafkaStreams streams = new KafkaStreams(builder, config);
+ KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
@@ -112,9 +112,9 @@
<pre class="brush: java;">
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
+ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
- import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueMapper;
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 74afa8d..0783c65 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -23,14 +23,14 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.StreamsConfig;
import java.util.HashMap;
import java.util.Map;
@@ -89,7 +89,7 @@ public class PageViewTypedDemo {
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
// TODO: the following can be removed with a serialization factory
Map<String, Object> serdeProps = new HashMap<>();
@@ -190,7 +190,7 @@ public class PageViewTypedDemo {
// write to the result topic
regionCount.to(wPageViewByRegionSerde, regionCountSerde, "streams-pageviewstats-typed-output");
- KafkaStreams streams = new KafkaStreams(builder, props);
+ KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// usually the stream application would be running forever,
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index 1a90f68..e930985 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -24,12 +24,12 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.connect.json.JsonDeserializer;
+import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -65,7 +65,7 @@ public class PageViewUntypedDemo {
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
@@ -121,7 +121,7 @@ public class PageViewUntypedDemo {
// write to the result topic
regionCount.to(jsonSerde, jsonSerde, "streams-pageviewstats-untyped-output");
- KafkaStreams streams = new KafkaStreams(builder, props);
+ KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// usually the stream application would be running forever,
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
index 1d672b2..0831e3b 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
@@ -19,8 +19,8 @@ package org.apache.kafka.streams.examples.pipe;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;
@@ -49,11 +49,11 @@ public class PipeDemo {
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
builder.stream("streams-file-input").to("streams-pipe-output");
- final KafkaStreams streams = new KafkaStreams(builder, props);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index 616fc48..e3cf60c 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -60,7 +60,7 @@ public class WordCountDemo {
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("streams-wordcount-input");
@@ -82,7 +82,7 @@ public class WordCountDemo {
// need to override value serde to Long type
counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
- final KafkaStreams streams = new KafkaStreams(builder, props);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
index f733c25..260d58a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
@@ -26,7 +26,7 @@ import java.util.Map;
/**
* {@code KafkaClientSupplier} can be used to provide custom Kafka clients to a {@link KafkaStreams} instance.
*
- * @see KafkaStreams#KafkaStreams(org.apache.kafka.streams.processor.TopologyBuilder, StreamsConfig, KafkaClientSupplier)
+ * @see KafkaStreams#KafkaStreams(org.apache.kafka.streams.Topology, StreamsConfig, KafkaClientSupplier)
*/
public interface KafkaClientSupplier {
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index d7c608a..ec09730 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -33,13 +33,11 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
@@ -89,8 +87,9 @@ import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG
* A Kafka client that allows for performing continuous computation on input coming from one or more input topics and
* sends output to zero, one, or more output topics.
* <p>
- * The computational logic can be specified either by using the {@link TopologyBuilder} to define a DAG topology of
- * {@link Processor}s or by using the {@link KStreamBuilder} which provides the high-level DSL to define transformations.
+ * The computational logic can be specified either by using the {@link Topology} to define a DAG topology of
+ * {@link Processor}s or by using the {@link StreamsBuilder} which provides the high-level DSL to define
+ * transformations.
* <p>
* One {@code KafkaStreams} instance can contain one or more threads specified in the configs for the processing work.
* <p>
@@ -121,8 +120,8 @@ import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG
* streams.start();
* }</pre>
*
- * @see KStreamBuilder
- * @see Topology
+ * @see org.apache.kafka.streams.StreamsBuilder
+ * @see org.apache.kafka.streams.Topology
*/
@InterfaceStability.Evolving
public class KafkaStreams {
@@ -145,7 +144,6 @@ public class KafkaStreams {
private final String logPrefix;
private final StreamsMetadataState streamsMetadataState;
private final StreamsConfig config;
- private StateRestoreListener stateRestoreListener;
private final StateDirectory stateDirectory;
// container states
@@ -406,7 +404,7 @@ public class KafkaStreams {
* @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
*/
@Deprecated
- public KafkaStreams(final TopologyBuilder builder,
+ public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
final Properties props) {
this(builder.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
}
@@ -415,7 +413,7 @@ public class KafkaStreams {
* @deprecated use {@link #KafkaStreams(Topology, StreamsConfig)} instead
*/
@Deprecated
- public KafkaStreams(final TopologyBuilder builder,
+ public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
final StreamsConfig config) {
this(builder.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
}
@@ -424,7 +422,7 @@ public class KafkaStreams {
* @deprecated use {@link #KafkaStreams(Topology, StreamsConfig, KafkaClientSupplier)} instead
*/
@Deprecated
- public KafkaStreams(final TopologyBuilder builder,
+ public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier) {
this(builder.internalTopologyBuilder, config, clientSupplier);
@@ -739,7 +737,7 @@ public class KafkaStreams {
/**
* Produce a string representation containing useful information about this {@code KafkaStream} instance such as
* thread IDs, task IDs, and a representation of the topology DAG including {@link StateStore}s (cf.
- * {@link TopologyBuilder} and {@link KStreamBuilder}).
+ * {@link Topology} and {@link StreamsBuilder}).
*
* @return A string representation of the Kafka Streams instance.
*/
@@ -751,7 +749,7 @@ public class KafkaStreams {
/**
* Produce a string representation containing useful information about this {@code KafkaStream} instance such as
* thread IDs, task IDs, and a representation of the topology DAG including {@link StateStore}s (cf.
- * {@link TopologyBuilder} and {@link KStreamBuilder}).
+ * {@link Topology} and {@link StreamsBuilder}).
*
* @param indent the top-level indent for each line
* @return A string representation of the Kafka Streams instance.
@@ -882,7 +880,7 @@ public class KafkaStreams {
* If a {@link StreamPartitioner custom partitioner} has been
* {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link StreamsConfig},
* {@link KStream#through(StreamPartitioner, String)}, or {@link KTable#through(StreamPartitioner, String, String)},
- * or if the original {@link KTable}'s input {@link KStreamBuilder#table(String, String) topic} is partitioned
+ * or if the original {@link KTable}'s input {@link StreamsBuilder#table(String, String) topic} is partitioned
* differently, please use {@link #metadataForKey(String, Object, StreamPartitioner)}.
* <p>
* Note: