You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2022/04/08 18:07:34 UTC

[kafka] branch add-assignor-log-generation created (now f7cd88be3c)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a change to branch add-assignor-log-generation
in repository https://gitbox.apache.org/repos/asf/kafka.git


      at f7cd88be3c MINOR: Add generation to consumer assignor logs

This branch includes the following new commits:

     new f7cd88be3c MINOR: Add generation to consumer assignor logs

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[kafka] 01/01: MINOR: Add generation to consumer assignor logs

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch add-assignor-log-generation
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit f7cd88be3c1c7aa3a9e69c6e3077687929a72085
Author: John Roesler <vv...@apache.org>
AuthorDate: Fri Apr 8 13:06:52 2022 -0500

    MINOR: Add generation to consumer assignor logs
---
 .../consumer/ConsumerPartitionAssignor.java        |  24 +-
 .../consumer/internals/ConsumerCoordinator.java    |  22 +-
 .../consumer/internals/ContextualLogging.java      |  23 ++
 .../consumer/internals/DynamicPrefixLogger.java    | 353 +++++++++++++++++++++
 .../internals/StreamsPartitionAssignor.java        |  23 +-
 5 files changed, 428 insertions(+), 17 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
index a541b8a686..21d918a54c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java
@@ -16,19 +16,22 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Optional;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Configurable;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.Utils;
+import java.util.function.Supplier;
 
 /**
  * This interface is used to define custom partition assignment for use in
@@ -45,6 +48,13 @@ import org.apache.kafka.common.utils.Utils;
  */
 public interface ConsumerPartitionAssignor {
 
+    /**
+     * ConsumerPartitionAssignors can optionally implement this method so that their logs
+     * will contain useful correlation information like the client id and current generation.
+     * @param logContext
+     */
+    default void setLogContext(Supplier<String> logContext) {}
+
     /**
      * Return serialized data that will be included in the {@link Subscription} sent to the leader
      * and can be leveraged in {@link #assign(Cluster, GroupSubscription)} ((e.g. local host/rack information)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 10939b2a0e..4f3c2672d3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -67,6 +67,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -82,6 +83,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.clients.consumer.ConsumerConfig.ASSIGN_FROM_SUBSCRIBED_ASSIGNORS;
@@ -164,13 +166,23 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
               metricGrpPrefix,
               time);
         this.rebalanceConfig = rebalanceConfig;
-        this.log = logContext.logger(ConsumerCoordinator.class);
+        final Supplier<String> dynamicPrefix =
+            () -> logContext.logPrefix() + "[generationId=" + generation().generationId + "] ";
+        this.log = new DynamicPrefixLogger(
+            dynamicPrefix,
+            LoggerFactory.getLogger(ConsumerCoordinator.class)
+        );
         this.metadata = metadata;
         this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch(), metadata.updateVersion());
         this.subscriptions = subscriptions;
         this.defaultOffsetCommitCallback = new DefaultOffsetCommitCallback();
         this.autoCommitEnabled = autoCommitEnabled;
         this.autoCommitIntervalMs = autoCommitIntervalMs;
+        for (final ConsumerPartitionAssignor assignor : assignors) {
+            if (assignor instanceof ContextualLogging) {
+                ((ContextualLogging) assignor).setLoggingContext(dynamicPrefix);
+            }
+        }
         this.assignors = assignors;
         this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
         this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
@@ -666,8 +678,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         isLeader = true;
 
         if (skipAssignment) {
-            log.info("Skipped assignment for returning static leader at generation {}. The static leader " +
-                "will continue with its existing assignment.", generation().generationId);
+            log.info("Skipped assignment for returning static leader. The static leader " +
+                "will continue with its existing assignment.");
             assignmentSnapshot = metadataSnapshot;
             return Collections.emptyMap();
         }
@@ -688,7 +700,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         // we must take the assignment snapshot after.
         assignmentSnapshot = metadataSnapshot;
 
-        log.info("Finished assignment for group at generation {}: {}", generation().generationId, assignments);
+        log.info("Finished assignment for group: {}", assignments);
 
         Map<String, ByteBuffer> groupAssignment = new HashMap<>();
         for (Map.Entry<String, Assignment> assignmentEntry : assignments.entrySet()) {
@@ -1160,7 +1172,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     private RequestFuture<Void> maybeAutoCommitOffsetsAsync() {
         if (autoCommitEnabled)
             return autoCommitOffsetsAsync();
-        return null;    
+        return null;
     }
 
     private class DefaultOffsetCommitCallback implements OffsetCommitCallback {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ContextualLogging.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ContextualLogging.java
new file mode 100644
index 0000000000..c0765549a9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ContextualLogging.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.function.Supplier;
+
+public interface ContextualLogging {
+    void setLoggingContext(Supplier<String> loggingContext);
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicPrefixLogger.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicPrefixLogger.java
new file mode 100644
index 0000000000..e2d9252d2b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicPrefixLogger.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.slf4j.Logger;
+import org.slf4j.Marker;
+
+import java.util.function.Supplier;
+
+public final class DynamicPrefixLogger implements Logger {
+
+    private final Supplier<String> prefix;
+    private final Logger delegate;
+
+    public DynamicPrefixLogger(final Supplier<String> prefix, final Logger delegate) {
+        this.prefix = prefix;
+        this.delegate = delegate;
+    }
+
+    @Override
+    public String getName() {
+        return delegate.getName();
+    }
+
+    @Override
+    public boolean isTraceEnabled() {
+        return delegate.isTraceEnabled();
+    }
+
+    @Override
+    public void trace(final String msg) {
+        delegate.trace(prefix.get() + msg);
+    }
+
+    @Override
+    public void trace(final String format, final Object arg) {
+        delegate.trace(prefix.get() + format, arg);
+    }
+
+    @Override
+    public void trace(final String format, final Object arg1, final Object arg2) {
+        delegate.trace(prefix.get() + format, arg1, arg2);
+    }
+
+    @Override
+    public void trace(final String format, final Object... arguments) {
+        delegate.trace(prefix.get() + format, arguments);
+    }
+
+    @Override
+    public void trace(final String msg, final Throwable t) {
+        delegate.trace(prefix.get() + msg, t);
+    }
+
+    @Override
+    public boolean isTraceEnabled(final Marker marker) {
+        return delegate.isTraceEnabled(marker);
+    }
+
+    @Override
+    public void trace(final Marker marker, final String msg) {
+        delegate.trace(marker, prefix.get() + msg);
+    }
+
+    @Override
+    public void trace(final Marker marker, final String format, final Object arg) {
+        delegate.trace(marker, prefix.get() + format, arg);
+    }
+
+    @Override
+    public void trace(final Marker marker,
+                      final String format,
+                      final Object arg1,
+                      final Object arg2) {
+        delegate.trace(marker, prefix.get() + format, arg1, arg2);
+    }
+
+    @Override
+    public void trace(final Marker marker, final String format, final Object... argArray) {
+        delegate.trace(marker, prefix.get() + format, argArray);
+    }
+
+    @Override
+    public void trace(final Marker marker, final String msg, final Throwable t) {
+        delegate.trace(marker, prefix.get() + msg, t);
+    }
+
+    @Override
+    public boolean isDebugEnabled() {
+        return delegate.isDebugEnabled();
+    }
+
+    @Override
+    public void debug(final String msg) {
+        delegate.debug(prefix.get() + msg);
+    }
+
+    @Override
+    public void debug(final String format, final Object arg) {
+        delegate.debug(prefix.get() + format, arg);
+    }
+
+    @Override
+    public void debug(final String format, final Object arg1, final Object arg2) {
+        delegate.debug(prefix.get() + format, arg1, arg2);
+    }
+
+    @Override
+    public void debug(final String format, final Object... arguments) {
+        delegate.debug(prefix.get() + format, arguments);
+    }
+
+    @Override
+    public void debug(final String msg, final Throwable t) {
+        delegate.debug(prefix.get() + msg, t);
+    }
+
+    @Override
+    public boolean isDebugEnabled(final Marker marker) {
+        return delegate.isDebugEnabled(marker);
+    }
+
+    @Override
+    public void debug(final Marker marker, final String msg) {
+        delegate.debug(marker, prefix.get() + msg);
+    }
+
+    @Override
+    public void debug(final Marker marker, final String format, final Object arg) {
+        delegate.debug(marker, prefix.get() + format, arg);
+    }
+
+    @Override
+    public void debug(final Marker marker,
+                      final String format,
+                      final Object arg1,
+                      final Object arg2) {
+        delegate.debug(marker, prefix.get() + format, arg1, arg2);
+    }
+
+    @Override
+    public void debug(final Marker marker, final String format, final Object... arguments) {
+        delegate.debug(marker, prefix.get() + format, arguments);
+    }
+
+    @Override
+    public void debug(final Marker marker, final String msg, final Throwable t) {
+        delegate.debug(marker, prefix.get() + msg, t);
+    }
+
+    @Override
+    public boolean isInfoEnabled() {
+        return delegate.isInfoEnabled();
+    }
+
+    @Override
+    public void info(final String msg) {
+        delegate.info(prefix.get() + msg);
+    }
+
+    @Override
+    public void info(final String format, final Object arg) {
+        delegate.info(prefix.get() + format, arg);
+    }
+
+    @Override
+    public void info(final String format, final Object arg1, final Object arg2) {
+        delegate.info(prefix.get() + format, arg1, arg2);
+    }
+
+    @Override
+    public void info(final String format, final Object... arguments) {
+        delegate.info(prefix.get() + format, arguments);
+    }
+
+    @Override
+    public void info(final String msg, final Throwable t) {
+        delegate.info(prefix.get() + msg, t);
+    }
+
+    @Override
+    public boolean isInfoEnabled(final Marker marker) {
+        return delegate.isInfoEnabled(marker);
+    }
+
+    @Override
+    public void info(final Marker marker, final String msg) {
+        delegate.info(marker, prefix.get() + msg);
+    }
+
+    @Override
+    public void info(final Marker marker, final String format, final Object arg) {
+        delegate.info(marker, prefix.get() + format, arg);
+    }
+
+    @Override
+    public void info(final Marker marker,
+                     final String format,
+                     final Object arg1,
+                     final Object arg2) {
+        delegate.info(marker, prefix.get() + format, arg1, arg2);
+    }
+
+    @Override
+    public void info(final Marker marker, final String format, final Object... arguments) {
+        delegate.info(marker, prefix.get() + format, arguments);
+    }
+
+    @Override
+    public void info(final Marker marker, final String msg, final Throwable t) {
+        delegate.info(marker, prefix.get() + msg, t);
+    }
+
+    @Override
+    public boolean isWarnEnabled() {
+        return delegate.isWarnEnabled();
+    }
+
+    @Override
+    public void warn(final String msg) {
+        delegate.warn(prefix.get() + msg);
+    }
+
+    @Override
+    public void warn(final String format, final Object arg) {
+        delegate.warn(prefix.get() + format, arg);
+    }
+
+    @Override
+    public void warn(final String format, final Object... arguments) {
+        delegate.warn(prefix.get() + format, arguments);
+    }
+
+    @Override
+    public void warn(final String format, final Object arg1, final Object arg2) {
+        delegate.warn(prefix.get() + format, arg1, arg2);
+    }
+
+    @Override
+    public void warn(final String msg, final Throwable t) {
+        delegate.warn(prefix.get() + msg, t);
+    }
+
+    @Override
+    public boolean isWarnEnabled(final Marker marker) {
+        return delegate.isWarnEnabled(marker);
+    }
+
+    @Override
+    public void warn(final Marker marker, final String msg) {
+        delegate.warn(marker, prefix.get() + msg);
+    }
+
+    @Override
+    public void warn(final Marker marker, final String format, final Object arg) {
+        delegate.warn(marker, prefix.get() + format, arg);
+    }
+
+    @Override
+    public void warn(final Marker marker,
+                     final String format,
+                     final Object arg1,
+                     final Object arg2) {
+        delegate.warn(marker, prefix.get() + format, arg1, arg2);
+    }
+
+    @Override
+    public void warn(final Marker marker, final String format, final Object... arguments) {
+        delegate.warn(marker, prefix.get() + format, arguments);
+    }
+
+    @Override
+    public void warn(final Marker marker, final String msg, final Throwable t) {
+        delegate.warn(marker, prefix.get() + msg, t);
+    }
+
+    @Override
+    public boolean isErrorEnabled() {
+        return delegate.isErrorEnabled();
+    }
+
+    @Override
+    public void error(final String msg) {
+        delegate.error(prefix.get() + msg);
+    }
+
+    @Override
+    public void error(final String format, final Object arg) {
+        delegate.error(prefix.get() + format, arg);
+    }
+
+    @Override
+    public void error(final String format, final Object arg1, final Object arg2) {
+        delegate.error(prefix.get() + format, arg1, arg2);
+    }
+
+    @Override
+    public void error(final String format, final Object... arguments) {
+        delegate.error(prefix.get() + format, arguments);
+    }
+
+    @Override
+    public void error(final String msg, final Throwable t) {
+        delegate.error(prefix.get() + msg, t);
+    }
+
+    @Override
+    public boolean isErrorEnabled(final Marker marker) {
+        return delegate.isErrorEnabled(marker);
+    }
+
+    @Override
+    public void error(final Marker marker, final String msg) {
+        delegate.error(marker, prefix.get() + msg);
+    }
+
+    @Override
+    public void error(final Marker marker, final String format, final Object arg) {
+        delegate.error(marker, format, arg);
+    }
+
+    @Override
+    public void error(final Marker marker,
+                      final String format,
+                      final Object arg1,
+                      final Object arg2) {
+        delegate.error(marker, prefix.get() + format, arg1, arg2);
+    }
+
+    @Override
+    public void error(final Marker marker, final String format, final Object... arguments) {
+        delegate.error(marker, prefix.get() + format, arguments);
+    }
+
+    @Override
+    public void error(final Marker marker, final String msg, final Throwable t) {
+        delegate.error(marker, prefix.get() + msg, t);
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 2af2fba718..17ddeb6ca7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -21,6 +21,8 @@ import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.internals.ContextualLogging;
+import org.apache.kafka.clients.consumer.internals.DynamicPrefixLogger;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.KafkaException;
@@ -29,7 +31,6 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.MissingSourceTopicException;
@@ -52,6 +53,7 @@ import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
 import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
 import org.apache.kafka.streams.state.HostInfo;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -77,7 +79,6 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static java.util.UUID.randomUUID;
-
 import static org.apache.kafka.common.utils.Utils.filterMap;
 import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
 import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsFuture;
@@ -86,10 +87,23 @@ import static org.apache.kafka.streams.processor.internals.assignment.StreamsAss
 import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN;
 import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM;
 
-public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Configurable {
+public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Configurable,
+    ContextualLogging {
 
-    private Logger log;
+    // set first via configure()
     private String logPrefix;
+    // set second via setLoggingContext()
+    private Supplier<String> loggingContext;
+    private Logger log;
+
+    @Override
+    public void setLoggingContext(final Supplier<String> loggingContext) {
+        this.loggingContext = () -> loggingContext.get() + logPrefix;
+        this.log = new DynamicPrefixLogger(
+            this.loggingContext,
+            LoggerFactory.getLogger(StreamsPartitionAssignor.class)
+        );
+    }
 
     private static class AssignedPartition implements Comparable<AssignedPartition> {
 
@@ -203,7 +217,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
         final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(configs);
 
         logPrefix = assignorConfiguration.logPrefix();
-        log = new LogContext(logPrefix).logger(getClass());
         usedSubscriptionMetadataVersion = assignorConfiguration.configuredMetadataVersion(usedSubscriptionMetadataVersion);
 
         final ReferenceContainer referenceContainer = assignorConfiguration.referenceContainer();