You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/01 22:43:18 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8248: KAFKA-9501: convert between active and standby without closing stores

vvcephei commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r414837565



##########
File path: checkstyle/suppressions.xml
##########
@@ -156,7 +156,7 @@
               files="(TopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl|StreamThread|StreamTask).java"/>
 
     <suppress checks="MethodLength"
-              files="(KTableImpl|StreamsPartitionAssignor.java)"/>
+              files="(KTableImpl|StreamsPartitionAssignor|TaskManager).java"/>

Review comment:
       Ok, it seems like this one at least is avoidable.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -247,6 +271,25 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
         }
     }
 
+    private void registerStoreWithChangelogReader(final String storeName) {
+        // if the store name does not exist in the changelog map, it means the underlying store
+        // is not log enabled (including global stores), and hence it does not need to be restored

Review comment:
       This comment needs to move into `isLoggingEnabled`. It doesn't make sense in this context anymore.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##########
@@ -132,47 +130,74 @@ private static String getTaskProducerClientId(final String threadClientId, final
                 partitions
             );
 
-            if (threadProducer == null) {
-                final String taskProducerClientId = getTaskProducerClientId(threadId, taskId);
-                final Map<String, Object> producerConfigs = config.getProducerConfigs(taskProducerClientId);
-                producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + taskId);
-                log.info("Creating producer client for task {}", taskId);
-                taskProducers.put(taskId, clientSupplier.getProducer(producerConfigs));
-            }
-
-            final RecordCollector recordCollector = new RecordCollectorImpl(
-                logContext,
-                taskId,
-                consumer,
-                threadProducer != null ?
-                    new StreamsProducer(threadProducer, false, logContext, applicationId) :
-                    new StreamsProducer(taskProducers.get(taskId), true, logContext, applicationId),
-                config.defaultProductionExceptionHandler(),
-                EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
-                streamsMetrics
-            );
-
-            final Task task = new StreamTask(
+            createdTasks.add(createStreamTask(
                 taskId,
                 partitions,
-                topology,
                 consumer,
-                config,
-                streamsMetrics,
-                stateDirectory,
-                cache,
-                time,
+                logContext,
                 stateManager,
-                recordCollector
-            );
-
-            log.trace("Created task {} with assigned partitions {}", taskId, partitions);
-            createdTasks.add(task);
-            createTaskSensor.record();
+                topology));
         }
         return createdTasks;
     }
 
+    private StreamTask createStreamTask(final TaskId taskId,
+                                        final Set<TopicPartition> partitions,
+                                        final Consumer<byte[], byte[]> consumer,
+                                        final LogContext logContext,
+                                        final ProcessorStateManager stateManager,
+                                        final ProcessorTopology topology) {
+        if (threadProducer == null) {
+            final String taskProducerClientId = getTaskProducerClientId(threadId, taskId);
+            final Map<String, Object> producerConfigs = config.getProducerConfigs(taskProducerClientId);
+            producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + taskId);
+            log.info("Creating producer client for task {}", taskId);
+            taskProducers.put(taskId, clientSupplier.getProducer(producerConfigs));
+        }
+
+        final RecordCollector recordCollector = new RecordCollectorImpl(
+            logContext,
+            taskId,
+            consumer,
+            threadProducer != null ?
+                new StreamsProducer(threadProducer, false, logContext, applicationId) :
+                                                                                          new StreamsProducer(taskProducers.get(taskId), true, logContext, applicationId),
+            config.defaultProductionExceptionHandler(),
+            EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
+            streamsMetrics
+        );
+
+        final StreamTask task = new StreamTask(
+            taskId,
+            partitions,
+            topology,
+            consumer,
+            config,
+            streamsMetrics,
+            stateDirectory,
+            cache,
+            time,
+            stateManager,
+            recordCollector
+        );
+
+        log.trace("Created task {} with assigned partitions {}", taskId, partitions);
+        createTaskSensor.record();

Review comment:
       I agree.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##########
@@ -235,9 +264,16 @@ void closeAndRemoveTaskProducerIfNeeded(final TaskId id) {
             return Collections.singleton(getThreadProducerClientId(threadId));
         } else {
             return taskProducers.keySet()
-                                .stream()
-                                .map(taskId -> getTaskProducerClientId(threadId, taskId))
-                                .collect(Collectors.toSet());
+                .stream()
+                .map(taskId -> getTaskProducerClientId(threadId, taskId))
+                .collect(Collectors.toSet());

Review comment:
       It seems like duelling formatters here and above. Do you want to propose that these it's better this way?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -155,9 +155,6 @@ void handleCorruption(final Map<TaskId, Collection<TopicPartition>> taskWithChan
             final TaskId taskId = entry.getKey();
             final Task task = tasks.get(taskId);
 
-            // this call is idempotent so even if the task is only CREATED we can still call it
-            changelogReader.remove(task.changelogPartitions());

Review comment:
       I lost track of how this is now handled. Can you enlighten me?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -70,18 +69,16 @@ static void registerStateStores(final Logger log,
                 e
             );
         }
+
         log.debug("Acquired state directory lock");
 
         final boolean storeDirsEmpty = stateDirectory.directoryForTaskIsEmpty(id);
 
         // We should only load checkpoint AFTER the corresponding state directory lock has been acquired and
         // the state stores have been registered; we should not try to load at the state manager construction time.
         // See https://issues.apache.org/jira/browse/KAFKA-8574

Review comment:
       Seems like this comment has also become displaced from its intended location before L83, `stateMgr.initializeStoreOffsetsFromCheckpoint(storeDirsEmpty)`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -505,4 +550,35 @@ private StateStoreMetadata findStore(final TopicPartition changelogPartition) {
 
         return found.isEmpty() ? null : found.get(0);
     }
+
+    void prepareForRecycle() {
+        log.debug("Preparing to recycle state for {} task {}.", taskType, taskId);
+
+        if (recyclingState) {
+            throw new IllegalStateException("Attempted to re-recycle state without completing first recycle");
+        }
+        recyclingState = true;
+    }
+
+    void recycleState() {
+        log.debug("Completed recycling state for formerly {} task {}.", taskType, taskId);
+
+        if (!recyclingState) {
+            throw new IllegalStateException("Attempted to complete recycle but state is not currently being recycled");
+        }
+        recyclingState = false;
+
+        if (taskType == TaskType.ACTIVE) {
+            taskType = TaskType.STANDBY;
+        } else {
+            taskType = TaskType.ACTIVE;
+        }

Review comment:
       It seems a bit "bold" here to assume that we want to flip the active/standby state. The TaskManager objectively knows what type it wants the task to become, so it seems it should just inform us of the desired task type in `prepareForRecycle`.
   
   Or, better yet, just null it out and we can set it during `createStandbyTaskFromActive` and `createActiveTaskFromStandby`. This kind of thing might also be nice, since we could have a more explicit lifecycle for this object:
   * I'm ready (created, valid, good to go)
   * I'm getting recycled
   * I'm closed
   
   Then, we can ensure that these "createXFromY" methods cleanly take the state manager from "closed" to "ready".
   I'm not saying to add _another_ state enum, but having a clearly defined lifecycle will help us later in maintenance. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -247,6 +271,25 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
         }
     }
 
+    private void registerStoreWithChangelogReader(final String storeName) {
+        // if the store name does not exist in the changelog map, it means the underlying store
+        // is not log enabled (including global stores), and hence it does not need to be restored
+        if (isLoggingEnabled(storeName)) {
+            // NOTE we assume the partition of the topic can always be inferred from the task id;
+            // if user ever use a custom partition grouper (deprecated in KIP-528) this would break and
+            // it is not a regression (it would always break anyways)

Review comment:
       And this comment should move into `getStorePartition`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -70,18 +69,16 @@ static void registerStateStores(final Logger log,
                 e
             );
         }
+
         log.debug("Acquired state directory lock");
 
         final boolean storeDirsEmpty = stateDirectory.directoryForTaskIsEmpty(id);
 
         // We should only load checkpoint AFTER the corresponding state directory lock has been acquired and
         // the state stores have been registered; we should not try to load at the state manager construction time.
         // See https://issues.apache.org/jira/browse/KAFKA-8574
-        for (final StateStore store : topology.stateStores()) {
-            processorContext.uninitialize();
-            store.init(processorContext, store);
-            log.trace("Registered state store {}", store.name());
-        }
+        stateMgr.registerStateStores(topology.stateStores(), processorContext);

Review comment:
       It seems like the intent here is to skip initializing the store again if it's already been initialized, but still register it to the changelog reader. Which is ok, since we `unregisterAllStoresWithChangelogReader` in the recycleState, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org