You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/01/31 21:53:18 UTC

[kafka] branch trunk updated: KAFKA-6138 Simplify StreamsBuilder#addGlobalStore (#4430)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b822206  KAFKA-6138 Simplify StreamsBuilder#addGlobalStore (#4430)
b822206 is described below

commit b8222065e0cc09f455dc5b2aa43c1d68cf0d4a1f
Author: Panuwat Anawatmongkhon <be...@gmail.com>
AuthorDate: Thu Feb 1 04:53:15 2018 +0700

    KAFKA-6138 Simplify StreamsBuilder#addGlobalStore (#4430)
    
    - implements KIP-233
    
    Author: Panuwat Anawatmongkhon <pa...@gmail.com>
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Damian Guy <da...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../org/apache/kafka/streams/StreamsBuilder.java   | 34 ++++++++++++++++------
 .../kstream/internals/InternalStreamsBuilder.java  | 16 ++++++++++
 2 files changed, 41 insertions(+), 9 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 27105c6..f08098e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -454,6 +454,28 @@ public class StreamsBuilder {
     }
 
     /**
+     * @deprecated use {@link #addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier)} instead
+     */
+    @SuppressWarnings("unchecked")
+    @Deprecated
+    public synchronized StreamsBuilder addGlobalStore(final StoreBuilder storeBuilder,
+                                                      final String topic,
+                                                      final String sourceName,
+                                                      final Consumed consumed,
+                                                      final String processorName,
+                                                      final ProcessorSupplier stateUpdateSupplier) {
+        Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
+        internalStreamsBuilder.addGlobalStore(storeBuilder,
+                                              sourceName,
+                                              topic,
+                                              new ConsumedInternal<>(consumed),
+                                              processorName,
+                                              stateUpdateSupplier);
+        return this;
+    }
+
+    /**
      * Adds a global {@link StateStore} to the topology.
      * The {@link StateStore} sources its data from all partitions of the provided input topic.
      * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
@@ -467,10 +489,8 @@ public class StreamsBuilder {
      * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
      *
      * @param storeBuilder          user defined {@link StoreBuilder}; can't be {@code null}
-     * @param sourceName            name of the {@link SourceNode} that will be automatically added
      * @param topic                 the topic to source the data from
      * @param consumed              the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
-     * @param processorName         the name of the {@link ProcessorSupplier}
      * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
      * @return itself
      * @throws TopologyException if the processor of state is already registered
@@ -478,18 +498,14 @@ public class StreamsBuilder {
     @SuppressWarnings("unchecked")
     public synchronized StreamsBuilder addGlobalStore(final StoreBuilder storeBuilder,
                                                       final String topic,
-                                                      final String sourceName,
                                                       final Consumed consumed,
-                                                      final String processorName,
                                                       final ProcessorSupplier stateUpdateSupplier) {
         Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
         internalStreamsBuilder.addGlobalStore(storeBuilder,
-                                              sourceName,
-                                              topic,
-                                              new ConsumedInternal<>(consumed),
-                                              processorName,
-                                              stateUpdateSupplier);
+                topic,
+                new ConsumedInternal<>(consumed),
+                stateUpdateSupplier);
         return this;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 2a8a89e..0b028e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -194,4 +194,20 @@ public class InternalStreamsBuilder implements InternalNameProvider {
                                                processorName,
                                                stateUpdateSupplier);
     }
+    
+    public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder,
+                                            final String topic,
+                                            final ConsumedInternal consumed,
+                                            final ProcessorSupplier stateUpdateSupplier) {
+        // explicitly disable logging for global stores
+        storeBuilder.withLoggingDisabled();
+        final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME);
+        final String processorName = newProcessorName(KTableImpl.SOURCE_NAME);
+        addGlobalStore(storeBuilder,
+                       sourceName,
+                       topic,
+                       consumed,
+                       processorName,
+                       stateUpdateSupplier);
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.