You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/29 21:12:45 UTC

[GitHub] [kafka] mjsax commented on a diff in pull request #12742: KAFKA-10892: Shared Readonly State Stores ( revisited )

mjsax commented on code in PR #12742:
URL: https://github.com/apache/kafka/pull/12742#discussion_r1059135143


##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *
+     * A Read Only StateStore can use any compacted topic as a changelog.
+     * <p>
+     * A {@link SourceNode} will be added to consume the data arriving from the partitions of the input topic.

Review Comment:
   Seems to be too detailed. Also `SourceNode` is an internal class and I think we should not refer to it in the JavaDocs.



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.

Review Comment:
   ```suggestion
        * Adds a read-only {@link StateStore} to the topology.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *
+     * A Read Only StateStore can use any compacted topic as a changelog.

Review Comment:
   ```suggestion
        * A read-only StateStore can use any compacted topic as a changelog.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *

Review Comment:
   ```suggestion
        * <p>
   ```



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *
+     * A Read Only StateStore can use any compacted topic as a changelog.
+     * <p>
+     * A {@link SourceNode} will be added to consume the data arriving from the partitions of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined key value store builder

Review Comment:
   Why "key value" ?



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *
+     * A Read Only StateStore can use any compacted topic as a changelog.
+     * <p>
+     * A {@link SourceNode} will be added to consume the data arriving from the partitions of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined key value store builder
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added

Review Comment:
   as above



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *
+     * A Read Only StateStore can use any compacted topic as a changelog.
+     * <p>
+     * A {@link SourceNode} will be added to consume the data arriving from the partitions of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined key value store builder
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param timestampExtractor    the stateless timestamp extractor used for this source,
+     *                              if not specified the default extractor defined in the configs will be used
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized <KIn, VIn> Topology addReadOnlyStateStore(final StoreBuilder<?> storeBuilder,
+                                                                  final String sourceName,
+                                                                  final TimestampExtractor timestampExtractor,
+                                                                  final Deserializer<KIn> keyDeserializer,
+                                                                  final Deserializer<VIn> valueDeserializer,
+                                                                  final String topic,
+                                                                  final String processorName,
+                                                                  final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
+        if (storeBuilder.loggingEnabled()) {
+            // -- disabling logging. We might want to print some logging.

Review Comment:
   For globlaKTable we also just disable logging without logging any warning -- guess we could mention it in the JavaDocs.
   
   Also, we don't really need the `if` and can call `withLoggingDisabled()` blindly.



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *
+     * A Read Only StateStore can use any compacted topic as a changelog.
+     * <p>
+     * A {@link SourceNode} will be added to consume the data arriving from the partitions of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all

Review Comment:
   Similar for `ProcessorNode`



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *
+     * A Read Only StateStore can use any compacted topic as a changelog.
+     * <p>
+     * A {@link SourceNode} will be added to consume the data arriving from the partitions of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined key value store builder
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param timestampExtractor    the stateless timestamp extractor used for this source,
+     *                              if not specified the default extractor defined in the configs will be used
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */

Review Comment:
   Should we mention anything about `auto.offset.reset` in the JavaDocs? Seems we don't allow to pass in a config, and we might want to hard-code it to "earliest"? (Cannot remember if/what we discussion on the KIP about it?)



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java:
##########
@@ -59,7 +59,7 @@ public void testKStreamBranch() {
 
         assertEquals(3, branches.length);
 
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<Integer, String, Void, Void> supplier = new MockProcessorSupplier<>();

Review Comment:
   Thanks for the cleanup!



##########
streams/src/test/java/org/apache/kafka/streams/TopologyTest.java:
##########
@@ -2303,6 +2304,63 @@ private void addGlobalStoreToTopologyAndExpectedDescription(final String globalS
         expectedDescription.addGlobalStore(expectedGlobalStore);
     }
 
+    @Test
+    public void readonlyStateStoresShouldHaveTheirOwnSubTopology() {

Review Comment:
   ```suggestion
       public void readOnlyStateStoresShouldHaveTheirOwnSubTopology() {
   ```



##########
docs/streams/developer-guide/processor-api.html:
##########
@@ -396,6 +397,21 @@ <h2>
                 </ul>
                 </p>
             </div>
+            <div class="section" id="readonly-state-stores">
+                <span id="streams-developer-guide-state-store-readonly"></span><h3><a class="toc-backref" href="#id12">ReadOnly State Stores</a><a class="headerlink" href="#readonly-state-stores" title="Permalink to this headline"></a></h3>
+                <p>The changelog topic for a fault-tolerant state store can also be used to perform lookups against. It is
+                    crucial to understand that there can be only one writer to this state store, but there can be multiple
+                    readers.</p>
+
+                <p>This allows you to use a readonly state store for looking up reference data managed by another processor
+                    or even another process all together. The messages within the compacted topic may be projected (subset of data extracted)
+                    before going into the readonly state store. Therefor we can optimize what the data within our readonly state
+                    store should look like, depending on the use-case it needs to serve.</p>
+
+                <p><b>note:</b> beware of the partitioning requirements when using readonly statestores for lookups during

Review Comment:
   ```suggestion
                   <p><b>note:</b> beware of the partitioning requirements when using read-only state stores for lookups during
   ```



##########
docs/streams/developer-guide/processor-api.html:
##########
@@ -396,6 +397,21 @@ <h2>
                 </ul>
                 </p>
             </div>
+            <div class="section" id="readonly-state-stores">
+                <span id="streams-developer-guide-state-store-readonly"></span><h3><a class="toc-backref" href="#id12">ReadOnly State Stores</a><a class="headerlink" href="#readonly-state-stores" title="Permalink to this headline"></a></h3>
+                <p>The changelog topic for a fault-tolerant state store can also be used to perform lookups against. It is

Review Comment:
   This sentence is somewhat confusing. Maybe we should start with something like this:
   
   > A read-only state store materialized the data from its input topic. It also uses the input topic for fault-tolerance, and thus does not have an additional changelog topic (the input topic is re-used as changelog). Thus, the input topic should be configured with log compaction (add link to log compaction). Note that no other processor should modify the content of the state store, and the only writer should be the associated "state update processor"; other processor may read the content of the read-only store.



##########
streams/src/test/java/org/apache/kafka/streams/TopologyTest.java:
##########
@@ -2303,6 +2304,63 @@ private void addGlobalStoreToTopologyAndExpectedDescription(final String globalS
         expectedDescription.addGlobalStore(expectedGlobalStore);
     }
 
+    @Test
+    public void readonlyStateStoresShouldHaveTheirOwnSubTopology() {
+        final String sourceName = "source";
+        final String storeName = "store";
+        final String topicName = "topic";
+        final String processorName = "processor";
+
+        final KeyValueStoreBuilder<?, ?> storeBuilder = mock(KeyValueStoreBuilder.class);
+        when(storeBuilder.name()).thenReturn(storeName);
+        topology.addReadOnlyStateStore(
+                storeBuilder,
+                sourceName,
+                null,
+                null,
+                null,
+                topicName,
+                processorName,
+                new MockProcessorSupplier<>());
+
+        final TopologyDescription.Source expectedSource = new InternalTopologyBuilder.Source(sourceName, Sets.newSet(topicName), null);
+        final TopologyDescription.Processor expectedProcessor = new InternalTopologyBuilder.Processor(processorName, Sets.newSet(storeName));
+
+        ((InternalTopologyBuilder.AbstractNode) expectedSource).addSuccessor(expectedProcessor);
+        ((InternalTopologyBuilder.AbstractNode) expectedProcessor).addPredecessor(expectedSource);
+
+        final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+        allNodes.add(expectedSource);
+        allNodes.add(expectedProcessor);
+        expectedDescription.addSubtopology(new SubtopologyDescription(0, allNodes));
+
+        assertThat(topology.describe(), equalTo(expectedDescription));
+        assertThat(topology.describe().hashCode(), equalTo(expectedDescription.hashCode()));
+    }
+
+    @Test
+    public void readonlyStateStoresShouldNotLog() {

Review Comment:
   ```suggestion
       public void readOnlyStateStoresShouldNotLog() {
   ```



##########
docs/streams/developer-guide/processor-api.html:
##########
@@ -396,6 +397,21 @@ <h2>
                 </ul>
                 </p>
             </div>
+            <div class="section" id="readonly-state-stores">
+                <span id="streams-developer-guide-state-store-readonly"></span><h3><a class="toc-backref" href="#id12">ReadOnly State Stores</a><a class="headerlink" href="#readonly-state-stores" title="Permalink to this headline"></a></h3>
+                <p>The changelog topic for a fault-tolerant state store can also be used to perform lookups against. It is
+                    crucial to understand that there can be only one writer to this state store, but there can be multiple
+                    readers.</p>
+
+                <p>This allows you to use a readonly state store for looking up reference data managed by another processor

Review Comment:
   ```suggestion
                   <p>This allows you to use a read-only state store for looking up reference data managed by another processor
   ```



##########
streams/src/test/java/org/apache/kafka/test/MockProcessor.java:
##########
@@ -28,9 +29,11 @@
 import java.util.List;
 import java.util.Map;
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public class MockProcessor<K, V> extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
-    private final MockApiProcessor<K, V, Object, Object> delegate;
+public class MockProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
+    private final MockApiProcessor<KIn, VIn, KOut, VOut> delegate;
+
+    private ProcessorContext<KOut, VOut> context;

Review Comment:
   Why do we add this? We inherit `context` from the super class anyway.



##########
docs/streams/developer-guide/processor-api.html:
##########
@@ -396,6 +397,21 @@ <h2>
                 </ul>
                 </p>
             </div>
+            <div class="section" id="readonly-state-stores">
+                <span id="streams-developer-guide-state-store-readonly"></span><h3><a class="toc-backref" href="#id12">ReadOnly State Stores</a><a class="headerlink" href="#readonly-state-stores" title="Permalink to this headline"></a></h3>
+                <p>The changelog topic for a fault-tolerant state store can also be used to perform lookups against. It is
+                    crucial to understand that there can be only one writer to this state store, but there can be multiple
+                    readers.</p>
+
+                <p>This allows you to use a readonly state store for looking up reference data managed by another processor
+                    or even another process all together. The messages within the compacted topic may be projected (subset of data extracted)
+                    before going into the readonly state store. Therefor we can optimize what the data within our readonly state
+                    store should look like, depending on the use-case it needs to serve.</p>
+
+                <p><b>note:</b> beware of the partitioning requirements when using readonly statestores for lookups during
+                    processing. You might want to make sure the original changelog topic is co-partitioned with the processors
+                    reading the readonly statestore.</p>

Review Comment:
   This reminds me: should we allow to pass in a custom partitioner? 🤔 (Could also be follow up work though...)



##########
docs/streams/developer-guide/processor-api.html:
##########
@@ -396,6 +397,21 @@ <h2>
                 </ul>
                 </p>
             </div>
+            <div class="section" id="readonly-state-stores">
+                <span id="streams-developer-guide-state-store-readonly"></span><h3><a class="toc-backref" href="#id12">ReadOnly State Stores</a><a class="headerlink" href="#readonly-state-stores" title="Permalink to this headline"></a></h3>
+                <p>The changelog topic for a fault-tolerant state store can also be used to perform lookups against. It is
+                    crucial to understand that there can be only one writer to this state store, but there can be multiple
+                    readers.</p>
+
+                <p>This allows you to use a readonly state store for looking up reference data managed by another processor
+                    or even another process all together. The messages within the compacted topic may be projected (subset of data extracted)

Review Comment:
   > may be projected 
   
   I don't think that works? When we restore the state, we would by-pass the processor and put the bytes from the input topic into the store as-is...



##########
docs/streams/developer-guide/processor-api.html:
##########
@@ -396,6 +397,21 @@ <h2>
                 </ul>
                 </p>
             </div>
+            <div class="section" id="readonly-state-stores">
+                <span id="streams-developer-guide-state-store-readonly"></span><h3><a class="toc-backref" href="#id12">ReadOnly State Stores</a><a class="headerlink" href="#readonly-state-stores" title="Permalink to this headline"></a></h3>
+                <p>The changelog topic for a fault-tolerant state store can also be used to perform lookups against. It is
+                    crucial to understand that there can be only one writer to this state store, but there can be multiple
+                    readers.</p>
+
+                <p>This allows you to use a readonly state store for looking up reference data managed by another processor
+                    or even another process all together. The messages within the compacted topic may be projected (subset of data extracted)
+                    before going into the readonly state store. Therefor we can optimize what the data within our readonly state
+                    store should look like, depending on the use-case it needs to serve.</p>
+
+                <p><b>note:</b> beware of the partitioning requirements when using readonly statestores for lookups during
+                    processing. You might want to make sure the original changelog topic is co-partitioned with the processors
+                    reading the readonly statestore.</p>

Review Comment:
   ```suggestion
                       reading the read-only state store.</p>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org