You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/10 03:19:11 UTC

[GitHub] [kafka] vvcephei opened a new pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()

vvcephei opened a new pull request #9148:
URL: https://github.com/apache/kafka/pull/9148


   From KIP-478, implement the new StreamBuilder#addGlobalStore() overload
   that takes a stateUpdateSupplier fully typed Processor<KIn, VIn, Void, Void>.
   
   Where necessary, use the adapters to make the old APIs defer to the new ones,
   as well as limiting the scope of this change set.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9148:
URL: https://github.com/apache/kafka/pull/9148#discussion_r472327085



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -667,7 +674,7 @@ public void validateCopartition() {
     private void validateGlobalStoreArguments(final String sourceName,
                                               final String topic,
                                               final String processorName,
-                                              final ProcessorSupplier<?, ?> stateUpdateSupplier,
+                                              final ProcessorSupplier<?, ?, ?, ?> stateUpdateSupplier,

Review comment:
       Sure; all we do it verify it's not null, but it doesn't hurt.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9148:
URL: https://github.com/apache/kafka/pull/9148#issuecomment-677844729


   Thanks @abbccdda !


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9148:
URL: https://github.com/apache/kafka/pull/9148#discussion_r468873464



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
##########
@@ -54,7 +54,19 @@
     private Sensor createSensor;
 
     public ProcessorNode(final String name) {
-        this(name, null, null);
+        this(name, (Processor<KIn, VIn, KOut, VOut>) null, null);
+    }
+
+    public ProcessorNode(final String name,
+                         final Processor<KIn, VIn, KOut, VOut> processor,
+                         final Set<String> stateStores) {
+
+        this.name = name;
+        this.processor = processor;
+        this.children = new ArrayList<>();
+        this.childByName = new HashMap<>();
+        this.stateStores = stateStores;
+        this.time = new SystemTime();
     }
 
     public ProcessorNode(final String name,

Review comment:
       It will actually get removed later on in the KIP-478 implementation. What will happen is that the DSL and the PAPI will adapt the deprecated Processors to the new ones, and we can completely remove all mentions of the deprecated Processors in the internals.
   
   But for now, if we deprecate this constructor, it'll just obligate us to suppress the deprecation warning in a bunch of other places.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9148:
URL: https://github.com/apache/kafka/pull/9148#discussion_r472368613



##########
File path: streams/src/test/java/org/apache/kafka/test/MockApiProcessorSupplier.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class MockApiProcessorSupplier<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn, KOut, VOut> {

Review comment:
       This one isn't so straightforward. Although the supplied processors can be adapted, `theCapturedProcessor()` and `capturedProcessors(expectedNumberOfProcessors)` return `MockProcessor` specifically, so we'd need a whole new adapter to convert a MockApiProcessor into a MockProcessor. I'd rather leave it alone for now. These delegating classes will go away in a couple more PRs anyway.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9148:
URL: https://github.com/apache/kafka/pull/9148


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9148:
URL: https://github.com/apache/kafka/pull/9148#issuecomment-676482308


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9148:
URL: https://github.com/apache/kafka/pull/9148#discussion_r467671774



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##########
@@ -499,7 +500,7 @@ public synchronized StreamsBuilder addStateStore(final StoreBuilder<?> builder)
             topic,
             new ConsumedInternal<>(consumed),
             processorName,
-            stateUpdateSupplier
+            () -> ProcessorAdapter.adapt(stateUpdateSupplier.get())

Review comment:
       Adapt the old API to the new one.

##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -721,7 +722,7 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
             valueDeserializer,
             topic,
             processorName,
-            stateUpdateSupplier
+            () -> ProcessorAdapter.adapt(stateUpdateSupplier.get())

Review comment:
       Also here, adapting the old API to the new one.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##########
@@ -201,12 +200,12 @@ public synchronized void addStateStore(final StoreBuilder<?> builder) {
         addGraphNode(root, new StateStoreNode<>(builder));
     }
 
-    public synchronized <K, V> void addGlobalStore(final StoreBuilder<?> storeBuilder,
-                                                   final String sourceName,
-                                                   final String topic,
-                                                   final ConsumedInternal<K, V> consumed,
-                                                   final String processorName,
-                                                   final ProcessorSupplier<K, V> stateUpdateSupplier) {
+    public synchronized <KIn, VIn> void addGlobalStore(final StoreBuilder<?> storeBuilder,
+                                                       final String sourceName,
+                                                       final String topic,
+                                                       final ConsumedInternal<KIn, VIn> consumed,
+                                                       final String processorName,
+                                                       final org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {

Review comment:
       For the internal builder, just directly change to the new API.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
##########
@@ -89,14 +90,16 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
             new TimestampedKeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize();
 
         if (isGlobalKTable) {
-            topologyBuilder.addGlobalStore(storeBuilder,
-                                           sourceName,
-                                           consumedInternal().timestampExtractor(),
-                                           consumedInternal().keyDeserializer(),
-                                           consumedInternal().valueDeserializer(),
-                                           topicName,
-                                           processorParameters.processorName(),
-                                           processorParameters.processorSupplier());
+            topologyBuilder.addGlobalStore(
+                storeBuilder,
+                sourceName,
+                consumedInternal().timestampExtractor(),
+                consumedInternal().keyDeserializer(),
+                consumedInternal().valueDeserializer(),
+                topicName,
+                processorParameters.processorName(),
+                () -> ProcessorAdapter.adapt(processorParameters.processorSupplier().get())

Review comment:
       For now, just adapt. Later this whole node will get converted to the new API.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -192,16 +192,23 @@ private boolean isWindowStore() {
     }
 
     private static class ProcessorNodeFactory<KIn, VIn, KOut, VOut> extends NodeFactory<KIn, VIn, KOut, VOut> {
-        private final ProcessorSupplier<KIn, VIn> supplier;
+        private final org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> supplier;
         private final Set<String> stateStoreNames = new HashSet<>();
 
         ProcessorNodeFactory(final String name,
                              final String[] predecessors,
-                             final ProcessorSupplier<KIn, VIn> supplier) {
+                             final org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> supplier) {

Review comment:
       We need to create a factory for the global node, so I'm adding the constructor for the new API, and also converting the internals here.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
##########
@@ -96,7 +96,7 @@ public String newStoreName(final String prefix) {
             null,
             GLOBAL_STORE_TOPIC_NAME,
             "processorName",
-            new KTableSource<>(GLOBAL_STORE_NAME, GLOBAL_STORE_NAME));
+            () -> ProcessorAdapter.adapt(new KTableSource<>(GLOBAL_STORE_NAME, GLOBAL_STORE_NAME).get()));

Review comment:
       Adapting for now. Once the Source is converted, this line will revert to the original.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
##########
@@ -270,14 +271,15 @@ public void testPatternSourceTopicsWithGlobalTopics() {
         builder.addSource(null, "source-1", null, null, null, Pattern.compile("topic-1"));
         builder.addSource(null, "source-2", null, null, null, Pattern.compile("topic-2"));
         builder.addGlobalStore(
-                new MockKeyValueStoreBuilder("global-store", false).withLoggingDisabled(),
-                "globalSource",
-                null,
-                null,
-                null,
-                "globalTopic",
-                "global-processor",
-                new MockProcessorSupplier<>());
+            new MockKeyValueStoreBuilder("global-store", false).withLoggingDisabled(),
+            "globalSource",
+            null,
+            null,
+            null,
+            "globalTopic",
+            "global-processor",
+            new MockApiProcessorSupplier<>()

Review comment:
       Using the new API. some extra lines changed when I cleaned up the indentation formatting.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
##########
@@ -36,9 +36,8 @@
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockApiProcessorSupplier;

Review comment:
       I went ahead and just converted the mock processor to the new API. I gave it a temporary name. Eventually, the old one will be unused, and we can delete the old one and rename the new one back to `MockProcessorSupplier`.

##########
File path: streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+public class MockApiProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {

Review comment:
       As noted earlier, I'm copying this class from `MockProcesor` to convert it to the new API (to avoid disturbing too much code at once).

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
##########
@@ -150,13 +149,21 @@ public void setup() {
         ((InternalProcessorContext) context).transitionToActive(task, null, null);
         EasyMock.expect(task.recordCollector()).andStubReturn(recordCollector);
 
-        context.setCurrentNode(new ProcessorNode<String, Long, Object, Object>("fake", null,
-            new HashSet<>(asList(
-                "LocalKeyValueStore",
-                "LocalTimestampedKeyValueStore",
-                "LocalWindowStore",
-                "LocalTimestampedWindowStore",
-                "LocalSessionStore"))));
+        context.setCurrentNode(
+            new ProcessorNode<>(
+                "fake",
+                (org.apache.kafka.streams.processor.api.Processor<String, Long, Object, Object>) null,

Review comment:
       This is all that changed. I had to cast the null to resolve the right overload.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
##########
@@ -54,7 +54,19 @@
     private Sensor createSensor;
 
     public ProcessorNode(final String name) {
-        this(name, null, null);
+        this(name, (Processor<KIn, VIn, KOut, VOut>) null, null);
+    }
+
+    public ProcessorNode(final String name,
+                         final Processor<KIn, VIn, KOut, VOut> processor,
+                         final Set<String> stateStores) {
+
+        this.name = name;
+        this.processor = processor;
+        this.children = new ArrayList<>();
+        this.childByName = new HashMap<>();
+        this.stateStores = stateStores;
+        this.time = new SystemTime();

Review comment:
       Adding the constructor for the new API. The old constructor still exists and still functions to adapt the old API. Eventually, the old constructor will become unused.

##########
File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
##########
@@ -176,12 +176,33 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
    *
    * @see `org.apache.kafka.streams.StreamsBuilder#addGlobalStore`
    */
+  @deprecated(
+    "Use #addGlobalStore(StoreBuilder, String, Consumed, org.apache.kafka.streams.processor.api.ProcessorSupplier) instead.",
+    "2.7.0"
+  )
   def addGlobalStore[K, V](storeBuilder: StoreBuilder[_ <: StateStore],
                            topic: String,
                            consumed: Consumed[K, V],
                            stateUpdateSupplier: ProcessorSupplier[K, V]): StreamsBuilderJ =
     inner.addGlobalStore(storeBuilder, topic, consumed, stateUpdateSupplier)
 
+  /**
+   * Adds a global `StateStore` to the topology. Global stores should not be added to `Processor`, `Transformer`,
+   * or `ValueTransformer` (in contrast to regular stores).
+   * <p>
+   * It is not required to connect a global store to `Processor`, `Transformer`, or `ValueTransformer`;
+   * those have read-only access to all global stores by default.
+   *
+   * @see `org.apache.kafka.streams.StreamsBuilder#addGlobalStore`
+   */
+  def addGlobalStore[K, V](

Review comment:
       Keeping the Scala API in sync with the Java one.

##########
File path: streams/src/test/java/org/apache/kafka/test/MockApiProcessorSupplier.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class MockApiProcessorSupplier<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn, KOut, VOut> {

Review comment:
       As noted earlier, I'm copying this class from `MockProcesorSupplier` to convert it to the new API (to avoid disturbing too much code at once).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9148:
URL: https://github.com/apache/kafka/pull/9148#discussion_r468834973



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##########
@@ -528,13 +529,56 @@ public synchronized StreamsBuilder addStateStore(final StoreBuilder<?> builder)
      * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
      * @return itself
      * @throws TopologyException if the processor of state is already registered
+     * @deprecated Since 2.7.0; use {@link #addGlobalStore(StoreBuilder, String, Consumed, org.apache.kafka.streams.processor.api.ProcessorSupplier)} instead.
      */
+    @Deprecated
     public synchronized <K, V> StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,
                                                              final String topic,
                                                              final Consumed<K, V> consumed,
                                                              final ProcessorSupplier<K, V> stateUpdateSupplier) {
         Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
+        internalStreamsBuilder.addGlobalStore(
+            storeBuilder,
+            topic,
+            new ConsumedInternal<>(consumed),
+            () -> ProcessorAdapter.adapt(stateUpdateSupplier.get())
+        );
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link org.apache.kafka.streams.processor.api.ProcessorSupplier}} will be used to create an
+     * {@link org.apache.kafka.streams.processor.api.Processor} that will receive all records forwarded from the {@link SourceNode}.
+     * NOTE: you should not use the {@link org.apache.kafka.streams.processor.api.Processor} to insert transformed records into
+     * the global state store. This store uses the source topic as changelog and during restore will insert records directly
+     * from the source.
+     * This {@link org.apache.kafka.streams.processor.api.Processor} should be used to keep the {@link StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     * <p>
+     * It is not required to connect a global store to the {@link org.apache.kafka.streams.processor.api.Processor Processors},
+     * {@link Transformer Transformers}, or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default.
+     *
+     * @param storeBuilder          user defined {@link StoreBuilder}; can't be {@code null}
+     * @param topic                 the topic to source the data from
+     * @param consumed              the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
+     * @param stateUpdateSupplier   the instance of {@link org.apache.kafka.streams.processor.api.ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized <KIn, VIn> StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,
+                                                             final String topic,

Review comment:
       format

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -532,14 +539,14 @@ public final void addStateStore(final StoreBuilder<?> storeBuilder,
         nodeGroups = null;
     }
 
-    public final <KIn, VIn, KOut, VOut> void addGlobalStore(final StoreBuilder<?> storeBuilder,
+    public final <KIn, VIn> void addGlobalStore(final StoreBuilder<?> storeBuilder,

Review comment:
       Why do we drop KOut/VOut?

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##########
@@ -528,13 +529,56 @@ public synchronized StreamsBuilder addStateStore(final StoreBuilder<?> builder)
      * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
      * @return itself
      * @throws TopologyException if the processor of state is already registered
+     * @deprecated Since 2.7.0; use {@link #addGlobalStore(StoreBuilder, String, Consumed, org.apache.kafka.streams.processor.api.ProcessorSupplier)} instead.
      */
+    @Deprecated
     public synchronized <K, V> StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,
                                                              final String topic,
                                                              final Consumed<K, V> consumed,
                                                              final ProcessorSupplier<K, V> stateUpdateSupplier) {
         Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
+        internalStreamsBuilder.addGlobalStore(
+            storeBuilder,
+            topic,
+            new ConsumedInternal<>(consumed),
+            () -> ProcessorAdapter.adapt(stateUpdateSupplier.get())
+        );
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link org.apache.kafka.streams.processor.api.ProcessorSupplier}} will be used to create an
+     * {@link org.apache.kafka.streams.processor.api.Processor} that will receive all records forwarded from the {@link SourceNode}.
+     * NOTE: you should not use the {@link org.apache.kafka.streams.processor.api.Processor} to insert transformed records into
+     * the global state store. This store uses the source topic as changelog and during restore will insert records directly
+     * from the source.
+     * This {@link org.apache.kafka.streams.processor.api.Processor} should be used to keep the {@link StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     * <p>
+     * It is not required to connect a global store to the {@link org.apache.kafka.streams.processor.api.Processor Processors},
+     * {@link Transformer Transformers}, or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default.
+     *
+     * @param storeBuilder          user defined {@link StoreBuilder}; can't be {@code null}
+     * @param topic                 the topic to source the data from
+     * @param consumed              the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
+     * @param stateUpdateSupplier   the instance of {@link org.apache.kafka.streams.processor.api.ProcessorSupplier}

Review comment:
       Could we reverse the import to use `org.apache.kafka.streams.processor.api.ProcessorSupplier` as default instead?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
##########
@@ -54,7 +54,19 @@
     private Sensor createSensor;
 
     public ProcessorNode(final String name) {
-        this(name, null, null);
+        this(name, (Processor<KIn, VIn, KOut, VOut>) null, null);
+    }
+
+    public ProcessorNode(final String name,
+                         final Processor<KIn, VIn, KOut, VOut> processor,
+                         final Set<String> stateStores) {
+
+        this.name = name;
+        this.processor = processor;
+        this.children = new ArrayList<>();
+        this.childByName = new HashMap<>();
+        this.stateStores = stateStores;
+        this.time = new SystemTime();
     }
 
     public ProcessorNode(final String name,

Review comment:
       Could we deprecate the old constructor?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -532,14 +539,14 @@ public final void addStateStore(final StoreBuilder<?> storeBuilder,
         nodeGroups = null;
     }
 
-    public final <KIn, VIn, KOut, VOut> void addGlobalStore(final StoreBuilder<?> storeBuilder,
+    public final <KIn, VIn> void addGlobalStore(final StoreBuilder<?> storeBuilder,

Review comment:
       format




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9148:
URL: https://github.com/apache/kafka/pull/9148#discussion_r472348482



##########
File path: streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+public class MockApiProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {

Review comment:
       Ah, yeah, this is a good idea. I'll have to migrate all the field references to method references so they can be delegated, but I wanted to do that anyway.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9148:
URL: https://github.com/apache/kafka/pull/9148#discussion_r468967992



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##########
@@ -528,13 +529,56 @@ public synchronized StreamsBuilder addStateStore(final StoreBuilder<?> builder)
      * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
      * @return itself
      * @throws TopologyException if the processor of state is already registered
+     * @deprecated Since 2.7.0; use {@link #addGlobalStore(StoreBuilder, String, Consumed, org.apache.kafka.streams.processor.api.ProcessorSupplier)} instead.
      */
+    @Deprecated
     public synchronized <K, V> StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,
                                                              final String topic,
                                                              final Consumed<K, V> consumed,
                                                              final ProcessorSupplier<K, V> stateUpdateSupplier) {
         Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
+        internalStreamsBuilder.addGlobalStore(
+            storeBuilder,
+            topic,
+            new ConsumedInternal<>(consumed),
+            () -> ProcessorAdapter.adapt(stateUpdateSupplier.get())
+        );
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link org.apache.kafka.streams.processor.api.ProcessorSupplier}} will be used to create an
+     * {@link org.apache.kafka.streams.processor.api.Processor} that will receive all records forwarded from the {@link SourceNode}.
+     * NOTE: you should not use the {@link org.apache.kafka.streams.processor.api.Processor} to insert transformed records into
+     * the global state store. This store uses the source topic as changelog and during restore will insert records directly
+     * from the source.
+     * This {@link org.apache.kafka.streams.processor.api.Processor} should be used to keep the {@link StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     * <p>
+     * It is not required to connect a global store to the {@link org.apache.kafka.streams.processor.api.Processor Processors},
+     * {@link Transformer Transformers}, or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default.
+     *
+     * @param storeBuilder          user defined {@link StoreBuilder}; can't be {@code null}
+     * @param topic                 the topic to source the data from
+     * @param consumed              the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
+     * @param stateUpdateSupplier   the instance of {@link org.apache.kafka.streams.processor.api.ProcessorSupplier}

Review comment:
       Yea, that's probably easier when we eventually cleanup the deprecated stuff 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9148:
URL: https://github.com/apache/kafka/pull/9148#discussion_r468872295



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -532,14 +539,14 @@ public final void addStateStore(final StoreBuilder<?> storeBuilder,
         nodeGroups = null;
     }
 
-    public final <KIn, VIn, KOut, VOut> void addGlobalStore(final StoreBuilder<?> storeBuilder,
+    public final <KIn, VIn> void addGlobalStore(final StoreBuilder<?> storeBuilder,

Review comment:
       I didn't realize it in the last PR, but the output types for the `stateUpdate` processor is always `Void, Void` (ie., you can't forward anything, since the only function of that processor is to update the store itself). So we don't need the parameters; I've just expanded them to `Void, Void` below.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9148:
URL: https://github.com/apache/kafka/pull/9148#discussion_r469028867



##########
File path: streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+public class MockApiProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {

Review comment:
       I'm thinking whether it makes more sense to let `MockProcessor` encapsulate a delegate `MockApiProcessor` so that we could also use existing tests to verify the correctness of the migration.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -667,7 +674,7 @@ public void validateCopartition() {
     private void validateGlobalStoreArguments(final String sourceName,
                                               final String topic,
                                               final String processorName,
-                                              final ProcessorSupplier<?, ?> stateUpdateSupplier,
+                                              final ProcessorSupplier<?, ?, ?, ?> stateUpdateSupplier,

Review comment:
       Could we use `Void, Void` instead?

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##########
@@ -512,27 +513,70 @@ public synchronized StreamsBuilder addStateStore(final StoreBuilder<?> builder)
      * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
      * of the input topic.
      * <p>
-     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * The provided {@link org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
      * records forwarded from the {@link SourceNode}. NOTE: you should not use the {@code Processor} to insert transformed records into
      * the global state store. This store uses the source topic as changelog and during restore will insert records directly
      * from the source.
      * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
      * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
      * <p>
-     * It is not required to connect a global store to {@link Processor Processors}, {@link Transformer Transformers},
+     * It is not required to connect a global store to {@link org.apache.kafka.streams.processor.Processor Processors}, {@link Transformer Transformers},
      * or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default.
      *
      * @param storeBuilder          user defined {@link StoreBuilder}; can't be {@code null}
      * @param topic                 the topic to source the data from
      * @param consumed              the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
-     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link org.apache.kafka.streams.processor.ProcessorSupplier}
      * @return itself
      * @throws TopologyException if the processor of state is already registered
+     * @deprecated Since 2.7.0; use {@link #addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier)} instead.
      */
+    @Deprecated
     public synchronized <K, V> StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,
                                                              final String topic,
                                                              final Consumed<K, V> consumed,
-                                                             final ProcessorSupplier<K, V> stateUpdateSupplier) {
+                                                             final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier) {
+        Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
+        internalStreamsBuilder.addGlobalStore(
+            storeBuilder,
+            topic,
+            new ConsumedInternal<>(consumed),
+            () -> ProcessorAdapter.adapt(stateUpdateSupplier.get())
+        );
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier}} will be used to create an
+     * {@link Processor} that will receive all records forwarded from the {@link SourceNode}.
+     * NOTE: you should not use the {@link Processor} to insert transformed records into
+     * the global state store. This store uses the source topic as changelog and during restore will insert records directly
+     * from the source.
+     * This {@link Processor} should be used to keep the {@link StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     * <p>
+     * It is not required to connect a global store to the {@link Processor Processors},
+     * {@link Transformer Transformers}, or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default.
+     *
+     * @param storeBuilder          user defined {@link StoreBuilder}; can't be {@code null}
+     * @param topic                 the topic to source the data from
+     * @param consumed              the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized <KIn, VIn> StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,

Review comment:
       Do we want to add test coverage for this function?

##########
File path: streams/src/test/java/org/apache/kafka/test/MockApiProcessorSupplier.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class MockApiProcessorSupplier<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn, KOut, VOut> {
+
+    private final long scheduleInterval;
+    private final PunctuationType punctuationType;
+    private final List<MockApiProcessor<KIn, VIn, KOut, VOut>> processors = new ArrayList<>();
+
+    public MockApiProcessorSupplier() {
+        this(-1L);
+    }
+
+    public MockApiProcessorSupplier(final long scheduleInterval) {
+        this(scheduleInterval, PunctuationType.STREAM_TIME);
+    }
+
+    public MockApiProcessorSupplier(final long scheduleInterval, final PunctuationType punctuationType) {
+        this.scheduleInterval = scheduleInterval;
+        this.punctuationType = punctuationType;
+    }
+
+    @Override
+    public Processor<KIn, VIn, KOut, VOut> get() {
+        final MockApiProcessor<KIn, VIn, KOut, VOut> processor = new MockApiProcessor<>(punctuationType, scheduleInterval);
+        processors.add(processor);
+        return processor;
+    }
+
+    // get the captured processor assuming that only one processor gets returned from this supplier
+    public MockApiProcessor<KIn, VIn, KOut, VOut> theCapturedProcessor() {
+        return capturedProcessors(1).get(0);
+    }
+
+    public int capturedProcessorsCount() {
+        return processors.size();
+    }
+
+        // get the captured processors with the expected number

Review comment:
       comment format.

##########
File path: streams/src/test/java/org/apache/kafka/test/MockApiProcessorSupplier.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class MockApiProcessorSupplier<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn, KOut, VOut> {
+
+    private final long scheduleInterval;
+    private final PunctuationType punctuationType;
+    private final List<MockApiProcessor<KIn, VIn, KOut, VOut>> processors = new ArrayList<>();
+
+    public MockApiProcessorSupplier() {
+        this(-1L);
+    }
+
+    public MockApiProcessorSupplier(final long scheduleInterval) {
+        this(scheduleInterval, PunctuationType.STREAM_TIME);
+    }
+
+    public MockApiProcessorSupplier(final long scheduleInterval, final PunctuationType punctuationType) {
+        this.scheduleInterval = scheduleInterval;
+        this.punctuationType = punctuationType;
+    }
+
+    @Override
+    public Processor<KIn, VIn, KOut, VOut> get() {
+        final MockApiProcessor<KIn, VIn, KOut, VOut> processor = new MockApiProcessor<>(punctuationType, scheduleInterval);
+        processors.add(processor);
+        return processor;
+    }
+
+    // get the captured processor assuming that only one processor gets returned from this supplier
+    public MockApiProcessor<KIn, VIn, KOut, VOut> theCapturedProcessor() {
+        return capturedProcessors(1).get(0);
+    }
+
+    public int capturedProcessorsCount() {

Review comment:
       We could port this function when it is actually needed.

##########
File path: streams/src/test/java/org/apache/kafka/test/MockApiProcessorSupplier.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class MockApiProcessorSupplier<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn, KOut, VOut> {

Review comment:
       Same question for ProcessorSupplier for using a delegate, but is minor to me. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9148:
URL: https://github.com/apache/kafka/pull/9148#discussion_r468871225



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##########
@@ -528,13 +529,56 @@ public synchronized StreamsBuilder addStateStore(final StoreBuilder<?> builder)
      * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
      * @return itself
      * @throws TopologyException if the processor of state is already registered
+     * @deprecated Since 2.7.0; use {@link #addGlobalStore(StoreBuilder, String, Consumed, org.apache.kafka.streams.processor.api.ProcessorSupplier)} instead.
      */
+    @Deprecated
     public synchronized <K, V> StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,
                                                              final String topic,
                                                              final Consumed<K, V> consumed,
                                                              final ProcessorSupplier<K, V> stateUpdateSupplier) {
         Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
+        internalStreamsBuilder.addGlobalStore(
+            storeBuilder,
+            topic,
+            new ConsumedInternal<>(consumed),
+            () -> ProcessorAdapter.adapt(stateUpdateSupplier.get())
+        );
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link org.apache.kafka.streams.processor.api.ProcessorSupplier}} will be used to create an
+     * {@link org.apache.kafka.streams.processor.api.Processor} that will receive all records forwarded from the {@link SourceNode}.
+     * NOTE: you should not use the {@link org.apache.kafka.streams.processor.api.Processor} to insert transformed records into
+     * the global state store. This store uses the source topic as changelog and during restore will insert records directly
+     * from the source.
+     * This {@link org.apache.kafka.streams.processor.api.Processor} should be used to keep the {@link StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     * <p>
+     * It is not required to connect a global store to the {@link org.apache.kafka.streams.processor.api.Processor Processors},
+     * {@link Transformer Transformers}, or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default.
+     *
+     * @param storeBuilder          user defined {@link StoreBuilder}; can't be {@code null}
+     * @param topic                 the topic to source the data from
+     * @param consumed              the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
+     * @param stateUpdateSupplier   the instance of {@link org.apache.kafka.streams.processor.api.ProcessorSupplier}

Review comment:
       Sure can! I was on the fence about which one is "better". Maybe I'll just stick with always importing the new one from now on.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org