You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/05 00:48:22 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #8248: KAFKA-9501: convert between active and standby without closing stores

guozhangwang commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r419795444



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -247,6 +270,20 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
         }
     }
 
+    private void registerStoreWithChangelogReader(final String storeName) {
+        if (isLoggingEnabled(storeName)) {
+            changelogReader.register(getStorePartition(storeName), this);
+        }
+    }
+
+    private void unregisterAllStoresWithChangelogReader() {
+        final List<TopicPartition> allChangelogPartitions = new ArrayList<>();
+        for (final StateStoreMetadata storeMetadata : stores.values()) {
+            allChangelogPartitions.add(storeMetadata.changelogPartition);

Review comment:
       The `storeMetadata.changelogPartition` maybe null, while ArrayList would still put that null into the array.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -789,7 +789,7 @@ private void prepareChangelogs(final Set<ChangelogMetadata> newPartitionsToResto
     }
 
     @Override
-    public void remove(final Collection<TopicPartition> revokedChangelogs) {
+    public void unregister(final Collection<TopicPartition> revokedChangelogs) {

Review comment:
       Some callers may be passing `null` into the collection, see my other comments.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -204,7 +203,14 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
             } else if (standbyTasks.containsKey(task.id()) && !task.isActive()) {
                 task.resume();
                 standbyTasksToCreate.remove(task.id());
-            } else /* we previously owned this task, and we don't have it anymore, or it has changed active/standby state */ {
+            } else {
+                // check for tasks that were owned previously but have changed active/standby status
+                final boolean isTransitioningType = activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id());
+                if (isTransitioningType) {

Review comment:
       This is a meta comment: I'm a bit inclined to suggest that we do this recycle on the task-manager, i.e. do not call task.close() inside task-manager and let the close() internally check a boolean whether it is prepareRecycle. 
   
   Instead, we can do the following:
   
   1. Add a `Task#convertTo(TaskType)` interface which would return an active / standby task copying the fields of the original task, originated in `RESTORING` state.
   
   2. For active task, the implementation would be:
   
   * first transit to the `RESTORING` state (we would allow `SUSPENDED` to transit to `RESTORING` too, so if it is not in CREATED, we can first suspend it and then transit to `RESTORING`).
   
   * and then return a newly created standby task initialized state as `RESTORING`.
   
   3. For standby task, the implementation would be:
   
   * first transit to `RESTORING` state (which would usually be a no-op).
   
   * and then return a newly created active task initialized state as `RESTORING`.
   
   Also I realized that `recordCollector.initialize();` in active task should be moved from `initializeIfNeeded` to `completeRestoration`. This is a minor bug that may block this proposal --- I will prepare a PR fixing this.
   
   4. Then on task manager, for those convertible tasks we would call `convertTo` instead of close / re-create via the task-creators.
   
   The key behind this proposal is that:
   
   * Suspended and Restoring states are actually the same for active and standby tasks. 
   * In the future when we remove `Suspended` state we would just have Restoring.
   * Active and Standby's Restoring state are actually the same in terms of functionality.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -247,6 +270,20 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
         }
     }
 
+    private void registerStoreWithChangelogReader(final String storeName) {

Review comment:
       nit: maybeRegisterStoreWithChangelogReader.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org