You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by di...@apache.org on 2023/11/02 09:23:48 UTC

(kafka) branch trunk updated: MINOR: Consumer-Internals clean up (#14614)

This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 244abc11afc MINOR: Consumer-Internals clean up (#14614)
244abc11afc is described below

commit 244abc11afce37e8b11c29b5427ab5042920cd55
Author: Laglangyue <35...@users.noreply.github.com>
AuthorDate: Thu Nov 2 17:23:41 2023 +0800

    MINOR: Consumer-Internals clean up (#14614)
    
    Reviewers: Divij Vaidya <di...@amazon.com>, hudeqi <12...@qq.com>
    
    ---------
    
    Co-authored-by: laglangyue <ji...@qq.com>
---
 .../consumer/internals/FetchMetricsRegistry.java   |  4 +--
 .../consumer/internals/PrototypeAsyncConsumer.java |  4 +--
 .../clients/consumer/internals/SensorBuilder.java  | 18 +++++++-------
 .../kafka/clients/consumer/internals/Utils.java    |  2 +-
 .../clients/consumer/internals/WakeupTrigger.java  | 29 ++++++++++++----------
 5 files changed, 30 insertions(+), 27 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java
index b6b269a87e8..289cbf01ee5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java
@@ -57,11 +57,11 @@ public class FetchMetricsRegistry {
     public MetricNameTemplate partitionPreferredReadReplica;
 
     public FetchMetricsRegistry() {
-        this(new HashSet<String>(), "");
+        this(new HashSet<>(), "");
     }
 
     public FetchMetricsRegistry(String metricGrpPrefix) {
-        this(new HashSet<String>(), metricGrpPrefix);
+        this(new HashSet<>(), metricGrpPrefix);
     }
 
     public FetchMetricsRegistry(Set<String> tags, String metricGrpPrefix) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
index 6389eb416e8..a90d37597a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
@@ -111,7 +111,7 @@ import static org.apache.kafka.common.utils.Utils.propsToMap;
  * This prototype consumer uses an {@link ApplicationEventHandler event handler} to process
  * {@link ApplicationEvent application events} so that the network IO can be processed in a dedicated
  * {@link ConsumerNetworkThread network thread}. Visit
- * <a href="https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor">this document</a>
+ * <a href="https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design">this document</a>
  * for detail implementation.
  */
 public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
@@ -1096,4 +1096,4 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
         }
     }
 
-}
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java
index 2272ee5c0a3..901eb7b3de4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java
@@ -41,7 +41,7 @@ public class SensorBuilder {
 
     private final Sensor sensor;
 
-    private final boolean prexisting;
+    private final boolean preexisting;
 
     private final Map<String, String> tags;
 
@@ -56,44 +56,44 @@ public class SensorBuilder {
         if (s != null) {
             sensor = s;
             tags = Collections.emptyMap();
-            prexisting = true;
+            preexisting = true;
         } else {
             sensor = metrics.sensor(name);
             tags = tagsSupplier.get();
-            prexisting = false;
+            preexisting = false;
         }
     }
 
     SensorBuilder withAvg(MetricNameTemplate name) {
-        if (!prexisting)
+        if (!preexisting)
             sensor.add(metrics.metricInstance(name, tags), new Avg());
 
         return this;
     }
 
     SensorBuilder withMin(MetricNameTemplate name) {
-        if (!prexisting)
+        if (!preexisting)
             sensor.add(metrics.metricInstance(name, tags), new Min());
 
         return this;
     }
 
     SensorBuilder withMax(MetricNameTemplate name) {
-        if (!prexisting)
+        if (!preexisting)
             sensor.add(metrics.metricInstance(name, tags), new Max());
 
         return this;
     }
 
     SensorBuilder withValue(MetricNameTemplate name) {
-        if (!prexisting)
+        if (!preexisting)
             sensor.add(metrics.metricInstance(name, tags), new Value());
 
         return this;
     }
 
     SensorBuilder withMeter(MetricNameTemplate rateName, MetricNameTemplate totalName) {
-        if (!prexisting) {
+        if (!preexisting) {
             sensor.add(new Meter(metrics.metricInstance(rateName, tags), metrics.metricInstance(totalName, tags)));
         }
 
@@ -101,7 +101,7 @@ public class SensorBuilder {
     }
 
     SensorBuilder withMeter(SampledStat sampledStat, MetricNameTemplate rateName, MetricNameTemplate totalName) {
-        if (!prexisting) {
+        if (!preexisting) {
             sensor.add(new Meter(sampledStat, metrics.metricInstance(rateName, tags), metrics.metricInstance(totalName, tags)));
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Utils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Utils.java
index acad4730393..2954a5df902 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Utils.java
@@ -26,7 +26,7 @@ public final class Utils {
 
     final static class PartitionComparator implements Comparator<TopicPartition>, Serializable {
         private static final long serialVersionUID = 1L;
-        private Map<String, List<String>> map;
+        private final Map<String, List<String>> map;
 
         PartitionComparator(Map<String, List<String>> map) {
             this.map = map;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
index 9763c3e09f3..aaaf0b92ac1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
@@ -27,14 +27,14 @@ import java.util.concurrent.atomic.AtomicReference;
  * Ensures blocking APIs can be woken up by the consumer.wakeup().
  */
 public class WakeupTrigger {
-    private AtomicReference<Wakeupable> pendingTask = new AtomicReference<>(null);
+    private final AtomicReference<Wakeupable> pendingTask = new AtomicReference<>(null);
 
-    /*
-      Wakeup a pending task.  If there isn't any pending task, return a WakeupFuture, so that the subsequent call
-      would know wakeup was previously called.
-
-      If there are active tasks, complete it with WakeupException, then unset pending task (return null here.
-      If the current task has already been woken-up, do nothing.
+    /**
+     * Wakeup a pending task.  If there isn't any pending task, return a WakeupFuture, so that the subsequent call
+     * would know wakeup was previously called.
+     * <p>
+     * If there are active tasks, complete it with WakeupException, then unset pending task (return null here.
+     * If the current task has already been woken-up, do nothing.
      */
     public void wakeup() {
         pendingTask.getAndUpdate(task -> {
@@ -50,12 +50,15 @@ public class WakeupTrigger {
         });
     }
 
-    /*
-    If there is no pending task, set the pending task active.
-    If wakeup was called before setting an active task, the current task will complete exceptionally with
-    WakeupException right
-    away.
-    if there is an active task, throw exception.
+    /**
+     *     If there is no pending task, set the pending task active.
+     *     If wakeup was called before setting an active task, the current task will complete exceptionally with
+     *     WakeupException right
+     *     away.
+     *     if there is an active task, throw exception.
+     * @param currentTask
+     * @param <T>
+     * @return
      */
     public <T> CompletableFuture<T> setActiveTask(final CompletableFuture<T> currentTask) {
         Objects.requireNonNull(currentTask, "currentTask cannot be null");