You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/09/09 21:25:40 UTC

[kafka] branch trunk updated: MINOR: Code cleanup of 'clients' module (#5427)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b8651d4  MINOR: Code cleanup of 'clients' module (#5427)
b8651d4 is described below

commit b8651d4e82d45463d2c71798bd5852f8a605b440
Author: Vahid Hashemian <va...@gmail.com>
AuthorDate: Sun Sep 9 14:25:30 2018 -0700

    MINOR: Code cleanup of 'clients' module (#5427)
    
    Cleanup involves:
    * Refactoring to use Java 8 constructs (lambdas,
    diamond for `empty` collection methods) and library
    methods (`computeIfAbsent`)
    * Simplifying code (including unnecessarily complex
    `equals` and `hashCode` implementations)
    * Removing redundant code
    * Fixing typos
    
    Reviewers: Ryanne Dolan, Ismael Juma <is...@juma.me.uk>
---
 .../apache/kafka/clients/FetchSessionHandler.java  |   6 +-
 .../org/apache/kafka/clients/InFlightRequests.java |   8 +-
 .../clients/admin/ConsumerGroupDescription.java    |   2 +-
 .../kafka/clients/admin/DeleteAclsResult.java      |  45 ++---
 .../admin/internals/AdminMetadataManager.java      |   7 +-
 .../kafka/clients/consumer/KafkaConsumer.java      |   2 +-
 .../kafka/clients/consumer/MockConsumer.java       |  15 +-
 .../kafka/clients/consumer/RangeAssignor.java      |   2 +-
 .../kafka/clients/consumer/RoundRobinAssignor.java |   3 +-
 .../internals/AbstractPartitionAssignor.java       |   6 +-
 .../main/java/org/apache/kafka/common/Cluster.java |  22 +--
 .../java/org/apache/kafka/common/MetricName.java   |  23 +--
 .../org/apache/kafka/common/TopicPartition.java    |  12 +-
 .../apache/kafka/common/TopicPartitionReplica.java |  19 +-
 .../kafka/common/acl/AccessControlEntryFilter.java |   4 +-
 .../apache/kafka/common/config/AbstractConfig.java |   4 +-
 .../org/apache/kafka/common/config/ConfigDef.java  |  69 +++----
 .../kafka/common/config/ConfigTransformer.java     |   4 +-
 .../common/header/internals/RecordHeaders.java     |  28 +--
 .../kafka/common/internals/KafkaFutureImpl.java    |  10 +-
 .../kafka/common/internals/PartitionStates.java    |   6 +-
 .../common/memory/GarbageCollectedMemoryPool.java  |   2 +-
 .../apache/kafka/common/network/ChannelState.java  |   3 +-
 .../apache/kafka/common/network/KafkaChannel.java  |   6 +-
 .../common/network/PlaintextChannelBuilder.java    |   3 +-
 .../common/network/PlaintextTransportLayer.java    |  13 +-
 .../kafka/common/network/SaslChannelBuilder.java   |   2 +-
 .../apache/kafka/common/network/Selectable.java    |  32 ++--
 .../org/apache/kafka/common/network/Selector.java  |  14 +-
 .../kafka/common/network/SslChannelBuilder.java    |   2 +-
 .../kafka/common/network/SslTransportLayer.java    |   6 +-
 .../org/apache/kafka/common/protocol/Errors.java   |  14 +-
 .../kafka/common/record/AbstractRecords.java       |   7 +-
 .../common/record/ByteBufferLogInputStream.java    |   3 +-
 .../kafka/common/record/CompressionType.java       |   2 +-
 .../apache/kafka/common/record/FileRecords.java    |   8 +-
 .../common/record/KafkaLZ4BlockInputStream.java    |   6 +-
 .../apache/kafka/common/record/MemoryRecords.java  |   7 +-
 .../kafka/common/record/MemoryRecordsBuilder.java  |   2 +-
 .../requests/AlterReplicaLogDirsRequest.java       |   2 +-
 .../kafka/common/requests/ApiVersionsRequest.java  |   4 +-
 .../common/requests/ControlledShutdownRequest.java |   5 +-
 .../kafka/common/requests/CreateTopicsRequest.java |   6 +-
 .../kafka/common/requests/DeleteAclsRequest.java   |   2 +-
 .../requests/DescribeDelegationTokenResponse.java  |   2 +-
 .../common/requests/DescribeGroupsResponse.java    |  10 +-
 .../kafka/common/requests/JoinGroupRequest.java    |  26 +--
 .../kafka/common/requests/ListGroupsRequest.java   |   4 +-
 .../kafka/common/resource/ResourceFilter.java      |   4 +-
 .../apache/kafka/common/security/JaasContext.java  |   2 +-
 .../security/authenticator/CredentialCache.java    |   2 +-
 .../security/authenticator/LoginManager.java       |  21 +-
 .../authenticator/SaslClientAuthenticator.java     |  20 +-
 .../authenticator/SaslServerAuthenticator.java     |  21 +-
 .../common/security/kerberos/KerberosLogin.java    | 212 ++++++++++-----------
 .../oauthbearer/OAuthBearerLoginModule.java        |  14 +-
 .../internals/OAuthBearerSaslClient.java           |  10 +-
 .../OAuthBearerSaslClientCallbackHandler.java      |   6 +-
 .../internals/OAuthBearerSaslServer.java           |   8 +-
 .../unsecured/OAuthBearerUnsecuredJws.java         |  34 ++--
 .../OAuthBearerUnsecuredLoginCallbackHandler.java  |  10 +-
 ...uthBearerUnsecuredValidatorCallbackHandler.java |  13 +-
 .../common/security/plain/PlainLoginModule.java    |   9 +-
 .../security/plain/internals/PlainSaslServer.java  |   6 +-
 .../common/security/scram/ScramLoginModule.java    |   9 +-
 .../security/scram/internals/ScramExtensions.java  |   2 +-
 .../security/scram/internals/ScramSaslClient.java  |   8 +-
 .../security/scram/internals/ScramSaslServer.java  |   8 +-
 .../internals/ScramServerCallbackHandler.java      |   3 +-
 .../kafka/common/security/ssl/SslFactory.java      |   4 +-
 .../token/delegation/TokenInformation.java         |   4 +-
 .../apache/kafka/common/utils/AppInfoParser.java   |   4 +-
 .../java/org/apache/kafka/common/utils/Utils.java  |   2 +-
 .../kafka/server/quota/ClientQuotaEntity.java      |   4 +-
 .../kafka/common/resource/ResourceTypeTest.java    |   8 +-
 .../kafka/common/security/JaasContextTest.java     |  18 +-
 .../ClientAuthenticationFailureTest.java           |   2 +-
 .../security/authenticator/LoginManagerTest.java   |   2 +-
 .../authenticator/SaslAuthenticatorTest.java       |  12 +-
 .../authenticator/SaslServerAuthenticatorTest.java |   8 +-
 .../authenticator/TestDigestLoginModule.java       |   4 +-
 .../common/security/kerberos/KerberosNameTest.java |   2 +-
 .../oauthbearer/OAuthBearerLoginModuleTest.java    |  36 ++--
 .../internals/OAuthBearerSaslServerTest.java       |   2 +-
 ...uthBearerUnsecuredLoginCallbackHandlerTest.java |   5 +-
 ...earerUnsecuredValidatorCallbackHandlerTest.java |   8 +-
 .../unsecured/OAuthBearerValidationUtilsTest.java  |   7 +-
 .../plain/internals/PlainSaslServerTest.java       |   2 +-
 .../scram/internals/ScramMessagesTest.java         |  12 +-
 .../common/serialization/SerializationTest.java    |   2 +-
 .../common/utils/ImplicitLinkedHashSetTest.java    |  10 +-
 .../apache/kafka/common/utils/MockScheduler.java   |   2 +-
 .../apache/kafka/common/utils/MockTimeTest.java    |   4 +-
 .../apache/kafka/common/utils/SanitizerTest.java   |   3 +-
 .../org/apache/kafka/common/utils/ShellTest.java   |   2 +-
 .../org/apache/kafka/common/utils/UtilsTest.java   |   6 +-
 96 files changed, 472 insertions(+), 615 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
index 195324e..16990ac 100644
--- a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
@@ -202,7 +202,7 @@ public class FetchSessionHandler {
                 next = null;
                 Map<TopicPartition, PartitionData> toSend =
                     Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions));
-                return new FetchRequestData(toSend, Collections.<TopicPartition>emptyList(), toSend, nextMetadata);
+                return new FetchRequestData(toSend, Collections.emptyList(), toSend, nextMetadata);
             }
 
             List<TopicPartition> added = new ArrayList<>();
@@ -234,9 +234,7 @@ public class FetchSessionHandler {
                 }
             }
             // Add any new partitions to the session.
-            for (Iterator<Entry<TopicPartition, PartitionData>> iter =
-                     next.entrySet().iterator(); iter.hasNext(); ) {
-                Entry<TopicPartition, PartitionData> entry = iter.next();
+            for (Entry<TopicPartition, PartitionData> entry : next.entrySet()) {
                 TopicPartition topicPartition = entry.getKey();
                 PartitionData nextData = entry.getValue();
                 if (sessionPartitions.containsKey(topicPartition)) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
index 5b7ba61..efca453 100644
--- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
+++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -153,12 +152,7 @@ final class InFlightRequests {
         } else {
             final Deque<NetworkClient.InFlightRequest> clearedRequests = requests.remove(node);
             inFlightRequestCount.getAndAdd(-clearedRequests.size());
-            return new Iterable<NetworkClient.InFlightRequest>() {
-                @Override
-                public Iterator<NetworkClient.InFlightRequest> iterator() {
-                    return clearedRequests.descendingIterator();
-                }
-            };
+            return () -> clearedRequests.descendingIterator();
         }
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
index 933d66c..8947293 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
@@ -45,7 +45,7 @@ public class ConsumerGroupDescription {
                                     Node coordinator) {
         this.groupId = groupId == null ? "" : groupId;
         this.isSimpleConsumerGroup = isSimpleConsumerGroup;
-        this.members = members == null ? Collections.<MemberDescription>emptyList() :
+        this.members = members == null ? Collections.emptyList() :
             Collections.unmodifiableList(new ArrayList<>(members));
         this.partitionAssignor = partitionAssignor == null ? "" : partitionAssignor;
         this.state = state;
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
index 19df228..63310bc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
@@ -101,29 +101,26 @@ public class DeleteAclsResult {
      * Note that it if the filters don't match any ACLs, this is not considered an error.
      */
     public KafkaFuture<Collection<AclBinding>> all() {
-        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply(
-            new KafkaFuture.BaseFunction<Void, Collection<AclBinding>>() {
-                @Override
-                public Collection<AclBinding> apply(Void v) {
-                    List<AclBinding> acls = new ArrayList<>();
-                    for (Map.Entry<AclBindingFilter, KafkaFuture<FilterResults>> entry : futures.entrySet()) {
-                        FilterResults results;
-                        try {
-                            results = entry.getValue().get();
-                        } catch (Throwable e) {
-                            // This should be unreachable, since the future returned by KafkaFuture#allOf should
-                            // have failed if any Future failed.
-                            throw new KafkaException("DeleteAclsResult#all: internal error", e);
-                        }
-                        for (FilterResult result : results.values()) {
-                            if (result.exception() != null) {
-                                throw result.exception();
-                            }
-                            acls.add(result.binding());
-                        }
-                    }
-                    return acls;
-                }
-            });
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply(v -> getAclBindings(futures));
+    }
+
+    private List<AclBinding> getAclBindings(Map<AclBindingFilter, KafkaFuture<FilterResults>> futures) {
+        List<AclBinding> acls = new ArrayList<>();
+        for (KafkaFuture<FilterResults> value: futures.values()) {
+            FilterResults results;
+            try {
+                results = value.get();
+            } catch (Throwable e) {
+                // This should be unreachable, since the future returned by KafkaFuture#allOf should
+                // have failed if any Future failed.
+                throw new KafkaException("DeleteAclsResult#all: internal error", e);
+            }
+            for (FilterResult result : results.values()) {
+                if (result.exception() != null)
+                    throw result.exception();
+                acls.add(result.binding());
+            }
+        }
+        return acls;
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index 1ad3991..3d9e5ca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -20,7 +20,6 @@ package org.apache.kafka.clients.admin.internals;
 import org.apache.kafka.clients.MetadataUpdater;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
@@ -183,9 +182,9 @@ public class AdminMetadataManager {
             log.trace("Clearing cached controller node {}.", cluster.controller());
             this.cluster = new Cluster(cluster.clusterResource().clusterId(),
                 cluster.nodes(),
-                Collections.<PartitionInfo>emptySet(),
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet(),
+                Collections.emptySet(),
+                Collections.emptySet(),
+                Collections.emptySet(),
                 null);
         }
     }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 33e037c..4ea3cfd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -708,7 +708,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
                     true, false, clusterResourceListeners);
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
-            this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
+            this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), 0);
             String metricGrpPrefix = "consumer";
             ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer");
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index cf1b07f..9eee6da 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -184,7 +184,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         // update the consumed offset
         final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new HashMap<>();
         for (final TopicPartition topicPartition : records.keySet()) {
-            results.put(topicPartition, new ArrayList<ConsumerRecord<K, V>>());
+            results.put(topicPartition, new ArrayList<>());
         }
 
         for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : this.records.entrySet()) {
@@ -208,11 +208,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         Set<TopicPartition> currentAssigned = new HashSet<>(this.subscriptions.assignedPartitions());
         if (!currentAssigned.contains(tp))
             throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer");
-        List<ConsumerRecord<K, V>> recs = this.records.get(tp);
-        if (recs == null) {
-            recs = new ArrayList<>();
-            this.records.put(tp, recs);
-        }
+        List<ConsumerRecord<K, V>> recs = this.records.computeIfAbsent(tp, k -> new ArrayList<>());
         recs.add(record);
     }
 
@@ -439,12 +435,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     public synchronized void scheduleNopPollTask() {
-        schedulePollTask(new Runnable() {
-            @Override
-            public void run() {
-                // noop
-            }
-        });
+        schedulePollTask(() -> { });
     }
 
     public synchronized Set<TopicPartition> paused() {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
index 5d5a268..78956ab 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
@@ -64,7 +64,7 @@ public class RangeAssignor extends AbstractPartitionAssignor {
         Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
-            assignment.put(memberId, new ArrayList<TopicPartition>());
+            assignment.put(memberId, new ArrayList<>());
 
         for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
             String topic = topicEntry.getKey();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
index 3b543f7..6763b82 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
@@ -60,7 +60,7 @@ public class RoundRobinAssignor extends AbstractPartitionAssignor {
                                                     Map<String, Subscription> subscriptions) {
         Map<String, List<TopicPartition>> assignment = new HashMap<>();
         for (String memberId : subscriptions.keySet())
-            assignment.put(memberId, new ArrayList<TopicPartition>());
+            assignment.put(memberId, new ArrayList<>());
 
         CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));
         for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
@@ -72,7 +72,6 @@ public class RoundRobinAssignor extends AbstractPartitionAssignor {
         return assignment;
     }
 
-
     public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic,
                                                     Map<String, Subscription> subscriptions) {
         SortedSet<String> topics = new TreeSet<>();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
index 8ec887e..35eb8eb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
@@ -80,11 +80,7 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor {
     }
 
     protected static <K, V> void put(Map<K, List<V>> map, K key, V value) {
-        List<V> list = map.get(key);
-        if (list == null) {
-            list = new ArrayList<>();
-            map.put(key, list);
-        }
+        List<V> list = map.computeIfAbsent(key, k -> new ArrayList<>());
         list.add(value);
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 33d3749..24a18db 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -56,7 +56,7 @@ public final class Cluster {
                    Collection<PartitionInfo> partitions,
                    Set<String> unauthorizedTopics,
                    Set<String> internalTopics) {
-        this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.<String>emptySet(), internalTopics, null);
+        this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.emptySet(), internalTopics, null);
     }
 
     /**
@@ -70,7 +70,7 @@ public final class Cluster {
                    Set<String> unauthorizedTopics,
                    Set<String> internalTopics,
                    Node controller) {
-        this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.<String>emptySet(), internalTopics, controller);
+        this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.emptySet(), internalTopics, controller);
     }
 
     /**
@@ -117,11 +117,11 @@ public final class Cluster {
         HashMap<String, List<PartitionInfo>> partsForTopic = new HashMap<>();
         HashMap<Integer, List<PartitionInfo>> partsForNode = new HashMap<>();
         for (Node n : this.nodes) {
-            partsForNode.put(n.id(), new ArrayList<PartitionInfo>());
+            partsForNode.put(n.id(), new ArrayList<>());
         }
         for (PartitionInfo p : partitions) {
             if (!partsForTopic.containsKey(p.topic()))
-                partsForTopic.put(p.topic(), new ArrayList<PartitionInfo>());
+                partsForTopic.put(p.topic(), new ArrayList<>());
             List<PartitionInfo> psTopic = partsForTopic.get(p.topic());
             psTopic.add(p);
 
@@ -157,8 +157,8 @@ public final class Cluster {
      * Create an empty cluster instance with no nodes and no topic-partitions.
      */
     public static Cluster empty() {
-        return new Cluster(null, new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(),
-                Collections.<String>emptySet(), null);
+        return new Cluster(null, new ArrayList<>(0), new ArrayList<>(0), Collections.emptySet(),
+            Collections.emptySet(), null);
     }
 
     /**
@@ -171,8 +171,8 @@ public final class Cluster {
         int nodeId = -1;
         for (InetSocketAddress address : addresses)
             nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
-        return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0),
-                            Collections.<String>emptySet(), Collections.<String>emptySet(), Collections.<String>emptySet(), null);
+        return new Cluster(null, true, nodes, new ArrayList<>(0),
+            Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null);
     }
 
     /**
@@ -231,7 +231,7 @@ public final class Cluster {
      */
     public List<PartitionInfo> partitionsForTopic(String topic) {
         List<PartitionInfo> parts = this.partitionsByTopic.get(topic);
-        return (parts == null) ? Collections.<PartitionInfo>emptyList() : parts;
+        return (parts == null) ? Collections.emptyList() : parts;
     }
 
     /**
@@ -251,7 +251,7 @@ public final class Cluster {
      */
     public List<PartitionInfo> availablePartitionsForTopic(String topic) {
         List<PartitionInfo> parts = this.availablePartitionsByTopic.get(topic);
-        return (parts == null) ? Collections.<PartitionInfo>emptyList() : parts;
+        return (parts == null) ? Collections.emptyList() : parts;
     }
 
     /**
@@ -261,7 +261,7 @@ public final class Cluster {
      */
     public List<PartitionInfo> partitionsForNode(int nodeId) {
         List<PartitionInfo> parts = this.partitionsByNode.get(nodeId);
-        return (parts == null) ? Collections.<PartitionInfo>emptyList() : parts;
+        return (parts == null) ? Collections.emptyList() : parts;
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/common/MetricName.java b/clients/src/main/java/org/apache/kafka/common/MetricName.java
index 2136a72..0af77a0 100644
--- a/clients/src/main/java/org/apache/kafka/common/MetricName.java
+++ b/clients/src/main/java/org/apache/kafka/common/MetricName.java
@@ -106,9 +106,9 @@ public final class MetricName {
             return hash;
         final int prime = 31;
         int result = 1;
-        result = prime * result + ((group == null) ? 0 : group.hashCode());
-        result = prime * result + ((name == null) ? 0 : name.hashCode());
-        result = prime * result + ((tags == null) ? 0 : tags.hashCode());
+        result = prime * result + group.hashCode();
+        result = prime * result + name.hashCode();
+        result = prime * result + tags.hashCode();
         this.hash = result;
         return result;
     }
@@ -122,22 +122,7 @@ public final class MetricName {
         if (getClass() != obj.getClass())
             return false;
         MetricName other = (MetricName) obj;
-        if (group == null) {
-            if (other.group != null)
-                return false;
-        } else if (!group.equals(other.group))
-            return false;
-        if (name == null) {
-            if (other.name != null)
-                return false;
-        } else if (!name.equals(other.name))
-            return false;
-        if (tags == null) {
-            if (other.tags != null)
-                return false;
-        } else if (!tags.equals(other.tags))
-            return false;
-        return true;
+        return group.equals(other.group) && name.equals(other.name) && tags.equals(other.tags);
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
index 08b2a51..2c6add7 100644
--- a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 /**
  * A topic name and partition number
@@ -48,7 +49,7 @@ public final class TopicPartition implements Serializable {
         final int prime = 31;
         int result = 1;
         result = prime * result + partition;
-        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+        result = prime * result + Objects.hashCode(topic);
         this.hash = result;
         return result;
     }
@@ -62,14 +63,7 @@ public final class TopicPartition implements Serializable {
         if (getClass() != obj.getClass())
             return false;
         TopicPartition other = (TopicPartition) obj;
-        if (partition != other.partition)
-            return false;
-        if (topic == null) {
-            if (other.topic != null)
-                return false;
-        } else if (!topic.equals(other.topic))
-            return false;
-        return true;
+        return partition == other.partition && Objects.equals(topic, other.topic);
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartitionReplica.java b/clients/src/main/java/org/apache/kafka/common/TopicPartitionReplica.java
index 2a10439..2442602 100644
--- a/clients/src/main/java/org/apache/kafka/common/TopicPartitionReplica.java
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartitionReplica.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common;
 
+import org.apache.kafka.common.utils.Utils;
+
 import java.io.Serializable;
 
 
@@ -30,7 +32,7 @@ public final class TopicPartitionReplica implements Serializable {
     private final String topic;
 
     public TopicPartitionReplica(String topic, int partition, int brokerId) {
-        this.topic = topic;
+        this.topic = Utils.notNull(topic);
         this.partition = partition;
         this.brokerId = brokerId;
     }
@@ -54,7 +56,7 @@ public final class TopicPartitionReplica implements Serializable {
         }
         final int prime = 31;
         int result = 1;
-        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+        result = prime * result + topic.hashCode();
         result = prime * result + partition;
         result = prime * result + brokerId;
         this.hash = result;
@@ -70,18 +72,7 @@ public final class TopicPartitionReplica implements Serializable {
         if (getClass() != obj.getClass())
             return false;
         TopicPartitionReplica other = (TopicPartitionReplica) obj;
-        if (partition != other.partition)
-            return false;
-        if (brokerId != other.brokerId)
-            return false;
-        if (topic == null) {
-            if (other.topic != null) {
-                return false;
-            }
-        } else if (!topic.equals(other.topic)) {
-            return false;
-        }
-        return true;
+        return partition == other.partition && brokerId == other.brokerId && topic.equals(other.topic);
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java
index a95303e..225e73a 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java
@@ -109,9 +109,7 @@ public class AccessControlEntryFilter {
             return false;
         if ((operation() != AclOperation.ANY) && (!operation().equals(other.operation())))
             return false;
-        if ((permissionType() != AclPermissionType.ANY) && (!permissionType().equals(other.permissionType())))
-            return false;
-        return true;
+        return (permissionType() == AclPermissionType.ANY) || (permissionType().equals(other.permissionType()));
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 4f8420b..9cf13dd 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -316,7 +316,7 @@ public class AbstractConfig {
      * @return The list of configured instances
      */
     public <T> List<T> getConfiguredInstances(String key, Class<T> t) {
-        return getConfiguredInstances(key, t, Collections.<String, Object>emptyMap());
+        return getConfiguredInstances(key, t, Collections.emptyMap());
     }
 
     /**
@@ -343,7 +343,7 @@ public class AbstractConfig {
      * @return The list of configured instances
      */
     public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, Map<String, Object> configOverrides) {
-        List<T> objects = new ArrayList<T>();
+        List<T> objects = new ArrayList<>();
         if (classNames == null)
             return objects;
         Map<String, Object> configPairs = originals();
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 662909a..3868ed5 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -22,10 +22,8 @@ import org.apache.kafka.common.utils.Utils;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -187,7 +185,7 @@ public class ConfigDef {
      */
     public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation,
                             String group, int orderInGroup, Width width, String displayName, Recommender recommender) {
-        return define(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList(), recommender);
+        return define(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, Collections.emptyList(), recommender);
     }
 
     /**
@@ -264,7 +262,7 @@ public class ConfigDef {
      */
     public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation,
                             String group, int orderInGroup, Width width, String displayName, Recommender recommender) {
-        return define(name, type, defaultValue, null, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList(), recommender);
+        return define(name, type, defaultValue, null, importance, documentation, group, orderInGroup, width, displayName, Collections.emptyList(), recommender);
     }
 
     /**
@@ -337,7 +335,7 @@ public class ConfigDef {
      */
     public ConfigDef define(String name, Type type, Importance importance, String documentation, String group, int orderInGroup,
                             Width width, String displayName, Recommender recommender) {
-        return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList(), recommender);
+        return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, group, orderInGroup, width, displayName, Collections.emptyList(), recommender);
     }
 
     /**
@@ -607,13 +605,7 @@ public class ConfigDef {
                 List<Object> originalRecommendedValues = value.recommendedValues();
                 if (!originalRecommendedValues.isEmpty()) {
                     Set<Object> originalRecommendedValueSet = new HashSet<>(originalRecommendedValues);
-                    Iterator<Object> it = recommendedValues.iterator();
-                    while (it.hasNext()) {
-                        Object o = it.next();
-                        if (!originalRecommendedValueSet.contains(o)) {
-                            it.remove();
-                        }
-                    }
+                    recommendedValues.removeIf(o -> !originalRecommendedValueSet.contains(o));
                 }
                 value.recommendedValues(recommendedValues);
                 value.visible(key.recommender.visible(name, parsed));
@@ -1271,32 +1263,30 @@ public class ConfigDef {
         }
 
         List<ConfigKey> configs = new ArrayList<>(configKeys.values());
-        Collections.sort(configs, new Comparator<ConfigKey>() {
-            @Override
-            public int compare(ConfigKey k1, ConfigKey k2) {
-                int cmp = k1.group == null
-                        ? (k2.group == null ? 0 : -1)
-                        : (k2.group == null ? 1 : Integer.compare(groupOrd.get(k1.group), groupOrd.get(k2.group)));
-                if (cmp == 0) {
-                    cmp = Integer.compare(k1.orderInGroup, k2.orderInGroup);
-                    if (cmp == 0) {
-                        // first take anything with no default value
-                        if (!k1.hasDefault() && k2.hasDefault()) {
-                            cmp = -1;
-                        } else if (!k2.hasDefault() && k1.hasDefault()) {
-                            cmp = 1;
-                        } else {
-                            cmp = k1.importance.compareTo(k2.importance);
-                            if (cmp == 0) {
-                                return k1.name.compareTo(k2.name);
-                            }
-                        }
-                    }
+        Collections.sort(configs, (k1, k2) -> compare(k1, k2, groupOrd));
+        return configs;
+    }
+
+    private int compare(ConfigKey k1, ConfigKey k2, Map<String, Integer> groupOrd) {
+        int cmp = k1.group == null
+            ? (k2.group == null ? 0 : -1)
+            : (k2.group == null ? 1 : Integer.compare(groupOrd.get(k1.group), groupOrd.get(k2.group)));
+        if (cmp == 0) {
+            cmp = Integer.compare(k1.orderInGroup, k2.orderInGroup);
+            if (cmp == 0) {
+                // first take anything with no default value
+                if (!k1.hasDefault() && k2.hasDefault())
+                    cmp = -1;
+                else if (!k2.hasDefault() && k1.hasDefault())
+                    cmp = 1;
+                else {
+                    cmp = k1.importance.compareTo(k2.importance);
+                    if (cmp == 0)
+                        return k1.name.compareTo(k2.name);
                 }
-                return cmp;
             }
-        });
-        return configs;
+        }
+        return cmp;
     }
 
     public void embed(final String keyPrefix, final String groupPrefix, final int startingOrd, final ConfigDef child) {
@@ -1324,12 +1314,7 @@ public class ConfigDef {
      */
     private static Validator embeddedValidator(final String keyPrefix, final Validator base) {
         if (base == null) return null;
-        return new ConfigDef.Validator() {
-            @Override
-            public void ensureValid(String name, Object value) {
-                base.ensureValid(name.substring(keyPrefix.length()), value);
-            }
-        };
+        return (name, value) -> base.ensureValid(name.substring(keyPrefix.length()), value);
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
index 6430ffd..a830f9f 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
@@ -81,7 +81,7 @@ public class ConfigTransformer {
         // Collect the variables from the given configs that need transformation
         for (Map.Entry<String, String> config : configs.entrySet()) {
             if (config.getValue() != null) {
-                List<ConfigVariable> vars = getVars(config.getKey(), config.getValue(), DEFAULT_PATTERN);
+                List<ConfigVariable> vars = getVars(config.getValue(), DEFAULT_PATTERN);
                 for (ConfigVariable var : vars) {
                     Map<String, Set<String>> keysByPath = keysByProvider.computeIfAbsent(var.providerName, k -> new HashMap<>());
                     Set<String> keys = keysByPath.computeIfAbsent(var.path, k -> new HashSet<>());
@@ -121,7 +121,7 @@ public class ConfigTransformer {
         return new ConfigTransformerResult(data, ttls);
     }
 
-    private static List<ConfigVariable> getVars(String key, String value, Pattern pattern) {
+    private static List<ConfigVariable> getVars(String value, Pattern pattern) {
         List<ConfigVariable> configVars = new ArrayList<>();
         Matcher matcher = pattern.matcher(value);
         while (matcher.find()) {
diff --git a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
index 141c972..577e758 100644
--- a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
+++ b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.utils.AbstractIterator;
 
 public class RecordHeaders implements Headers {
-    
+
     private final List<Header> headers;
     private volatile boolean isReadOnly;
 
@@ -43,7 +43,7 @@ public class RecordHeaders implements Headers {
             this.headers = new ArrayList<>(Arrays.asList(headers));
         }
     }
-    
+
     public RecordHeaders(Iterable<Header> headers) {
         //Use efficient copy constructor if possible, fallback to iteration otherwise
         if (headers == null) {
@@ -99,12 +99,7 @@ public class RecordHeaders implements Headers {
     @Override
     public Iterable<Header> headers(final String key) {
         checkKey(key);
-        return new Iterable<Header>() {
-            @Override
-            public Iterator<Header> iterator() {
-                return new FilterByKeyIterator(headers.iterator(), key);
-            }
-        };
+        return () -> new FilterByKeyIterator(headers.iterator(), key);
     }
 
     @Override
@@ -119,17 +114,15 @@ public class RecordHeaders implements Headers {
     public Header[] toArray() {
         return headers.isEmpty() ? Record.EMPTY_HEADERS : headers.toArray(new Header[headers.size()]);
     }
-    
+
     private void checkKey(String key) {
-        if (key == null) {
+        if (key == null)
             throw new IllegalArgumentException("key cannot be null.");
-        }
     }
-    
+
     private void canWrite() {
-        if (isReadOnly) {
+        if (isReadOnly)
             throw new IllegalStateException("RecordHeaders has been closed.");
-        }
     }
 
     private Iterator<Header> closeAware(final Iterator<Header> original) {
@@ -177,7 +170,7 @@ public class RecordHeaders implements Headers {
                ", isReadOnly = " + isReadOnly +
                ')';
     }
-    
+
     private static final class FilterByKeyIterator extends AbstractIterator<Header> {
 
         private final Iterator<Header> original;
@@ -187,14 +180,13 @@ public class RecordHeaders implements Headers {
             this.original = original;
             this.key = key;
         }
-        
+
         protected Header makeNext() {
             while (true) {
                 if (original.hasNext()) {
                     Header header = original.next();
-                    if (!header.key().equals(key)) {
+                    if (!header.key().equals(key))
                         continue;
-                    }
 
                     return header;
                 }
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
index 33916ac..d610172 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -208,7 +208,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
 
     @Override
     public synchronized boolean complete(T newValue) {
-        List<BiConsumer<? super T, ? super Throwable>> oldWaiters = null;
+        List<BiConsumer<? super T, ? super Throwable>> oldWaiters;
         synchronized (this) {
             if (done)
                 return false;
@@ -225,7 +225,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
 
     @Override
     public boolean completeExceptionally(Throwable newException) {
-        List<BiConsumer<? super T, ? super Throwable>> oldWaiters = null;
+        List<BiConsumer<? super T, ? super Throwable>> oldWaiters;
         synchronized (this) {
             if (done)
                 return false;
@@ -257,7 +257,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
      */
     @Override
     public T get() throws InterruptedException, ExecutionException {
-        SingleWaiter<T> waiter = new SingleWaiter<T>();
+        SingleWaiter<T> waiter = new SingleWaiter<>();
         addWaiter(waiter);
         return waiter.await();
     }
@@ -269,7 +269,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
     @Override
     public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
             TimeoutException {
-        SingleWaiter<T> waiter = new SingleWaiter<T>();
+        SingleWaiter<T> waiter = new SingleWaiter<>();
         addWaiter(waiter);
         return waiter.await(timeout, unit);
     }
@@ -292,7 +292,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
      */
     @Override
     public synchronized boolean isCancelled() {
-        return (exception != null) && (exception instanceof CancellationException);
+        return exception instanceof CancellationException;
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
index ba65632..2918dd6 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
@@ -134,11 +134,7 @@ public class PartitionStates<S> {
     private void update(Map<TopicPartition, S> partitionToState) {
         LinkedHashMap<String, List<TopicPartition>> topicToPartitions = new LinkedHashMap<>();
         for (TopicPartition tp : partitionToState.keySet()) {
-            List<TopicPartition> partitions = topicToPartitions.get(tp.topic());
-            if (partitions == null) {
-                partitions = new ArrayList<>();
-                topicToPartitions.put(tp.topic(), partitions);
-            }
+            List<TopicPartition> partitions = topicToPartitions.computeIfAbsent(tp.topic(), k -> new ArrayList<>());
             partitions.add(tp);
         }
         for (Map.Entry<String, List<TopicPartition>> entry : topicToPartitions.entrySet()) {
diff --git a/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java b/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java
index 041d1c2..18f8ffe 100644
--- a/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java
+++ b/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java
@@ -77,7 +77,7 @@ public class GarbageCollectedMemoryPool extends SimpleMemoryPool implements Auto
     }
 
     @Override
-    public void close() throws Exception {
+    public void close() {
         alive = false;
         gcListenerThread.interrupt();
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
index 08ed1a0..2d584bd 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
@@ -62,7 +62,8 @@ public class ChannelState {
         FAILED_SEND,
         AUTHENTICATION_FAILED,
         LOCAL_CLOSE
-    };
+    }
+
     // AUTHENTICATION_FAILED has a custom exception. For other states,
     // create a reusable `ChannelState` instance per-state.
     public static final ChannelState NOT_CONNECTED = new ChannelState(State.NOT_CONNECTED);
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 17dc6a3..2895128 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -48,7 +48,7 @@ public class KafkaChannel {
         MUTED_AND_RESPONSE_PENDING,
         MUTED_AND_THROTTLED,
         MUTED_AND_THROTTLED_AND_RESPONSE_PENDING
-    };
+    }
 
     /** Socket server events that will change the mute state:
      * <ul>
@@ -72,7 +72,7 @@ public class KafkaChannel {
         RESPONSE_SENT,
         THROTTLE_STARTED,
         THROTTLE_ENDED
-    };
+    }
 
     private final String id;
     private final TransportLayer transportLayer;
@@ -90,7 +90,7 @@ public class KafkaChannel {
     private ChannelMuteState muteState;
     private ChannelState state;
 
-    public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize, MemoryPool memoryPool) throws IOException {
+    public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize, MemoryPool memoryPool) {
         this.id = id;
         this.transportLayer = transportLayer;
         this.authenticator = authenticator;
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
index c7b80cb..e397f05 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
@@ -26,7 +26,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
-import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.channels.SelectionKey;
 import java.util.Map;
@@ -76,7 +75,7 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
         }
 
         @Override
-        public void authenticate() throws IOException {}
+        public void authenticate() {}
 
         @Override
         public KafkaPrincipal principal() {
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
index ccb9c60..845b147 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
@@ -87,10 +87,9 @@ public class PlaintextTransportLayer implements TransportLayer {
     /**
      * Performs SSL handshake hence is a no-op for the non-secure
      * implementation
-     * @throws IOException
-    */
+     */
     @Override
-    public void handshake() throws IOException {}
+    public void handshake() {}
 
     /**
     * Reads a sequence of bytes from this channel into the given buffer.
@@ -121,7 +120,7 @@ public class PlaintextTransportLayer implements TransportLayer {
      * @param dsts - The buffers into which bytes are to be transferred
      * @param offset - The offset within the buffer array of the first buffer into which bytes are to be transferred; must be non-negative and no larger than dsts.length.
      * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than dsts.length - offset
-     * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
+     * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
      * @throws IOException if some other I/O error occurs
      */
     @Override
@@ -133,7 +132,7 @@ public class PlaintextTransportLayer implements TransportLayer {
     * Writes a sequence of bytes to this channel from the given buffer.
     *
     * @param src The buffer from which bytes are to be retrieved
-    * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
+    * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
     * @throws IOException If some other I/O error occurs
     */
     @Override
@@ -145,7 +144,7 @@ public class PlaintextTransportLayer implements TransportLayer {
     * Writes a sequence of bytes to this channel from the given buffer.
     *
     * @param srcs The buffer from which bytes are to be retrieved
-    * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
+    * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
     * @throws IOException If some other I/O error occurs
     */
     @Override
@@ -180,7 +179,7 @@ public class PlaintextTransportLayer implements TransportLayer {
      * Returns ANONYMOUS as Principal.
      */
     @Override
-    public Principal peerPrincipal() throws IOException {
+    public Principal peerPrincipal() {
         return principal;
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index ca34b71..172e629 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -156,7 +156,7 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
 
     @Override
     public Set<String> reconfigurableConfigs() {
-        return securityProtocol == SecurityProtocol.SASL_SSL ? SslConfigs.RECONFIGURABLE_CONFIGS : Collections.<String>emptySet();
+        return securityProtocol == SecurityProtocol.SASL_SSL ? SslConfigs.RECONFIGURABLE_CONFIGS : Collections.emptySet();
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
index efb603c..8f81dbe 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
@@ -30,7 +30,7 @@ public interface Selectable {
     /**
      * See {@link #connect(String, InetSocketAddress, int, int) connect()}
      */
-    public static final int USE_DEFAULT_BUFFER_SIZE = -1;
+    int USE_DEFAULT_BUFFER_SIZE = -1;
 
     /**
      * Begin establishing a socket connection to the given address identified by the given address
@@ -40,83 +40,83 @@ public interface Selectable {
      * @param receiveBufferSize The receive buffer for the socket
      * @throws IOException If we cannot begin connecting
      */
-    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException;
+    void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException;
 
     /**
      * Wakeup this selector if it is blocked on I/O
      */
-    public void wakeup();
+    void wakeup();
 
     /**
      * Close this selector
      */
-    public void close();
+    void close();
 
     /**
      * Close the connection identified by the given id
      */
-    public void close(String id);
+    void close(String id);
 
     /**
      * Queue the given request for sending in the subsequent {@link #poll(long) poll()} calls
      * @param send The request to send
      */
-    public void send(Send send);
+    void send(Send send);
 
     /**
      * Do I/O. Reads, writes, connection establishment, etc.
      * @param timeout The amount of time to block if there is nothing to do
      * @throws IOException
      */
-    public void poll(long timeout) throws IOException;
+    void poll(long timeout) throws IOException;
 
     /**
      * The list of sends that completed on the last {@link #poll(long) poll()} call.
      */
-    public List<Send> completedSends();
+    List<Send> completedSends();
 
     /**
      * The list of receives that completed on the last {@link #poll(long) poll()} call.
      */
-    public List<NetworkReceive> completedReceives();
+    List<NetworkReceive> completedReceives();
 
     /**
      * The connections that finished disconnecting on the last {@link #poll(long) poll()}
      * call. Channel state indicates the local channel state at the time of disconnection.
      */
-    public Map<String, ChannelState> disconnected();
+    Map<String, ChannelState> disconnected();
 
     /**
      * The list of connections that completed their connection on the last {@link #poll(long) poll()}
      * call.
      */
-    public List<String> connected();
+    List<String> connected();
 
     /**
      * Disable reads from the given connection
      * @param id The id for the connection
      */
-    public void mute(String id);
+    void mute(String id);
 
     /**
      * Re-enable reads from the given connection
      * @param id The id for the connection
      */
-    public void unmute(String id);
+    void unmute(String id);
 
     /**
      * Disable reads from all connections
      */
-    public void muteAll();
+    void muteAll();
 
     /**
      * Re-enable reads from all connections
      */
-    public void unmuteAll();
+    void unmuteAll();
 
     /**
      * returns true  if a channel is ready
      * @param id The id for the connection
      */
-    public boolean isChannelReady(String id);
+    boolean isChannelReady(String id);
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 806bda7..44223e7 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -20,8 +20,6 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.memory.MemoryPool;
-import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -227,7 +225,7 @@ public class Selector implements Selectable, AutoCloseable {
     }
 
     public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder, LogContext logContext) {
-        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, Collections.<String, String>emptyMap(), true, channelBuilder, logContext);
+        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, Collections.emptyMap(), true, channelBuilder, logContext);
     }
 
     public Selector(long connectionMaxIdleMS, int failedAuthenticationDelayMs, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder, LogContext logContext) {
@@ -552,7 +550,7 @@ public class Selector implements Selectable, AutoCloseable {
 
                 /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
                 if (channel.ready() && key.isWritable()) {
-                    Send send = null;
+                    Send send;
                     try {
                         send = channel.write();
                     } catch (Exception e) {
@@ -919,7 +917,7 @@ public class Selector implements Selectable, AutoCloseable {
      */
     private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
         if (!stagedReceives.containsKey(channel))
-            stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
+            stagedReceives.put(channel, new ArrayDeque<>());
 
         Deque<NetworkReceive> deque = stagedReceives.get(channel);
         deque.add(receive);
@@ -1045,11 +1043,7 @@ public class Selector implements Selectable, AutoCloseable {
 
             metricName = metrics.metricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
             topLevelMetricNames.add(metricName);
-            this.metrics.addMetric(metricName, new Measurable() {
-                public double measure(MetricConfig config, long now) {
-                    return channels.size();
-                }
-            });
+            this.metrics.addMetric(metricName, (config, now) -> channels.size());
         }
 
         private Meter createMeter(Metrics metrics, String groupName, Map<String, String> metricTags,
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index b43e4c3..86d41d0 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -163,7 +163,7 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
          * No-Op for plaintext authenticator
          */
         @Override
-        public void authenticate() throws IOException {}
+        public void authenticate() {}
 
         /**
          * Constructs Principal using configured principalBuilder.
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index 08a39e7..917e5a2 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -76,7 +76,7 @@ public class SslTransportLayer implements TransportLayer {
     }
 
     // Prefer `create`, only use this in tests
-    SslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
+    SslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) {
         this.channelId = channelId;
         this.key = key;
         this.socketChannel = (SocketChannel) key.channel();
@@ -372,7 +372,7 @@ public class SslTransportLayer implements TransportLayer {
         }
     }
 
-    private SSLHandshakeException renegotiationException() throws IOException {
+    private SSLHandshakeException renegotiationException() {
         return new SSLHandshakeException("Renegotiation is not supported");
     }
 
@@ -715,7 +715,7 @@ public class SslTransportLayer implements TransportLayer {
      * SSLSession's peerPrincipal for the remote host.
      * @return Principal
      */
-    public Principal peerPrincipal() throws IOException {
+    public Principal peerPrincipal() {
         try {
             return sslEngine.getSession().getPeerPrincipal();
         } catch (SSLPeerUnverifiedException se) {
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index d461060..a796ec9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -111,7 +111,7 @@ import java.util.function.Function;
  * Do not add exceptions that occur only on the client or only on the server here.
  */
 public enum Errors {
-    UNKNOWN_SERVER_ERROR(-1, "The server experienced an unexpected error when processing the request,",
+    UNKNOWN_SERVER_ERROR(-1, "The server experienced an unexpected error when processing the request.",
             UnknownServerException::new),
     NONE(0, null, message -> null),
     OFFSET_OUT_OF_RANGE(1, "The requested offset is not within the range of offsets maintained by the server.",
@@ -159,8 +159,8 @@ public enum Errors {
     ILLEGAL_GENERATION(22, "Specified group generation id is not valid.",
             IllegalGenerationException::new),
     INCONSISTENT_GROUP_PROTOCOL(23,
-            "The group member's supported protocols are incompatible with those of existing members" +
-                " or first group member tried to join with empty protocol type or empty protocol list.",
+            "The group member's supported protocols are incompatible with those of existing members " +
+            "or first group member tried to join with empty protocol type or empty protocol list.",
             InconsistentGroupProtocolException::new),
     INVALID_GROUP_ID(24, "The configured groupId is invalid.",
             InvalidGroupIdException::new),
@@ -201,8 +201,8 @@ public enum Errors {
     NOT_CONTROLLER(41, "This is not the correct controller for this cluster.",
             NotControllerException::new),
     INVALID_REQUEST(42, "This most likely occurs because of a request being malformed by the " +
-                "client library or the message was sent to an incompatible broker. See the broker logs " +
-                "for more details.",
+            "client library or the message was sent to an incompatible broker. See the broker logs " +
+            "for more details.",
             InvalidRequestException::new),
     UNSUPPORTED_FOR_MESSAGE_FORMAT(43, "The message format version on the broker does not support the request.",
             UnsupportedForMessageFormatException::new),
@@ -221,10 +221,10 @@ public enum Errors {
             "its transactional id.",
             InvalidPidMappingException::new),
     INVALID_TRANSACTION_TIMEOUT(50, "The transaction timeout is larger than the maximum value allowed by " +
-                "the broker (as configured by transaction.max.timeout.ms).",
+            "the broker (as configured by transaction.max.timeout.ms).",
             InvalidTxnTimeoutException::new),
     CONCURRENT_TRANSACTIONS(51, "The producer attempted to update a transaction " +
-                "while another concurrent operation on the same transaction was ongoing.",
+            "while another concurrent operation on the same transaction was ongoing.",
             ConcurrentTransactionsException::new),
     TRANSACTION_COORDINATOR_FENCED(52, "Indicates that the transaction coordinator sending a WriteTxnMarker " +
             "is no longer the current coordinator for a given producer.",
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 5e41901..1994a71 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -25,12 +25,7 @@ import java.util.Iterator;
 
 public abstract class AbstractRecords implements Records {
 
-    private final Iterable<Record> records = new Iterable<Record>() {
-        @Override
-        public Iterator<Record> iterator() {
-            return recordsIterator();
-        }
-    };
+    private final Iterable<Record> records = this::recordsIterator;
 
     @Override
     public boolean hasMatchingMagic(byte magic) {
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
index 7f91f26..572abd8 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.errors.CorruptRecordException;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC;
@@ -39,7 +38,7 @@ class ByteBufferLogInputStream implements LogInputStream<MutableRecordBatch> {
         this.maxMessageSize = maxMessageSize;
     }
 
-    public MutableRecordBatch nextBatch() throws IOException {
+    public MutableRecordBatch nextBatch() {
         int remaining = buffer.remaining();
 
         Integer batchSize = nextBatchSize();
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 9b3bfc4..a333d2a 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -128,7 +128,7 @@ public enum CompressionType {
     /**
      * Wrap bufferStream with an OutputStream that will compress data with this CompressionType.
      *
-     * Note: Unlike {@link #wrapForInput}, {@link #wrapForOutput} cannot take {@#link ByteBuffer}s directly.
+     * Note: Unlike {@link #wrapForInput}, {@link #wrapForOutput} cannot take {@link ByteBuffer}s directly.
      * Currently, {@link MemoryRecordsBuilder#writeDefaultBatchHeader()} and {@link MemoryRecordsBuilder#writeLegacyCompressedWrapperHeader()}
      * write to the underlying buffer in the given {@link ByteBufferOutputStream} after the compressed data has been written.
      * In the event that the buffer needs to be expanded while writing the data, access to the underlying buffer needs to be preserved.
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index cebb5fa..3537fc3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -32,7 +32,6 @@ import java.nio.channels.FileChannel;
 import java.nio.channels.GatheringByteChannel;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
-import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -374,12 +373,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
      * @return An iterator over batches starting from {@code start}
      */
     public Iterable<FileChannelRecordBatch> batchesFrom(final int start) {
-        return new Iterable<FileChannelRecordBatch>() {
-            @Override
-            public Iterator<FileChannelRecordBatch> iterator() {
-                return batchIterator(start);
-            }
-        };
+        return () -> batchIterator(start);
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
index 56f1058..850b1e9 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
@@ -264,12 +264,12 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
     }
 
     @Override
-    public int available() throws IOException {
+    public int available() {
         return decompressedBuffer == null ? 0 : decompressedBuffer.remaining();
     }
 
     @Override
-    public void close() throws IOException {
+    public void close() {
         bufferSupplier.release(decompressionBuffer);
     }
 
@@ -279,7 +279,7 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
     }
 
     @Override
-    public void reset() throws IOException {
+    public void reset() {
         throw new RuntimeException("reset not supported");
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index af62e09..c6db18d 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -47,12 +47,7 @@ public class MemoryRecords extends AbstractRecords {
 
     private final ByteBuffer buffer;
 
-    private final Iterable<MutableRecordBatch> batches = new Iterable<MutableRecordBatch>() {
-        @Override
-        public Iterator<MutableRecordBatch> iterator() {
-            return batchIterator();
-        }
-    };
+    private final Iterable<MutableRecordBatch> batches = this::batchIterator;
 
     private int validBytes = -1;
 
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 6f6404f..1c7a6c7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -41,7 +41,7 @@ public class MemoryRecordsBuilder {
     private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
     private static final DataOutputStream CLOSED_STREAM = new DataOutputStream(new OutputStream() {
         @Override
-        public void write(int b) throws IOException {
+        public void write(int b) {
             throw new IllegalStateException("MemoryRecordsBuilder is closed for record appends");
         }
     });
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java
index 03bc098..b7cf1d8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java
@@ -117,7 +117,7 @@ public class AlterReplicaLogDirsRequest extends AbstractRequest {
         Map<String, List<TopicPartition>> dirPartitions = new HashMap<>();
         for (Map.Entry<TopicPartition, String> entry: partitionDirs.entrySet()) {
             if (!dirPartitions.containsKey(entry.getValue()))
-                dirPartitions.put(entry.getValue(), new ArrayList<TopicPartition>());
+                dirPartitions.put(entry.getValue(), new ArrayList<>());
             dirPartitions.get(entry.getValue()).add(entry.getKey());
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
index e154ac9..03f385b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
@@ -95,10 +95,10 @@ public class ApiVersionsRequest extends AbstractRequest {
         short version = version();
         switch (version) {
             case 0:
-                return new ApiVersionsResponse(Errors.forException(e), Collections.<ApiVersionsResponse.ApiVersion>emptyList());
+                return new ApiVersionsResponse(Errors.forException(e), Collections.emptyList());
             case 1:
             case 2:
-                return new ApiVersionsResponse(throttleTimeMs, Errors.forException(e), Collections.<ApiVersionsResponse.ApiVersion>emptyList());
+                return new ApiVersionsResponse(throttleTimeMs, Errors.forException(e), Collections.emptyList());
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         version, this.getClass().getSimpleName(), ApiKeys.API_VERSIONS.latestVersion()));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
index d6db1e1..dc86dd7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Field;
@@ -79,10 +78,10 @@ public class ControlledShutdownRequest extends AbstractRequest {
         switch (versionId) {
             case 0:
             case 1:
-                return new ControlledShutdownResponse(Errors.forException(e), Collections.<TopicPartition>emptySet());
+                return new ControlledShutdownResponse(Errors.forException(e), Collections.emptySet());
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ApiKeys.CONTROLLED_SHUTDOWN.latestVersion()));
+                    versionId, this.getClass().getSimpleName(), ApiKeys.CONTROLLED_SHUTDOWN.latestVersion()));
         }
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
index 9f05c5a..16d4c97 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
@@ -120,12 +120,12 @@ public class CreateTopicsRequest extends AbstractRequest {
         public TopicDetails(int partitions,
                             short replicationFactor,
                             Map<String, String> configs) {
-            this(partitions, replicationFactor, Collections.<Integer, List<Integer>>emptyMap(), configs);
+            this(partitions, replicationFactor, Collections.emptyMap(), configs);
         }
 
         public TopicDetails(int partitions,
                             short replicationFactor) {
-            this(partitions, replicationFactor, Collections.<String, String>emptyMap());
+            this(partitions, replicationFactor, Collections.emptyMap());
         }
 
         public TopicDetails(Map<Integer, List<Integer>> replicasAssignments,
@@ -134,7 +134,7 @@ public class CreateTopicsRequest extends AbstractRequest {
         }
 
         public TopicDetails(Map<Integer, List<Integer>> replicasAssignments) {
-            this(replicasAssignments, Collections.<String, String>emptyMap());
+            this(replicasAssignments, Collections.emptyMap());
         }
 
         @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
index c3fc194..2ed7c24 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
@@ -140,7 +140,7 @@ public class DeleteAclsRequest extends AbstractRequest {
                 List<DeleteAclsResponse.AclFilterResponse> responses = new ArrayList<>();
                 for (int i = 0; i < filters.size(); i++) {
                     responses.add(new DeleteAclsResponse.AclFilterResponse(
-                        ApiError.fromThrowable(throwable), Collections.<DeleteAclsResponse.AclDeletionResult>emptySet()));
+                        ApiError.fromThrowable(throwable), Collections.emptySet()));
                 }
                 return new DeleteAclsResponse(throttleTimeMs, responses);
             default:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
index c86b141..64ed27b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
@@ -82,7 +82,7 @@ public class DescribeDelegationTokenResponse extends AbstractResponse {
     }
 
     public DescribeDelegationTokenResponse(int throttleTimeMs, Errors error) {
-        this(throttleTimeMs, error, new ArrayList<DelegationToken>());
+        this(throttleTimeMs, error, new ArrayList<>());
     }
 
     public DescribeDelegationTokenResponse(Struct struct) {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index fbdfaa3..06c2471 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -196,11 +196,11 @@ public class DescribeGroupsResponse extends AbstractResponse {
 
         public static GroupMetadata forError(Errors error) {
             return new DescribeGroupsResponse.GroupMetadata(
-                    error,
-                    DescribeGroupsResponse.UNKNOWN_STATE,
-                    DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
-                    DescribeGroupsResponse.UNKNOWN_PROTOCOL,
-                    Collections.<DescribeGroupsResponse.GroupMember>emptyList());
+                error,
+                DescribeGroupsResponse.UNKNOWN_STATE,
+                DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
+                DescribeGroupsResponse.UNKNOWN_PROTOCOL,
+                Collections.emptyList());
         }
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 366509d..76cc1a3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -201,22 +201,22 @@ public class JoinGroupRequest extends AbstractRequest {
             case 0:
             case 1:
                 return new JoinGroupResponse(
-                        Errors.forException(e),
-                        JoinGroupResponse.UNKNOWN_GENERATION_ID,
-                        JoinGroupResponse.UNKNOWN_PROTOCOL,
-                        JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
-                        JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
-                        Collections.<String, ByteBuffer>emptyMap());
+                    Errors.forException(e),
+                    JoinGroupResponse.UNKNOWN_GENERATION_ID,
+                    JoinGroupResponse.UNKNOWN_PROTOCOL,
+                    JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
+                    JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
+                    Collections.emptyMap());
             case 2:
             case 3:
                 return new JoinGroupResponse(
-                        throttleTimeMs,
-                        Errors.forException(e),
-                        JoinGroupResponse.UNKNOWN_GENERATION_ID,
-                        JoinGroupResponse.UNKNOWN_PROTOCOL,
-                        JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
-                        JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
-                        Collections.<String, ByteBuffer>emptyMap());
+                    throttleTimeMs,
+                    Errors.forException(e),
+                    JoinGroupResponse.UNKNOWN_GENERATION_ID,
+                    JoinGroupResponse.UNKNOWN_PROTOCOL,
+                    JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
+                    JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
+                    Collections.emptyMap());
 
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
index 27254cb..baab1e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -71,10 +71,10 @@ public class ListGroupsRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
-                return new ListGroupsResponse(Errors.forException(e), Collections.<ListGroupsResponse.Group>emptyList());
+                return new ListGroupsResponse(Errors.forException(e), Collections.emptyList());
             case 1:
             case 2:
-                return new ListGroupsResponse(throttleTimeMs, Errors.forException(e), Collections.<ListGroupsResponse.Group>emptyList());
+                return new ListGroupsResponse(throttleTimeMs, Errors.forException(e), Collections.emptyList());
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.LIST_GROUPS.latestVersion()));
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
index 0a4611f..2ea9032 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
@@ -93,9 +93,7 @@ public class ResourceFilter {
     public boolean matches(Resource other) {
         if ((name != null) && (!name.equals(other.name())))
             return false;
-        if ((resourceType != ResourceType.ANY) && (!resourceType.equals(other.resourceType())))
-            return false;
-        return true;
+        return (resourceType == ResourceType.ANY) || (resourceType.equals(other.resourceType()));
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java b/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
index 849a978..231ee16 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
@@ -140,7 +140,7 @@ public class JaasContext {
      * The type of the SASL login context, it should be SERVER for the broker and CLIENT for the clients (consumer, producer,
      * etc.). This is used to validate behaviour (e.g. some functionality is only available in the broker or clients).
      */
-    public enum Type { CLIENT, SERVER; }
+    public enum Type { CLIENT, SERVER }
 
     private final String name;
     private final Type type;
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java
index 96e7426..28923ab 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java
@@ -23,7 +23,7 @@ public class CredentialCache {
     private final ConcurrentHashMap<String, Cache<? extends Object>> cacheMap = new ConcurrentHashMap<>();
 
     public <C> Cache<C> createCache(String mechanism, Class<C> credentialClass) {
-        Cache<C> cache = new Cache<C>(credentialClass);
+        Cache<C> cache = new Cache<>(credentialClass);
         Cache<C> oldCache = (Cache<C>) cacheMap.putIfAbsent(mechanism, cache);
         return oldCache == null ? cache : oldCache;
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
index 2bc44c5..7bb7be7 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
@@ -16,16 +16,6 @@
  */
 package org.apache.kafka.common.security.authenticator;
 
-import javax.security.auth.Subject;
-import javax.security.auth.login.LoginException;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-
-
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.types.Password;
@@ -39,6 +29,13 @@ import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
 public class LoginManager {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(LoginManager.class);
@@ -55,7 +52,7 @@ public class LoginManager {
     private int refCount;
 
     private LoginManager(JaasContext jaasContext, String saslMechanism, Map<String, ?> configs,
-                         LoginMetadata<?> loginMetadata) throws IOException, LoginException {
+                         LoginMetadata<?> loginMetadata) throws LoginException {
         this.loginMetadata = loginMetadata;
         this.login = Utils.newInstance(loginMetadata.loginClass);
         loginCallbackHandler = Utils.newInstance(loginMetadata.loginCallbackClass);
@@ -89,7 +86,7 @@ public class LoginManager {
      */
     public static LoginManager acquireLoginManager(JaasContext jaasContext, String saslMechanism,
                                                    Class<? extends Login> defaultLoginClass,
-                                                   Map<String, ?> configs) throws IOException, LoginException {
+                                                   Map<String, ?> configs) throws LoginException {
         Class<? extends Login> loginClass = configuredClassOrDefault(configs, jaasContext,
                 saslMechanism, SaslConfigs.SASL_LOGIN_CLASS, defaultLoginClass);
         Class<? extends AuthenticateCallbackHandler> defaultLoginCallbackHandlerClass = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 8934e8e..fb74f5f 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -113,7 +113,7 @@ public class SaslClientAuthenticator implements Authenticator {
                                    String host,
                                    String mechanism,
                                    boolean handshakeRequestEnable,
-                                   TransportLayer transportLayer) throws IOException {
+                                   TransportLayer transportLayer) {
         this.node = node;
         this.subject = subject;
         this.callbackHandler = callbackHandler;
@@ -144,13 +144,11 @@ public class SaslClientAuthenticator implements Authenticator {
 
     private SaslClient createSaslClient() {
         try {
-            return Subject.doAs(subject, new PrivilegedExceptionAction<SaslClient>() {
-                public SaslClient run() throws SaslException {
-                    String[] mechs = {mechanism};
-                    LOG.debug("Creating SaslClient: client={};service={};serviceHostname={};mechs={}",
-                        clientPrincipalName, servicePrincipal, host, Arrays.toString(mechs));
-                    return Sasl.createSaslClient(mechs, clientPrincipalName, servicePrincipal, host, configs, callbackHandler);
-                }
+            return Subject.doAs(subject, (PrivilegedExceptionAction<SaslClient>) () -> {
+                String[] mechs = {mechanism};
+                LOG.debug("Creating SaslClient: client={};service={};serviceHostname={};mechs={}",
+                    clientPrincipalName, servicePrincipal, host, Arrays.toString(mechs));
+                return Sasl.createSaslClient(mechs, clientPrincipalName, servicePrincipal, host, configs, callbackHandler);
             });
         } catch (PrivilegedActionException e) {
             throw new SaslAuthenticationException("Failed to create SaslClient with mechanism " + mechanism, e.getCause());
@@ -354,11 +352,7 @@ public class SaslClientAuthenticator implements Authenticator {
             if (isInitial && !saslClient.hasInitialResponse())
                 return saslToken;
             else
-                return Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
-                    public byte[] run() throws SaslException {
-                        return saslClient.evaluateChallenge(saslToken);
-                    }
-                });
+                return Subject.doAs(subject, (PrivilegedExceptionAction<byte[]>) () -> saslClient.evaluateChallenge(saslToken));
         } catch (PrivilegedActionException e) {
             String error = "An error: (" + e + ") occurred when evaluating SASL token received from the Kafka Broker.";
             KerberosError kerberosError = KerberosError.fromException(e);
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 43cb0a4..48a49fe 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -129,7 +129,7 @@ public class SaslServerAuthenticator implements Authenticator {
                                    KerberosShortNamer kerberosNameParser,
                                    ListenerName listenerName,
                                    SecurityProtocol securityProtocol,
-                                   TransportLayer transportLayer) throws IOException {
+                                   TransportLayer transportLayer) {
         this.callbackHandlers = callbackHandlers;
         this.connectionId = connectionId;
         this.subjects = subjects;
@@ -164,12 +164,8 @@ public class SaslServerAuthenticator implements Authenticator {
             saslServer = createSaslKerberosServer(callbackHandler, configs, subject);
         } else {
             try {
-                saslServer = Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
-                    public SaslServer run() throws SaslException {
-                        return Sasl.createSaslServer(saslMechanism, "kafka", serverAddress().getHostName(),
-                                configs, callbackHandler);
-                    }
-                });
+                saslServer = Subject.doAs(subject, (PrivilegedExceptionAction<SaslServer>) () ->
+                    Sasl.createSaslServer(saslMechanism, "kafka", serverAddress().getHostName(), configs, callbackHandler));
             } catch (PrivilegedActionException e) {
                 throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e.getCause());
             }
@@ -212,11 +208,8 @@ public class SaslServerAuthenticator implements Authenticator {
         }
 
         try {
-            return Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
-                public SaslServer run() throws SaslException {
-                    return Sasl.createSaslServer(saslMechanism, servicePrincipalName, serviceHostname, configs, saslServerCallbackHandler);
-                }
-            });
+            return Subject.doAs(subject, (PrivilegedExceptionAction<SaslServer>) () ->
+                    Sasl.createSaslServer(saslMechanism, servicePrincipalName, serviceHostname, configs, saslServerCallbackHandler));
         } catch (PrivilegedActionException e) {
             throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e.getCause());
         }
@@ -308,11 +301,11 @@ public class SaslServerAuthenticator implements Authenticator {
             saslServer.dispose();
     }
 
-    private void setSaslState(SaslState saslState) throws IOException {
+    private void setSaslState(SaslState saslState) {
         setSaslState(saslState, null);
     }
 
-    private void setSaslState(SaslState saslState, AuthenticationException exception) throws IOException {
+    private void setSaslState(SaslState saslState, AuthenticationException exception) {
         if (netOutBuffer != null && !netOutBuffer.completed()) {
             pendingSaslState = saslState;
             pendingException = exception;
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
index ec996a8..2faf6be 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
@@ -134,127 +134,125 @@ public class KerberosLogin extends AbstractLogin {
         // TGT's existing expiry date and the configured minTimeBeforeRelogin. For testing and development,
         // you can decrease the interval of expiration of tickets (for example, to 3 minutes) by running:
         //  "modprinc -maxlife 3mins <principal>" in kadmin.
-        t = KafkaThread.daemon(String.format("kafka-kerberos-refresh-thread-%s", principal), new Runnable() {
-            public void run() {
-                log.info("[Principal={}]: TGT refresh thread started.", principal);
-                while (true) {  // renewal thread's main loop. if it exits from here, thread will exit.
-                    KerberosTicket tgt = getTGT();
-                    long now = currentWallTime();
-                    long nextRefresh;
-                    Date nextRefreshDate;
-                    if (tgt == null) {
-                        nextRefresh = now + minTimeBeforeRelogin;
-                        nextRefreshDate = new Date(nextRefresh);
-                        log.warn("[Principal={}]: No TGT found: will try again at {}", principal, nextRefreshDate);
+        t = KafkaThread.daemon(String.format("kafka-kerberos-refresh-thread-%s", principal), () -> {
+            log.info("[Principal={}]: TGT refresh thread started.", principal);
+            while (true) {  // renewal thread's main loop. if it exits from here, thread will exit.
+                KerberosTicket tgt = getTGT();
+                long now = currentWallTime();
+                long nextRefresh;
+                Date nextRefreshDate;
+                if (tgt == null) {
+                    nextRefresh = now + minTimeBeforeRelogin;
+                    nextRefreshDate = new Date(nextRefresh);
+                    log.warn("[Principal={}]: No TGT found: will try again at {}", principal, nextRefreshDate);
+                } else {
+                    nextRefresh = getRefreshTime(tgt);
+                    long expiry = tgt.getEndTime().getTime();
+                    Date expiryDate = new Date(expiry);
+                    if (isUsingTicketCache && tgt.getRenewTill() != null && tgt.getRenewTill().getTime() < expiry) {
+                        log.warn("The TGT cannot be renewed beyond the next expiry date: {}." +
+                            "This process will not be able to authenticate new SASL connections after that " +
+                            "time (for example, it will not be able to authenticate a new connection with a Kafka " +
+                            "Broker).  Ask your system administrator to either increase the " +
+                            "'renew until' time by doing : 'modprinc -maxrenewlife {} ' within " +
+                            "kadmin, or instead, to generate a keytab for {}. Because the TGT's " +
+                            "expiry cannot be further extended by refreshing, exiting refresh thread now.",
+                            expiryDate, principal, principal);
+                        return;
+                    }
+                    // determine how long to sleep from looking at ticket's expiry.
+                    // We should not allow the ticket to expire, but we should take into consideration
+                    // minTimeBeforeRelogin. Will not sleep less than minTimeBeforeRelogin, unless doing so
+                    // would cause ticket expiration.
+                    if ((nextRefresh > expiry) || (now + minTimeBeforeRelogin > expiry)) {
+                        // expiry is before next scheduled refresh).
+                        log.info("[Principal={}]: Refreshing now because expiry is before next scheduled refresh time.", principal);
+                        nextRefresh = now;
                     } else {
-                        nextRefresh = getRefreshTime(tgt);
-                        long expiry = tgt.getEndTime().getTime();
-                        Date expiryDate = new Date(expiry);
-                        if (isUsingTicketCache && tgt.getRenewTill() != null && tgt.getRenewTill().getTime() < expiry) {
-                            log.warn("The TGT cannot be renewed beyond the next expiry date: {}." +
-                                    "This process will not be able to authenticate new SASL connections after that " +
-                                    "time (for example, it will not be able to authenticate a new connection with a Kafka " +
-                                    "Broker).  Ask your system administrator to either increase the " +
-                                    "'renew until' time by doing : 'modprinc -maxrenewlife {} ' within " +
-                                    "kadmin, or instead, to generate a keytab for {}. Because the TGT's " +
-                                    "expiry cannot be further extended by refreshing, exiting refresh thread now.",
-                                    expiryDate, principal, principal);
-                            return;
-                        }
-                        // determine how long to sleep from looking at ticket's expiry.
-                        // We should not allow the ticket to expire, but we should take into consideration
-                        // minTimeBeforeRelogin. Will not sleep less than minTimeBeforeRelogin, unless doing so
-                        // would cause ticket expiration.
-                        if ((nextRefresh > expiry) || (now + minTimeBeforeRelogin > expiry)) {
-                            // expiry is before next scheduled refresh).
-                            log.info("[Principal={}]: Refreshing now because expiry is before next scheduled refresh time.", principal);
-                            nextRefresh = now;
-                        } else {
-                            if (nextRefresh < (now + minTimeBeforeRelogin)) {
-                                // next scheduled refresh is sooner than (now + MIN_TIME_BEFORE_LOGIN).
-                                Date until = new Date(nextRefresh);
-                                Date newUntil = new Date(now + minTimeBeforeRelogin);
-                                log.warn("[Principal={}]: TGT refresh thread time adjusted from {} to {} since the former is sooner " +
-                                        "than the minimum refresh interval ({} seconds) from now.",
-                                        principal, until, newUntil, minTimeBeforeRelogin / 1000);
-                            }
-                            nextRefresh = Math.max(nextRefresh, now + minTimeBeforeRelogin);
-                        }
-                        nextRefreshDate = new Date(nextRefresh);
-                        if (nextRefresh > expiry) {
-                            log.error("[Principal={}]: Next refresh: {} is later than expiry {}. This may indicate a clock skew problem." +
-                                    "Check that this host and the KDC hosts' clocks are in sync. Exiting refresh thread.",
-                                    principal, nextRefreshDate, expiryDate);
-                            return;
+                        if (nextRefresh < (now + minTimeBeforeRelogin)) {
+                            // next scheduled refresh is sooner than (now + MIN_TIME_BEFORE_LOGIN).
+                            Date until = new Date(nextRefresh);
+                            Date newUntil = new Date(now + minTimeBeforeRelogin);
+                            log.warn("[Principal={}]: TGT refresh thread time adjusted from {} to {} since the former is sooner " +
+                                "than the minimum refresh interval ({} seconds) from now.",
+                                principal, until, newUntil, minTimeBeforeRelogin / 1000);
                         }
+                        nextRefresh = Math.max(nextRefresh, now + minTimeBeforeRelogin);
                     }
-                    if (now < nextRefresh) {
-                        Date until = new Date(nextRefresh);
-                        log.info("[Principal={}]: TGT refresh sleeping until: {}", principal, until);
-                        try {
-                            Thread.sleep(nextRefresh - now);
-                        } catch (InterruptedException ie) {
-                            log.warn("[Principal={}]: TGT renewal thread has been interrupted and will exit.", principal);
-                            return;
-                        }
-                    } else {
-                        log.error("[Principal={}]: NextRefresh: {} is in the past: exiting refresh thread. Check"
-                                + " clock sync between this host and KDC - (KDC's clock is likely ahead of this host)."
-                                + " Manual intervention will be required for this client to successfully authenticate."
-                                + " Exiting refresh thread.", principal, nextRefreshDate);
+                    nextRefreshDate = new Date(nextRefresh);
+                    if (nextRefresh > expiry) {
+                        log.error("[Principal={}]: Next refresh: {} is later than expiry {}. This may indicate a clock skew problem." +
+                            "Check that this host and the KDC hosts' clocks are in sync. Exiting refresh thread.",
+                            principal, nextRefreshDate, expiryDate);
+                        return;
+                    }
+                }
+                if (now < nextRefresh) {
+                    Date until = new Date(nextRefresh);
+                    log.info("[Principal={}]: TGT refresh sleeping until: {}", principal, until);
+                    try {
+                        Thread.sleep(nextRefresh - now);
+                    } catch (InterruptedException ie) {
+                        log.warn("[Principal={}]: TGT renewal thread has been interrupted and will exit.", principal);
                         return;
                     }
-                    if (isUsingTicketCache) {
-                        String kinitArgs = "-R";
-                        int retry = 1;
-                        while (retry >= 0) {
-                            try {
-                                log.debug("[Principal={}]: Running ticket cache refresh command: {} {}", principal, kinitCmd, kinitArgs);
-                                Shell.execCommand(kinitCmd, kinitArgs);
-                                break;
-                            } catch (Exception e) {
-                                if (retry > 0) {
-                                    --retry;
-                                    // sleep for 10 seconds
-                                    try {
-                                        Thread.sleep(10 * 1000);
-                                    } catch (InterruptedException ie) {
-                                        log.error("[Principal={}]: Interrupted while renewing TGT, exiting Login thread", principal);
-                                        return;
-                                    }
-                                } else {
-                                    log.warn("[Principal={}]: Could not renew TGT due to problem running shell command: '{} {}'. " +
-                                            "Exiting refresh thread.", principal, kinitCmd, kinitArgs, e);
+                } else {
+                    log.error("[Principal={}]: NextRefresh: {} is in the past: exiting refresh thread. Check"
+                        + " clock sync between this host and KDC - (KDC's clock is likely ahead of this host)."
+                        + " Manual intervention will be required for this client to successfully authenticate."
+                        + " Exiting refresh thread.", principal, nextRefreshDate);
+                    return;
+                }
+                if (isUsingTicketCache) {
+                    String kinitArgs = "-R";
+                    int retry = 1;
+                    while (retry >= 0) {
+                        try {
+                            log.debug("[Principal={}]: Running ticket cache refresh command: {} {}", principal, kinitCmd, kinitArgs);
+                            Shell.execCommand(kinitCmd, kinitArgs);
+                            break;
+                        } catch (Exception e) {
+                            if (retry > 0) {
+                                --retry;
+                                // sleep for 10 seconds
+                                try {
+                                    Thread.sleep(10 * 1000);
+                                } catch (InterruptedException ie) {
+                                    log.error("[Principal={}]: Interrupted while renewing TGT, exiting Login thread", principal);
                                     return;
                                 }
+                            } else {
+                                log.warn("[Principal={}]: Could not renew TGT due to problem running shell command: '{} {}'. " +
+                                    "Exiting refresh thread.", principal, kinitCmd, kinitArgs, e);
+                                return;
                             }
                         }
                     }
-                    try {
-                        int retry = 1;
-                        while (retry >= 0) {
-                            try {
-                                reLogin();
-                                break;
-                            } catch (LoginException le) {
-                                if (retry > 0) {
-                                    --retry;
-                                    // sleep for 10 seconds.
-                                    try {
-                                        Thread.sleep(10 * 1000);
-                                    } catch (InterruptedException e) {
-                                        log.error("[Principal={}]: Interrupted during login retry after LoginException:", principal, le);
-                                        throw le;
-                                    }
-                                } else {
-                                    log.error("[Principal={}]: Could not refresh TGT.", principal, le);
+                }
+                try {
+                    int retry = 1;
+                    while (retry >= 0) {
+                        try {
+                            reLogin();
+                            break;
+                        } catch (LoginException le) {
+                            if (retry > 0) {
+                                --retry;
+                                // sleep for 10 seconds.
+                                try {
+                                    Thread.sleep(10 * 1000);
+                                } catch (InterruptedException e) {
+                                    log.error("[Principal={}]: Interrupted during login retry after LoginException:", principal, le);
+                                    throw le;
                                 }
+                            } else {
+                                log.error("[Principal={}]: Could not refresh TGT.", principal, le);
                             }
                         }
-                    } catch (LoginException le) {
-                        log.error("[Principal={}]: Failed to refresh TGT: refresh thread exiting now.", principal, le);
-                        return;
                     }
+                } catch (LoginException le) {
+                    log.error("[Principal={}]: Failed to refresh TGT: refresh thread exiting now.", principal, le);
+                    return;
                 }
             }
         });
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
index ac273cc..1dcd199 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
@@ -121,14 +121,14 @@ import org.slf4j.LoggerFactory;
  * <p>
  * Here is a typical, basic JAAS configuration for a client leveraging unsecured
  * SASL/OAUTHBEARER authentication:
- * 
+ *
  * <pre>
  * KafkaClient {
  *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
  *      unsecuredLoginStringClaim_sub="thePrincipalName";
  * };
  * </pre>
- * 
+ *
  * An implementation of the {@link Login} interface specific to the
  * {@code OAUTHBEARER} mechanism is automatically applied; it periodically
  * refreshes any token before it expires so that the client can continue to make
@@ -196,14 +196,14 @@ import org.slf4j.LoggerFactory;
  * <p>
  * Here is a typical, basic JAAS configuration for a broker leveraging unsecured
  * SASL/OAUTHBEARER validation:
- * 
+ *
  * <pre>
  * KafkaServer {
  *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
  *      unsecuredLoginStringClaim_sub="thePrincipalName";
  * };
  * </pre>
- * 
+ *
  * Production use cases will require writing an implementation of
  * {@link AuthenticateCallbackHandler} that can handle an instance of
  * {@link OAuthBearerValidatorCallback} and declaring it via the
@@ -229,7 +229,7 @@ import org.slf4j.LoggerFactory;
  * Better to mitigate this possibility by leaving the existing token (which
  * still has some lifetime left) in place until a new replacement token is
  * actually retrieved. This implementation supports this.
- * 
+ *
  * @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC
  * @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC
  * @see SaslConfigs#SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC
@@ -350,7 +350,7 @@ public class OAuthBearerLoginModule implements LoginModule {
     }
 
     @Override
-    public boolean commit() throws LoginException {
+    public boolean commit() {
         if (tokenRequiringCommit == null) {
             if (log.isDebugEnabled())
                 log.debug("Nothing here to commit");
@@ -371,7 +371,7 @@ public class OAuthBearerLoginModule implements LoginModule {
     }
 
     @Override
-    public boolean abort() throws LoginException {
+    public boolean abort() {
         if (tokenRequiringCommit != null) {
             log.info("Login aborted");
             tokenRequiringCommit = null;
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClient.java
index 16db3c8..e32e16d 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClient.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClient.java
@@ -58,7 +58,7 @@ public class OAuthBearerSaslClient implements SaslClient {
 
     enum State {
         SEND_CLIENT_FIRST_MESSAGE, RECEIVE_SERVER_FIRST_MESSAGE, RECEIVE_SERVER_MESSAGE_AFTER_FAILURE, COMPLETE, FAILED
-    };
+    }
 
     private State state;
 
@@ -127,14 +127,14 @@ public class OAuthBearerSaslClient implements SaslClient {
     }
 
     @Override
-    public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
+    public byte[] unwrap(byte[] incoming, int offset, int len) {
         if (!isComplete())
             throw new IllegalStateException("Authentication exchange has not completed");
         return Arrays.copyOfRange(incoming, offset, offset + len);
     }
 
     @Override
-    public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
+    public byte[] wrap(byte[] outgoing, int offset, int len) {
         if (!isComplete())
             throw new IllegalStateException("Authentication exchange has not completed");
         return Arrays.copyOfRange(outgoing, offset, offset + len);
@@ -148,7 +148,7 @@ public class OAuthBearerSaslClient implements SaslClient {
     }
 
     @Override
-    public void dispose() throws SaslException {
+    public void dispose() {
     }
 
     private void setState(State state) {
@@ -173,7 +173,7 @@ public class OAuthBearerSaslClient implements SaslClient {
     public static class OAuthBearerSaslClientFactory implements SaslClientFactory {
         @Override
         public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol,
-                String serverName, Map<String, ?> props, CallbackHandler callbackHandler) throws SaslException {
+                String serverName, Map<String, ?> props, CallbackHandler callbackHandler) {
             String[] mechanismNamesCompatibleWithPolicy = getMechanismNames(props);
             for (String mechanism : mechanisms) {
                 for (int i = 0; i < mechanismNamesCompatibleWithPolicy.length; i++) {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
index ab2b716..352e6b6 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
@@ -53,7 +53,7 @@ public class OAuthBearerSaslClientCallbackHandler implements AuthenticateCallbac
 
     /**
      * Return true if this instance has been configured, otherwise false
-     * 
+     *
      * @return true if this instance has been configured, otherwise false
      */
     public boolean configured() {
@@ -91,8 +91,8 @@ public class OAuthBearerSaslClientCallbackHandler implements AuthenticateCallbac
             throw new IllegalArgumentException("Callback had a token already");
         Subject subject = Subject.getSubject(AccessController.getContext());
         Set<OAuthBearerToken> privateCredentials = subject != null
-                ? subject.getPrivateCredentials(OAuthBearerToken.class)
-                : Collections.<OAuthBearerToken>emptySet();
+            ? subject.getPrivateCredentials(OAuthBearerToken.class)
+            : Collections.emptySet();
         if (privateCredentials.size() != 1)
             throw new IOException(
                     String.format("Unable to find OAuth Bearer token in Subject's private credentials (size=%d)",
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServer.java
index 8f89c14..db332b4 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServer.java
@@ -128,21 +128,21 @@ public class OAuthBearerSaslServer implements SaslServer {
     }
 
     @Override
-    public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
+    public byte[] unwrap(byte[] incoming, int offset, int len) {
         if (!complete)
             throw new IllegalStateException("Authentication exchange has not completed");
         return Arrays.copyOfRange(incoming, offset, offset + len);
     }
 
     @Override
-    public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
+    public byte[] wrap(byte[] outgoing, int offset, int len) {
         if (!complete)
             throw new IllegalStateException("Authentication exchange has not completed");
         return Arrays.copyOfRange(outgoing, offset, offset + len);
     }
 
     @Override
-    public void dispose() throws SaslException {
+    public void dispose() {
         complete = false;
         tokenForNegotiatedProperty = null;
         extensions = null;
@@ -225,7 +225,7 @@ public class OAuthBearerSaslServer implements SaslServer {
     public static class OAuthBearerSaslServerFactory implements SaslServerFactory {
         @Override
         public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map<String, ?> props,
-                CallbackHandler callbackHandler) throws SaslException {
+                CallbackHandler callbackHandler) {
             String[] mechanismNamesCompatibleWithPolicy = getMechanismNames(props);
             for (int i = 0; i < mechanismNamesCompatibleWithPolicy.length; i++) {
                 if (mechanismNamesCompatibleWithPolicy[i].equals(mechanism)) {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
index b764e54..b5b3016 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
@@ -35,13 +35,12 @@ import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.JsonNodeType;
-import com.fasterxml.jackson.databind.node.NumericNode;
 
 /**
  * A simple unsecured JWS implementation. The '{@code nbf}' claim is ignored if
  * it is given because the related logic is not required for Kafka testing and
  * development purposes.
- * 
+ *
  * @see <a href="https://tools.ietf.org/html/rfc7515">RFC 7515</a>
  */
 public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
@@ -58,7 +57,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
 
     /**
      * Constructor with the given principal and scope claim names
-     * 
+     *
      * @param compactSerialization
      *            the compact serialization to parse as an unsecured JWS
      * @param principalClaimName
@@ -118,7 +117,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
 
     /**
      * Return the 3 or 5 dot-separated sections of the JWT compact serialization
-     * 
+     *
      * @return the 3 or 5 dot-separated sections of the JWT compact serialization
      */
     public List<String> splits() {
@@ -127,7 +126,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
 
     /**
      * Return the JOSE Header as a {@code Map}
-     * 
+     *
      * @return the JOSE header
      */
     public Map<String, Object> header() {
@@ -156,7 +155,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
 
     /**
      * Return the JWT Claim Set as a {@code Map}
-     * 
+     *
      * @return the (always non-null but possibly empty) claims
      */
     public Map<String, Object> claims() {
@@ -165,7 +164,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
 
     /**
      * Return the (always non-null/non-empty) principal claim name
-     * 
+     *
      * @return the (always non-null/non-empty) principal claim name
      */
     public String principalClaimName() {
@@ -174,7 +173,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
 
     /**
      * Return the (always non-null/non-empty) scope claim name
-     * 
+     *
      * @return the (always non-null/non-empty) scope claim name
      */
     public String scopeClaimName() {
@@ -183,7 +182,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
 
     /**
      * Indicate if the claim exists and is the given type
-     * 
+     *
      * @param claimName
      *            the mandatory JWT claim name
      * @param type
@@ -205,7 +204,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
 
     /**
      * Extract a claim of the given type
-     * 
+     *
      * @param claimName
      *            the mandatory JWT claim name
      * @param type
@@ -228,7 +227,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
 
     /**
      * Extract a claim in its raw form
-     * 
+     *
      * @param claimName
      *            the mandatory JWT claim name
      * @return the raw claim value, if it exists, otherwise null
@@ -241,7 +240,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
      * Return the
      * <a href="https://tools.ietf.org/html/rfc7519#section-4.1.4">Expiration
      * Time</a> claim
-     * 
+     *
      * @return the <a href=
      *         "https://tools.ietf.org/html/rfc7519#section-4.1.4">Expiration
      *         Time</a> claim if available, otherwise null
@@ -255,7 +254,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
     /**
      * Return the <a href="https://tools.ietf.org/html/rfc7519#section-4.1.6">Issued
      * At</a> claim
-     * 
+     *
      * @return the
      *         <a href= "https://tools.ietf.org/html/rfc7519#section-4.1.6">Issued
      *         At</a> claim if available, otherwise null
@@ -269,7 +268,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
     /**
      * Return the
      * <a href="https://tools.ietf.org/html/rfc7519#section-4.1.2">Subject</a> claim
-     * 
+     *
      * @return the <a href=
      *         "https://tools.ietf.org/html/rfc7519#section-4.1.2">Subject</a> claim
      *         if available, otherwise null
@@ -284,7 +283,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
      * Decode the given Base64URL-encoded value, parse the resulting JSON as a JSON
      * object, and return the map of member names to their values (each value being
      * represented as either a String, a Number, or a List of Strings).
-     * 
+     *
      * @param split
      *            the value to decode and parse
      * @return the map of JSON member names to their String, Number, or String List
@@ -326,12 +325,11 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
     private static Object convert(JsonNode value) {
         if (value.isArray()) {
             List<String> retvalList = new ArrayList<>();
-            for (JsonNode arrayElement : value) {
+            for (JsonNode arrayElement : value)
                 retvalList.add(arrayElement.asText());
-            }
             return retvalList;
         }
-        return value.getNodeType() == JsonNodeType.NUMBER ? ((NumericNode) value).numberValue() : value.asText();
+        return value.getNodeType() == JsonNodeType.NUMBER ? value.numberValue() : value.asText();
     }
 
     private Long calculateStartTimeMs() throws OAuthBearerIllegalTokenException {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
index 83b2602..8d259e3 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
@@ -79,7 +79,7 @@ import org.slf4j.LoggerFactory;
  * be something other than '{@code scope}'</li>
  * </ul>
  * For example:
- * 
+ *
  * <pre>
  * KafkaClient {
  *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
@@ -89,7 +89,7 @@ import org.slf4j.LoggerFactory;
  *      unsecuredLoginExtension_traceId="123";
  * };
  * </pre>
- * 
+ *
  * This class is the default when the SASL mechanism is OAUTHBEARER and no value
  * is explicitly set via either the {@code sasl.login.callback.handler.class}
  * client configuration property or the
@@ -122,7 +122,7 @@ public class OAuthBearerUnsecuredLoginCallbackHandler implements AuthenticateCal
 
     /**
      * For testing
-     * 
+     *
      * @param time
      *            the mandatory time to set
      */
@@ -132,7 +132,7 @@ public class OAuthBearerUnsecuredLoginCallbackHandler implements AuthenticateCal
 
     /**
      * Return true if this instance has been configured, otherwise false
-     * 
+     *
      * @return true if this instance has been configured, otherwise false
      */
     public boolean configured() {
@@ -179,7 +179,7 @@ public class OAuthBearerUnsecuredLoginCallbackHandler implements AuthenticateCal
         // empty
     }
 
-    private void handleTokenCallback(OAuthBearerTokenCallback callback) throws IOException {
+    private void handleTokenCallback(OAuthBearerTokenCallback callback) {
         if (callback.token() != null)
             throw new IllegalArgumentException("Callback had a token already");
         String principalClaimNameValue = optionValue(PRINCIPAL_CLAIM_NAME_OPTION);
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
index 2e21cf4..54e289c 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common.security.oauthbearer.internals.unsecured;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -59,7 +58,7 @@ import org.slf4j.LoggerFactory;
  * clock skew (the default is 0)</li>
  * <ul>
  * For example:
- * 
+ *
  * <pre>
  * KafkaServer {
  *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
@@ -91,7 +90,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandler implements Authenticat
 
     /**
      * For testing
-     * 
+     *
      * @param time
      *            the mandatory time to set
      */
@@ -101,7 +100,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandler implements Authenticat
 
     /**
      * Return true if this instance has been configured, otherwise false
-     * 
+     *
      * @return true if this instance has been configured, otherwise false
      */
     public boolean configured() {
@@ -124,7 +123,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandler implements Authenticat
     }
 
     @Override
-    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+    public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
         if (!configured())
             throw new IllegalStateException("Callback handler not configured");
         for (Callback callback : callbacks) {
@@ -195,8 +194,8 @@ public class OAuthBearerUnsecuredValidatorCallbackHandler implements Authenticat
     private List<String> requiredScope() {
         String requiredSpaceDelimitedScope = option(REQUIRED_SCOPE_OPTION);
         List<String> requiredScope = requiredSpaceDelimitedScope == null || requiredSpaceDelimitedScope.trim().isEmpty()
-                ? Collections.<String>emptyList()
-                : OAuthBearerScopeUtils.parseScope(requiredSpaceDelimitedScope.trim());
+            ? Collections.emptyList()
+            : OAuthBearerScopeUtils.parseScope(requiredSpaceDelimitedScope.trim());
         return requiredScope;
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainLoginModule.java b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainLoginModule.java
index 6c32ff1..4085168 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainLoginModule.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainLoginModule.java
@@ -22,7 +22,6 @@ import java.util.Map;
 
 import javax.security.auth.Subject;
 import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.login.LoginException;
 import javax.security.auth.spi.LoginModule;
 
 public class PlainLoginModule implements LoginModule {
@@ -45,22 +44,22 @@ public class PlainLoginModule implements LoginModule {
     }
 
     @Override
-    public boolean login() throws LoginException {
+    public boolean login() {
         return true;
     }
 
     @Override
-    public boolean logout() throws LoginException {
+    public boolean logout() {
         return true;
     }
 
     @Override
-    public boolean commit() throws LoginException {
+    public boolean commit() {
         return true;
     }
 
     @Override
-    public boolean abort() throws LoginException {
+    public boolean abort() {
         return false;
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainSaslServer.java
index 054d411..18df038 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainSaslServer.java
@@ -143,21 +143,21 @@ public class PlainSaslServer implements SaslServer {
     }
 
     @Override
-    public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
+    public byte[] unwrap(byte[] incoming, int offset, int len) {
         if (!complete)
             throw new IllegalStateException("Authentication exchange has not completed");
         return Arrays.copyOfRange(incoming, offset, offset + len);
     }
 
     @Override
-    public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
+    public byte[] wrap(byte[] outgoing, int offset, int len) {
         if (!complete)
             throw new IllegalStateException("Authentication exchange has not completed");
         return Arrays.copyOfRange(outgoing, offset, offset + len);
     }
 
     @Override
-    public void dispose() throws SaslException {
+    public void dispose() {
     }
 
     public static class PlainSaslServerFactory implements SaslServerFactory {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
index 3d7de58..104b3fc 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
@@ -24,7 +24,6 @@ import java.util.Map;
 
 import javax.security.auth.Subject;
 import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.login.LoginException;
 import javax.security.auth.spi.LoginModule;
 
 public class ScramLoginModule implements LoginModule {
@@ -55,22 +54,22 @@ public class ScramLoginModule implements LoginModule {
     }
 
     @Override
-    public boolean login() throws LoginException {
+    public boolean login() {
         return true;
     }
 
     @Override
-    public boolean logout() throws LoginException {
+    public boolean logout() {
         return true;
     }
 
     @Override
-    public boolean commit() throws LoginException {
+    public boolean commit() {
         return true;
     }
 
     @Override
-    public boolean abort() throws LoginException {
+    public boolean abort() {
         return false;
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
index 7b51890..439bcfe 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
@@ -26,7 +26,7 @@ import java.util.Map;
 public class ScramExtensions extends SaslExtensions {
 
     public ScramExtensions() {
-        this(Collections.<String, String>emptyMap());
+        this(Collections.emptyMap());
     }
 
     public ScramExtensions(String extensions) {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java
index bd3b704..c21a52e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java
@@ -59,7 +59,7 @@ public class ScramSaslClient implements SaslClient {
         RECEIVE_SERVER_FINAL_MESSAGE,
         COMPLETE,
         FAILED
-    };
+    }
 
     private final ScramMechanism mechanism;
     private final CallbackHandler callbackHandler;
@@ -157,14 +157,14 @@ public class ScramSaslClient implements SaslClient {
     }
 
     @Override
-    public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
+    public byte[] unwrap(byte[] incoming, int offset, int len) {
         if (!isComplete())
             throw new IllegalStateException("Authentication exchange has not completed");
         return Arrays.copyOfRange(incoming, offset, offset + len);
     }
 
     @Override
-    public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
+    public byte[] wrap(byte[] outgoing, int offset, int len) {
         if (!isComplete())
             throw new IllegalStateException("Authentication exchange has not completed");
         return Arrays.copyOfRange(outgoing, offset, offset + len);
@@ -178,7 +178,7 @@ public class ScramSaslClient implements SaslClient {
     }
 
     @Override
-    public void dispose() throws SaslException {
+    public void dispose() {
     }
 
     private void setState(State state) {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java
index b11300a..217dab7 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java
@@ -62,7 +62,7 @@ public class ScramSaslServer implements SaslServer {
         RECEIVE_CLIENT_FINAL_MESSAGE,
         COMPLETE,
         FAILED
-    };
+    }
 
     private final ScramMechanism mechanism;
     private final ScramFormatter formatter;
@@ -194,21 +194,21 @@ public class ScramSaslServer implements SaslServer {
     }
 
     @Override
-    public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
+    public byte[] unwrap(byte[] incoming, int offset, int len) {
         if (!isComplete())
             throw new IllegalStateException("Authentication exchange has not completed");
         return Arrays.copyOfRange(incoming, offset, offset + len);
     }
 
     @Override
-    public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
+    public byte[] wrap(byte[] outgoing, int offset, int len) {
         if (!isComplete())
             throw new IllegalStateException("Authentication exchange has not completed");
         return Arrays.copyOfRange(outgoing, offset, offset + len);
     }
 
     @Override
-    public void dispose() throws SaslException {
+    public void dispose() {
     }
 
     private void setState(State state) {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramServerCallbackHandler.java
index f6f5315..63c2b17 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramServerCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramServerCallbackHandler.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common.security.scram.internals;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
@@ -50,7 +49,7 @@ public class ScramServerCallbackHandler implements AuthenticateCallbackHandler {
     }
 
     @Override
-    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+    public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
         String username = null;
         for (Callback callback : callbacks) {
             if (callback instanceof NameCallback)
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index 61f2f0d..b1f7df8 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -447,7 +447,7 @@ public class SslFactory implements Reconfigurable {
         private final Principal subjectPrincipal;
         private final Set<List<?>> subjectAltNames;
 
-        static List<CertificateEntries> create(KeyStore keystore) throws GeneralSecurityException, IOException {
+        static List<CertificateEntries> create(KeyStore keystore) throws GeneralSecurityException {
             Enumeration<String> aliases = keystore.aliases();
             List<CertificateEntries> entries = new ArrayList<>();
             while (aliases.hasMoreElements()) {
@@ -463,7 +463,7 @@ public class SslFactory implements Reconfigurable {
             this.subjectPrincipal = cert.getSubjectX500Principal();
             Collection<List<?>> altNames = cert.getSubjectAlternativeNames();
             // use a set for comparison
-            this.subjectAltNames = altNames != null ? new HashSet<>(altNames) : Collections.<List<?>>emptySet();
+            this.subjectAltNames = altNames != null ? new HashSet<>(altNames) : Collections.emptySet();
         }
 
         @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
index ffd2af3..8f360ca 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
@@ -87,9 +87,7 @@ public class TokenInformation {
     }
 
     public boolean ownerOrRenewer(KafkaPrincipal principal) {
-        if (owner.equals(principal) || renewers.contains(principal))
-            return true;
-        return false;
+        return owner.equals(principal) || renewers.contains(principal);
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
index 0a17ecd..17c4ba1 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
@@ -99,8 +99,8 @@ public class AppInfoParser {
     }
 
     public interface AppInfoMBean {
-        public String getVersion();
-        public String getCommitId();
+        String getVersion();
+        String getCommitId();
     }
 
     public static class AppInfo implements AppInfoMBean {
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index a6d3e2c..f94858c 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -772,7 +772,7 @@ public final class Utils {
      * @return
      */
     public static <T> List<T> safe(List<T> other) {
-        return other == null ? Collections.<T>emptyList() : other;
+        return other == null ? Collections.emptyList() : other;
     }
 
    /**
diff --git a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaEntity.java b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaEntity.java
index a5ff082..0f0eb62 100644
--- a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaEntity.java
+++ b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaEntity.java
@@ -28,7 +28,7 @@ public interface ClientQuotaEntity {
     /**
      * Entity type of a {@link ConfigEntity}
      */
-    public enum ConfigEntityType {
+    enum ConfigEntityType {
         USER,
         CLIENT_ID,
         DEFAULT_USER,
@@ -41,7 +41,7 @@ public interface ClientQuotaEntity {
      * For example, {user, client-id} quota is represented using two
      * instances of ConfigEntity with entity types USER and CLIENT_ID.
      */
-    public interface ConfigEntity {
+    interface ConfigEntity {
         /**
          * Returns the name of this entity. For default quotas, an empty string is returned.
          */
diff --git a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
index 8e07e73..8eafba7 100644
--- a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
@@ -46,7 +46,7 @@ public class ResourceTypeTest {
     };
 
     @Test
-    public void testIsUnknown() throws Exception {
+    public void testIsUnknown() {
         for (AclResourceTypeTestInfo info : INFOS) {
             assertEquals(info.resourceType + " was supposed to have unknown == " + info.unknown,
                 info.unknown, info.resourceType.isUnknown());
@@ -54,7 +54,7 @@ public class ResourceTypeTest {
     }
 
     @Test
-    public void testCode() throws Exception {
+    public void testCode() {
         assertEquals(ResourceType.values().length, INFOS.length);
         for (AclResourceTypeTestInfo info : INFOS) {
             assertEquals(info.resourceType + " was supposed to have code == " + info.code,
@@ -66,7 +66,7 @@ public class ResourceTypeTest {
     }
 
     @Test
-    public void testName() throws Exception {
+    public void testName() {
         for (AclResourceTypeTestInfo info : INFOS) {
             assertEquals("ResourceType.fromString(" + info.name + ") was supposed to be " +
                 info.resourceType, info.resourceType, ResourceType.fromString(info.name));
@@ -75,7 +75,7 @@ public class ResourceTypeTest {
     }
 
     @Test
-    public void testExhaustive() throws Exception {
+    public void testExhaustive() {
         assertEquals(INFOS.length, ResourceType.values().length);
         for (int i = 0; i < INFOS.length; i++) {
             assertEquals(INFOS[i].resourceType, ResourceType.values()[i]);
diff --git a/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
index d96b359..8b8c251 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
@@ -190,38 +190,38 @@ public class JaasContextTest {
     @Test
     public void testLoadForServerWithListenerNameOverride() throws IOException {
         writeConfiguration(Arrays.asList(
-                "KafkaServer { test.LoginModuleDefault required; };",
-                "plaintext.KafkaServer { test.LoginModuleOverride requisite; };"
+            "KafkaServer { test.LoginModuleDefault required; };",
+            "plaintext.KafkaServer { test.LoginModuleOverride requisite; };"
         ));
         JaasContext context = JaasContext.loadServerContext(new ListenerName("plaintext"),
-                "SOME-MECHANISM", Collections.<String, Object>emptyMap());
+            "SOME-MECHANISM", Collections.emptyMap());
         assertEquals("plaintext.KafkaServer", context.name());
         assertEquals(JaasContext.Type.SERVER, context.type());
         assertEquals(1, context.configurationEntries().size());
         checkEntry(context.configurationEntries().get(0), "test.LoginModuleOverride",
-                LoginModuleControlFlag.REQUISITE, Collections.<String, Object>emptyMap());
+            LoginModuleControlFlag.REQUISITE, Collections.emptyMap());
     }
 
     @Test
     public void testLoadForServerWithListenerNameAndFallback() throws IOException {
         writeConfiguration(Arrays.asList(
-                "KafkaServer { test.LoginModule required; };",
-                "other.KafkaServer { test.LoginModuleOther requisite; };"
+            "KafkaServer { test.LoginModule required; };",
+            "other.KafkaServer { test.LoginModuleOther requisite; };"
         ));
         JaasContext context = JaasContext.loadServerContext(new ListenerName("plaintext"),
-                "SOME-MECHANISM", Collections.<String, Object>emptyMap());
+            "SOME-MECHANISM", Collections.emptyMap());
         assertEquals("KafkaServer", context.name());
         assertEquals(JaasContext.Type.SERVER, context.type());
         assertEquals(1, context.configurationEntries().size());
         checkEntry(context.configurationEntries().get(0), "test.LoginModule", LoginModuleControlFlag.REQUIRED,
-                Collections.<String, Object>emptyMap());
+            Collections.emptyMap());
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testLoadForServerWithWrongListenerName() throws IOException {
         writeConfiguration("Server", "test.LoginModule required;");
         JaasContext.loadServerContext(new ListenerName("plaintext"), "SOME-MECHANISM",
-                Collections.<String, Object>emptyMap());
+            Collections.emptyMap());
     }
 
     private AppConfigurationEntry configurationEntry(JaasContext.Type contextType, String jaasConfigProp) {
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
index 2fce4c5..938fe94 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
@@ -128,7 +128,7 @@ public class ClientAuthenticationFailureTest {
     }
 
     @Test
-    public void testTransactionalProducerWithInvalidCredentials() throws Exception {
+    public void testTransactionalProducerWithInvalidCredentials() {
         Map<String, Object> props = new HashMap<>(saslClientConfigs);
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port());
         props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txclient-1");
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java
index 5436b2a..34ff881 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java
@@ -56,7 +56,7 @@ public class LoginManagerTest {
     public void testClientLoginManager() throws Exception {
         Map<String, ?> configs = Collections.singletonMap("sasl.jaas.config", dynamicPlainContext);
         JaasContext dynamicContext = JaasContext.loadClientContext(configs);
-        JaasContext staticContext = JaasContext.loadClientContext(Collections.<String, Object>emptyMap());
+        JaasContext staticContext = JaasContext.loadClientContext(Collections.emptyMap());
 
         LoginManager dynamicLogin = LoginManager.acquireLoginManager(dynamicContext, "PLAIN",
                 DefaultLogin.class, configs);
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 74058eb..dfefabb 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -830,7 +830,7 @@ public class SaslAuthenticatorTest {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
         TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Collections.singletonList("PLAIN"));
         jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_CLIENT, TestPlainLoginModule.class.getName(),
-                Collections.<String, Object>emptyMap());
+                Collections.emptyMap());
         server = createEchoServer(securityProtocol);
 
         // Connection should succeed using login callback override that sets correct username/password
@@ -854,7 +854,7 @@ public class SaslAuthenticatorTest {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
         TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Collections.singletonList("PLAIN"));
         jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER, TestPlainLoginModule.class.getName(),
-                Collections.<String, Object>emptyMap());
+                Collections.emptyMap());
         jaasConfig.setClientOptions("PLAIN", TestServerCallbackHandler.USERNAME, TestServerCallbackHandler.PASSWORD);
         ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
         String prefix = listenerName.saslMechanismConfigPrefix("PLAIN");
@@ -996,7 +996,7 @@ public class SaslAuthenticatorTest {
                 TestJaasConfig.jaasConfigProperty("PLAIN", serverOptions));
         TestJaasConfig staticJaasConfig = new TestJaasConfig();
         staticJaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER, PlainLoginModule.class.getName(),
-                Collections.<String, Object>emptyMap());
+                Collections.emptyMap());
         staticJaasConfig.setClientOptions("PLAIN", "user1", "user1-secret");
         Configuration.setConfiguration(staticJaasConfig);
         server = createEchoServer(securityProtocol);
@@ -1544,7 +1544,7 @@ public class SaslAuthenticatorTest {
         }
 
         @Override
-        protected boolean authenticate(String username, char[] password) throws IOException {
+        protected boolean authenticate(String username, char[] password) {
             if (!configured)
                 throw new IllegalStateException("Server callback handler not configured");
             return USERNAME.equals(username) && new String(password).equals(PASSWORD);
@@ -1594,7 +1594,7 @@ public class SaslAuthenticatorTest {
         }
 
         @Override
-        public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+        public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
             if (!configured)
                 throw new IllegalStateException("Client callback handler not configured");
             for (Callback callback : callbacks) {
@@ -1665,7 +1665,7 @@ public class SaslAuthenticatorTest {
         }
 
         @Override
-        public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+        public void handle(Callback[] callbacks) {
             if (!configured)
                 throw new IllegalStateException("Login callback handler not configured");
 
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index b70d592..1ae83ee 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -55,7 +55,7 @@ public class SaslServerAuthenticatorTest {
         final Capture<ByteBuffer> size = EasyMock.newCapture();
         EasyMock.expect(transportLayer.read(EasyMock.capture(size))).andAnswer(new IAnswer<Integer>() {
             @Override
-            public Integer answer() throws Throwable {
+            public Integer answer() {
                 size.getValue().putInt(SaslServerAuthenticator.MAX_RECEIVE_SIZE + 1);
                 return 4;
             }
@@ -79,7 +79,7 @@ public class SaslServerAuthenticatorTest {
         final Capture<ByteBuffer> size = EasyMock.newCapture();
         EasyMock.expect(transportLayer.read(EasyMock.capture(size))).andAnswer(new IAnswer<Integer>() {
             @Override
-            public Integer answer() throws Throwable {
+            public Integer answer() {
                 size.getValue().putInt(headerStruct.sizeOf());
                 return 4;
             }
@@ -88,7 +88,7 @@ public class SaslServerAuthenticatorTest {
         final Capture<ByteBuffer> payload = EasyMock.newCapture();
         EasyMock.expect(transportLayer.read(EasyMock.capture(payload))).andAnswer(new IAnswer<Integer>() {
             @Override
-            public Integer answer() throws Throwable {
+            public Integer answer() {
                 // serialize only the request header. the authenticator should not parse beyond this
                 headerStruct.writeTo(payload.getValue());
                 return headerStruct.sizeOf();
@@ -111,7 +111,7 @@ public class SaslServerAuthenticatorTest {
         Map<String, JaasContext> jaasContexts = Collections.singletonMap(mechanism,
                 new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig, null));
         Map<String, Subject> subjects = Collections.singletonMap(mechanism, new Subject());
-        Map<String, AuthenticateCallbackHandler> callbackHandlers = Collections.<String, AuthenticateCallbackHandler>singletonMap(
+        Map<String, AuthenticateCallbackHandler> callbackHandlers = Collections.singletonMap(
                 mechanism, new SaslServerCallbackHandler());
         return new SaslServerAuthenticator(configs, callbackHandlers, "node", subjects, null,
                 new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer);
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java
index 97b0b27..c27e853 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java
@@ -16,14 +16,12 @@
  */
 package org.apache.kafka.common.security.authenticator;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.NameCallback;
 import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.auth.login.AppConfigurationEntry;
 import javax.security.sasl.AuthorizeCallback;
 import javax.security.sasl.RealmCallback;
@@ -45,7 +43,7 @@ public class TestDigestLoginModule extends PlainLoginModule {
         }
 
         @Override
-        public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+        public void handle(Callback[] callbacks) {
             String username = null;
             for (Callback callback : callbacks) {
                 if (callback instanceof NameCallback) {
diff --git a/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java b/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java
index 1acd80c..687f937 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java
@@ -87,7 +87,7 @@ public class KerberosNameTest {
     }
 
     @Test
-    public void testInvalidRules() throws Exception {
+    public void testInvalidRules() {
         testInvalidRule(Arrays.asList("default"));
         testInvalidRule(Arrays.asList("DEFAUL"));
         testInvalidRule(Arrays.asList("DEFAULT/L"));
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java
index a9620fa..51d6012 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java
@@ -134,14 +134,14 @@ public class OAuthBearerLoginModuleTest {
 
         // Create login modules
         OAuthBearerLoginModule loginModule1 = new OAuthBearerLoginModule();
-        loginModule1.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
-                Collections.<String, Object>emptyMap());
+        loginModule1.initialize(subject, testTokenCallbackHandler, Collections.emptyMap(),
+                Collections.emptyMap());
         OAuthBearerLoginModule loginModule2 = new OAuthBearerLoginModule();
-        loginModule2.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
-                Collections.<String, Object>emptyMap());
+        loginModule2.initialize(subject, testTokenCallbackHandler, Collections.emptyMap(),
+                Collections.emptyMap());
         OAuthBearerLoginModule loginModule3 = new OAuthBearerLoginModule();
-        loginModule3.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
-                Collections.<String, Object>emptyMap());
+        loginModule3.initialize(subject, testTokenCallbackHandler, Collections.emptyMap(),
+                Collections.emptyMap());
 
         // Should start with nothing
         assertEquals(0, privateCredentials.size());
@@ -230,11 +230,11 @@ public class OAuthBearerLoginModuleTest {
 
         // Create login modules
         OAuthBearerLoginModule loginModule1 = new OAuthBearerLoginModule();
-        loginModule1.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
-                Collections.<String, Object>emptyMap());
+        loginModule1.initialize(subject, testTokenCallbackHandler, Collections.emptyMap(),
+                Collections.emptyMap());
         OAuthBearerLoginModule loginModule2 = new OAuthBearerLoginModule();
-        loginModule2.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
-                Collections.<String, Object>emptyMap());
+        loginModule2.initialize(subject, testTokenCallbackHandler, Collections.emptyMap(),
+                Collections.emptyMap());
 
         // Should start with nothing
         assertEquals(0, privateCredentials.size());
@@ -290,8 +290,8 @@ public class OAuthBearerLoginModuleTest {
 
         // Create login module
         OAuthBearerLoginModule loginModule = new OAuthBearerLoginModule();
-        loginModule.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
-                Collections.<String, Object>emptyMap());
+        loginModule.initialize(subject, testTokenCallbackHandler, Collections.emptyMap(),
+                Collections.emptyMap());
 
         // Should start with nothing
         assertEquals(0, privateCredentials.size());
@@ -342,14 +342,14 @@ public class OAuthBearerLoginModuleTest {
 
         // Create login modules
         OAuthBearerLoginModule loginModule1 = new OAuthBearerLoginModule();
-        loginModule1.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
-                Collections.<String, Object>emptyMap());
+        loginModule1.initialize(subject, testTokenCallbackHandler, Collections.emptyMap(),
+                Collections.emptyMap());
         OAuthBearerLoginModule loginModule2 = new OAuthBearerLoginModule();
-        loginModule2.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
-                Collections.<String, Object>emptyMap());
+        loginModule2.initialize(subject, testTokenCallbackHandler, Collections.emptyMap(),
+                Collections.emptyMap());
         OAuthBearerLoginModule loginModule3 = new OAuthBearerLoginModule();
-        loginModule3.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
-                Collections.<String, Object>emptyMap());
+        loginModule3.initialize(subject, testTokenCallbackHandler, Collections.emptyMap(),
+                Collections.emptyMap());
 
         // Should start with nothing
         assertEquals(0, privateCredentials.size());
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
index 5df7968..d656608 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
@@ -194,7 +194,7 @@ public class OAuthBearerSaslServerTest {
     }
 
     private byte[] clientInitialResponse(String authorizationId, boolean illegalToken)
-            throws OAuthBearerConfigException, IOException, UnsupportedCallbackException, LoginException {
+            throws OAuthBearerConfigException, IOException, UnsupportedCallbackException {
         return clientInitialResponse(authorizationId, false, Collections.emptyMap());
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandlerTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandlerTest.java
index be01fe3..34b8209 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandlerTest.java
@@ -29,7 +29,6 @@ import java.util.Map;
 
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.auth.login.LoginException;
 
 import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
 import org.apache.kafka.common.security.authenticator.TestJaasConfig;
@@ -91,7 +90,7 @@ public class OAuthBearerUnsecuredLoginCallbackHandlerTest {
     @SuppressWarnings("unchecked")
     @Test
     public void validOptionsWithExplicitOptionValues()
-            throws IOException, UnsupportedCallbackException, LoginException {
+            throws IOException, UnsupportedCallbackException {
         String explicitScope1 = "scope1";
         String explicitScope2 = "scope2";
         String explicitScopeClaimName = "putScopeInHere";
@@ -142,7 +141,7 @@ public class OAuthBearerUnsecuredLoginCallbackHandlerTest {
                 (Map) options);
         OAuthBearerUnsecuredLoginCallbackHandler callbackHandler = new OAuthBearerUnsecuredLoginCallbackHandler();
         callbackHandler.time(mockTime);
-        callbackHandler.configure(Collections.<String, Object>emptyMap(), OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
+        callbackHandler.configure(Collections.emptyMap(), OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
                 Arrays.asList(config.getAppConfigurationEntry("KafkaClient")[0]));
         return callbackHandler;
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
index c945af0..2449a83 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
@@ -73,7 +73,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandlerTest {
     }
 
     @Test
-    public void validToken() throws IOException, UnsupportedCallbackException {
+    public void validToken() {
         for (final boolean includeOptionalIssuedAtClaim : new boolean[] {true, false}) {
             String claimsJson = "{" + PRINCIPAL_CLAIM_TEXT + comma(EXPIRATION_TIME_CLAIM_TEXT)
                     + (includeOptionalIssuedAtClaim ? comma(ISSUED_AT_CLAIM_TEXT) : "") + "}";
@@ -101,7 +101,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandlerTest {
     }
 
     @Test
-    public void includesRequiredScope() throws IOException, UnsupportedCallbackException {
+    public void includesRequiredScope() {
         String claimsJson = "{" + SUB_CLAIM_TEXT + comma(EXPIRATION_TIME_CLAIM_TEXT) + comma(SCOPE_CLAIM_TEXT) + "}";
         Object validationResult = validationResult(UNSECURED_JWT_HEADER_JSON, claimsJson,
                 MODULE_OPTIONS_MAP_REQUIRE_EXISTING_SCOPE);
@@ -124,7 +124,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandlerTest {
 
     private static void confirmFailsValidation(String headerJson, String claimsJson,
             Map<String, String> moduleOptionsMap, String optionalFailureScope) throws OAuthBearerConfigException,
-            OAuthBearerIllegalTokenException, IOException, UnsupportedCallbackException {
+            OAuthBearerIllegalTokenException {
         Object validationResultObj = validationResult(headerJson, claimsJson, moduleOptionsMap);
         assertTrue(validationResultObj instanceof OAuthBearerValidatorCallback);
         OAuthBearerValidatorCallback callback = (OAuthBearerValidatorCallback) validationResultObj;
@@ -159,7 +159,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandlerTest {
         config.createOrUpdateEntry("KafkaClient", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule",
                 (Map) options);
         OAuthBearerUnsecuredValidatorCallbackHandler callbackHandler = new OAuthBearerUnsecuredValidatorCallbackHandler();
-        callbackHandler.configure(Collections.<String, Object>emptyMap(), OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
+        callbackHandler.configure(Collections.emptyMap(), OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
                 Arrays.asList(config.getAppConfigurationEntry("KafkaClient")[0]));
         return callbackHandler;
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerValidationUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerValidationUtilsTest.java
index 2a21864..7477dbc 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerValidationUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerValidationUtilsTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.security.oauthbearer.internals.unsecured;
 
 import static org.junit.Assert.assertTrue;
 
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Base64;
@@ -160,18 +159,18 @@ public class OAuthBearerValidationUtilsTest {
 
     @SuppressWarnings({"unchecked", "rawtypes"})
     @Test
-    public void validateScope() throws IOException {
+    public void validateScope() {
         long nowMs = TIME.milliseconds();
         double nowClaimValue = ((double) nowMs) / 1000;
         final List<String> noScope = Collections.emptyList();
         final List<String> scope1 = Arrays.asList("scope1");
         final List<String> scope1And2 = Arrays.asList("scope1", "scope2");
         for (boolean actualScopeExists : new boolean[] {true, false}) {
-            List<? extends List> scopes = !actualScopeExists ? Arrays.<List>asList((List) null)
+            List<? extends List> scopes = !actualScopeExists ? Arrays.asList((List) null)
                     : Arrays.asList(noScope, scope1, scope1And2);
             for (List<String> actualScope : scopes) {
                 for (boolean requiredScopeExists : new boolean[] {true, false}) {
-                    List<? extends List> requiredScopes = !requiredScopeExists ? Arrays.<List>asList((List) null)
+                    List<? extends List> requiredScopes = !requiredScopeExists ? Arrays.asList((List) null)
                             : Arrays.asList(noScope, scope1, scope1And2);
                     for (List<String> requiredScope : requiredScopes) {
                         StringBuilder sb = new StringBuilder("{");
diff --git a/clients/src/test/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerTest.java b/clients/src/test/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerTest.java
index bc73e9f..b8c77df 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerTest.java
@@ -40,7 +40,7 @@ public class PlainSaslServerTest {
     private PlainSaslServer saslServer;
 
     @Before
-    public void setUp() throws Exception {
+    public void setUp() {
         TestJaasConfig jaasConfig = new TestJaasConfig();
         Map<String, Object> options = new HashMap<>();
         options.put("user_" + USER_A, PASSWORD_A);
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/internals/ScramMessagesTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/internals/ScramMessagesTest.java
index f171624..1f60efa 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/scram/internals/ScramMessagesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/internals/ScramMessagesTest.java
@@ -71,7 +71,7 @@ public class ScramMessagesTest {
     @Test
     public void validClientFirstMessage() throws SaslException {
         String nonce = formatter.secureRandomString();
-        ClientFirstMessage m = new ClientFirstMessage("someuser", nonce, Collections.<String, String>emptyMap());
+        ClientFirstMessage m = new ClientFirstMessage("someuser", nonce, Collections.emptyMap());
         checkClientFirstMessage(m, "someuser", nonce, "");
 
         // Default format used by Kafka client: only user and nonce are specified
@@ -116,7 +116,7 @@ public class ScramMessagesTest {
     }
 
     @Test
-    public void invalidClientFirstMessage() throws SaslException {
+    public void invalidClientFirstMessage() {
         String nonce = formatter.secureRandomString();
         // Invalid entry in gs2-header
         String invalid = String.format("n,x=something,n=testuser,r=%s", nonce);
@@ -166,7 +166,7 @@ public class ScramMessagesTest {
     }
 
     @Test
-    public void invalidServerFirstMessage() throws SaslException {
+    public void invalidServerFirstMessage() {
         String nonce = formatter.secureRandomString();
         String salt = randomBytesAsString();
 
@@ -221,7 +221,7 @@ public class ScramMessagesTest {
     }
 
     @Test
-    public void invalidClientFinalMessage() throws SaslException {
+    public void invalidClientFinalMessage() {
         String nonce = formatter.secureRandomString();
         String channelBinding = randomBytesAsString();
         String proof = randomBytesAsString();
@@ -272,7 +272,7 @@ public class ScramMessagesTest {
     }
 
     @Test
-    public void invalidServerFinalMessage() throws SaslException {
+    public void invalidServerFinalMessage() {
         String serverSignature = randomBytesAsString();
 
         // Invalid error
@@ -299,7 +299,7 @@ public class ScramMessagesTest {
 
     private byte[] toBytes(String base64Str) {
         return Base64.getDecoder().decode(base64Str);
-    };
+    }
 
     private void checkClientFirstMessage(ClientFirstMessage message, String saslName, String nonce, String authzid) {
         assertEquals(saslName, message.saslName());
diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index 134882f..38bdbfe 100644
--- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -38,7 +38,7 @@ public class SerializationTest {
         {
             put(String.class, Arrays.asList("my string"));
             put(Short.class, Arrays.asList((short) 32767, (short) -32768));
-            put(Integer.class, Arrays.asList((int) 423412424, (int) -41243432));
+            put(Integer.class, Arrays.asList(423412424, -41243432));
             put(Long.class, Arrays.asList(922337203685477580L, -922337203685477581L));
             put(Float.class, Arrays.asList(5678567.12312f, -5678567.12341f));
             put(Double.class, Arrays.asList(5678567.12312d, -5678567.12341d));
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashSetTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashSetTest.java
index 20084a2..249f6f8 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashSetTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashSetTest.java
@@ -87,7 +87,7 @@ public class ImplicitLinkedHashSetTest {
     }
 
     @Test
-    public void testInsertDelete() throws Exception {
+    public void testInsertDelete() {
         ImplicitLinkedHashSet<TestElement> set = new ImplicitLinkedHashSet<>(100);
         assertTrue(set.add(new TestElement(1)));
         TestElement second = new TestElement(2);
@@ -137,7 +137,7 @@ public class ImplicitLinkedHashSetTest {
     }
 
     @Test
-    public void testTraversal() throws Exception {
+    public void testTraversal() {
         ImplicitLinkedHashSet<TestElement> set = new ImplicitLinkedHashSet<>(100);
         expectTraversal(set.iterator());
         assertTrue(set.add(new TestElement(2)));
@@ -168,7 +168,7 @@ public class ImplicitLinkedHashSetTest {
     }
 
     @Test
-    public void testCollisions() throws Exception {
+    public void testCollisions() {
         ImplicitLinkedHashSet<TestElement> set = new ImplicitLinkedHashSet<>(5);
         assertEquals(11, set.numSlots());
         assertTrue(set.add(new TestElement(11)));
@@ -184,7 +184,7 @@ public class ImplicitLinkedHashSetTest {
     }
 
     @Test
-    public void testEnlargement() throws Exception {
+    public void testEnlargement() {
         ImplicitLinkedHashSet<TestElement> set = new ImplicitLinkedHashSet<>(5);
         assertEquals(11, set.numSlots());
         for (int i = 0; i < 6; i++) {
@@ -203,7 +203,7 @@ public class ImplicitLinkedHashSetTest {
     }
 
     @Test
-    public void testManyInsertsAndDeletes() throws Exception {
+    public void testManyInsertsAndDeletes() {
         Random random = new Random(123);
         LinkedHashSet<Integer> existing = new LinkedHashSet<>();
         ImplicitLinkedHashSet<TestElement> set = new ImplicitLinkedHashSet<>();
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
index 7134f0a..89ece60 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
@@ -92,7 +92,7 @@ public class MockScheduler implements Scheduler, MockTime.MockTimeListener {
             public Void apply(final Long now) {
                 executor.submit(new Callable<Void>() {
                     @Override
-                    public Void call() throws Exception {
+                    public Void call() {
                         // Note: it is possible that we'll execute Callable#call right after
                         // the future is cancelled.  This is a valid sequence of events
                         // that the author of the Callable needs to be able to handle.
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockTimeTest.java b/clients/src/test/java/org/apache/kafka/common/utils/MockTimeTest.java
index 58bcb19..7bd302b 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockTimeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockTimeTest.java
@@ -30,7 +30,7 @@ public class MockTimeTest {
     final public Timeout globalTimeout = Timeout.millis(120000);
 
     @Test
-    public void testAdvanceClock() throws Exception {
+    public void testAdvanceClock() {
         MockTime time = new MockTime(0, 100, 200);
         Assert.assertEquals(100, time.milliseconds());
         Assert.assertEquals(200, time.nanoseconds());
@@ -40,7 +40,7 @@ public class MockTimeTest {
     }
 
     @Test
-    public void testAutoTickMs() throws Exception {
+    public void testAutoTickMs() {
         MockTime time = new MockTime(1, 100, 200);
         Assert.assertEquals(101, time.milliseconds());
         Assert.assertEquals(2000200, time.nanoseconds());
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/SanitizerTest.java b/clients/src/test/java/org/apache/kafka/common/utils/SanitizerTest.java
index 59ac6c0..bba19a8 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/SanitizerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/SanitizerTest.java
@@ -20,7 +20,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.io.UnsupportedEncodingException;
 import java.lang.management.ManagementFactory;
 
 import javax.management.MBeanException;
@@ -34,7 +33,7 @@ import org.junit.Test;
 public class SanitizerTest {
 
     @Test
-    public void testSanitize() throws UnsupportedEncodingException {
+    public void testSanitize() {
         String principal = "CN=Some characters !@#$%&*()_-+=';:,/~";
         String sanitizedPrincipal = Sanitizer.sanitize(principal);
         assertTrue(sanitizedPrincipal.replace('%', '_').matches("[a-zA-Z0-9\\._\\-]+"));
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ShellTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ShellTest.java
index d77f4d0..7bd96f0 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/ShellTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ShellTest.java
@@ -49,7 +49,7 @@ public class ShellTest {
     private final static String NONEXISTENT_PATH = "/dev/a/path/that/does/not/exist/in/the/filesystem";
 
     @Test
-    public void testAttemptToRunNonExistentProgram() throws Exception {
+    public void testAttemptToRunNonExistentProgram() {
         assumeTrue(!OperatingSystem.IS_WINDOWS);
         try {
             Shell.execCommand(NONEXISTENT_PATH);
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 4d1d830..b258a34 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -106,7 +106,7 @@ public class UtilsTest {
         assertEquals("1", Utils.join(Arrays.asList("1"), ","));
         assertEquals("1,2,3", Utils.join(Arrays.asList(1, 2, 3), ","));
     }
-    
+
     @Test
     public void testAbs() {
         assertEquals(0, Utils.abs(Integer.MIN_VALUE));
@@ -365,7 +365,7 @@ public class UtilsTest {
         EasyMock.expect(channelMock.size()).andReturn((long) fileChannelContent.length());
         EasyMock.expect(channelMock.read(EasyMock.anyObject(ByteBuffer.class), EasyMock.anyInt())).andAnswer(new IAnswer<Integer>() {
             @Override
-            public Integer answer() throws Throwable {
+            public Integer answer() {
                 ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0];
                 buffer.put(fileChannelContent.getBytes());
                 return -1;
@@ -399,7 +399,7 @@ public class UtilsTest {
             final StringBuilder sb = new StringBuilder();
             EasyMock.expect(channelMock.read(EasyMock.anyObject(ByteBuffer.class), EasyMock.anyInt())).andAnswer(new IAnswer<Integer>() {
                 @Override
-                public Integer answer() throws Throwable {
+                public Integer answer() {
                     ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0];
                     for (int i = 0; i < mockedBytesRead; i++)
                         sb.append("a");