You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/11 20:20:42 UTC

[GitHub] [kafka] abbccdda commented on a change in pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()

abbccdda commented on a change in pull request #9148:
URL: https://github.com/apache/kafka/pull/9148#discussion_r468834973



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##########
@@ -528,13 +529,56 @@ public synchronized StreamsBuilder addStateStore(final StoreBuilder<?> builder)
      * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
      * @return itself
      * @throws TopologyException if the processor of state is already registered
+     * @deprecated Since 2.7.0; use {@link #addGlobalStore(StoreBuilder, String, Consumed, org.apache.kafka.streams.processor.api.ProcessorSupplier)} instead.
      */
+    @Deprecated
     public synchronized <K, V> StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,
                                                              final String topic,
                                                              final Consumed<K, V> consumed,
                                                              final ProcessorSupplier<K, V> stateUpdateSupplier) {
         Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
+        internalStreamsBuilder.addGlobalStore(
+            storeBuilder,
+            topic,
+            new ConsumedInternal<>(consumed),
+            () -> ProcessorAdapter.adapt(stateUpdateSupplier.get())
+        );
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link org.apache.kafka.streams.processor.api.ProcessorSupplier}} will be used to create an
+     * {@link org.apache.kafka.streams.processor.api.Processor} that will receive all records forwarded from the {@link SourceNode}.
+     * NOTE: you should not use the {@link org.apache.kafka.streams.processor.api.Processor} to insert transformed records into
+     * the global state store. This store uses the source topic as changelog and during restore will insert records directly
+     * from the source.
+     * This {@link org.apache.kafka.streams.processor.api.Processor} should be used to keep the {@link StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     * <p>
+     * It is not required to connect a global store to the {@link org.apache.kafka.streams.processor.api.Processor Processors},
+     * {@link Transformer Transformers}, or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default.
+     *
+     * @param storeBuilder          user defined {@link StoreBuilder}; can't be {@code null}
+     * @param topic                 the topic to source the data from
+     * @param consumed              the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
+     * @param stateUpdateSupplier   the instance of {@link org.apache.kafka.streams.processor.api.ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized <KIn, VIn> StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,
+                                                             final String topic,

Review comment:
       format

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -532,14 +539,14 @@ public final void addStateStore(final StoreBuilder<?> storeBuilder,
         nodeGroups = null;
     }
 
-    public final <KIn, VIn, KOut, VOut> void addGlobalStore(final StoreBuilder<?> storeBuilder,
+    public final <KIn, VIn> void addGlobalStore(final StoreBuilder<?> storeBuilder,

Review comment:
       Why do we drop KOut/VOut?

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##########
@@ -528,13 +529,56 @@ public synchronized StreamsBuilder addStateStore(final StoreBuilder<?> builder)
      * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
      * @return itself
      * @throws TopologyException if the processor of state is already registered
+     * @deprecated Since 2.7.0; use {@link #addGlobalStore(StoreBuilder, String, Consumed, org.apache.kafka.streams.processor.api.ProcessorSupplier)} instead.
      */
+    @Deprecated
     public synchronized <K, V> StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,
                                                              final String topic,
                                                              final Consumed<K, V> consumed,
                                                              final ProcessorSupplier<K, V> stateUpdateSupplier) {
         Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
+        internalStreamsBuilder.addGlobalStore(
+            storeBuilder,
+            topic,
+            new ConsumedInternal<>(consumed),
+            () -> ProcessorAdapter.adapt(stateUpdateSupplier.get())
+        );
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link org.apache.kafka.streams.processor.api.ProcessorSupplier}} will be used to create an
+     * {@link org.apache.kafka.streams.processor.api.Processor} that will receive all records forwarded from the {@link SourceNode}.
+     * NOTE: you should not use the {@link org.apache.kafka.streams.processor.api.Processor} to insert transformed records into
+     * the global state store. This store uses the source topic as changelog and during restore will insert records directly
+     * from the source.
+     * This {@link org.apache.kafka.streams.processor.api.Processor} should be used to keep the {@link StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     * <p>
+     * It is not required to connect a global store to the {@link org.apache.kafka.streams.processor.api.Processor Processors},
+     * {@link Transformer Transformers}, or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default.
+     *
+     * @param storeBuilder          user defined {@link StoreBuilder}; can't be {@code null}
+     * @param topic                 the topic to source the data from
+     * @param consumed              the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
+     * @param stateUpdateSupplier   the instance of {@link org.apache.kafka.streams.processor.api.ProcessorSupplier}

Review comment:
       Could we reverse the import to use `org.apache.kafka.streams.processor.api.ProcessorSupplier` as default instead?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
##########
@@ -54,7 +54,19 @@
     private Sensor createSensor;
 
     public ProcessorNode(final String name) {
-        this(name, null, null);
+        this(name, (Processor<KIn, VIn, KOut, VOut>) null, null);
+    }
+
+    public ProcessorNode(final String name,
+                         final Processor<KIn, VIn, KOut, VOut> processor,
+                         final Set<String> stateStores) {
+
+        this.name = name;
+        this.processor = processor;
+        this.children = new ArrayList<>();
+        this.childByName = new HashMap<>();
+        this.stateStores = stateStores;
+        this.time = new SystemTime();
     }
 
     public ProcessorNode(final String name,

Review comment:
       Could we deprecate the old constructor?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -532,14 +539,14 @@ public final void addStateStore(final StoreBuilder<?> storeBuilder,
         nodeGroups = null;
     }
 
-    public final <KIn, VIn, KOut, VOut> void addGlobalStore(final StoreBuilder<?> storeBuilder,
+    public final <KIn, VIn> void addGlobalStore(final StoreBuilder<?> storeBuilder,

Review comment:
       format




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org