You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by di...@apache.org on 2023/11/02 09:23:48 UTC
(kafka) branch trunk updated: MINOR: Consumer-Internals clean up (#14614)
This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 244abc11afc MINOR: Consumer-Internals clean up (#14614)
244abc11afc is described below
commit 244abc11afce37e8b11c29b5427ab5042920cd55
Author: Laglangyue <35...@users.noreply.github.com>
AuthorDate: Thu Nov 2 17:23:41 2023 +0800
MINOR: Consumer-Internals clean up (#14614)
Reviewers: Divij Vaidya <di...@amazon.com>, hudeqi <12...@qq.com>
---------
Co-authored-by: laglangyue <ji...@qq.com>
---
.../consumer/internals/FetchMetricsRegistry.java | 4 +--
.../consumer/internals/PrototypeAsyncConsumer.java | 4 +--
.../clients/consumer/internals/SensorBuilder.java | 18 +++++++-------
.../kafka/clients/consumer/internals/Utils.java | 2 +-
.../clients/consumer/internals/WakeupTrigger.java | 29 ++++++++++++----------
5 files changed, 30 insertions(+), 27 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java
index b6b269a87e8..289cbf01ee5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java
@@ -57,11 +57,11 @@ public class FetchMetricsRegistry {
public MetricNameTemplate partitionPreferredReadReplica;
public FetchMetricsRegistry() {
- this(new HashSet<String>(), "");
+ this(new HashSet<>(), "");
}
public FetchMetricsRegistry(String metricGrpPrefix) {
- this(new HashSet<String>(), metricGrpPrefix);
+ this(new HashSet<>(), metricGrpPrefix);
}
public FetchMetricsRegistry(Set<String> tags, String metricGrpPrefix) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
index 6389eb416e8..a90d37597a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
@@ -111,7 +111,7 @@ import static org.apache.kafka.common.utils.Utils.propsToMap;
* This prototype consumer uses an {@link ApplicationEventHandler event handler} to process
* {@link ApplicationEvent application events} so that the network IO can be processed in a dedicated
* {@link ConsumerNetworkThread network thread}. Visit
- * <a href="https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor">this document</a>
+ * <a href="https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design">this document</a>
* for detail implementation.
*/
public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
@@ -1096,4 +1096,4 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
}
}
-}
+}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java
index 2272ee5c0a3..901eb7b3de4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java
@@ -41,7 +41,7 @@ public class SensorBuilder {
private final Sensor sensor;
- private final boolean prexisting;
+ private final boolean preexisting;
private final Map<String, String> tags;
@@ -56,44 +56,44 @@ public class SensorBuilder {
if (s != null) {
sensor = s;
tags = Collections.emptyMap();
- prexisting = true;
+ preexisting = true;
} else {
sensor = metrics.sensor(name);
tags = tagsSupplier.get();
- prexisting = false;
+ preexisting = false;
}
}
SensorBuilder withAvg(MetricNameTemplate name) {
- if (!prexisting)
+ if (!preexisting)
sensor.add(metrics.metricInstance(name, tags), new Avg());
return this;
}
SensorBuilder withMin(MetricNameTemplate name) {
- if (!prexisting)
+ if (!preexisting)
sensor.add(metrics.metricInstance(name, tags), new Min());
return this;
}
SensorBuilder withMax(MetricNameTemplate name) {
- if (!prexisting)
+ if (!preexisting)
sensor.add(metrics.metricInstance(name, tags), new Max());
return this;
}
SensorBuilder withValue(MetricNameTemplate name) {
- if (!prexisting)
+ if (!preexisting)
sensor.add(metrics.metricInstance(name, tags), new Value());
return this;
}
SensorBuilder withMeter(MetricNameTemplate rateName, MetricNameTemplate totalName) {
- if (!prexisting) {
+ if (!preexisting) {
sensor.add(new Meter(metrics.metricInstance(rateName, tags), metrics.metricInstance(totalName, tags)));
}
@@ -101,7 +101,7 @@ public class SensorBuilder {
}
SensorBuilder withMeter(SampledStat sampledStat, MetricNameTemplate rateName, MetricNameTemplate totalName) {
- if (!prexisting) {
+ if (!preexisting) {
sensor.add(new Meter(sampledStat, metrics.metricInstance(rateName, tags), metrics.metricInstance(totalName, tags)));
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Utils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Utils.java
index acad4730393..2954a5df902 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Utils.java
@@ -26,7 +26,7 @@ public final class Utils {
final static class PartitionComparator implements Comparator<TopicPartition>, Serializable {
private static final long serialVersionUID = 1L;
- private Map<String, List<String>> map;
+ private final Map<String, List<String>> map;
PartitionComparator(Map<String, List<String>> map) {
this.map = map;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
index 9763c3e09f3..aaaf0b92ac1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
@@ -27,14 +27,14 @@ import java.util.concurrent.atomic.AtomicReference;
* Ensures blocking APIs can be woken up by the consumer.wakeup().
*/
public class WakeupTrigger {
- private AtomicReference<Wakeupable> pendingTask = new AtomicReference<>(null);
+ private final AtomicReference<Wakeupable> pendingTask = new AtomicReference<>(null);
- /*
- Wakeup a pending task. If there isn't any pending task, return a WakeupFuture, so that the subsequent call
- would know wakeup was previously called.
-
- If there are active tasks, complete it with WakeupException, then unset pending task (return null here.
- If the current task has already been woken-up, do nothing.
+ /**
+ * Wakeup a pending task. If there isn't any pending task, return a WakeupFuture, so that the subsequent call
+ * would know wakeup was previously called.
+ * <p>
+ * If there are active tasks, complete it with WakeupException, then unset pending task (return null here.
+ * If the current task has already been woken-up, do nothing.
*/
public void wakeup() {
pendingTask.getAndUpdate(task -> {
@@ -50,12 +50,15 @@ public class WakeupTrigger {
});
}
- /*
- If there is no pending task, set the pending task active.
- If wakeup was called before setting an active task, the current task will complete exceptionally with
- WakeupException right
- away.
- if there is an active task, throw exception.
+ /**
+ * If there is no pending task, set the pending task active.
+ * If wakeup was called before setting an active task, the current task will complete exceptionally with
+ * WakeupException right
+ * away.
+ * if there is an active task, throw exception.
+ * @param currentTask
+ * @param <T>
+ * @return
*/
public <T> CompletableFuture<T> setActiveTask(final CompletableFuture<T> currentTask) {
Objects.requireNonNull(currentTask, "currentTask cannot be null");