You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/01/31 21:53:18 UTC
[kafka] branch trunk updated: KAFKA-6138 Simplify
StreamsBuilder#addGlobalStore (#4430)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b822206 KAFKA-6138 Simplify StreamsBuilder#addGlobalStore (#4430)
b822206 is described below
commit b8222065e0cc09f455dc5b2aa43c1d68cf0d4a1f
Author: Panuwat Anawatmongkhon <be...@gmail.com>
AuthorDate: Thu Feb 1 04:53:15 2018 +0700
KAFKA-6138 Simplify StreamsBuilder#addGlobalStore (#4430)
- implements KIP-233
Author: Panuwat Anawatmongkhon <pa...@gmail.com>
Reviewers: Bill Bejeck <bi...@confluent.io>, Damian Guy <da...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../org/apache/kafka/streams/StreamsBuilder.java | 34 ++++++++++++++++------
.../kstream/internals/InternalStreamsBuilder.java | 16 ++++++++++
2 files changed, 41 insertions(+), 9 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 27105c6..f08098e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -454,6 +454,28 @@ public class StreamsBuilder {
}
/**
+ * @deprecated use {@link #addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier)} instead
+ */
+ @SuppressWarnings("unchecked")
+ @Deprecated
+ public synchronized StreamsBuilder addGlobalStore(final StoreBuilder storeBuilder,
+ final String topic,
+ final String sourceName,
+ final Consumed consumed,
+ final String processorName,
+ final ProcessorSupplier stateUpdateSupplier) {
+ Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
+ Objects.requireNonNull(consumed, "consumed can't be null");
+ internalStreamsBuilder.addGlobalStore(storeBuilder,
+ sourceName,
+ topic,
+ new ConsumedInternal<>(consumed),
+ processorName,
+ stateUpdateSupplier);
+ return this;
+ }
+
+ /**
* Adds a global {@link StateStore} to the topology.
* The {@link StateStore} sources its data from all partitions of the provided input topic.
* There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
@@ -467,10 +489,8 @@ public class StreamsBuilder {
* The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
*
* @param storeBuilder user defined {@link StoreBuilder}; can't be {@code null}
- * @param sourceName name of the {@link SourceNode} that will be automatically added
* @param topic the topic to source the data from
* @param consumed the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
- * @param processorName the name of the {@link ProcessorSupplier}
* @param stateUpdateSupplier the instance of {@link ProcessorSupplier}
* @return itself
* @throws TopologyException if the processor of state is already registered
@@ -478,18 +498,14 @@ public class StreamsBuilder {
@SuppressWarnings("unchecked")
public synchronized StreamsBuilder addGlobalStore(final StoreBuilder storeBuilder,
final String topic,
- final String sourceName,
final Consumed consumed,
- final String processorName,
final ProcessorSupplier stateUpdateSupplier) {
Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
internalStreamsBuilder.addGlobalStore(storeBuilder,
- sourceName,
- topic,
- new ConsumedInternal<>(consumed),
- processorName,
- stateUpdateSupplier);
+ topic,
+ new ConsumedInternal<>(consumed),
+ stateUpdateSupplier);
return this;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 2a8a89e..0b028e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -194,4 +194,20 @@ public class InternalStreamsBuilder implements InternalNameProvider {
processorName,
stateUpdateSupplier);
}
+
+ public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder,
+ final String topic,
+ final ConsumedInternal consumed,
+ final ProcessorSupplier stateUpdateSupplier) {
+ // explicitly disable logging for global stores
+ storeBuilder.withLoggingDisabled();
+ final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME);
+ final String processorName = newProcessorName(KTableImpl.SOURCE_NAME);
+ addGlobalStore(storeBuilder,
+ sourceName,
+ topic,
+ consumed,
+ processorName,
+ stateUpdateSupplier);
+ }
}
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.