You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/01/18 17:02:12 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #11676: feat: checkpoint position in state stores

vvcephei commented on a change in pull request #11676:
URL: https://github.com/apache/kafka/pull/11676#discussion_r786893179



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
##########
@@ -94,7 +94,8 @@
      * @throws StreamsException if the store's change log does not contain the partition
      */
     void register(final StateStore store,
-                  final StateRestoreCallback stateRestoreCallback);
+                  final StateRestoreCallback stateRestoreCallback,
+                  final CheckpointCallback checkpointCallback);

Review comment:
       We need to avoid breaking changes, so what we'll want to do here is deprecate this method and introduce a new overload with a default implementation that calls the old method (ignoring the `checkpointCallback`). That way, existing store implementations will continue to compile after upgrading.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -152,4 +152,10 @@ default void init(final StateStoreContext context, final StateStore root) {
         // If a store doesn't implement a query handler, then all queries are unknown.
         return QueryResult.forUnknownQueryType(query, this);
     }
+
+    /*
+    default void checkpoint() throws IOException {
+    }
+
+     */

Review comment:
       ```suggestion
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -598,6 +605,15 @@ public void checkpoint() {
         // checkpoint those stores that are only logged and persistent to the checkpoint file
         final Map<TopicPartition, Long> checkpointingOffsets = new HashMap<>();
         for (final StateStoreMetadata storeMetadata : stores.values()) {
+            if (storeMetadata.checkpointCallback != null) {
+                try {
+                    storeMetadata.checkpointCallback.checkpoint();
+                } catch (final IOException e) {
+                    throw new ProcessorStateException(format("%sError creating position checkpoint file",
+                            logPrefix), e);
+                }
+            }
+

Review comment:
       I think we'll need to add `if (!corrupted)`.
   
   I agree we don't need to move this into the block below because non-persistent stores already know that they're not persistent and therefore can safely ignore the callback. But stores won't automatically know whether they are corrupted or not, so we should avoid checkpointing them if they're corrupted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org