You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/20 09:50:29 UTC

[GitHub] [kafka] calmera opened a new pull request, #12188: KAFKA-10892: Shared Readonly State Stores

calmera opened a new pull request, #12188:
URL: https://github.com/apache/kafka/pull/12188

   This PR implements KIP-813: support for shareable state stores.
   
   Tests were created to validate the structure of the resulting topology. 
   
   Certain parts (MockProcessor eg.) have been migrated to the newer PAPI as well. Tests have been ran against the whole project to be sure this migration doesn't cause any side-effects.
   
   The contribution is my original work and I license the work to the project under the project's open source license


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] calmera commented on pull request #12188: KAFKA-10892: Shared Readonly State Stores

Posted by GitBox <gi...@apache.org>.
calmera commented on PR #12188:
URL: https://github.com/apache/kafka/pull/12188#issuecomment-1277295678

   Split this PR into two new ones:
   
   - #12740 updating the PAPI test classes to the latest version
   - #12742 handling the Topology changes and read-only statestore introduction


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12188: KAFKA-10892: Shared Readonly State Stores

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12188:
URL: https://github.com/apache/kafka/pull/12188#discussion_r915376733


##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *

Review Comment:
   nit: do we need a `<p>` tag here?



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.

Review Comment:
   nit: `read-only`



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *
+     * A Read Only StateStore can use any compacted topic as a changelog.

Review Comment:
   Proposal:
   ```
   A read-only state store uses its input topic for fault-tolerance. Thus, in contrast to regular state stores, it must never create an internal changelog topic. Therefore, the input topic should be configured with log compaction.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *
+     * A Read Only StateStore can use any compacted topic as a changelog.
+     * <p>
+     * A {@link SourceNode} will be added to consume the data arriving from the partitions of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined key value store builder

Review Comment:
   If we are limited to kv-store, should we change the type to `StoreBuilder<KeyValueStore>` (or similar)?



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *
+     * A Read Only StateStore can use any compacted topic as a changelog.
+     * <p>
+     * A {@link SourceNode} will be added to consume the data arriving from the partitions of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined key value store builder
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param timestampExtractor    the stateless timestamp extractor used for this source,
+     *                              if not specified the default extractor defined in the configs will be used
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return itself
+     * @throws TopologyException if the processor of state is already registered
+     */
+    public synchronized <KIn, VIn> Topology addReadOnlyStateStore(final StoreBuilder<?> storeBuilder,
+                                                                  final String sourceName,
+                                                                  final TimestampExtractor timestampExtractor,
+                                                                  final Deserializer<KIn> keyDeserializer,
+                                                                  final Deserializer<VIn> valueDeserializer,
+                                                                  final String topic,
+                                                                  final String processorName,
+                                                                  final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
+        if (storeBuilder.loggingEnabled()) {
+            // -- disabling logging. We might want to print some logging.
+            storeBuilder.withLoggingDisabled();

Review Comment:
   I think we should throw a `TopologyException` here (we do the same for global stores if logging is enabled).



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java:
##########
@@ -59,7 +59,7 @@ public void testKStreamBranch() {
 
         assertEquals(3, branches.length);
 
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<Integer, String, Void, Void> supplier = new MockProcessorSupplier<>();

Review Comment:
   Seems this changes are not related to this PR (similar below)? Would be good to exclude them and put into it's own PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] calmera commented on a diff in pull request #12188: KAFKA-10892: Shared Readonly State Stores

Posted by GitBox <gi...@apache.org>.
calmera commented on code in PR #12188:
URL: https://github.com/apache/kafka/pull/12188#discussion_r993559392


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java:
##########
@@ -59,7 +59,7 @@ public void testKStreamBranch() {
 
         assertEquals(3, branches.length);
 
-        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+        final MockProcessorSupplier<Integer, String, Void, Void> supplier = new MockProcessorSupplier<>();

Review Comment:
   Yeah, problem is that I need them to decently test the topology.
   
   Let me see what I can do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] calmera commented on a diff in pull request #12188: KAFKA-10892: Shared Readonly State Stores

Posted by GitBox <gi...@apache.org>.
calmera commented on code in PR #12188:
URL: https://github.com/apache/kafka/pull/12188#discussion_r993550391


##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder<?> storeBuilder,
         return this;
     }
 
+    /**
+     * Adds a Read Only {@link StateStore} to the topology.
+     *
+     * A Read Only StateStore can use any compacted topic as a changelog.
+     * <p>
+     * A {@link SourceNode} will be added to consume the data arriving from the partitions of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+     * records forwarded from the {@link SourceNode}.
+     * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     *
+     * @param storeBuilder          user defined key value store builder

Review Comment:
   There is no real restriction on the type of store, so will adapt the description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] calmera closed pull request #12188: KAFKA-10892: Shared Readonly State Stores

Posted by GitBox <gi...@apache.org>.
calmera closed pull request #12188: KAFKA-10892: Shared Readonly State Stores
URL: https://github.com/apache/kafka/pull/12188


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org