You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/23 03:36:10 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #8248: KAFKA-9501: convert between active and standby without closing stores

ableegoldman commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r391363596



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##########
@@ -132,47 +130,74 @@ private static String getTaskProducerClientId(final String threadClientId, final
                 partitions
             );
 
-            if (threadProducer == null) {
-                final String taskProducerClientId = getTaskProducerClientId(threadId, taskId);
-                final Map<String, Object> producerConfigs = config.getProducerConfigs(taskProducerClientId);
-                producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + taskId);
-                log.info("Creating producer client for task {}", taskId);
-                taskProducers.put(taskId, clientSupplier.getProducer(producerConfigs));
-            }
-
-            final RecordCollector recordCollector = new RecordCollectorImpl(
-                logContext,
-                taskId,
-                consumer,
-                threadProducer != null ?
-                    new StreamsProducer(threadProducer, false, logContext, applicationId) :
-                    new StreamsProducer(taskProducers.get(taskId), true, logContext, applicationId),
-                config.defaultProductionExceptionHandler(),
-                EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
-                streamsMetrics
-            );
-
-            final Task task = new StreamTask(
+            createdTasks.add(createStreamTask(
                 taskId,
                 partitions,
-                topology,
                 consumer,
-                config,
-                streamsMetrics,
-                stateDirectory,
-                cache,
-                time,
+                logContext,
                 stateManager,
-                recordCollector
-            );
-
-            log.trace("Created task {} with assigned partitions {}", taskId, partitions);
-            createdTasks.add(task);
-            createTaskSensor.record();
+                topology));
         }
         return createdTasks;
     }
 
+    private StreamTask createStreamTask(final TaskId taskId,
+                                        final Set<TopicPartition> partitions,
+                                        final Consumer<byte[], byte[]> consumer,
+                                        final LogContext logContext,
+                                        final ProcessorStateManager stateManager,
+                                        final ProcessorTopology topology) {
+        if (threadProducer == null) {
+            final String taskProducerClientId = getTaskProducerClientId(threadId, taskId);
+            final Map<String, Object> producerConfigs = config.getProducerConfigs(taskProducerClientId);
+            producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + taskId);
+            log.info("Creating producer client for task {}", taskId);
+            taskProducers.put(taskId, clientSupplier.getProducer(producerConfigs));
+        }
+
+        final RecordCollector recordCollector = new RecordCollectorImpl(
+            logContext,
+            taskId,
+            consumer,
+            threadProducer != null ?
+                new StreamsProducer(threadProducer, false, logContext, applicationId) :
+                                                                                          new StreamsProducer(taskProducers.get(taskId), true, logContext, applicationId),
+            config.defaultProductionExceptionHandler(),
+            EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
+            streamsMetrics
+        );
+
+        final StreamTask task = new StreamTask(
+            taskId,
+            partitions,
+            topology,
+            consumer,
+            config,
+            streamsMetrics,
+            stateDirectory,
+            cache,
+            time,
+            stateManager,
+            recordCollector
+        );
+
+        log.trace("Created task {} with assigned partitions {}", taskId, partitions);
+        createTaskSensor.record();
+        return task;
+    }
+
+    StreamTask convertStandbyToActive(final StandbyTask standbyTask,
+                                      final Set<TopicPartition> partitions,
+                                      final Consumer<byte[], byte[]> consumer) {
+        return createStreamTask(
+            standbyTask.id,
+            partitions,
+            consumer,
+            getLogContext(standbyTask.id),
+            standbyTask.stateMgr,
+            standbyTask.topology);

Review comment:
       The `topology` is created but never initialized for a standby, therefore we don't need to worry about closing it and can reuse it here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org