You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jl...@apache.org on 2023/10/20 19:04:11 UTC

[kafka] branch trunk updated: MINOR: Server-Commons cleanup (#14572)

This is an automated email from the ASF dual-hosted git repository.

jlprat pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eed5e68880c MINOR: Server-Commons cleanup (#14572)
eed5e68880c is described below

commit eed5e68880c0d4fe088fcb9493baecc3db667176
Author: Josep Prat <jo...@aiven.io>
AuthorDate: Fri Oct 20 21:04:04 2023 +0200

    MINOR: Server-Commons cleanup (#14572)
    
    MINOR: Server-Commons cleanup
    
    Fixes Javadoc and minor issues in the Java files of Server-Commons modules.
    
    Javadoc is now formatted as intended by the author of the doc itself.
    
    Signed-off-by: Josep Prat <jo...@aiven.io>
    
    Reviewers: Mickael Maison <mi...@gmail.com>
---
 .../java/org/apache/kafka/admin/AdminUtils.java    |  8 +++----
 .../java/org/apache/kafka/queue/EventQueue.java    |  2 +-
 .../org/apache/kafka/queue/KafkaEventQueue.java    |  2 +-
 .../apache/kafka/server/common/CheckpointFile.java |  4 ++--
 .../kafka/server/common/MetadataVersion.java       |  4 ++--
 .../kafka/server/common/ProducerIdsBlock.java      |  2 +-
 .../serialization/AbstractApiMessageSerde.java     |  5 ++--
 .../common/serialization/BytesApiMessageSerde.java |  4 ++--
 .../server/config/ServerTopicConfigSynonyms.java   |  6 ++---
 .../fault/ProcessTerminatingFaultHandler.java      |  6 ++---
 .../kafka/server/metrics/KafkaYammerMetrics.java   |  2 +-
 .../apache/kafka/server/mutable/BoundedList.java   |  4 ++--
 .../kafka/server/network/EndpointReadyFutures.java | 26 ++++++++------------
 .../apache/kafka/server/util/CommandLineUtils.java |  4 ++--
 .../org/apache/kafka/server/util/FutureUtils.java  | 13 ++++------
 .../java/org/apache/kafka/server/util/Json.java    | 16 ++++++-------
 .../apache/kafka/server/util/KafkaScheduler.java   |  2 +-
 .../org/apache/kafka/server/util/Scheduler.java    |  4 ++--
 .../kafka/server/util/ThroughputThrottler.java     |  8 +++----
 .../kafka/server/util/TranslatedValueMapView.java  |  2 +-
 .../apache/kafka/server/util/json/JsonArray.java   |  2 +-
 .../apache/kafka/server/util/json/JsonValue.java   | 20 +++++++---------
 .../org/apache/kafka/server/util/timer/Timer.java  |  2 +-
 .../kafka/server/util/timer/TimingWheel.java       | 28 +++++++++++-----------
 .../org/apache/kafka/timeline/BaseHashTable.java   | 26 ++++++++++----------
 .../java/org/apache/kafka/timeline/Snapshot.java   |  2 +-
 .../apache/kafka/timeline/SnapshotRegistry.java    |  2 +-
 .../kafka/timeline/SnapshottableHashTable.java     | 24 +++++++++----------
 .../org/apache/kafka/timeline/TimelineHashMap.java |  4 ++--
 .../org/apache/kafka/timeline/TimelineHashSet.java |  4 ++--
 .../org/apache/kafka/timeline/TimelineInteger.java |  3 +--
 .../org/apache/kafka/timeline/TimelineLong.java    |  3 +--
 .../org/apache/kafka/timeline/TimelineObject.java  |  2 +-
 33 files changed, 116 insertions(+), 130 deletions(-)

diff --git a/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java b/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java
index 9504954b76e..ad69e161758 100644
--- a/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java
+++ b/server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java
@@ -111,7 +111,7 @@ public class AdminUtils {
      * brokers, it guarantees that the replica distribution is even across brokers and racks.
      * </p>
      * @return a Map from partition id to replica ids
-     * @throws AdminOperationException If rack information is supplied but it is incomplete, or if it is not possible to
+     * @throws AdminOperationException If rack information is supplied, but it is incomplete, or if it is not possible to
      *                                 assign each replica to a unique rack.
      *
      */
@@ -214,13 +214,13 @@ public class AdminUtils {
     /**
      * Given broker and rack information, returns a list of brokers alternated by the rack. Assume
      * this is the rack and its brokers:
-     *
+     * <pre>
      * rack1: 0, 1, 2
      * rack2: 3, 4, 5
      * rack3: 6, 7, 8
-     *
+     * </pre>
      * This API would return the list of 0, 3, 6, 1, 4, 7, 2, 5, 8
-     *
+     * <br>
      * This is essential to make sure that the assignReplicasToBrokers API can use such list and
      * assign replicas to brokers in a simple round-robin fashion, while ensuring an even
      * distribution of leader and replica counts on each broker and that replicas are
diff --git a/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java b/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
index 8c4022cee2b..df1fd0987cb 100644
--- a/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
+++ b/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
@@ -210,7 +210,7 @@ public interface EventQueue extends AutoCloseable {
 
     /**
      * Asynchronously shut down the event queue.
-     *
+     * <br>
      * No new events will be accepted, and the queue thread will exit after running the existing events.
      * Deferred events will receive TimeoutExceptions.
      *
diff --git a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
index 2fde8285dab..36284ed7f3e 100644
--- a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
+++ b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
@@ -201,7 +201,7 @@ public final class KafkaEventQueue implements EventQueue {
             }
         }
 
-        private void handleEvents() throws InterruptedException {
+        private void handleEvents() {
             Throwable toDeliver = null;
             EventContext toRun = null;
             boolean wasInterrupted = false;
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
index 818af8b3c1b..6efbaa136e0 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
@@ -37,13 +37,13 @@ import java.util.Optional;
 
 /**
  * This class represents a utility to capture a checkpoint in a file. It writes down to the file in the below format.
- *
+ * <pre>
  * ========= File beginning =========
  * version: int
  * entries-count: int
  * entry-as-string-on-each-line
  * ========= File end ===============
- *
+ * </pre>
  * Each entry is represented as a string on each line in the checkpoint file. {@link EntryFormatter} is used
  * to convert the entry into a string and vice versa.
  *
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index f7ca2fc242f..8f22ed582a6 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -27,10 +27,10 @@ import org.apache.kafka.common.record.RecordVersion;
  * This class contains the different Kafka versions.
  * Right now, we use them for upgrades - users can configure the version of the API brokers will use to communicate between themselves.
  * This is only for inter-broker communications - when communicating with clients, the client decides on the API version.
- *
+ * <br>
  * Note that the ID we initialize for each version is important.
  * We consider a version newer than another if it is lower in the enum list (to avoid depending on lexicographic order)
- *
+ * <br>
  * Since the api protocol may change more than once within the same release and to facilitate people deploying code from
  * trunk, we have the concept of internal versions (first introduced during the 0.10.0 development cycle). For example,
  * the first time we introduce a version change in a release, say 0.10.0, we will add a config value "0.10.0-IV0" and a
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java b/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java
index c4240018f9b..7c197ce98df 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Holds a range of Producer IDs used for Transactional and EOS producers.
- *
+ * <br>
  * The start and end of the ID block are inclusive.
  */
 public class ProducerIdsBlock {
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java b/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java
index c3914ea47b5..a29b6a00fc0 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java
@@ -26,11 +26,10 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
 /**
  * This is an implementation of {@code RecordSerde} with {@link ApiMessageAndVersion} but implementors need to implement
  * {@link #apiMessageFor(short)} to return a {@code ApiMessage} instance for the given {@code apiKey}.
- *
+ * <br>
  * This can be used as the underlying serialization mechanism for records defined with {@link ApiMessage}s.
- * <p></p>
+ * <br><br>
  * Serialization format for the given {@code ApiMessageAndVersion} is below:
- * <p></p>
  * <pre>
  *     [data_frame_version header message]
  *     header =&gt; [api_key version]
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/serialization/BytesApiMessageSerde.java b/server-common/src/main/java/org/apache/kafka/server/common/serialization/BytesApiMessageSerde.java
index 668bbfb2488..93ad9490755 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/serialization/BytesApiMessageSerde.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/serialization/BytesApiMessageSerde.java
@@ -25,10 +25,10 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
 import java.nio.ByteBuffer;
 
 /**
- * This class provides conversion of {@code ApiMessageAndVersion} to bytes and vice versa.. This can be used as serialization protocol for any
+ * This class provides conversion of {@code ApiMessageAndVersion} to bytes and vice versa. This can be used as serialization protocol for any
  * metadata records derived of {@code ApiMessage}s. It internally uses {@link AbstractApiMessageSerde} for serialization/deserialization
  * mechanism.
- * <p></p>
+ * <br><br>
  * Implementors need to extend this class and implement {@link #apiMessageFor(short)} method to return a respective
  * {@code ApiMessage} for the given {@code apiKey}. This is required to deserialize the bytes to build the respective
  * {@code ApiMessage} instance.
diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
index c53f8da84f9..320de2db6b9 100644
--- a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
+++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
@@ -34,16 +34,16 @@ public final class ServerTopicConfigSynonyms {
 
     /**
      * Maps topic configurations to their equivalent broker configurations.
-     *
+     * <br>
      * Topics can be configured either by setting their dynamic topic configurations, or by
      * setting equivalent broker configurations. For historical reasons, the equivalent broker
      * configurations have different names. This table maps each topic configuration to its
      * equivalent broker configurations.
-     *
+     * <br>
      * In some cases, the equivalent broker configurations must be transformed before they
      * can be used. For example, log.roll.hours must be converted to milliseconds before it
      * can be used as the value of segment.ms.
-     *
+     * <br>
      * The broker configurations will be used in the order specified here. In other words, if
      * both the first and the second synonyms are configured, we will use only the value of
      * the first synonym and ignore the second.
diff --git a/server-common/src/main/java/org/apache/kafka/server/fault/ProcessTerminatingFaultHandler.java b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessTerminatingFaultHandler.java
index 29ba7b84706..a15c924c98b 100644
--- a/server-common/src/main/java/org/apache/kafka/server/fault/ProcessTerminatingFaultHandler.java
+++ b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessTerminatingFaultHandler.java
@@ -66,13 +66,13 @@ final public class ProcessTerminatingFaultHandler implements FaultHandler {
 
         /**
          * Set if halt or exit should be used.
-         *
+         * <br>
          * When {@code value} is {@code false} {@code Exit.exit} is called, otherwise {@code Exit.halt} is
          * called. The default value is {@code true}.
-         *
+         * <br>
          * The default implementation of {@code Exit.exit} calls {@code Runtime.exit} which
          * blocks on all of the shutdown hooks executing.
-         *
+         * <br>
          * The default implementation of {@code Exit.halt} calls {@code Runtime.halt} which
          * forcibly terminates the JVM.
          */
diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java
index 329083350cd..6b2f0a1e38d 100644
--- a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java
+++ b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaYammerMetrics.java
@@ -38,7 +38,7 @@ import java.util.function.Supplier;
 /**
  * This class encapsulates the default yammer metrics registry for Kafka server,
  * and configures the set of exported JMX metrics for Yammer metrics.
- *
+ * <br>
  * KafkaYammerMetrics.defaultRegistry() should always be used instead of Metrics.defaultRegistry()
  */
 public class KafkaYammerMetrics implements Reconfigurable {
diff --git a/server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java b/server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java
index 9af28d06595..6c5d1ba0d5a 100644
--- a/server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java
+++ b/server-common/src/main/java/org/apache/kafka/server/mutable/BoundedList.java
@@ -36,11 +36,11 @@ public class BoundedList<E> implements List<E> {
     private final List<E> underlying;
 
     public static <E> BoundedList<E> newArrayBacked(int maxLength) {
-        return new BoundedList<>(maxLength, new ArrayList<E>());
+        return new BoundedList<>(maxLength, new ArrayList<>());
     }
 
     public static <E> BoundedList<E> newArrayBacked(int maxLength, int initialCapacity) {
-        return new BoundedList<>(maxLength, new ArrayList<E>(initialCapacity));
+        return new BoundedList<>(maxLength, new ArrayList<>(initialCapacity));
     }
 
     public BoundedList(int maxLength, List<E> underlying) {
diff --git a/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java b/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java
index 1841cba9f19..578079b4bd3 100644
--- a/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java
+++ b/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java
@@ -71,10 +71,8 @@ public class EndpointReadyFutures {
             String name,
             Map<Endpoint, ? extends CompletionStage<?>> newFutures
         ) {
-            newFutures.forEach((endpoint, future) -> {
-                endpointStages.computeIfAbsent(endpoint, __ -> new ArrayList<>()).
-                    add(new EndpointCompletionStage(name, future));
-            });
+            newFutures.forEach((endpoint, future) -> endpointStages.computeIfAbsent(endpoint, __ -> new ArrayList<>()).
+                add(new EndpointCompletionStage(name, future)));
             return this;
         }
 
@@ -123,9 +121,7 @@ public class EndpointReadyFutures {
             addReadinessFutures("authorizerStart", effectiveStartFutures);
             stages.forEach(stage -> {
                 Map<Endpoint, CompletionStage<?>> newReadinessFutures = new HashMap<>();
-                info.endpoints().forEach(endpoint -> {
-                    newReadinessFutures.put(endpoint, stage.future);
-                });
+                info.endpoints().forEach(endpoint -> newReadinessFutures.put(endpoint, stage.future));
                 addReadinessFutures(stage.name, newReadinessFutures);
             });
             return new EndpointReadyFutures(logContext,
@@ -200,15 +196,13 @@ public class EndpointReadyFutures {
             stages.forEach(stage -> stageNames.add(stage.name));
             EndpointReadyFuture readyFuture = new EndpointReadyFuture(endpoint, stageNames);
             newFutures.put(endpoint, readyFuture.future);
-            stages.forEach(stage -> {
-                stage.future.whenComplete((__, exception) -> {
-                    if (exception != null) {
-                        readyFuture.failStage(stage.name, exception);
-                    } else {
-                        readyFuture.completeStage(stage.name);
-                    }
-                });
-            });
+            stages.forEach(stage -> stage.future.whenComplete((__, exception) -> {
+                if (exception != null) {
+                    readyFuture.failStage(stage.name, exception);
+                } else {
+                    readyFuture.completeStage(stage.name);
+                }
+            }));
         });
         this.futures = Collections.unmodifiableMap(newFutures);
     }
diff --git a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
index 419f0e41c28..e51f6cd321f 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
@@ -238,9 +238,9 @@ public class CommandLineUtils {
         try {
             initializeBootstrapProperties(properties,
                 options.has(bootstrapServer) ?
-                    Optional.of(options.valueOf(bootstrapServer).toString()) : Optional.empty(),
+                    Optional.of(options.valueOf(bootstrapServer)) : Optional.empty(),
                 options.has(bootstrapControllers) ?
-                        Optional.of(options.valueOf(bootstrapControllers).toString()) : Optional.empty());
+                        Optional.of(options.valueOf(bootstrapControllers)) : Optional.empty());
         } catch (InitializeBootstrapException e) {
             printUsageAndExit(parser, e.getMessage());
         }
diff --git a/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
index 642179e81d9..f7a385abf08 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
@@ -80,14 +80,11 @@ public class FutureUtils {
         CompletableFuture<? extends T> sourceFuture,
         CompletableFuture<T> destinationFuture
     ) {
-        sourceFuture.whenComplete(new BiConsumer<T, Throwable>() {
-            @Override
-            public void accept(T val, Throwable throwable) {
-                if (throwable != null) {
-                    destinationFuture.completeExceptionally(throwable);
-                } else {
-                    destinationFuture.complete(val);
-                }
+        sourceFuture.whenComplete((BiConsumer<T, Throwable>) (val, throwable) -> {
+            if (throwable != null) {
+                destinationFuture.completeExceptionally(throwable);
+            } else {
+                destinationFuture.complete(val);
             }
         });
     }
diff --git a/server-common/src/main/java/org/apache/kafka/server/util/Json.java b/server-common/src/main/java/org/apache/kafka/server/util/Json.java
index 3a2922838a3..042e3fff29d 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/Json.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/Json.java
@@ -30,7 +30,7 @@ import java.util.Optional;
  * Provides methods for parsing JSON with Jackson and encoding to JSON with a simple and naive custom implementation.
  */
 public final class Json {
-    private static ObjectMapper mapper = new ObjectMapper();
+    private static final ObjectMapper MAPPER = new ObjectMapper();
 
     /**
      * Parse a JSON string into a JsonValue if possible. `None` is returned if `input` is not valid JSON.
@@ -48,7 +48,7 @@ public final class Json {
      * exception.
      */
     public static <T> T parseStringAs(String input, Class<T> clazz) throws JsonProcessingException {
-        return mapper.readValue(input, clazz);
+        return MAPPER.readValue(input, clazz);
     }
 
     /**
@@ -56,21 +56,21 @@ public final class Json {
      */
     public static Optional<JsonValue> parseBytes(byte[] input) throws IOException {
         try {
-            return Optional.ofNullable(mapper.readTree(input)).map(JsonValue::apply);
+            return Optional.ofNullable(MAPPER.readTree(input)).map(JsonValue::apply);
         } catch (JsonProcessingException e) {
             return Optional.empty();
         }
     }
 
     public static JsonValue tryParseBytes(byte[] input) throws IOException {
-        return JsonValue.apply(mapper.readTree(input));
+        return JsonValue.apply(MAPPER.readTree(input));
     }
 
     /**
      * Parse a JSON byte array into a generic type T, or throws a JsonProcessingException in the case of exception.
      */
     public static <T> T parseBytesAs(byte[] input, Class<T> clazz) throws IOException {
-        return mapper.readValue(input, clazz);
+        return MAPPER.readValue(input, clazz);
     }
 
     /**
@@ -83,7 +83,7 @@ public final class Json {
         if (input == null || input.isEmpty()) {
             throw new JsonParseException(MissingNode.getInstance().traverse(), "The input string shouldn't be empty");
         } else {
-            return JsonValue.apply(mapper.readTree(input));
+            return JsonValue.apply(MAPPER.readTree(input));
         }
     }
 
@@ -93,7 +93,7 @@ public final class Json {
      * a jackson-scala dependency).
      */
     public static String encodeAsString(Object obj) throws JsonProcessingException {
-        return mapper.writeValueAsString(obj);
+        return MAPPER.writeValueAsString(obj);
     }
 
     /**
@@ -102,6 +102,6 @@ public final class Json {
      * a jackson-scala dependency).
      */
     public static byte[] encodeAsBytes(Object obj) throws JsonProcessingException {
-        return mapper.writeValueAsBytes(obj);
+        return MAPPER.writeValueAsBytes(obj);
     }
 }
diff --git a/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java b/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java
index 7115ed2f2ab..f451795f85f 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor
- *
+ * <br>
  * It has a pool of kafka-scheduler- threads that do the actual work.
  */
 public class KafkaScheduler implements Scheduler {
diff --git a/server-common/src/main/java/org/apache/kafka/server/util/Scheduler.java b/server-common/src/main/java/org/apache/kafka/server/util/Scheduler.java
index fde3a979140..5df88960cde 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/Scheduler.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/Scheduler.java
@@ -20,14 +20,14 @@ import java.util.concurrent.ScheduledFuture;
 
 /**
  * A scheduler for running jobs
- *
+ * <br>
  * This interface controls a job scheduler that allows scheduling either repeating background jobs
  * that execute periodically or delayed one-time actions that are scheduled in the future.
  */
 public interface Scheduler {
 
     /**
-     * Initialize this scheduler so it is ready to accept scheduling of tasks
+     * Initialize this scheduler, so it is ready to accept scheduling of tasks
      */
     void startup();
 
diff --git a/server-common/src/main/java/org/apache/kafka/server/util/ThroughputThrottler.java b/server-common/src/main/java/org/apache/kafka/server/util/ThroughputThrottler.java
index d826ddda85e..e6a5e6f1290 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/ThroughputThrottler.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/ThroughputThrottler.java
@@ -19,11 +19,11 @@ package org.apache.kafka.server.util;
 
 /**
  * This class helps producers throttle throughput.
- *
+ * <br>
  * If targetThroughput >= 0, the resulting average throughput will be approximately
  * min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0,
  * no throttling will occur.
- *
+ * <br>
  * To use, do this between successive send attempts:
  * <pre>
  *     {@code
@@ -64,7 +64,7 @@ public class ThroughputThrottler {
      * @param amountSoFar bytes produced so far if you want to throttle data throughput, or
      *                    messages produced so far if you want to throttle message throughput.
      * @param sendStartMs timestamp of the most recently sent message
-     * @return
+     * @return <code>true</code> if throttling should happen
      */
     public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
         if (this.targetThroughput < 0) {
@@ -78,7 +78,7 @@ public class ThroughputThrottler {
 
     /**
      * Occasionally blocks for small amounts of time to achieve targetThroughput.
-     *
+     * <br>
      * Note that if targetThroughput is 0, this will block extremely aggressively.
      */
     public void throttle() {
diff --git a/server-common/src/main/java/org/apache/kafka/server/util/TranslatedValueMapView.java b/server-common/src/main/java/org/apache/kafka/server/util/TranslatedValueMapView.java
index 9c85f6caf46..d269550ab83 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/TranslatedValueMapView.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/TranslatedValueMapView.java
@@ -29,7 +29,7 @@ import java.util.function.Function;
 /**
  * A map which presents a lightweight view of another "underlying" map. Values in the
  * underlying map will be translated by a callback before they are returned.
- *
+ * <br>
  * This class is not internally synchronized. (Typically the underlyingMap is treated as
  * immutable.)
  */
diff --git a/server-common/src/main/java/org/apache/kafka/server/util/json/JsonArray.java b/server-common/src/main/java/org/apache/kafka/server/util/json/JsonArray.java
index 1f1c0287eec..9509a3bfcd9 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/json/JsonArray.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/json/JsonArray.java
@@ -43,7 +43,7 @@ public class JsonArray implements JsonValue {
         Stream<JsonNode> nodeStream = StreamSupport.stream(
                 Spliterators.spliteratorUnknownSize(node.elements(), Spliterator.ORDERED),
                 false);
-        Stream<JsonValue> results = nodeStream.map(node -> JsonValue.apply(node));
+        Stream<JsonValue> results = nodeStream.map(JsonValue::apply);
         return results.collect(Collectors.toList()).iterator();
     }
 
diff --git a/server-common/src/main/java/org/apache/kafka/server/util/json/JsonValue.java b/server-common/src/main/java/org/apache/kafka/server/util/json/JsonValue.java
index daf3b4c7d61..cd89d0d6063 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/json/JsonValue.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/json/JsonValue.java
@@ -26,18 +26,16 @@ import java.util.Optional;
 /**
  * A simple wrapper over Jackson's JsonNode that enables type safe parsing via the `DecodeJson` type
  * class.
- *
+ * <br>
  * Typical usage would be something like:
- *
- * {{{
- * val jsonNode: JsonNode = ???
- * val jsonObject = JsonValue(jsonNode).asJsonObject
- * val intValue = jsonObject("int_field").to[Int]
- * val optionLongValue = jsonObject("option_long_field").to[Option[Long]]
- * val mapStringIntField = jsonObject("map_string_int_field").to[Map[String, Int]]
- * val seqStringField = jsonObject("seq_string_field").to[Seq[String]
- * }}}
- *
+ * <pre><code>
+ * // Given a jsonNode containing a parsed JSON:
+ * JsonObject jsonObject = JsonValue.apply(jsonNode).asJsonObject();
+ * Integer intField = jsonObject.apply("int_field").to(new DecodeJson.DecodeInteger());
+ * Optional<Integer> optionLongField = jsonObject.apply("option_long_field").to(DecodeJson.decodeOptional(new DecodeJson.DecodeInteger()));
+ * Map<String, Integer> mapStringIntField = jsonObject.apply("map_string_int_field").to(DecodeJson.decodeMap(new DecodeJson.DecodeInteger()));
+ * List<String> seqStringField = jsonObject.apply("seq_string_field").to(DecodeJson.decodeList(new DecodeJson.DecodeString()));
+ * </code></pre>
  * The `to` method throws an exception if the value cannot be converted to the requested type.
  */
 
diff --git a/server-common/src/main/java/org/apache/kafka/server/util/timer/Timer.java b/server-common/src/main/java/org/apache/kafka/server/util/timer/Timer.java
index 2771f34a7cd..b3c500f565c 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/timer/Timer.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/timer/Timer.java
@@ -27,7 +27,7 @@ public interface Timer extends AutoCloseable {
     /**
      * Advance the internal clock, executing any tasks whose expiration has been
      * reached within the duration of the passed timeout.
-     * @param timeoutMs
+     * @param timeoutMs the time to advance in milliseconds
      * @return whether or not any tasks were executed
      */
     boolean advanceClock(long timeoutMs) throws InterruptedException;
diff --git a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java
index 59ba67cfa07..43fae38f65e 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java
@@ -21,11 +21,11 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Hierarchical Timing Wheels
- *
+ * <br>
  * A simple timing wheel is a circular list of buckets of timer tasks. Let u be the time unit.
  * A timing wheel with size n has n buckets and can hold timer tasks in n * u time interval.
  * Each bucket holds timer tasks that fall into the corresponding time range. At the beginning,
- * the first bucket holds tasks for [0, u), the second bucket holds tasks for [u, 2u), …,
+ * the first bucket holds tasks for [0, u), the second bucket holds tasks for [u, 2u), &hellip;,
  * the n-th bucket for [u * (n -1), u * n). Every interval of time unit u, the timer ticks and
  * moved to the next bucket then expire all timer tasks in it. So, the timer never insert a task
  * into the bucket for the current time since it is already expired. The timer immediately runs
@@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * A timing wheel has O(1) cost for insert/delete (start-timer/stop-timer) whereas priority queue
  * based timers, such as java.util.concurrent.DelayQueue and java.util.Timer, have O(log n)
  * insert/delete cost.
- *
+ * <br>
  * A major drawback of a simple timing wheel is that it assumes that a timer request is within
  * the time interval of n * u from the current time. If a timer request is out of this interval,
  * it is an overflow. A hierarchical timing wheel deals with such overflows. It is a hierarchically
@@ -47,50 +47,50 @@ import java.util.concurrent.atomic.AtomicInteger;
  * are then moved to the finer grain wheels or be executed. The insert (start-timer) cost is O(m)
  * where m is the number of wheels, which is usually very small compared to the number of requests
  * in the system, and the delete (stop-timer) cost is still O(1).
- *
+ * <br>
  * Example
  * Let's say that u is 1 and n is 3. If the start time is c,
  * then the buckets at different levels are:
- *
+ * <pre>
  * level    buckets
  * 1        [c,c]   [c+1,c+1]  [c+2,c+2]
  * 2        [c,c+2] [c+3,c+5]  [c+6,c+8]
  * 3        [c,c+8] [c+9,c+17] [c+18,c+26]
- *
+ * </pre>
  * The bucket expiration is at the time of bucket beginning.
  * So at time = c+1, buckets [c,c], [c,c+2] and [c,c+8] are expired.
  * Level 1's clock moves to c+1, and [c+3,c+3] is created.
  * Level 2 and level3's clock stay at c since their clocks move in unit of 3 and 9, respectively.
  * So, no new buckets are created in level 2 and 3.
- *
+ * <br>
  * Note that bucket [c,c+2] in level 2 won't receive any task since that range is already covered in level 1.
  * The same is true for the bucket [c,c+8] in level 3 since its range is covered in level 2.
  * This is a bit wasteful, but simplifies the implementation.
- *
+ * <pre>
  * 1        [c+1,c+1]  [c+2,c+2]  [c+3,c+3]
  * 2        [c,c+2]    [c+3,c+5]  [c+6,c+8]
  * 3        [c,c+8]    [c+9,c+17] [c+18,c+26]
- *
+ * </pre>
  * At time = c+2, [c+1,c+1] is newly expired.
  * Level 1 moves to c+2, and [c+4,c+4] is created,
- *
+ * <pre>
  * 1        [c+2,c+2]  [c+3,c+3]  [c+4,c+4]
  * 2        [c,c+2]    [c+3,c+5]  [c+6,c+8]
  * 3        [c,c+8]    [c+9,c+17] [c+18,c+26]
- *
+ * </pre>
  * At time = c+3, [c+2,c+2] is newly expired.
  * Level 2 moves to c+3, and [c+5,c+5] and [c+9,c+11] are created.
  * Level 3 stay at c.
- *
+ * <pre>
  * 1        [c+3,c+3]  [c+4,c+4]  [c+5,c+5]
  * 2        [c+3,c+5]  [c+6,c+8]  [c+9,c+11]
  * 3        [c,c+8]    [c+9,c+17] [c+18,c+26]
- *
+ * </pre>
  * The hierarchical timing wheels works especially well when operations are completed before they time out.
  * Even when everything times out, it still has advantageous when there are many items in the timer.
  * Its insert cost (including reinsert) and delete cost are O(m) and O(1), respectively while priority
  * queue based timers takes O(log N) for both insert and delete where N is the number of items in the queue.
- *
+ * <br>
  * This class is not thread-safe. There should not be any add calls while advanceClock is executing.
  * It is caller's responsibility to enforce it. Simultaneous add calls are thread-safe.
  */
diff --git a/server-common/src/main/java/org/apache/kafka/timeline/BaseHashTable.java b/server-common/src/main/java/org/apache/kafka/timeline/BaseHashTable.java
index 05315466519..783d7e10643 100644
--- a/server-common/src/main/java/org/apache/kafka/timeline/BaseHashTable.java
+++ b/server-common/src/main/java/org/apache/kafka/timeline/BaseHashTable.java
@@ -22,14 +22,14 @@ import java.util.List;
 
 /**
  * A hash table which uses separate chaining.
- *
+ * <br>
  * In order to optimize memory consumption a bit, the common case where there is
  * one element per slot is handled by simply placing the element in the slot,
  * and the case where there are multiple elements is handled by creating an
- * array and putting that in the slot.  Java is storing type info in memory
+ * array and putting that in the slot. Java is storing type info in memory
  * about every object whether we want it or not, so let's get some benefit
  * out of it.
- *
+ * <br>
  * Arrays and null values cannot be inserted.
  */
 @SuppressWarnings("unchecked")
@@ -58,7 +58,7 @@ class BaseHashTable<T> {
 
     /**
      * Calculate the capacity we should provision, given the expected size.
-     *
+     * <br>
      * Our capacity must always be a power of 2, and never less than 2 or more
      * than MAX_CAPACITY.  We use 64-bit numbers here to avoid overflow
      * concerns.
@@ -180,7 +180,7 @@ class BaseHashTable<T> {
     /**
      * Expand the hash table to a new size.  Existing elements will be copied to new slots.
      */
-    final private void rehash(int newSize) {
+    private void rehash(int newSize) {
         Object[] prevElements = elements;
         elements = new Object[newSize];
         List<Object> ready = new ArrayList<>();
@@ -224,15 +224,15 @@ class BaseHashTable<T> {
      */
     static <T> void unpackSlot(List<T> out, Object[] elements, int slot) {
         Object value = elements[slot];
-        if (value == null) {
-            return;
-        } else if (value instanceof Object[]) {
-            Object[] array = (Object[]) value;
-            for (Object object : array) {
-                out.add((T) object);
+        if (value != null) {
+            if (value instanceof Object[]) {
+                Object[] array = (Object[]) value;
+                for (Object object : array) {
+                    out.add((T) object);
+                }
+            } else {
+                out.add((T) value);
             }
-        } else {
-            out.add((T) value);
         }
     }
 
diff --git a/server-common/src/main/java/org/apache/kafka/timeline/Snapshot.java b/server-common/src/main/java/org/apache/kafka/timeline/Snapshot.java
index 8efa61fff03..7e16a42ce93 100644
--- a/server-common/src/main/java/org/apache/kafka/timeline/Snapshot.java
+++ b/server-common/src/main/java/org/apache/kafka/timeline/Snapshot.java
@@ -22,7 +22,7 @@ import java.util.Map;
 
 /**
  * A snapshot of some timeline data structures.
- *
+ * <br>
  * The snapshot contains historical data for several timeline data structures.
  * We use an IdentityHashMap to store this data.  This way, we can easily drop all of
  * the snapshot data.
diff --git a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
index 970c53bb727..56462c3aff5 100644
--- a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
+++ b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
@@ -182,7 +182,7 @@ public class SnapshotRegistry {
 
     /**
      * Creates a new snapshot at the given epoch.
-     *
+     * <br>
      * If {@code epoch} already exists and it is the last snapshot then just return that snapshot.
      *
      * @param epoch             The epoch to create the snapshot at.  The current epoch
diff --git a/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java b/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
index 2977e061643..26f8f245964 100644
--- a/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
+++ b/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
@@ -29,48 +29,48 @@ import java.util.NoSuchElementException;
  * We handle divergences between the current state and historical state by copying a
  * reference to elements that have been deleted or overwritten into the most recent
  * snapshot tier.
- *
+ * <br>
  * Note that there are no keys in SnapshottableHashTable, only values.  So it more similar
  * to a hash set than a hash map.  The subclasses implement full-featured maps and sets
  * using this class as a building block.
- *
+ * <br>
  * Each snapshot tier contains a size and a hash table.  The size reflects the size at
  * the time the snapshot was taken.  Note that, as an optimization, snapshot tiers will
  * be null if they don't contain anything.  So for example, if snapshot 20 of Object O
  * contains the same entries as snapshot 10 of that object, the snapshot 20 tier for
  * object O will be null.
- *
+ * <br>
  * The current tier's data is stored in the fields inherited from BaseHashTable.  It
  * would be conceptually simpler to have a separate BaseHashTable object, but since Java
  * doesn't have value types, subclassing is the only way to avoid another pointer
  * indirection and the associated extra memory cost.
- *
+ * <br>
  * Note that each element in the hash table contains a start epoch, and a value.  The
  * start epoch is there to identify when the object was first inserted.  This in turn
  * determines which snapshots it is a member of.
- *
+ * <br>
  * In order to retrieve an object from snapshot E, we start by checking to see if the
  * object exists in the "current" hash tier.  If it does, and its startEpoch extends back
  * to E, we return that object.  Otherwise, we check all the snapshot tiers, starting
  * with E, and ending with the most recent snapshot, to see if the object is there.
  * As an optimization, if we encounter the object in a snapshot tier but its epoch is too
  * new, we know that its value at epoch E must be null, so we can return that immediately.
- *
+ * <br>
  * The class hierarchy looks like this:
- *
+ * <pre>
  *        Revertable       BaseHashTable
  *              ↑              ↑
  *           SnapshottableHashTable → SnapshotRegistry → Snapshot
  *               ↑             ↑
  *   TimelineHashSet       TimelineHashMap
- *
+ * </pre>
  * BaseHashTable is a simple hash table that uses separate chaining.  The interface is
  * pretty bare-bones since this class is not intended to be used directly by end-users.
- *
+ * <br>
  * This class, SnapshottableHashTable, has the logic for snapshotting and iterating over
  * snapshots.  This is the core of the snapshotted hash table code and handles the
  * tiering.
- *
+ * <br>
  * TimelineHashSet and TimelineHashMap are mostly wrappers around this
  * SnapshottableHashTable class.  They implement standard Java APIs for Set and Map,
  * respectively.  There's a fair amount of boilerplate for this, but it's necessary so
@@ -78,11 +78,11 @@ import java.util.NoSuchElementException;
  * The accessor APIs have two versions -- one that looks at the current state, and one
  * that looks at a historical snapshotted state.  Mutation APIs only ever mutate the
  * current state.
- *
+ * <br>
  * One very important feature of SnapshottableHashTable is that we support iterating
  * over a snapshot even while changes are being made to the current state.  See the
  * Javadoc for the iterator for more information about how this is accomplished.
- *
+ * <br>
  * All of these classes require external synchronization, and don't support null keys or
  * values.
  */
diff --git a/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java b/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java
index 857ff87cdce..8239b1410af 100644
--- a/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java
+++ b/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java
@@ -27,9 +27,9 @@ import java.util.Set;
 
 /**
  * This is a hash map which can be snapshotted.
- *
+ * <br>
  * See {@SnapshottableHashTable} for more details about the implementation.
- *
+ * <br>
  * This class requires external synchronization.  Null keys and values are not supported.
  *
  * @param <K>   The key type of the set.
diff --git a/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java b/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java
index 34efb10fdf9..24705a4dffc 100644
--- a/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java
+++ b/server-common/src/main/java/org/apache/kafka/timeline/TimelineHashSet.java
@@ -24,9 +24,9 @@ import java.util.Set;
 
 /**
  * This is a hash set which can be snapshotted.
- *
+ * <br>
  * See {@SnapshottableHashTable} for more details about the implementation.
- *
+ * <br>
  * This class requires external synchronization.  Null values are not supported.
  *
  * @param <T>   The value type of the set.
diff --git a/server-common/src/main/java/org/apache/kafka/timeline/TimelineInteger.java b/server-common/src/main/java/org/apache/kafka/timeline/TimelineInteger.java
index 7e98ca55e78..c4d7298a22d 100644
--- a/server-common/src/main/java/org/apache/kafka/timeline/TimelineInteger.java
+++ b/server-common/src/main/java/org/apache/kafka/timeline/TimelineInteger.java
@@ -22,7 +22,7 @@ import java.util.Iterator;
 
 /**
  * This is a mutable integer which can be snapshotted. 
- *
+ * <br>
  * This class requires external synchronization.
  */
 public class TimelineInteger implements Revertable {
@@ -93,7 +93,6 @@ public class TimelineInteger implements Revertable {
         set(get() - 1);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void executeRevert(long targetEpoch, Delta delta) {
         IntegerContainer container = (IntegerContainer) delta;
diff --git a/server-common/src/main/java/org/apache/kafka/timeline/TimelineLong.java b/server-common/src/main/java/org/apache/kafka/timeline/TimelineLong.java
index 1379b084004..b882d6455f1 100644
--- a/server-common/src/main/java/org/apache/kafka/timeline/TimelineLong.java
+++ b/server-common/src/main/java/org/apache/kafka/timeline/TimelineLong.java
@@ -22,7 +22,7 @@ import java.util.Iterator;
 
 /**
  * This is a mutable long which can be snapshotted.
- *
+ * <br>
  * This class requires external synchronization.
  */
 public class TimelineLong implements Revertable {
@@ -93,7 +93,6 @@ public class TimelineLong implements Revertable {
         set(get() - 1L);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void executeRevert(long targetEpoch, Delta delta) {
         LongContainer container = (LongContainer) delta;
diff --git a/server-common/src/main/java/org/apache/kafka/timeline/TimelineObject.java b/server-common/src/main/java/org/apache/kafka/timeline/TimelineObject.java
index 0b4a43a249a..f41343c9b3b 100644
--- a/server-common/src/main/java/org/apache/kafka/timeline/TimelineObject.java
+++ b/server-common/src/main/java/org/apache/kafka/timeline/TimelineObject.java
@@ -23,7 +23,7 @@ import java.util.Objects;
 
 /**
  * This is a mutable reference to an immutable object. It can be snapshotted.
- *
+ * <br>
  * This class requires external synchronization.
  */
 public class TimelineObject<T> implements Revertable {