You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/31 22:29:09 UTC
[6/8] kafka git commit: KAFKA-5671: Add StreamsBuilder and Deprecate
KStreamBuilder
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 46769eb..af05170 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -16,13 +16,14 @@
*/
package org.apache.kafka.streams.kstream;
-import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.internals.GlobalKTableImpl;
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableSource;
@@ -43,16 +44,26 @@ import java.util.regex.Pattern;
/**
* {@code KStreamBuilder} provide the high-level Kafka Streams DSL to specify a Kafka Streams topology.
*
- * @see TopologyBuilder
+ * @see org.apache.kafka.streams.processor.TopologyBuilder
* @see KStream
* @see KTable
* @see GlobalKTable
+ * @deprecated Use {@link org.apache.kafka.streams.StreamsBuilder StreamsBuilder} instead
*/
-@InterfaceStability.Evolving
+@Deprecated
public class KStreamBuilder extends TopologyBuilder {
private final AtomicInteger index = new AtomicInteger(0);
+ private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(super.internalTopologyBuilder);
+
+ private Topology.AutoOffsetReset translateAutoOffsetReset(final TopologyBuilder.AutoOffsetReset resetPolicy) {
+ if (resetPolicy == null) {
+ return null;
+ }
+ return resetPolicy == TopologyBuilder.AutoOffsetReset.EARLIEST ? Topology.AutoOffsetReset.EARLIEST : Topology.AutoOffsetReset.LATEST;
+ }
+
/**
* Create a {@link KStream} from the specified topics.
* The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
@@ -92,7 +103,6 @@ public class KStreamBuilder extends TopologyBuilder {
return stream(offsetReset, null, null, null, topics);
}
-
/**
* Create a {@link KStream} from the specified topic pattern.
* The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
@@ -155,7 +165,6 @@ public class KStreamBuilder extends TopologyBuilder {
return stream(null, null, keySerde, valSerde, topics);
}
-
/**
* Create a {@link KStream} from the specified topics.
* The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
@@ -182,7 +191,6 @@ public class KStreamBuilder extends TopologyBuilder {
return stream(offsetReset, null, keySerde, valSerde, topics);
}
-
/**
* Create a {@link KStream} from the specified topics.
* The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
@@ -209,7 +217,6 @@ public class KStreamBuilder extends TopologyBuilder {
return stream(null, timestampExtractor, keySerde, valSerde, topics);
}
-
/**
* Create a {@link KStream} from the specified topics.
* <p>
@@ -235,14 +242,18 @@ public class KStreamBuilder extends TopologyBuilder {
final Serde<K> keySerde,
final Serde<V> valSerde,
final String... topics) {
- final String name = newName(KStreamImpl.SOURCE_NAME);
+ try {
+ final String name = newName(KStreamImpl.SOURCE_NAME);
- addSource(offsetReset, name, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);
+ internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor,
+ keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);
- return new KStreamImpl<>(this, name, Collections.singleton(name), false);
+ return new KStreamImpl<>(internalStreamsBuilder, name, Collections.singleton(name), false);
+ } catch (final org.apache.kafka.streams.errors.TopologyException e) {
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
+ }
}
-
/**
* Create a {@link KStream} from the specified topic pattern.
* The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor}
@@ -322,7 +333,6 @@ public class KStreamBuilder extends TopologyBuilder {
return stream(null, timestampExtractor, keySerde, valSerde, topicPattern);
}
-
/**
* Create a {@link KStream} from the specified topic pattern.
* <p>
@@ -349,14 +359,18 @@ public class KStreamBuilder extends TopologyBuilder {
final Serde<K> keySerde,
final Serde<V> valSerde,
final Pattern topicPattern) {
- final String name = newName(KStreamImpl.SOURCE_NAME);
+ try {
+ final String name = newName(KStreamImpl.SOURCE_NAME);
- addSource(offsetReset, name, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern);
+ internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor,
+ keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern);
- return new KStreamImpl<>(this, name, Collections.singleton(name), false);
+ return new KStreamImpl<>(internalStreamsBuilder, name, Collections.singleton(name), false);
+ } catch (final org.apache.kafka.streams.errors.TopologyException e) {
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
+ }
}
-
/**
* Create a {@link KTable} for the specified topic.
* The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and
@@ -375,14 +389,14 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param topic the topic name; cannot be {@code null}
+ * @param topic the topic name; cannot be {@code null}
* @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(String)} ()}.
* @return a {@link KTable} for the specified topic
*/
@@ -409,14 +423,14 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param topic the topic name; cannot be {@code null}
+ * @param topic the topic name; cannot be {@code null}
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@link KTable} for the specified topic
*/
@@ -425,7 +439,6 @@ public class KStreamBuilder extends TopologyBuilder {
return table(null, null, null, null, topic, storeSupplier);
}
-
/**
* Create a {@link KTable} for the specified topic.
* The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
@@ -463,17 +476,17 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
- * offsets are available
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, String)} ()}.
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+ * offsets are available
+ * @param topic the topic name; cannot be {@code null}
+ * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, String)} ()}.
* @return a {@link KTable} for the specified topic
*/
public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
@@ -500,7 +513,7 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -534,7 +547,7 @@ public class KStreamBuilder extends TopologyBuilder {
* <p>
* @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
* offsets are available
- * @param topic the topic name; if {@code null} an internal store name will be automatically given.
+ * @param topic the topic name; cannot be {@code null}
* @return a {@link KTable} for the specified topic
*/
public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
@@ -542,7 +555,6 @@ public class KStreamBuilder extends TopologyBuilder {
return table(offsetReset, null, null, null, topic, (String) null);
}
-
/**
* Create a {@link KTable} for the specified topic.
* The default {@code "auto.offset.reset"} strategy and default key and value deserializers
@@ -561,7 +573,7 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -580,7 +592,6 @@ public class KStreamBuilder extends TopologyBuilder {
return table(null, timestampExtractor, null, null, topic, storeName);
}
-
/**
* Create a {@link KTable} for the specified topic.
* The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
@@ -598,7 +609,7 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -618,7 +629,6 @@ public class KStreamBuilder extends TopologyBuilder {
return table(offsetReset, timestampExtractor, null, null, topic, storeName);
}
-
/**
* Create a {@link KTable} for the specified topic.
* The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor}
@@ -637,18 +647,18 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name; cannot be {@code null}
* @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(Serde, Serde, String)} ()}.
* @return a {@link KTable} for the specified topic
*/
@@ -677,18 +687,18 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name; cannot be {@code null}
* @param storeSupplier user defined state store supplier. Cannot be {@code null}.
* @return a {@link KTable} for the specified topic
*/
@@ -732,23 +742,29 @@ public class KStreamBuilder extends TopologyBuilder {
final String topic,
final StateStoreSupplier<KeyValueStore> storeSupplier,
final boolean isQueryable) {
- final String source = newName(KStreamImpl.SOURCE_NAME);
- final String name = newName(KTableImpl.SOURCE_NAME);
- final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());
+ try {
+ final String source = newName(KStreamImpl.SOURCE_NAME);
+ final String name = newName(KTableImpl.SOURCE_NAME);
+ final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());
- addSource(offsetReset, source, timestampExtractor, keySerde == null ? null : keySerde.deserializer(),
+ internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), source, timestampExtractor,
+ keySerde == null ? null : keySerde.deserializer(),
valSerde == null ? null : valSerde.deserializer(),
topic);
- addProcessor(name, processorSupplier, source);
+ internalTopologyBuilder.addProcessor(name, processorSupplier, source);
- final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier,
- keySerde, valSerde, Collections.singleton(source), storeSupplier.name(), isQueryable);
+ final KTableImpl<K, ?, V> kTable = new KTableImpl<>(internalStreamsBuilder, name, processorSupplier,
+ keySerde, valSerde, Collections.singleton(source), storeSupplier.name(), isQueryable);
- addStateStore(storeSupplier, name);
- connectSourceStoreAndTopic(storeSupplier.name(), topic);
+ addStateStore(storeSupplier, name);
+ connectSourceStoreAndTopic(storeSupplier.name(), topic);
- return kTable;
+ return kTable;
+ } catch (final org.apache.kafka.streams.errors.TopologyException e) {
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
+ }
}
+
/**
* Create a {@link KTable} for the specified topic.
* Input {@link KeyValue records} with {@code null} key will be dropped.
@@ -765,21 +781,21 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
* For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
* query the value of the key on a parallel running instance of your Kafka Streams application.
*
- * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
- * offsets are available
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
- * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, Serde, Serde, String)} ()} ()}.
+ * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+ * offsets are available
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name; cannot be {@code null}
+ * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, Serde, Serde, String)} ()} ()}.
* @return a {@link KTable} for the specified topic
*/
public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
@@ -790,8 +806,6 @@ public class KStreamBuilder extends TopologyBuilder {
return table(offsetReset, null, keySerde, valSerde, topic, queryableStoreName);
}
-
-
/**
* Create a {@link KTable} for the specified topic.
* The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
@@ -809,7 +823,7 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -834,8 +848,6 @@ public class KStreamBuilder extends TopologyBuilder {
return table(null, timestampExtractor, keySerde, valSerde, topic, storeName);
}
-
-
/**
* Create a {@link KTable} for the specified topic.
* Input {@link KeyValue} pairs with {@code null} key will be dropped.
@@ -852,7 +864,7 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -902,11 +914,11 @@ public class KStreamBuilder extends TopologyBuilder {
* <p>
* @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
* offsets are available
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name; cannot be {@code null}
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name; cannot be {@code null}
* @return a {@link KTable} for the specified topic
*/
public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
@@ -915,6 +927,7 @@ public class KStreamBuilder extends TopologyBuilder {
final String topic) {
return table(offsetReset, null, keySerde, valSerde, topic, (String) null);
}
+
/**
* Create a {@link KTable} for the specified topic.
* Input {@link KeyValue records} with {@code null} key will be dropped.
@@ -931,7 +944,7 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
* }</pre>
@@ -972,7 +985,7 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key);
* }</pre>
@@ -1005,10 +1018,9 @@ public class KStreamBuilder extends TopologyBuilder {
* @return a {@link GlobalKTable} for the specified topic
*/
public <K, V> GlobalKTable<K, V> globalTable(final String topic) {
- return globalTable(null, null, null, topic, (String) null);
+ return globalTable(null, null, null, topic, null);
}
-
/**
* Create a {@link GlobalKTable} for the specified topic.
* The default {@link TimestampExtractor} and default key and value deserializers as specified in
@@ -1024,7 +1036,7 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key);
* }</pre>
@@ -1067,7 +1079,7 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key);
* }</pre>
@@ -1103,7 +1115,7 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key);
* }</pre>
@@ -1139,7 +1151,7 @@ public class KStreamBuilder extends TopologyBuilder {
* {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
* <pre>{@code
* KafkaStreams streams = ...
- * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ * ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
* String key = "some-key";
* Long valueForKey = localStore.get(key);
* }</pre>
@@ -1162,17 +1174,20 @@ public class KStreamBuilder extends TopologyBuilder {
final TimestampExtractor timestampExtractor,
final String topic,
final StateStoreSupplier<KeyValueStore> storeSupplier) {
- Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
- final String sourceName = newName(KStreamImpl.SOURCE_NAME);
- final String processorName = newName(KTableImpl.SOURCE_NAME);
- final KTableSource<K, V> tableSource = new KTableSource<>(storeSupplier.name());
-
-
- final Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
- final Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
-
- addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, tableSource);
- return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name()));
+ try {
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+ final String sourceName = newName(KStreamImpl.SOURCE_NAME);
+ final String processorName = newName(KTableImpl.SOURCE_NAME);
+ final KTableSource<K, V> tableSource = new KTableSource<>(storeSupplier.name());
+
+ final Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
+ final Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
+
+ internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, tableSource);
+ return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name()));
+ } catch (final org.apache.kafka.streams.errors.TopologyException e) {
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
+ }
}
/**
@@ -1211,7 +1226,11 @@ public class KStreamBuilder extends TopologyBuilder {
* @return a {@link KStream} containing all records of the given streams
*/
public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
- return KStreamImpl.merge(this, streams);
+ try {
+ return KStreamImpl.merge(internalStreamsBuilder, streams);
+ } catch (final org.apache.kafka.streams.errors.TopologyException e) {
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(e);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index b7dd43e..06a0eee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
@@ -34,7 +35,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
* {@code KTable} is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
* Each record in this changelog stream is an update on the primary-keyed table with the record key as the primary key.
* <p>
- * A {@code KTable} is either {@link KStreamBuilder#table(String, String) defined from a single Kafka topic} that is
+ * A {@code KTable} is either {@link StreamsBuilder#table(String, String) defined from a single Kafka topic} that is
* consumed message by message or the result of a {@code KTable} transformation.
* An aggregation of a {@link KStream} also yields a {@code KTable}.
* <p>
@@ -62,7 +63,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
* @see KStream
* @see KGroupedTable
* @see GlobalKTable
- * @see KStreamBuilder#table(String, String)
+ * @see StreamsBuilder#table(String, String)
*/
@InterfaceStability.Evolving
public interface KTable<K, V> {
@@ -630,10 +631,10 @@ public interface KTable<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(String) #to(someTopicName)} and
- * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+ * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link KStreamBuilder#table(String, String)})
+ * {@link StreamsBuilder#table(String, String)})
* The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
*
* @param topic the topic name
@@ -651,10 +652,10 @@ public interface KTable<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(String) #to(someTopicName)} and
- * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+ * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link KStreamBuilder#table(String, String)})
+ * {@link StreamsBuilder#table(String, String)})
* The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'.
*
* @param topic the topic name
@@ -671,10 +672,10 @@ public interface KTable<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(String) #to(someTopicName)} and
- * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName)}.
+ * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
- * {@link KStreamBuilder#table(String)})
+ * {@link StreamsBuilder#table(String)})
*
* @param topic the topic name
* @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable}
@@ -689,10 +690,10 @@ public interface KTable<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
- * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName)}.
+ * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
- * {@link KStreamBuilder#table(String)})
+ * {@link StreamsBuilder#table(String)})
*
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified producer's {@link DefaultPartitioner} will be used
@@ -710,10 +711,10 @@ public interface KTable<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
- * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+ * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link KStreamBuilder#table(String, String)})
+ * {@link StreamsBuilder#table(String, String)})
*
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified producer's {@link DefaultPartitioner} will be used
@@ -734,10 +735,10 @@ public interface KTable<K, V> {
* started).
* <p>
* This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
- * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+ * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link KStreamBuilder#table(String, String)})
+ * {@link StreamsBuilder#table(String, String)})
*
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified producer's {@link DefaultPartitioner} will be used
@@ -758,10 +759,10 @@ public interface KTable<K, V> {
* used—otherwise producer's {@link DefaultPartitioner} is used.
* <p>
* This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
- * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+ * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link KStreamBuilder#table(String, String)})
+ * {@link StreamsBuilder#table(String, String)})
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
@@ -785,10 +786,10 @@ public interface KTable<K, V> {
* used—otherwise producer's {@link DefaultPartitioner} is used.
* <p>
* This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
- * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+ * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link KStreamBuilder#table(String, String)})
+ * {@link StreamsBuilder#table(String, String)})
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
@@ -811,10 +812,10 @@ public interface KTable<K, V> {
* used—otherwise producer's {@link DefaultPartitioner} is used.
* <p>
* This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
- * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
+ * {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with an interna; store name (cf.
- * {@link KStreamBuilder#table(String)})
+ * {@link StreamsBuilder#table(String)})
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
@@ -834,10 +835,10 @@ public interface KTable<K, V> {
* <p>
* This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
* #to(keySerde, valueSerde, partitioner, someTopicName)} and
- * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+ * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link KStreamBuilder#table(String, String)})
+ * {@link StreamsBuilder#table(String, String)})
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
@@ -866,10 +867,10 @@ public interface KTable<K, V> {
* <p>
* This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
* #to(keySerde, valueSerde, partitioner, someTopicName)} and
- * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+ * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with the given store name (cf.
- * {@link KStreamBuilder#table(String, String)})
+ * {@link StreamsBuilder#table(String, String)})
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
@@ -897,10 +898,10 @@ public interface KTable<K, V> {
* <p>
* This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
* #to(keySerde, valueSerde, partitioner, someTopicName)} and
- * {@link KStreamBuilder#table(String) KStreamBuilder#table(someTopicName)}.
+ * {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}.
* <p>
* The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf.
- * {@link KStreamBuilder#table(String)})
+ * {@link StreamsBuilder#table(String)})
*
* @param keySerde key serde used to send key-value pairs,
* if not specified the default key serde defined in the configuration will be used
@@ -918,7 +919,6 @@ public interface KTable<K, V> {
final StreamPartitioner<? super K, ? super V> partitioner,
final String topic);
-
/**
* Materialize this changelog stream to a topic using default serializers and deserializers and producer's
* {@link DefaultPartitioner}.
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 81d10ef..b5de562 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windows;
@@ -33,24 +32,24 @@ import java.util.Set;
public abstract class AbstractStream<K> {
- protected final KStreamBuilder topology;
+ protected final InternalStreamsBuilder builder;
protected final String name;
- protected final Set<String> sourceNodes;
+ final Set<String> sourceNodes;
// This copy-constructor will allow to extend KStream
// and KTable APIs with new methods without impacting the public interface.
public AbstractStream(AbstractStream<K> stream) {
- this.topology = stream.topology;
+ this.builder = stream.builder;
this.name = stream.name;
this.sourceNodes = stream.sourceNodes;
}
- AbstractStream(final KStreamBuilder topology, String name, final Set<String> sourceNodes) {
+ AbstractStream(final InternalStreamsBuilder builder, String name, final Set<String> sourceNodes) {
if (sourceNodes == null || sourceNodes.isEmpty()) {
throw new IllegalArgumentException("parameter <sourceNodes> must not be null or empty");
}
- this.topology = topology;
+ this.builder = builder;
this.name = name;
this.sourceNodes = sourceNodes;
}
@@ -61,13 +60,13 @@ public abstract class AbstractStream<K> {
allSourceNodes.addAll(sourceNodes);
allSourceNodes.addAll(other.sourceNodes);
- topology.copartitionSources(allSourceNodes);
+ builder.internalTopologyBuilder.copartitionSources(allSourceNodes);
return allSourceNodes;
}
String getOrCreateName(final String queryableStoreName, final String prefix) {
- final String returnName = queryableStoreName != null ? queryableStoreName : topology.newStoreName(prefix);
+ final String returnName = queryableStoreName != null ? queryableStoreName : builder.newStoreName(prefix);
Topic.validate(returnName);
return returnName;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
new file mode 100644
index 0000000..bcb68f0
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
+
+public class InternalStreamsBuilder {
+
+ final InternalTopologyBuilder internalTopologyBuilder;
+
+ private final AtomicInteger index = new AtomicInteger(0);
+
+ public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuilder) {
+ this.internalTopologyBuilder = internalTopologyBuilder;
+ }
+
+ public <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
+ final TimestampExtractor timestampExtractor,
+ final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final String... topics) {
+ final String name = newName(KStreamImpl.SOURCE_NAME);
+
+ internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);
+
+ return new KStreamImpl<>(this, name, Collections.singleton(name), false);
+ }
+
+ public <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
+ final TimestampExtractor timestampExtractor,
+ final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final Pattern topicPattern) {
+ final String name = newName(KStreamImpl.SOURCE_NAME);
+
+ internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern);
+
+ return new KStreamImpl<>(this, name, Collections.singleton(name), false);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
+ final TimestampExtractor timestampExtractor,
+ final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final String topic,
+ final String queryableStoreName) {
+ final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
+ final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(internalStoreName,
+ keySerde,
+ valSerde,
+ false,
+ Collections.<String, String>emptyMap(),
+ true);
+ return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, queryableStoreName != null);
+ }
+
+ public <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
+ final TimestampExtractor timestampExtractor,
+ final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final String topic,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+ return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, true);
+ }
+
+ private <K, V> KTable<K, V> doTable(final Topology.AutoOffsetReset offsetReset,
+ final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final TimestampExtractor timestampExtractor,
+ final String topic,
+ final StateStoreSupplier<KeyValueStore> storeSupplier,
+ final boolean isQueryable) {
+ final String source = newName(KStreamImpl.SOURCE_NAME);
+ final String name = newName(KTableImpl.SOURCE_NAME);
+ final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());
+
+ internalTopologyBuilder.addSource(offsetReset, source, timestampExtractor, keySerde == null ? null : keySerde.deserializer(),
+ valSerde == null ? null : valSerde.deserializer(),
+ topic);
+ internalTopologyBuilder.addProcessor(name, processorSupplier, source);
+
+ final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier,
+ keySerde, valSerde, Collections.singleton(source), storeSupplier.name(), isQueryable);
+
+ internalTopologyBuilder.addStateStore(storeSupplier, name);
+ internalTopologyBuilder.connectSourceStoreAndTopic(storeSupplier.name(), topic);
+
+ return kTable;
+ }
+
+ public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final TimestampExtractor timestampExtractor,
+ final String topic,
+ final String queryableStoreName) {
+ final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
+ return doGlobalTable(keySerde, valSerde, timestampExtractor, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName,
+ keySerde,
+ valSerde,
+ false,
+ Collections.<String, String>emptyMap(),
+ true));
+ }
+
+ public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final String topic,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <K, V> GlobalKTable<K, V> doGlobalTable(final Serde<K> keySerde,
+ final Serde<V> valSerde,
+ final TimestampExtractor timestampExtractor,
+ final String topic,
+ final StateStoreSupplier<KeyValueStore> storeSupplier) {
+ Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+ final String sourceName = newName(KStreamImpl.SOURCE_NAME);
+ final String processorName = newName(KTableImpl.SOURCE_NAME);
+ final KTableSource<K, V> tableSource = new KTableSource<>(storeSupplier.name());
+
+
+ final Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
+ final Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
+
+ internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, tableSource);
+ return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name()));
+ }
+
+ public <K, V> KStream<K, V> merge(final KStream<K, V>... streams) {
+ return KStreamImpl.merge(this, streams);
+ }
+
+ String newName(final String prefix) {
+ return prefix + String.format("%010d", index.getAndIncrement());
+ }
+
+ String newStoreName(final String prefix) {
+ return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement());
+ }
+
+ public synchronized void addStateStore(final StateStoreSupplier supplier,
+ final String... processorNames) {
+ internalTopologyBuilder.addStateStore(supplier, processorNames);
+ }
+
+ public synchronized void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+ final String sourceName,
+ final Deserializer keyDeserializer,
+ final Deserializer valueDeserializer,
+ final String topic,
+ final String processorName,
+ final ProcessorSupplier stateUpdateSupplier) {
+ internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer,
+ valueDeserializer, topic, processorName, stateUpdateSupplier);
+ }
+
+ public synchronized void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+ final String sourceName,
+ final TimestampExtractor timestampExtractor,
+ final Deserializer keyDeserializer,
+ final Deserializer valueDeserializer,
+ final String topic,
+ final String processorName,
+ final ProcessorSupplier stateUpdateSupplier) {
+ internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer,
+ valueDeserializer, topic, processorName, stateUpdateSupplier);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index a1b40a3..5fd5f6d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -22,7 +22,6 @@ import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindows;
@@ -31,8 +30,8 @@ import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.WindowStore;
import java.util.Collections;
import java.util.Objects;
@@ -48,13 +47,13 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
private final boolean repartitionRequired;
private boolean isQueryable = true;
- KGroupedStreamImpl(final KStreamBuilder topology,
+ KGroupedStreamImpl(final InternalStreamsBuilder builder,
final String name,
final Set<String> sourceNodes,
final Serde<K> keySerde,
final Serde<V> valSerde,
final boolean repartitionRequired) {
- super(topology, name, sourceNodes);
+ super(builder, name, sourceNodes);
this.keySerde = keySerde;
this.valSerde = valSerde;
this.repartitionRequired = repartitionRequired;
@@ -391,20 +390,21 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
final String functionName,
final StateStoreSupplier storeSupplier) {
- final String aggFunctionName = topology.newName(functionName);
+ final String aggFunctionName = builder.newName(functionName);
final String sourceName = repartitionIfRequired(storeSupplier.name());
- topology.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
- topology.addStateStore(storeSupplier, aggFunctionName);
-
- return new KTableImpl<>(topology,
- aggFunctionName,
- aggregateSupplier,
- sourceName.equals(this.name) ? sourceNodes
- : Collections.singleton(sourceName),
- storeSupplier.name(),
- isQueryable);
+ builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
+ builder.internalTopologyBuilder.addStateStore(storeSupplier, aggFunctionName);
+
+ return new KTableImpl<>(
+ builder,
+ aggFunctionName,
+ aggregateSupplier,
+ sourceName.equals(this.name) ? sourceNodes
+ : Collections.singleton(sourceName),
+ storeSupplier.name(),
+ isQueryable);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 4455848..aefaad8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Initializer;
@@ -50,12 +49,12 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
protected final Serde<? extends V> valSerde;
private boolean isQueryable = true;
- public KGroupedTableImpl(final KStreamBuilder topology,
+ public KGroupedTableImpl(final InternalStreamsBuilder builder,
final String name,
final String sourceName,
final Serde<? extends K> keySerde,
final Serde<? extends V> valSerde) {
- super(topology, name, Collections.singleton(sourceName));
+ super(builder, name, Collections.singleton(sourceName));
this.keySerde = keySerde;
this.valSerde = valSerde;
this.isQueryable = true;
@@ -82,7 +81,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
final Aggregator<? super K, ? super V, T> adder,
final Aggregator<? super K, ? super V, T> subtractor,
final Serde<T> aggValueSerde) {
- return aggregate(initializer, adder, subtractor, aggValueSerde, (String) null);
+ return aggregate(initializer, adder, subtractor, aggValueSerde, null);
}
@Override
@@ -117,9 +116,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
final String functionName,
final StateStoreSupplier<KeyValueStore> storeSupplier) {
- String sinkName = topology.newName(KStreamImpl.SINK_NAME);
- String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
- String funcName = topology.newName(functionName);
+ String sinkName = builder.newName(KStreamImpl.SINK_NAME);
+ String sourceName = builder.newName(KStreamImpl.SOURCE_NAME);
+ String funcName = builder.newName(functionName);
String topic = storeSupplier.name() + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
@@ -132,18 +131,18 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
ChangedDeserializer<? extends V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
// send the aggregate key-value pairs to the intermediate topic for partitioning
- topology.addInternalTopic(topic);
- topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name);
+ builder.internalTopologyBuilder.addInternalTopic(topic);
+ builder.internalTopologyBuilder.addSink(sinkName, topic, keySerializer, changedValueSerializer, null, this.name);
// read the intermediate topic with RecordMetadataTimestampExtractor
- topology.addSource(null, sourceName, new FailOnInvalidTimestamp(), keyDeserializer, changedValueDeserializer, topic);
+ builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(), keyDeserializer, changedValueDeserializer, topic);
// aggregate the values with the aggregator and local store
- topology.addProcessor(funcName, aggregateSupplier, sourceName);
- topology.addStateStore(storeSupplier, funcName);
+ builder.internalTopologyBuilder.addProcessor(funcName, aggregateSupplier, sourceName);
+ builder.internalTopologyBuilder.addStateStore(storeSupplier, funcName);
// return the KTable representation with the intermediate topic as the sources
- return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name(), isQueryable);
+ return new KTableImpl<>(builder, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name(), isQueryable);
}
@Override