You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/09/09 21:25:40 UTC
[kafka] branch trunk updated: MINOR: Code cleanup of 'clients'
module (#5427)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b8651d4 MINOR: Code cleanup of 'clients' module (#5427)
b8651d4 is described below
commit b8651d4e82d45463d2c71798bd5852f8a605b440
Author: Vahid Hashemian <va...@gmail.com>
AuthorDate: Sun Sep 9 14:25:30 2018 -0700
MINOR: Code cleanup of 'clients' module (#5427)
Cleanup involves:
* Refactoring to use Java 8 constructs (lambdas,
diamond for `empty` collection methods) and library
methods (`computeIfAbsent`)
* Simplifying code (including unnecessarily complex
`equals` and `hashCode` implementations)
* Removing redundant code
* Fixing typos
Reviewers: Ryanne Dolan, Ismael Juma <is...@juma.me.uk>
---
.../apache/kafka/clients/FetchSessionHandler.java | 6 +-
.../org/apache/kafka/clients/InFlightRequests.java | 8 +-
.../clients/admin/ConsumerGroupDescription.java | 2 +-
.../kafka/clients/admin/DeleteAclsResult.java | 45 ++---
.../admin/internals/AdminMetadataManager.java | 7 +-
.../kafka/clients/consumer/KafkaConsumer.java | 2 +-
.../kafka/clients/consumer/MockConsumer.java | 15 +-
.../kafka/clients/consumer/RangeAssignor.java | 2 +-
.../kafka/clients/consumer/RoundRobinAssignor.java | 3 +-
.../internals/AbstractPartitionAssignor.java | 6 +-
.../main/java/org/apache/kafka/common/Cluster.java | 22 +--
.../java/org/apache/kafka/common/MetricName.java | 23 +--
.../org/apache/kafka/common/TopicPartition.java | 12 +-
.../apache/kafka/common/TopicPartitionReplica.java | 19 +-
.../kafka/common/acl/AccessControlEntryFilter.java | 4 +-
.../apache/kafka/common/config/AbstractConfig.java | 4 +-
.../org/apache/kafka/common/config/ConfigDef.java | 69 +++----
.../kafka/common/config/ConfigTransformer.java | 4 +-
.../common/header/internals/RecordHeaders.java | 28 +--
.../kafka/common/internals/KafkaFutureImpl.java | 10 +-
.../kafka/common/internals/PartitionStates.java | 6 +-
.../common/memory/GarbageCollectedMemoryPool.java | 2 +-
.../apache/kafka/common/network/ChannelState.java | 3 +-
.../apache/kafka/common/network/KafkaChannel.java | 6 +-
.../common/network/PlaintextChannelBuilder.java | 3 +-
.../common/network/PlaintextTransportLayer.java | 13 +-
.../kafka/common/network/SaslChannelBuilder.java | 2 +-
.../apache/kafka/common/network/Selectable.java | 32 ++--
.../org/apache/kafka/common/network/Selector.java | 14 +-
.../kafka/common/network/SslChannelBuilder.java | 2 +-
.../kafka/common/network/SslTransportLayer.java | 6 +-
.../org/apache/kafka/common/protocol/Errors.java | 14 +-
.../kafka/common/record/AbstractRecords.java | 7 +-
.../common/record/ByteBufferLogInputStream.java | 3 +-
.../kafka/common/record/CompressionType.java | 2 +-
.../apache/kafka/common/record/FileRecords.java | 8 +-
.../common/record/KafkaLZ4BlockInputStream.java | 6 +-
.../apache/kafka/common/record/MemoryRecords.java | 7 +-
.../kafka/common/record/MemoryRecordsBuilder.java | 2 +-
.../requests/AlterReplicaLogDirsRequest.java | 2 +-
.../kafka/common/requests/ApiVersionsRequest.java | 4 +-
.../common/requests/ControlledShutdownRequest.java | 5 +-
.../kafka/common/requests/CreateTopicsRequest.java | 6 +-
.../kafka/common/requests/DeleteAclsRequest.java | 2 +-
.../requests/DescribeDelegationTokenResponse.java | 2 +-
.../common/requests/DescribeGroupsResponse.java | 10 +-
.../kafka/common/requests/JoinGroupRequest.java | 26 +--
.../kafka/common/requests/ListGroupsRequest.java | 4 +-
.../kafka/common/resource/ResourceFilter.java | 4 +-
.../apache/kafka/common/security/JaasContext.java | 2 +-
.../security/authenticator/CredentialCache.java | 2 +-
.../security/authenticator/LoginManager.java | 21 +-
.../authenticator/SaslClientAuthenticator.java | 20 +-
.../authenticator/SaslServerAuthenticator.java | 21 +-
.../common/security/kerberos/KerberosLogin.java | 212 ++++++++++-----------
.../oauthbearer/OAuthBearerLoginModule.java | 14 +-
.../internals/OAuthBearerSaslClient.java | 10 +-
.../OAuthBearerSaslClientCallbackHandler.java | 6 +-
.../internals/OAuthBearerSaslServer.java | 8 +-
.../unsecured/OAuthBearerUnsecuredJws.java | 34 ++--
.../OAuthBearerUnsecuredLoginCallbackHandler.java | 10 +-
...uthBearerUnsecuredValidatorCallbackHandler.java | 13 +-
.../common/security/plain/PlainLoginModule.java | 9 +-
.../security/plain/internals/PlainSaslServer.java | 6 +-
.../common/security/scram/ScramLoginModule.java | 9 +-
.../security/scram/internals/ScramExtensions.java | 2 +-
.../security/scram/internals/ScramSaslClient.java | 8 +-
.../security/scram/internals/ScramSaslServer.java | 8 +-
.../internals/ScramServerCallbackHandler.java | 3 +-
.../kafka/common/security/ssl/SslFactory.java | 4 +-
.../token/delegation/TokenInformation.java | 4 +-
.../apache/kafka/common/utils/AppInfoParser.java | 4 +-
.../java/org/apache/kafka/common/utils/Utils.java | 2 +-
.../kafka/server/quota/ClientQuotaEntity.java | 4 +-
.../kafka/common/resource/ResourceTypeTest.java | 8 +-
.../kafka/common/security/JaasContextTest.java | 18 +-
.../ClientAuthenticationFailureTest.java | 2 +-
.../security/authenticator/LoginManagerTest.java | 2 +-
.../authenticator/SaslAuthenticatorTest.java | 12 +-
.../authenticator/SaslServerAuthenticatorTest.java | 8 +-
.../authenticator/TestDigestLoginModule.java | 4 +-
.../common/security/kerberos/KerberosNameTest.java | 2 +-
.../oauthbearer/OAuthBearerLoginModuleTest.java | 36 ++--
.../internals/OAuthBearerSaslServerTest.java | 2 +-
...uthBearerUnsecuredLoginCallbackHandlerTest.java | 5 +-
...earerUnsecuredValidatorCallbackHandlerTest.java | 8 +-
.../unsecured/OAuthBearerValidationUtilsTest.java | 7 +-
.../plain/internals/PlainSaslServerTest.java | 2 +-
.../scram/internals/ScramMessagesTest.java | 12 +-
.../common/serialization/SerializationTest.java | 2 +-
.../common/utils/ImplicitLinkedHashSetTest.java | 10 +-
.../apache/kafka/common/utils/MockScheduler.java | 2 +-
.../apache/kafka/common/utils/MockTimeTest.java | 4 +-
.../apache/kafka/common/utils/SanitizerTest.java | 3 +-
.../org/apache/kafka/common/utils/ShellTest.java | 2 +-
.../org/apache/kafka/common/utils/UtilsTest.java | 6 +-
96 files changed, 472 insertions(+), 615 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
index 195324e..16990ac 100644
--- a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
@@ -202,7 +202,7 @@ public class FetchSessionHandler {
next = null;
Map<TopicPartition, PartitionData> toSend =
Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions));
- return new FetchRequestData(toSend, Collections.<TopicPartition>emptyList(), toSend, nextMetadata);
+ return new FetchRequestData(toSend, Collections.emptyList(), toSend, nextMetadata);
}
List<TopicPartition> added = new ArrayList<>();
@@ -234,9 +234,7 @@ public class FetchSessionHandler {
}
}
// Add any new partitions to the session.
- for (Iterator<Entry<TopicPartition, PartitionData>> iter =
- next.entrySet().iterator(); iter.hasNext(); ) {
- Entry<TopicPartition, PartitionData> entry = iter.next();
+ for (Entry<TopicPartition, PartitionData> entry : next.entrySet()) {
TopicPartition topicPartition = entry.getKey();
PartitionData nextData = entry.getValue();
if (sessionPartitions.containsKey(topicPartition)) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
index 5b7ba61..efca453 100644
--- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
+++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -153,12 +152,7 @@ final class InFlightRequests {
} else {
final Deque<NetworkClient.InFlightRequest> clearedRequests = requests.remove(node);
inFlightRequestCount.getAndAdd(-clearedRequests.size());
- return new Iterable<NetworkClient.InFlightRequest>() {
- @Override
- public Iterator<NetworkClient.InFlightRequest> iterator() {
- return clearedRequests.descendingIterator();
- }
- };
+ return () -> clearedRequests.descendingIterator();
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
index 933d66c..8947293 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
@@ -45,7 +45,7 @@ public class ConsumerGroupDescription {
Node coordinator) {
this.groupId = groupId == null ? "" : groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
- this.members = members == null ? Collections.<MemberDescription>emptyList() :
+ this.members = members == null ? Collections.emptyList() :
Collections.unmodifiableList(new ArrayList<>(members));
this.partitionAssignor = partitionAssignor == null ? "" : partitionAssignor;
this.state = state;
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
index 19df228..63310bc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
@@ -101,29 +101,26 @@ public class DeleteAclsResult {
* Note that it if the filters don't match any ACLs, this is not considered an error.
*/
public KafkaFuture<Collection<AclBinding>> all() {
- return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply(
- new KafkaFuture.BaseFunction<Void, Collection<AclBinding>>() {
- @Override
- public Collection<AclBinding> apply(Void v) {
- List<AclBinding> acls = new ArrayList<>();
- for (Map.Entry<AclBindingFilter, KafkaFuture<FilterResults>> entry : futures.entrySet()) {
- FilterResults results;
- try {
- results = entry.getValue().get();
- } catch (Throwable e) {
- // This should be unreachable, since the future returned by KafkaFuture#allOf should
- // have failed if any Future failed.
- throw new KafkaException("DeleteAclsResult#all: internal error", e);
- }
- for (FilterResult result : results.values()) {
- if (result.exception() != null) {
- throw result.exception();
- }
- acls.add(result.binding());
- }
- }
- return acls;
- }
- });
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply(v -> getAclBindings(futures));
+ }
+
+ private List<AclBinding> getAclBindings(Map<AclBindingFilter, KafkaFuture<FilterResults>> futures) {
+ List<AclBinding> acls = new ArrayList<>();
+ for (KafkaFuture<FilterResults> value: futures.values()) {
+ FilterResults results;
+ try {
+ results = value.get();
+ } catch (Throwable e) {
+ // This should be unreachable, since the future returned by KafkaFuture#allOf should
+ // have failed if any Future failed.
+ throw new KafkaException("DeleteAclsResult#all: internal error", e);
+ }
+ for (FilterResult result : results.values()) {
+ if (result.exception() != null)
+ throw result.exception();
+ acls.add(result.binding());
+ }
+ }
+ return acls;
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index 1ad3991..3d9e5ca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -20,7 +20,6 @@ package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
@@ -183,9 +182,9 @@ public class AdminMetadataManager {
log.trace("Clearing cached controller node {}.", cluster.controller());
this.cluster = new Cluster(cluster.clusterResource().clusterId(),
cluster.nodes(),
- Collections.<PartitionInfo>emptySet(),
- Collections.<String>emptySet(),
- Collections.<String>emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
null);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 33e037c..4ea3cfd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -708,7 +708,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
true, false, clusterResourceListeners);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
- this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
+ this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), 0);
String metricGrpPrefix = "consumer";
ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer");
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index cf1b07f..9eee6da 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -184,7 +184,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
// update the consumed offset
final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new HashMap<>();
for (final TopicPartition topicPartition : records.keySet()) {
- results.put(topicPartition, new ArrayList<ConsumerRecord<K, V>>());
+ results.put(topicPartition, new ArrayList<>());
}
for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : this.records.entrySet()) {
@@ -208,11 +208,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
Set<TopicPartition> currentAssigned = new HashSet<>(this.subscriptions.assignedPartitions());
if (!currentAssigned.contains(tp))
throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer");
- List<ConsumerRecord<K, V>> recs = this.records.get(tp);
- if (recs == null) {
- recs = new ArrayList<>();
- this.records.put(tp, recs);
- }
+ List<ConsumerRecord<K, V>> recs = this.records.computeIfAbsent(tp, k -> new ArrayList<>());
recs.add(record);
}
@@ -439,12 +435,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
}
public synchronized void scheduleNopPollTask() {
- schedulePollTask(new Runnable() {
- @Override
- public void run() {
- // noop
- }
- });
+ schedulePollTask(() -> { });
}
public synchronized Set<TopicPartition> paused() {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
index 5d5a268..78956ab 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
@@ -64,7 +64,7 @@ public class RangeAssignor extends AbstractPartitionAssignor {
Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
Map<String, List<TopicPartition>> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet())
- assignment.put(memberId, new ArrayList<TopicPartition>());
+ assignment.put(memberId, new ArrayList<>());
for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
index 3b543f7..6763b82 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
@@ -60,7 +60,7 @@ public class RoundRobinAssignor extends AbstractPartitionAssignor {
Map<String, Subscription> subscriptions) {
Map<String, List<TopicPartition>> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet())
- assignment.put(memberId, new ArrayList<TopicPartition>());
+ assignment.put(memberId, new ArrayList<>());
CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));
for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
@@ -72,7 +72,6 @@ public class RoundRobinAssignor extends AbstractPartitionAssignor {
return assignment;
}
-
public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
SortedSet<String> topics = new TreeSet<>();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
index 8ec887e..35eb8eb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
@@ -80,11 +80,7 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor {
}
protected static <K, V> void put(Map<K, List<V>> map, K key, V value) {
- List<V> list = map.get(key);
- if (list == null) {
- list = new ArrayList<>();
- map.put(key, list);
- }
+ List<V> list = map.computeIfAbsent(key, k -> new ArrayList<>());
list.add(value);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 33d3749..24a18db 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -56,7 +56,7 @@ public final class Cluster {
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics,
Set<String> internalTopics) {
- this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.<String>emptySet(), internalTopics, null);
+ this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.emptySet(), internalTopics, null);
}
/**
@@ -70,7 +70,7 @@ public final class Cluster {
Set<String> unauthorizedTopics,
Set<String> internalTopics,
Node controller) {
- this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.<String>emptySet(), internalTopics, controller);
+ this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.emptySet(), internalTopics, controller);
}
/**
@@ -117,11 +117,11 @@ public final class Cluster {
HashMap<String, List<PartitionInfo>> partsForTopic = new HashMap<>();
HashMap<Integer, List<PartitionInfo>> partsForNode = new HashMap<>();
for (Node n : this.nodes) {
- partsForNode.put(n.id(), new ArrayList<PartitionInfo>());
+ partsForNode.put(n.id(), new ArrayList<>());
}
for (PartitionInfo p : partitions) {
if (!partsForTopic.containsKey(p.topic()))
- partsForTopic.put(p.topic(), new ArrayList<PartitionInfo>());
+ partsForTopic.put(p.topic(), new ArrayList<>());
List<PartitionInfo> psTopic = partsForTopic.get(p.topic());
psTopic.add(p);
@@ -157,8 +157,8 @@ public final class Cluster {
* Create an empty cluster instance with no nodes and no topic-partitions.
*/
public static Cluster empty() {
- return new Cluster(null, new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(),
- Collections.<String>emptySet(), null);
+ return new Cluster(null, new ArrayList<>(0), new ArrayList<>(0), Collections.emptySet(),
+ Collections.emptySet(), null);
}
/**
@@ -171,8 +171,8 @@ public final class Cluster {
int nodeId = -1;
for (InetSocketAddress address : addresses)
nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
- return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0),
- Collections.<String>emptySet(), Collections.<String>emptySet(), Collections.<String>emptySet(), null);
+ return new Cluster(null, true, nodes, new ArrayList<>(0),
+ Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null);
}
/**
@@ -231,7 +231,7 @@ public final class Cluster {
*/
public List<PartitionInfo> partitionsForTopic(String topic) {
List<PartitionInfo> parts = this.partitionsByTopic.get(topic);
- return (parts == null) ? Collections.<PartitionInfo>emptyList() : parts;
+ return (parts == null) ? Collections.emptyList() : parts;
}
/**
@@ -251,7 +251,7 @@ public final class Cluster {
*/
public List<PartitionInfo> availablePartitionsForTopic(String topic) {
List<PartitionInfo> parts = this.availablePartitionsByTopic.get(topic);
- return (parts == null) ? Collections.<PartitionInfo>emptyList() : parts;
+ return (parts == null) ? Collections.emptyList() : parts;
}
/**
@@ -261,7 +261,7 @@ public final class Cluster {
*/
public List<PartitionInfo> partitionsForNode(int nodeId) {
List<PartitionInfo> parts = this.partitionsByNode.get(nodeId);
- return (parts == null) ? Collections.<PartitionInfo>emptyList() : parts;
+ return (parts == null) ? Collections.emptyList() : parts;
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/common/MetricName.java b/clients/src/main/java/org/apache/kafka/common/MetricName.java
index 2136a72..0af77a0 100644
--- a/clients/src/main/java/org/apache/kafka/common/MetricName.java
+++ b/clients/src/main/java/org/apache/kafka/common/MetricName.java
@@ -106,9 +106,9 @@ public final class MetricName {
return hash;
final int prime = 31;
int result = 1;
- result = prime * result + ((group == null) ? 0 : group.hashCode());
- result = prime * result + ((name == null) ? 0 : name.hashCode());
- result = prime * result + ((tags == null) ? 0 : tags.hashCode());
+ result = prime * result + group.hashCode();
+ result = prime * result + name.hashCode();
+ result = prime * result + tags.hashCode();
this.hash = result;
return result;
}
@@ -122,22 +122,7 @@ public final class MetricName {
if (getClass() != obj.getClass())
return false;
MetricName other = (MetricName) obj;
- if (group == null) {
- if (other.group != null)
- return false;
- } else if (!group.equals(other.group))
- return false;
- if (name == null) {
- if (other.name != null)
- return false;
- } else if (!name.equals(other.name))
- return false;
- if (tags == null) {
- if (other.tags != null)
- return false;
- } else if (!tags.equals(other.tags))
- return false;
- return true;
+ return group.equals(other.group) && name.equals(other.name) && tags.equals(other.tags);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
index 08b2a51..2c6add7 100644
--- a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common;
import java.io.Serializable;
+import java.util.Objects;
/**
* A topic name and partition number
@@ -48,7 +49,7 @@ public final class TopicPartition implements Serializable {
final int prime = 31;
int result = 1;
result = prime * result + partition;
- result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+ result = prime * result + Objects.hashCode(topic);
this.hash = result;
return result;
}
@@ -62,14 +63,7 @@ public final class TopicPartition implements Serializable {
if (getClass() != obj.getClass())
return false;
TopicPartition other = (TopicPartition) obj;
- if (partition != other.partition)
- return false;
- if (topic == null) {
- if (other.topic != null)
- return false;
- } else if (!topic.equals(other.topic))
- return false;
- return true;
+ return partition == other.partition && Objects.equals(topic, other.topic);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartitionReplica.java b/clients/src/main/java/org/apache/kafka/common/TopicPartitionReplica.java
index 2a10439..2442602 100644
--- a/clients/src/main/java/org/apache/kafka/common/TopicPartitionReplica.java
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartitionReplica.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.common;
+import org.apache.kafka.common.utils.Utils;
+
import java.io.Serializable;
@@ -30,7 +32,7 @@ public final class TopicPartitionReplica implements Serializable {
private final String topic;
public TopicPartitionReplica(String topic, int partition, int brokerId) {
- this.topic = topic;
+ this.topic = Utils.notNull(topic);
this.partition = partition;
this.brokerId = brokerId;
}
@@ -54,7 +56,7 @@ public final class TopicPartitionReplica implements Serializable {
}
final int prime = 31;
int result = 1;
- result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+ result = prime * result + topic.hashCode();
result = prime * result + partition;
result = prime * result + brokerId;
this.hash = result;
@@ -70,18 +72,7 @@ public final class TopicPartitionReplica implements Serializable {
if (getClass() != obj.getClass())
return false;
TopicPartitionReplica other = (TopicPartitionReplica) obj;
- if (partition != other.partition)
- return false;
- if (brokerId != other.brokerId)
- return false;
- if (topic == null) {
- if (other.topic != null) {
- return false;
- }
- } else if (!topic.equals(other.topic)) {
- return false;
- }
- return true;
+ return partition == other.partition && brokerId == other.brokerId && topic.equals(other.topic);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java
index a95303e..225e73a 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java
@@ -109,9 +109,7 @@ public class AccessControlEntryFilter {
return false;
if ((operation() != AclOperation.ANY) && (!operation().equals(other.operation())))
return false;
- if ((permissionType() != AclPermissionType.ANY) && (!permissionType().equals(other.permissionType())))
- return false;
- return true;
+ return (permissionType() == AclPermissionType.ANY) || (permissionType().equals(other.permissionType()));
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 4f8420b..9cf13dd 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -316,7 +316,7 @@ public class AbstractConfig {
* @return The list of configured instances
*/
public <T> List<T> getConfiguredInstances(String key, Class<T> t) {
- return getConfiguredInstances(key, t, Collections.<String, Object>emptyMap());
+ return getConfiguredInstances(key, t, Collections.emptyMap());
}
/**
@@ -343,7 +343,7 @@ public class AbstractConfig {
* @return The list of configured instances
*/
public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, Map<String, Object> configOverrides) {
- List<T> objects = new ArrayList<T>();
+ List<T> objects = new ArrayList<>();
if (classNames == null)
return objects;
Map<String, Object> configPairs = originals();
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 662909a..3868ed5 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -22,10 +22,8 @@ import org.apache.kafka.common.utils.Utils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -187,7 +185,7 @@ public class ConfigDef {
*/
public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation,
String group, int orderInGroup, Width width, String displayName, Recommender recommender) {
- return define(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList(), recommender);
+ return define(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, Collections.emptyList(), recommender);
}
/**
@@ -264,7 +262,7 @@ public class ConfigDef {
*/
public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation,
String group, int orderInGroup, Width width, String displayName, Recommender recommender) {
- return define(name, type, defaultValue, null, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList(), recommender);
+ return define(name, type, defaultValue, null, importance, documentation, group, orderInGroup, width, displayName, Collections.emptyList(), recommender);
}
/**
@@ -337,7 +335,7 @@ public class ConfigDef {
*/
public ConfigDef define(String name, Type type, Importance importance, String documentation, String group, int orderInGroup,
Width width, String displayName, Recommender recommender) {
- return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList(), recommender);
+ return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, group, orderInGroup, width, displayName, Collections.emptyList(), recommender);
}
/**
@@ -607,13 +605,7 @@ public class ConfigDef {
List<Object> originalRecommendedValues = value.recommendedValues();
if (!originalRecommendedValues.isEmpty()) {
Set<Object> originalRecommendedValueSet = new HashSet<>(originalRecommendedValues);
- Iterator<Object> it = recommendedValues.iterator();
- while (it.hasNext()) {
- Object o = it.next();
- if (!originalRecommendedValueSet.contains(o)) {
- it.remove();
- }
- }
+ recommendedValues.removeIf(o -> !originalRecommendedValueSet.contains(o));
}
value.recommendedValues(recommendedValues);
value.visible(key.recommender.visible(name, parsed));
@@ -1271,32 +1263,30 @@ public class ConfigDef {
}
List<ConfigKey> configs = new ArrayList<>(configKeys.values());
- Collections.sort(configs, new Comparator<ConfigKey>() {
- @Override
- public int compare(ConfigKey k1, ConfigKey k2) {
- int cmp = k1.group == null
- ? (k2.group == null ? 0 : -1)
- : (k2.group == null ? 1 : Integer.compare(groupOrd.get(k1.group), groupOrd.get(k2.group)));
- if (cmp == 0) {
- cmp = Integer.compare(k1.orderInGroup, k2.orderInGroup);
- if (cmp == 0) {
- // first take anything with no default value
- if (!k1.hasDefault() && k2.hasDefault()) {
- cmp = -1;
- } else if (!k2.hasDefault() && k1.hasDefault()) {
- cmp = 1;
- } else {
- cmp = k1.importance.compareTo(k2.importance);
- if (cmp == 0) {
- return k1.name.compareTo(k2.name);
- }
- }
- }
+ Collections.sort(configs, (k1, k2) -> compare(k1, k2, groupOrd));
+ return configs;
+ }
+
+ private int compare(ConfigKey k1, ConfigKey k2, Map<String, Integer> groupOrd) {
+ int cmp = k1.group == null
+ ? (k2.group == null ? 0 : -1)
+ : (k2.group == null ? 1 : Integer.compare(groupOrd.get(k1.group), groupOrd.get(k2.group)));
+ if (cmp == 0) {
+ cmp = Integer.compare(k1.orderInGroup, k2.orderInGroup);
+ if (cmp == 0) {
+ // first take anything with no default value
+ if (!k1.hasDefault() && k2.hasDefault())
+ cmp = -1;
+ else if (!k2.hasDefault() && k1.hasDefault())
+ cmp = 1;
+ else {
+ cmp = k1.importance.compareTo(k2.importance);
+ if (cmp == 0)
+ return k1.name.compareTo(k2.name);
}
- return cmp;
}
- });
- return configs;
+ }
+ return cmp;
}
public void embed(final String keyPrefix, final String groupPrefix, final int startingOrd, final ConfigDef child) {
@@ -1324,12 +1314,7 @@ public class ConfigDef {
*/
private static Validator embeddedValidator(final String keyPrefix, final Validator base) {
if (base == null) return null;
- return new ConfigDef.Validator() {
- @Override
- public void ensureValid(String name, Object value) {
- base.ensureValid(name.substring(keyPrefix.length()), value);
- }
- };
+ return (name, value) -> base.ensureValid(name.substring(keyPrefix.length()), value);
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
index 6430ffd..a830f9f 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
@@ -81,7 +81,7 @@ public class ConfigTransformer {
// Collect the variables from the given configs that need transformation
for (Map.Entry<String, String> config : configs.entrySet()) {
if (config.getValue() != null) {
- List<ConfigVariable> vars = getVars(config.getKey(), config.getValue(), DEFAULT_PATTERN);
+ List<ConfigVariable> vars = getVars(config.getValue(), DEFAULT_PATTERN);
for (ConfigVariable var : vars) {
Map<String, Set<String>> keysByPath = keysByProvider.computeIfAbsent(var.providerName, k -> new HashMap<>());
Set<String> keys = keysByPath.computeIfAbsent(var.path, k -> new HashSet<>());
@@ -121,7 +121,7 @@ public class ConfigTransformer {
return new ConfigTransformerResult(data, ttls);
}
- private static List<ConfigVariable> getVars(String key, String value, Pattern pattern) {
+ private static List<ConfigVariable> getVars(String value, Pattern pattern) {
List<ConfigVariable> configVars = new ArrayList<>();
Matcher matcher = pattern.matcher(value);
while (matcher.find()) {
diff --git a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
index 141c972..577e758 100644
--- a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
+++ b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.utils.AbstractIterator;
public class RecordHeaders implements Headers {
-
+
private final List<Header> headers;
private volatile boolean isReadOnly;
@@ -43,7 +43,7 @@ public class RecordHeaders implements Headers {
this.headers = new ArrayList<>(Arrays.asList(headers));
}
}
-
+
public RecordHeaders(Iterable<Header> headers) {
//Use efficient copy constructor if possible, fallback to iteration otherwise
if (headers == null) {
@@ -99,12 +99,7 @@ public class RecordHeaders implements Headers {
@Override
public Iterable<Header> headers(final String key) {
checkKey(key);
- return new Iterable<Header>() {
- @Override
- public Iterator<Header> iterator() {
- return new FilterByKeyIterator(headers.iterator(), key);
- }
- };
+ return () -> new FilterByKeyIterator(headers.iterator(), key);
}
@Override
@@ -119,17 +114,15 @@ public class RecordHeaders implements Headers {
public Header[] toArray() {
return headers.isEmpty() ? Record.EMPTY_HEADERS : headers.toArray(new Header[headers.size()]);
}
-
+
private void checkKey(String key) {
- if (key == null) {
+ if (key == null)
throw new IllegalArgumentException("key cannot be null.");
- }
}
-
+
private void canWrite() {
- if (isReadOnly) {
+ if (isReadOnly)
throw new IllegalStateException("RecordHeaders has been closed.");
- }
}
private Iterator<Header> closeAware(final Iterator<Header> original) {
@@ -177,7 +170,7 @@ public class RecordHeaders implements Headers {
", isReadOnly = " + isReadOnly +
')';
}
-
+
private static final class FilterByKeyIterator extends AbstractIterator<Header> {
private final Iterator<Header> original;
@@ -187,14 +180,13 @@ public class RecordHeaders implements Headers {
this.original = original;
this.key = key;
}
-
+
protected Header makeNext() {
while (true) {
if (original.hasNext()) {
Header header = original.next();
- if (!header.key().equals(key)) {
+ if (!header.key().equals(key))
continue;
- }
return header;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
index 33916ac..d610172 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -208,7 +208,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
@Override
public synchronized boolean complete(T newValue) {
- List<BiConsumer<? super T, ? super Throwable>> oldWaiters = null;
+ List<BiConsumer<? super T, ? super Throwable>> oldWaiters;
synchronized (this) {
if (done)
return false;
@@ -225,7 +225,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
@Override
public boolean completeExceptionally(Throwable newException) {
- List<BiConsumer<? super T, ? super Throwable>> oldWaiters = null;
+ List<BiConsumer<? super T, ? super Throwable>> oldWaiters;
synchronized (this) {
if (done)
return false;
@@ -257,7 +257,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
*/
@Override
public T get() throws InterruptedException, ExecutionException {
- SingleWaiter<T> waiter = new SingleWaiter<T>();
+ SingleWaiter<T> waiter = new SingleWaiter<>();
addWaiter(waiter);
return waiter.await();
}
@@ -269,7 +269,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
- SingleWaiter<T> waiter = new SingleWaiter<T>();
+ SingleWaiter<T> waiter = new SingleWaiter<>();
addWaiter(waiter);
return waiter.await(timeout, unit);
}
@@ -292,7 +292,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
*/
@Override
public synchronized boolean isCancelled() {
- return (exception != null) && (exception instanceof CancellationException);
+ return exception instanceof CancellationException;
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
index ba65632..2918dd6 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
@@ -134,11 +134,7 @@ public class PartitionStates<S> {
private void update(Map<TopicPartition, S> partitionToState) {
LinkedHashMap<String, List<TopicPartition>> topicToPartitions = new LinkedHashMap<>();
for (TopicPartition tp : partitionToState.keySet()) {
- List<TopicPartition> partitions = topicToPartitions.get(tp.topic());
- if (partitions == null) {
- partitions = new ArrayList<>();
- topicToPartitions.put(tp.topic(), partitions);
- }
+ List<TopicPartition> partitions = topicToPartitions.computeIfAbsent(tp.topic(), k -> new ArrayList<>());
partitions.add(tp);
}
for (Map.Entry<String, List<TopicPartition>> entry : topicToPartitions.entrySet()) {
diff --git a/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java b/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java
index 041d1c2..18f8ffe 100644
--- a/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java
+++ b/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java
@@ -77,7 +77,7 @@ public class GarbageCollectedMemoryPool extends SimpleMemoryPool implements Auto
}
@Override
- public void close() throws Exception {
+ public void close() {
alive = false;
gcListenerThread.interrupt();
}
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
index 08ed1a0..2d584bd 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
@@ -62,7 +62,8 @@ public class ChannelState {
FAILED_SEND,
AUTHENTICATION_FAILED,
LOCAL_CLOSE
- };
+ }
+
// AUTHENTICATION_FAILED has a custom exception. For other states,
// create a reusable `ChannelState` instance per-state.
public static final ChannelState NOT_CONNECTED = new ChannelState(State.NOT_CONNECTED);
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 17dc6a3..2895128 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -48,7 +48,7 @@ public class KafkaChannel {
MUTED_AND_RESPONSE_PENDING,
MUTED_AND_THROTTLED,
MUTED_AND_THROTTLED_AND_RESPONSE_PENDING
- };
+ }
/** Socket server events that will change the mute state:
* <ul>
@@ -72,7 +72,7 @@ public class KafkaChannel {
RESPONSE_SENT,
THROTTLE_STARTED,
THROTTLE_ENDED
- };
+ }
private final String id;
private final TransportLayer transportLayer;
@@ -90,7 +90,7 @@ public class KafkaChannel {
private ChannelMuteState muteState;
private ChannelState state;
- public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize, MemoryPool memoryPool) throws IOException {
+ public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize, MemoryPool memoryPool) {
this.id = id;
this.transportLayer = transportLayer;
this.authenticator = authenticator;
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
index c7b80cb..e397f05 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
@@ -26,7 +26,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
-import java.io.IOException;
import java.net.InetAddress;
import java.nio.channels.SelectionKey;
import java.util.Map;
@@ -76,7 +75,7 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
}
@Override
- public void authenticate() throws IOException {}
+ public void authenticate() {}
@Override
public KafkaPrincipal principal() {
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
index ccb9c60..845b147 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
@@ -87,10 +87,9 @@ public class PlaintextTransportLayer implements TransportLayer {
/**
* Performs SSL handshake hence is a no-op for the non-secure
* implementation
- * @throws IOException
- */
+ */
@Override
- public void handshake() throws IOException {}
+ public void handshake() {}
/**
* Reads a sequence of bytes from this channel into the given buffer.
@@ -121,7 +120,7 @@ public class PlaintextTransportLayer implements TransportLayer {
* @param dsts - The buffers into which bytes are to be transferred
* @param offset - The offset within the buffer array of the first buffer into which bytes are to be transferred; must be non-negative and no larger than dsts.length.
* @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than dsts.length - offset
- * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
+ * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
* @throws IOException if some other I/O error occurs
*/
@Override
@@ -133,7 +132,7 @@ public class PlaintextTransportLayer implements TransportLayer {
* Writes a sequence of bytes to this channel from the given buffer.
*
* @param src The buffer from which bytes are to be retrieved
- * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
+ * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
* @throws IOException If some other I/O error occurs
*/
@Override
@@ -145,7 +144,7 @@ public class PlaintextTransportLayer implements TransportLayer {
* Writes a sequence of bytes to this channel from the given buffer.
*
* @param srcs The buffer from which bytes are to be retrieved
- * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
+ * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
* @throws IOException If some other I/O error occurs
*/
@Override
@@ -180,7 +179,7 @@ public class PlaintextTransportLayer implements TransportLayer {
* Returns ANONYMOUS as Principal.
*/
@Override
- public Principal peerPrincipal() throws IOException {
+ public Principal peerPrincipal() {
return principal;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index ca34b71..172e629 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -156,7 +156,7 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
@Override
public Set<String> reconfigurableConfigs() {
- return securityProtocol == SecurityProtocol.SASL_SSL ? SslConfigs.RECONFIGURABLE_CONFIGS : Collections.<String>emptySet();
+ return securityProtocol == SecurityProtocol.SASL_SSL ? SslConfigs.RECONFIGURABLE_CONFIGS : Collections.emptySet();
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
index efb603c..8f81dbe 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
@@ -30,7 +30,7 @@ public interface Selectable {
/**
* See {@link #connect(String, InetSocketAddress, int, int) connect()}
*/
- public static final int USE_DEFAULT_BUFFER_SIZE = -1;
+ int USE_DEFAULT_BUFFER_SIZE = -1;
/**
* Begin establishing a socket connection to the given address identified by the given address
@@ -40,83 +40,83 @@ public interface Selectable {
* @param receiveBufferSize The receive buffer for the socket
* @throws IOException If we cannot begin connecting
*/
- public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException;
+ void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException;
/**
* Wakeup this selector if it is blocked on I/O
*/
- public void wakeup();
+ void wakeup();
/**
* Close this selector
*/
- public void close();
+ void close();
/**
* Close the connection identified by the given id
*/
- public void close(String id);
+ void close(String id);
/**
* Queue the given request for sending in the subsequent {@link #poll(long) poll()} calls
* @param send The request to send
*/
- public void send(Send send);
+ void send(Send send);
/**
* Do I/O. Reads, writes, connection establishment, etc.
* @param timeout The amount of time to block if there is nothing to do
* @throws IOException
*/
- public void poll(long timeout) throws IOException;
+ void poll(long timeout) throws IOException;
/**
* The list of sends that completed on the last {@link #poll(long) poll()} call.
*/
- public List<Send> completedSends();
+ List<Send> completedSends();
/**
* The list of receives that completed on the last {@link #poll(long) poll()} call.
*/
- public List<NetworkReceive> completedReceives();
+ List<NetworkReceive> completedReceives();
/**
* The connections that finished disconnecting on the last {@link #poll(long) poll()}
* call. Channel state indicates the local channel state at the time of disconnection.
*/
- public Map<String, ChannelState> disconnected();
+ Map<String, ChannelState> disconnected();
/**
* The list of connections that completed their connection on the last {@link #poll(long) poll()}
* call.
*/
- public List<String> connected();
+ List<String> connected();
/**
* Disable reads from the given connection
* @param id The id for the connection
*/
- public void mute(String id);
+ void mute(String id);
/**
* Re-enable reads from the given connection
* @param id The id for the connection
*/
- public void unmute(String id);
+ void unmute(String id);
/**
* Disable reads from all connections
*/
- public void muteAll();
+ void muteAll();
/**
* Re-enable reads from all connections
*/
- public void unmuteAll();
+ void unmuteAll();
/**
* returns true if a channel is ready
* @param id The id for the connection
*/
- public boolean isChannelReady(String id);
+ boolean isChannelReady(String id);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 806bda7..44223e7 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -20,8 +20,6 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.memory.MemoryPool;
-import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@@ -227,7 +225,7 @@ public class Selector implements Selectable, AutoCloseable {
}
public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder, LogContext logContext) {
- this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, Collections.<String, String>emptyMap(), true, channelBuilder, logContext);
+ this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, Collections.emptyMap(), true, channelBuilder, logContext);
}
public Selector(long connectionMaxIdleMS, int failedAuthenticationDelayMs, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder, LogContext logContext) {
@@ -552,7 +550,7 @@ public class Selector implements Selectable, AutoCloseable {
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
- Send send = null;
+ Send send;
try {
send = channel.write();
} catch (Exception e) {
@@ -919,7 +917,7 @@ public class Selector implements Selectable, AutoCloseable {
*/
private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
if (!stagedReceives.containsKey(channel))
- stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
+ stagedReceives.put(channel, new ArrayDeque<>());
Deque<NetworkReceive> deque = stagedReceives.get(channel);
deque.add(receive);
@@ -1045,11 +1043,7 @@ public class Selector implements Selectable, AutoCloseable {
metricName = metrics.metricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
topLevelMetricNames.add(metricName);
- this.metrics.addMetric(metricName, new Measurable() {
- public double measure(MetricConfig config, long now) {
- return channels.size();
- }
- });
+ this.metrics.addMetric(metricName, (config, now) -> channels.size());
}
private Meter createMeter(Metrics metrics, String groupName, Map<String, String> metricTags,
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index b43e4c3..86d41d0 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -163,7 +163,7 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
* No-Op for plaintext authenticator
*/
@Override
- public void authenticate() throws IOException {}
+ public void authenticate() {}
/**
* Constructs Principal using configured principalBuilder.
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index 08a39e7..917e5a2 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -76,7 +76,7 @@ public class SslTransportLayer implements TransportLayer {
}
// Prefer `create`, only use this in tests
- SslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
+ SslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) {
this.channelId = channelId;
this.key = key;
this.socketChannel = (SocketChannel) key.channel();
@@ -372,7 +372,7 @@ public class SslTransportLayer implements TransportLayer {
}
}
- private SSLHandshakeException renegotiationException() throws IOException {
+ private SSLHandshakeException renegotiationException() {
return new SSLHandshakeException("Renegotiation is not supported");
}
@@ -715,7 +715,7 @@ public class SslTransportLayer implements TransportLayer {
* SSLSession's peerPrincipal for the remote host.
* @return Principal
*/
- public Principal peerPrincipal() throws IOException {
+ public Principal peerPrincipal() {
try {
return sslEngine.getSession().getPeerPrincipal();
} catch (SSLPeerUnverifiedException se) {
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index d461060..a796ec9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -111,7 +111,7 @@ import java.util.function.Function;
* Do not add exceptions that occur only on the client or only on the server here.
*/
public enum Errors {
- UNKNOWN_SERVER_ERROR(-1, "The server experienced an unexpected error when processing the request,",
+ UNKNOWN_SERVER_ERROR(-1, "The server experienced an unexpected error when processing the request.",
UnknownServerException::new),
NONE(0, null, message -> null),
OFFSET_OUT_OF_RANGE(1, "The requested offset is not within the range of offsets maintained by the server.",
@@ -159,8 +159,8 @@ public enum Errors {
ILLEGAL_GENERATION(22, "Specified group generation id is not valid.",
IllegalGenerationException::new),
INCONSISTENT_GROUP_PROTOCOL(23,
- "The group member's supported protocols are incompatible with those of existing members" +
- " or first group member tried to join with empty protocol type or empty protocol list.",
+ "The group member's supported protocols are incompatible with those of existing members " +
+ "or first group member tried to join with empty protocol type or empty protocol list.",
InconsistentGroupProtocolException::new),
INVALID_GROUP_ID(24, "The configured groupId is invalid.",
InvalidGroupIdException::new),
@@ -201,8 +201,8 @@ public enum Errors {
NOT_CONTROLLER(41, "This is not the correct controller for this cluster.",
NotControllerException::new),
INVALID_REQUEST(42, "This most likely occurs because of a request being malformed by the " +
- "client library or the message was sent to an incompatible broker. See the broker logs " +
- "for more details.",
+ "client library or the message was sent to an incompatible broker. See the broker logs " +
+ "for more details.",
InvalidRequestException::new),
UNSUPPORTED_FOR_MESSAGE_FORMAT(43, "The message format version on the broker does not support the request.",
UnsupportedForMessageFormatException::new),
@@ -221,10 +221,10 @@ public enum Errors {
"its transactional id.",
InvalidPidMappingException::new),
INVALID_TRANSACTION_TIMEOUT(50, "The transaction timeout is larger than the maximum value allowed by " +
- "the broker (as configured by transaction.max.timeout.ms).",
+ "the broker (as configured by transaction.max.timeout.ms).",
InvalidTxnTimeoutException::new),
CONCURRENT_TRANSACTIONS(51, "The producer attempted to update a transaction " +
- "while another concurrent operation on the same transaction was ongoing.",
+ "while another concurrent operation on the same transaction was ongoing.",
ConcurrentTransactionsException::new),
TRANSACTION_COORDINATOR_FENCED(52, "Indicates that the transaction coordinator sending a WriteTxnMarker " +
"is no longer the current coordinator for a given producer.",
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 5e41901..1994a71 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -25,12 +25,7 @@ import java.util.Iterator;
public abstract class AbstractRecords implements Records {
- private final Iterable<Record> records = new Iterable<Record>() {
- @Override
- public Iterator<Record> iterator() {
- return recordsIterator();
- }
- };
+ private final Iterable<Record> records = this::recordsIterator;
@Override
public boolean hasMatchingMagic(byte magic) {
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
index 7f91f26..572abd8 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.errors.CorruptRecordException;
-import java.io.IOException;
import java.nio.ByteBuffer;
import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC;
@@ -39,7 +38,7 @@ class ByteBufferLogInputStream implements LogInputStream<MutableRecordBatch> {
this.maxMessageSize = maxMessageSize;
}
- public MutableRecordBatch nextBatch() throws IOException {
+ public MutableRecordBatch nextBatch() {
int remaining = buffer.remaining();
Integer batchSize = nextBatchSize();
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 9b3bfc4..a333d2a 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -128,7 +128,7 @@ public enum CompressionType {
/**
* Wrap bufferStream with an OutputStream that will compress data with this CompressionType.
*
- * Note: Unlike {@link #wrapForInput}, {@link #wrapForOutput} cannot take {@#link ByteBuffer}s directly.
+ * Note: Unlike {@link #wrapForInput}, {@link #wrapForOutput} cannot take {@link ByteBuffer}s directly.
* Currently, {@link MemoryRecordsBuilder#writeDefaultBatchHeader()} and {@link MemoryRecordsBuilder#writeLegacyCompressedWrapperHeader()}
* write to the underlying buffer in the given {@link ByteBufferOutputStream} after the compressed data has been written.
* In the event that the buffer needs to be expanded while writing the data, access to the underlying buffer needs to be preserved.
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index cebb5fa..3537fc3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -32,7 +32,6 @@ import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
-import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -374,12 +373,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
* @return An iterator over batches starting from {@code start}
*/
public Iterable<FileChannelRecordBatch> batchesFrom(final int start) {
- return new Iterable<FileChannelRecordBatch>() {
- @Override
- public Iterator<FileChannelRecordBatch> iterator() {
- return batchIterator(start);
- }
- };
+ return () -> batchIterator(start);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
index 56f1058..850b1e9 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
@@ -264,12 +264,12 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
}
@Override
- public int available() throws IOException {
+ public int available() {
return decompressedBuffer == null ? 0 : decompressedBuffer.remaining();
}
@Override
- public void close() throws IOException {
+ public void close() {
bufferSupplier.release(decompressionBuffer);
}
@@ -279,7 +279,7 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
}
@Override
- public void reset() throws IOException {
+ public void reset() {
throw new RuntimeException("reset not supported");
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index af62e09..c6db18d 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -47,12 +47,7 @@ public class MemoryRecords extends AbstractRecords {
private final ByteBuffer buffer;
- private final Iterable<MutableRecordBatch> batches = new Iterable<MutableRecordBatch>() {
- @Override
- public Iterator<MutableRecordBatch> iterator() {
- return batchIterator();
- }
- };
+ private final Iterable<MutableRecordBatch> batches = this::batchIterator;
private int validBytes = -1;
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 6f6404f..1c7a6c7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -41,7 +41,7 @@ public class MemoryRecordsBuilder {
private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
private static final DataOutputStream CLOSED_STREAM = new DataOutputStream(new OutputStream() {
@Override
- public void write(int b) throws IOException {
+ public void write(int b) {
throw new IllegalStateException("MemoryRecordsBuilder is closed for record appends");
}
});
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java
index 03bc098..b7cf1d8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java
@@ -117,7 +117,7 @@ public class AlterReplicaLogDirsRequest extends AbstractRequest {
Map<String, List<TopicPartition>> dirPartitions = new HashMap<>();
for (Map.Entry<TopicPartition, String> entry: partitionDirs.entrySet()) {
if (!dirPartitions.containsKey(entry.getValue()))
- dirPartitions.put(entry.getValue(), new ArrayList<TopicPartition>());
+ dirPartitions.put(entry.getValue(), new ArrayList<>());
dirPartitions.get(entry.getValue()).add(entry.getKey());
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
index e154ac9..03f385b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
@@ -95,10 +95,10 @@ public class ApiVersionsRequest extends AbstractRequest {
short version = version();
switch (version) {
case 0:
- return new ApiVersionsResponse(Errors.forException(e), Collections.<ApiVersionsResponse.ApiVersion>emptyList());
+ return new ApiVersionsResponse(Errors.forException(e), Collections.emptyList());
case 1:
case 2:
- return new ApiVersionsResponse(throttleTimeMs, Errors.forException(e), Collections.<ApiVersionsResponse.ApiVersion>emptyList());
+ return new ApiVersionsResponse(throttleTimeMs, Errors.forException(e), Collections.emptyList());
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
version, this.getClass().getSimpleName(), ApiKeys.API_VERSIONS.latestVersion()));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
index d6db1e1..dc86dd7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.requests;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Field;
@@ -79,10 +78,10 @@ public class ControlledShutdownRequest extends AbstractRequest {
switch (versionId) {
case 0:
case 1:
- return new ControlledShutdownResponse(Errors.forException(e), Collections.<TopicPartition>emptySet());
+ return new ControlledShutdownResponse(Errors.forException(e), Collections.emptySet());
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
- versionId, this.getClass().getSimpleName(), ApiKeys.CONTROLLED_SHUTDOWN.latestVersion()));
+ versionId, this.getClass().getSimpleName(), ApiKeys.CONTROLLED_SHUTDOWN.latestVersion()));
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
index 9f05c5a..16d4c97 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
@@ -120,12 +120,12 @@ public class CreateTopicsRequest extends AbstractRequest {
public TopicDetails(int partitions,
short replicationFactor,
Map<String, String> configs) {
- this(partitions, replicationFactor, Collections.<Integer, List<Integer>>emptyMap(), configs);
+ this(partitions, replicationFactor, Collections.emptyMap(), configs);
}
public TopicDetails(int partitions,
short replicationFactor) {
- this(partitions, replicationFactor, Collections.<String, String>emptyMap());
+ this(partitions, replicationFactor, Collections.emptyMap());
}
public TopicDetails(Map<Integer, List<Integer>> replicasAssignments,
@@ -134,7 +134,7 @@ public class CreateTopicsRequest extends AbstractRequest {
}
public TopicDetails(Map<Integer, List<Integer>> replicasAssignments) {
- this(replicasAssignments, Collections.<String, String>emptyMap());
+ this(replicasAssignments, Collections.emptyMap());
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
index c3fc194..2ed7c24 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
@@ -140,7 +140,7 @@ public class DeleteAclsRequest extends AbstractRequest {
List<DeleteAclsResponse.AclFilterResponse> responses = new ArrayList<>();
for (int i = 0; i < filters.size(); i++) {
responses.add(new DeleteAclsResponse.AclFilterResponse(
- ApiError.fromThrowable(throwable), Collections.<DeleteAclsResponse.AclDeletionResult>emptySet()));
+ ApiError.fromThrowable(throwable), Collections.emptySet()));
}
return new DeleteAclsResponse(throttleTimeMs, responses);
default:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
index c86b141..64ed27b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
@@ -82,7 +82,7 @@ public class DescribeDelegationTokenResponse extends AbstractResponse {
}
public DescribeDelegationTokenResponse(int throttleTimeMs, Errors error) {
- this(throttleTimeMs, error, new ArrayList<DelegationToken>());
+ this(throttleTimeMs, error, new ArrayList<>());
}
public DescribeDelegationTokenResponse(Struct struct) {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index fbdfaa3..06c2471 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -196,11 +196,11 @@ public class DescribeGroupsResponse extends AbstractResponse {
public static GroupMetadata forError(Errors error) {
return new DescribeGroupsResponse.GroupMetadata(
- error,
- DescribeGroupsResponse.UNKNOWN_STATE,
- DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
- DescribeGroupsResponse.UNKNOWN_PROTOCOL,
- Collections.<DescribeGroupsResponse.GroupMember>emptyList());
+ error,
+ DescribeGroupsResponse.UNKNOWN_STATE,
+ DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
+ DescribeGroupsResponse.UNKNOWN_PROTOCOL,
+ Collections.emptyList());
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 366509d..76cc1a3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -201,22 +201,22 @@ public class JoinGroupRequest extends AbstractRequest {
case 0:
case 1:
return new JoinGroupResponse(
- Errors.forException(e),
- JoinGroupResponse.UNKNOWN_GENERATION_ID,
- JoinGroupResponse.UNKNOWN_PROTOCOL,
- JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
- JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
- Collections.<String, ByteBuffer>emptyMap());
+ Errors.forException(e),
+ JoinGroupResponse.UNKNOWN_GENERATION_ID,
+ JoinGroupResponse.UNKNOWN_PROTOCOL,
+ JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
+ JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
+ Collections.emptyMap());
case 2:
case 3:
return new JoinGroupResponse(
- throttleTimeMs,
- Errors.forException(e),
- JoinGroupResponse.UNKNOWN_GENERATION_ID,
- JoinGroupResponse.UNKNOWN_PROTOCOL,
- JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
- JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
- Collections.<String, ByteBuffer>emptyMap());
+ throttleTimeMs,
+ Errors.forException(e),
+ JoinGroupResponse.UNKNOWN_GENERATION_ID,
+ JoinGroupResponse.UNKNOWN_PROTOCOL,
+ JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
+ JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
+ Collections.emptyMap());
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
index 27254cb..baab1e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -71,10 +71,10 @@ public class ListGroupsRequest extends AbstractRequest {
short versionId = version();
switch (versionId) {
case 0:
- return new ListGroupsResponse(Errors.forException(e), Collections.<ListGroupsResponse.Group>emptyList());
+ return new ListGroupsResponse(Errors.forException(e), Collections.emptyList());
case 1:
case 2:
- return new ListGroupsResponse(throttleTimeMs, Errors.forException(e), Collections.<ListGroupsResponse.Group>emptyList());
+ return new ListGroupsResponse(throttleTimeMs, Errors.forException(e), Collections.emptyList());
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.LIST_GROUPS.latestVersion()));
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
index 0a4611f..2ea9032 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
@@ -93,9 +93,7 @@ public class ResourceFilter {
public boolean matches(Resource other) {
if ((name != null) && (!name.equals(other.name())))
return false;
- if ((resourceType != ResourceType.ANY) && (!resourceType.equals(other.resourceType())))
- return false;
- return true;
+ return (resourceType == ResourceType.ANY) || (resourceType.equals(other.resourceType()));
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java b/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
index 849a978..231ee16 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
@@ -140,7 +140,7 @@ public class JaasContext {
* The type of the SASL login context, it should be SERVER for the broker and CLIENT for the clients (consumer, producer,
* etc.). This is used to validate behaviour (e.g. some functionality is only available in the broker or clients).
*/
- public enum Type { CLIENT, SERVER; }
+ public enum Type { CLIENT, SERVER }
private final String name;
private final Type type;
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java
index 96e7426..28923ab 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/CredentialCache.java
@@ -23,7 +23,7 @@ public class CredentialCache {
private final ConcurrentHashMap<String, Cache<? extends Object>> cacheMap = new ConcurrentHashMap<>();
public <C> Cache<C> createCache(String mechanism, Class<C> credentialClass) {
- Cache<C> cache = new Cache<C>(credentialClass);
+ Cache<C> cache = new Cache<>(credentialClass);
Cache<C> oldCache = (Cache<C>) cacheMap.putIfAbsent(mechanism, cache);
return oldCache == null ? cache : oldCache;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
index 2bc44c5..7bb7be7 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
@@ -16,16 +16,6 @@
*/
package org.apache.kafka.common.security.authenticator;
-import javax.security.auth.Subject;
-import javax.security.auth.login.LoginException;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-
-
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.types.Password;
@@ -39,6 +29,13 @@ import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
public class LoginManager {
private static final Logger LOGGER = LoggerFactory.getLogger(LoginManager.class);
@@ -55,7 +52,7 @@ public class LoginManager {
private int refCount;
private LoginManager(JaasContext jaasContext, String saslMechanism, Map<String, ?> configs,
- LoginMetadata<?> loginMetadata) throws IOException, LoginException {
+ LoginMetadata<?> loginMetadata) throws LoginException {
this.loginMetadata = loginMetadata;
this.login = Utils.newInstance(loginMetadata.loginClass);
loginCallbackHandler = Utils.newInstance(loginMetadata.loginCallbackClass);
@@ -89,7 +86,7 @@ public class LoginManager {
*/
public static LoginManager acquireLoginManager(JaasContext jaasContext, String saslMechanism,
Class<? extends Login> defaultLoginClass,
- Map<String, ?> configs) throws IOException, LoginException {
+ Map<String, ?> configs) throws LoginException {
Class<? extends Login> loginClass = configuredClassOrDefault(configs, jaasContext,
saslMechanism, SaslConfigs.SASL_LOGIN_CLASS, defaultLoginClass);
Class<? extends AuthenticateCallbackHandler> defaultLoginCallbackHandlerClass = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 8934e8e..fb74f5f 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -113,7 +113,7 @@ public class SaslClientAuthenticator implements Authenticator {
String host,
String mechanism,
boolean handshakeRequestEnable,
- TransportLayer transportLayer) throws IOException {
+ TransportLayer transportLayer) {
this.node = node;
this.subject = subject;
this.callbackHandler = callbackHandler;
@@ -144,13 +144,11 @@ public class SaslClientAuthenticator implements Authenticator {
private SaslClient createSaslClient() {
try {
- return Subject.doAs(subject, new PrivilegedExceptionAction<SaslClient>() {
- public SaslClient run() throws SaslException {
- String[] mechs = {mechanism};
- LOG.debug("Creating SaslClient: client={};service={};serviceHostname={};mechs={}",
- clientPrincipalName, servicePrincipal, host, Arrays.toString(mechs));
- return Sasl.createSaslClient(mechs, clientPrincipalName, servicePrincipal, host, configs, callbackHandler);
- }
+ return Subject.doAs(subject, (PrivilegedExceptionAction<SaslClient>) () -> {
+ String[] mechs = {mechanism};
+ LOG.debug("Creating SaslClient: client={};service={};serviceHostname={};mechs={}",
+ clientPrincipalName, servicePrincipal, host, Arrays.toString(mechs));
+ return Sasl.createSaslClient(mechs, clientPrincipalName, servicePrincipal, host, configs, callbackHandler);
});
} catch (PrivilegedActionException e) {
throw new SaslAuthenticationException("Failed to create SaslClient with mechanism " + mechanism, e.getCause());
@@ -354,11 +352,7 @@ public class SaslClientAuthenticator implements Authenticator {
if (isInitial && !saslClient.hasInitialResponse())
return saslToken;
else
- return Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
- public byte[] run() throws SaslException {
- return saslClient.evaluateChallenge(saslToken);
- }
- });
+ return Subject.doAs(subject, (PrivilegedExceptionAction<byte[]>) () -> saslClient.evaluateChallenge(saslToken));
} catch (PrivilegedActionException e) {
String error = "An error: (" + e + ") occurred when evaluating SASL token received from the Kafka Broker.";
KerberosError kerberosError = KerberosError.fromException(e);
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 43cb0a4..48a49fe 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -129,7 +129,7 @@ public class SaslServerAuthenticator implements Authenticator {
KerberosShortNamer kerberosNameParser,
ListenerName listenerName,
SecurityProtocol securityProtocol,
- TransportLayer transportLayer) throws IOException {
+ TransportLayer transportLayer) {
this.callbackHandlers = callbackHandlers;
this.connectionId = connectionId;
this.subjects = subjects;
@@ -164,12 +164,8 @@ public class SaslServerAuthenticator implements Authenticator {
saslServer = createSaslKerberosServer(callbackHandler, configs, subject);
} else {
try {
- saslServer = Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
- public SaslServer run() throws SaslException {
- return Sasl.createSaslServer(saslMechanism, "kafka", serverAddress().getHostName(),
- configs, callbackHandler);
- }
- });
+ saslServer = Subject.doAs(subject, (PrivilegedExceptionAction<SaslServer>) () ->
+ Sasl.createSaslServer(saslMechanism, "kafka", serverAddress().getHostName(), configs, callbackHandler));
} catch (PrivilegedActionException e) {
throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e.getCause());
}
@@ -212,11 +208,8 @@ public class SaslServerAuthenticator implements Authenticator {
}
try {
- return Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
- public SaslServer run() throws SaslException {
- return Sasl.createSaslServer(saslMechanism, servicePrincipalName, serviceHostname, configs, saslServerCallbackHandler);
- }
- });
+ return Subject.doAs(subject, (PrivilegedExceptionAction<SaslServer>) () ->
+ Sasl.createSaslServer(saslMechanism, servicePrincipalName, serviceHostname, configs, saslServerCallbackHandler));
} catch (PrivilegedActionException e) {
throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e.getCause());
}
@@ -308,11 +301,11 @@ public class SaslServerAuthenticator implements Authenticator {
saslServer.dispose();
}
- private void setSaslState(SaslState saslState) throws IOException {
+ private void setSaslState(SaslState saslState) {
setSaslState(saslState, null);
}
- private void setSaslState(SaslState saslState, AuthenticationException exception) throws IOException {
+ private void setSaslState(SaslState saslState, AuthenticationException exception) {
if (netOutBuffer != null && !netOutBuffer.completed()) {
pendingSaslState = saslState;
pendingException = exception;
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
index ec996a8..2faf6be 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
@@ -134,127 +134,125 @@ public class KerberosLogin extends AbstractLogin {
// TGT's existing expiry date and the configured minTimeBeforeRelogin. For testing and development,
// you can decrease the interval of expiration of tickets (for example, to 3 minutes) by running:
// "modprinc -maxlife 3mins <principal>" in kadmin.
- t = KafkaThread.daemon(String.format("kafka-kerberos-refresh-thread-%s", principal), new Runnable() {
- public void run() {
- log.info("[Principal={}]: TGT refresh thread started.", principal);
- while (true) { // renewal thread's main loop. if it exits from here, thread will exit.
- KerberosTicket tgt = getTGT();
- long now = currentWallTime();
- long nextRefresh;
- Date nextRefreshDate;
- if (tgt == null) {
- nextRefresh = now + minTimeBeforeRelogin;
- nextRefreshDate = new Date(nextRefresh);
- log.warn("[Principal={}]: No TGT found: will try again at {}", principal, nextRefreshDate);
+ t = KafkaThread.daemon(String.format("kafka-kerberos-refresh-thread-%s", principal), () -> {
+ log.info("[Principal={}]: TGT refresh thread started.", principal);
+ while (true) { // renewal thread's main loop. if it exits from here, thread will exit.
+ KerberosTicket tgt = getTGT();
+ long now = currentWallTime();
+ long nextRefresh;
+ Date nextRefreshDate;
+ if (tgt == null) {
+ nextRefresh = now + minTimeBeforeRelogin;
+ nextRefreshDate = new Date(nextRefresh);
+ log.warn("[Principal={}]: No TGT found: will try again at {}", principal, nextRefreshDate);
+ } else {
+ nextRefresh = getRefreshTime(tgt);
+ long expiry = tgt.getEndTime().getTime();
+ Date expiryDate = new Date(expiry);
+ if (isUsingTicketCache && tgt.getRenewTill() != null && tgt.getRenewTill().getTime() < expiry) {
+ log.warn("The TGT cannot be renewed beyond the next expiry date: {}." +
+ "This process will not be able to authenticate new SASL connections after that " +
+ "time (for example, it will not be able to authenticate a new connection with a Kafka " +
+ "Broker). Ask your system administrator to either increase the " +
+ "'renew until' time by doing : 'modprinc -maxrenewlife {} ' within " +
+ "kadmin, or instead, to generate a keytab for {}. Because the TGT's " +
+ "expiry cannot be further extended by refreshing, exiting refresh thread now.",
+ expiryDate, principal, principal);
+ return;
+ }
+ // determine how long to sleep from looking at ticket's expiry.
+ // We should not allow the ticket to expire, but we should take into consideration
+ // minTimeBeforeRelogin. Will not sleep less than minTimeBeforeRelogin, unless doing so
+ // would cause ticket expiration.
+ if ((nextRefresh > expiry) || (now + minTimeBeforeRelogin > expiry)) {
+ // expiry is before next scheduled refresh).
+ log.info("[Principal={}]: Refreshing now because expiry is before next scheduled refresh time.", principal);
+ nextRefresh = now;
} else {
- nextRefresh = getRefreshTime(tgt);
- long expiry = tgt.getEndTime().getTime();
- Date expiryDate = new Date(expiry);
- if (isUsingTicketCache && tgt.getRenewTill() != null && tgt.getRenewTill().getTime() < expiry) {
- log.warn("The TGT cannot be renewed beyond the next expiry date: {}." +
- "This process will not be able to authenticate new SASL connections after that " +
- "time (for example, it will not be able to authenticate a new connection with a Kafka " +
- "Broker). Ask your system administrator to either increase the " +
- "'renew until' time by doing : 'modprinc -maxrenewlife {} ' within " +
- "kadmin, or instead, to generate a keytab for {}. Because the TGT's " +
- "expiry cannot be further extended by refreshing, exiting refresh thread now.",
- expiryDate, principal, principal);
- return;
- }
- // determine how long to sleep from looking at ticket's expiry.
- // We should not allow the ticket to expire, but we should take into consideration
- // minTimeBeforeRelogin. Will not sleep less than minTimeBeforeRelogin, unless doing so
- // would cause ticket expiration.
- if ((nextRefresh > expiry) || (now + minTimeBeforeRelogin > expiry)) {
- // expiry is before next scheduled refresh).
- log.info("[Principal={}]: Refreshing now because expiry is before next scheduled refresh time.", principal);
- nextRefresh = now;
- } else {
- if (nextRefresh < (now + minTimeBeforeRelogin)) {
- // next scheduled refresh is sooner than (now + MIN_TIME_BEFORE_LOGIN).
- Date until = new Date(nextRefresh);
- Date newUntil = new Date(now + minTimeBeforeRelogin);
- log.warn("[Principal={}]: TGT refresh thread time adjusted from {} to {} since the former is sooner " +
- "than the minimum refresh interval ({} seconds) from now.",
- principal, until, newUntil, minTimeBeforeRelogin / 1000);
- }
- nextRefresh = Math.max(nextRefresh, now + minTimeBeforeRelogin);
- }
- nextRefreshDate = new Date(nextRefresh);
- if (nextRefresh > expiry) {
- log.error("[Principal={}]: Next refresh: {} is later than expiry {}. This may indicate a clock skew problem." +
- "Check that this host and the KDC hosts' clocks are in sync. Exiting refresh thread.",
- principal, nextRefreshDate, expiryDate);
- return;
+ if (nextRefresh < (now + minTimeBeforeRelogin)) {
+ // next scheduled refresh is sooner than (now + MIN_TIME_BEFORE_LOGIN).
+ Date until = new Date(nextRefresh);
+ Date newUntil = new Date(now + minTimeBeforeRelogin);
+ log.warn("[Principal={}]: TGT refresh thread time adjusted from {} to {} since the former is sooner " +
+ "than the minimum refresh interval ({} seconds) from now.",
+ principal, until, newUntil, minTimeBeforeRelogin / 1000);
}
+ nextRefresh = Math.max(nextRefresh, now + minTimeBeforeRelogin);
}
- if (now < nextRefresh) {
- Date until = new Date(nextRefresh);
- log.info("[Principal={}]: TGT refresh sleeping until: {}", principal, until);
- try {
- Thread.sleep(nextRefresh - now);
- } catch (InterruptedException ie) {
- log.warn("[Principal={}]: TGT renewal thread has been interrupted and will exit.", principal);
- return;
- }
- } else {
- log.error("[Principal={}]: NextRefresh: {} is in the past: exiting refresh thread. Check"
- + " clock sync between this host and KDC - (KDC's clock is likely ahead of this host)."
- + " Manual intervention will be required for this client to successfully authenticate."
- + " Exiting refresh thread.", principal, nextRefreshDate);
+ nextRefreshDate = new Date(nextRefresh);
+ if (nextRefresh > expiry) {
+ log.error("[Principal={}]: Next refresh: {} is later than expiry {}. This may indicate a clock skew problem." +
+ "Check that this host and the KDC hosts' clocks are in sync. Exiting refresh thread.",
+ principal, nextRefreshDate, expiryDate);
+ return;
+ }
+ }
+ if (now < nextRefresh) {
+ Date until = new Date(nextRefresh);
+ log.info("[Principal={}]: TGT refresh sleeping until: {}", principal, until);
+ try {
+ Thread.sleep(nextRefresh - now);
+ } catch (InterruptedException ie) {
+ log.warn("[Principal={}]: TGT renewal thread has been interrupted and will exit.", principal);
return;
}
- if (isUsingTicketCache) {
- String kinitArgs = "-R";
- int retry = 1;
- while (retry >= 0) {
- try {
- log.debug("[Principal={}]: Running ticket cache refresh command: {} {}", principal, kinitCmd, kinitArgs);
- Shell.execCommand(kinitCmd, kinitArgs);
- break;
- } catch (Exception e) {
- if (retry > 0) {
- --retry;
- // sleep for 10 seconds
- try {
- Thread.sleep(10 * 1000);
- } catch (InterruptedException ie) {
- log.error("[Principal={}]: Interrupted while renewing TGT, exiting Login thread", principal);
- return;
- }
- } else {
- log.warn("[Principal={}]: Could not renew TGT due to problem running shell command: '{} {}'. " +
- "Exiting refresh thread.", principal, kinitCmd, kinitArgs, e);
+ } else {
+ log.error("[Principal={}]: NextRefresh: {} is in the past: exiting refresh thread. Check"
+ + " clock sync between this host and KDC - (KDC's clock is likely ahead of this host)."
+ + " Manual intervention will be required for this client to successfully authenticate."
+ + " Exiting refresh thread.", principal, nextRefreshDate);
+ return;
+ }
+ if (isUsingTicketCache) {
+ String kinitArgs = "-R";
+ int retry = 1;
+ while (retry >= 0) {
+ try {
+ log.debug("[Principal={}]: Running ticket cache refresh command: {} {}", principal, kinitCmd, kinitArgs);
+ Shell.execCommand(kinitCmd, kinitArgs);
+ break;
+ } catch (Exception e) {
+ if (retry > 0) {
+ --retry;
+ // sleep for 10 seconds
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException ie) {
+ log.error("[Principal={}]: Interrupted while renewing TGT, exiting Login thread", principal);
return;
}
+ } else {
+ log.warn("[Principal={}]: Could not renew TGT due to problem running shell command: '{} {}'. " +
+ "Exiting refresh thread.", principal, kinitCmd, kinitArgs, e);
+ return;
}
}
}
- try {
- int retry = 1;
- while (retry >= 0) {
- try {
- reLogin();
- break;
- } catch (LoginException le) {
- if (retry > 0) {
- --retry;
- // sleep for 10 seconds.
- try {
- Thread.sleep(10 * 1000);
- } catch (InterruptedException e) {
- log.error("[Principal={}]: Interrupted during login retry after LoginException:", principal, le);
- throw le;
- }
- } else {
- log.error("[Principal={}]: Could not refresh TGT.", principal, le);
+ }
+ try {
+ int retry = 1;
+ while (retry >= 0) {
+ try {
+ reLogin();
+ break;
+ } catch (LoginException le) {
+ if (retry > 0) {
+ --retry;
+ // sleep for 10 seconds.
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException e) {
+ log.error("[Principal={}]: Interrupted during login retry after LoginException:", principal, le);
+ throw le;
}
+ } else {
+ log.error("[Principal={}]: Could not refresh TGT.", principal, le);
}
}
- } catch (LoginException le) {
- log.error("[Principal={}]: Failed to refresh TGT: refresh thread exiting now.", principal, le);
- return;
}
+ } catch (LoginException le) {
+ log.error("[Principal={}]: Failed to refresh TGT: refresh thread exiting now.", principal, le);
+ return;
}
}
});
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
index ac273cc..1dcd199 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
@@ -121,14 +121,14 @@ import org.slf4j.LoggerFactory;
* <p>
* Here is a typical, basic JAAS configuration for a client leveraging unsecured
* SASL/OAUTHBEARER authentication:
- *
+ *
* <pre>
* KafkaClient {
* org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
* unsecuredLoginStringClaim_sub="thePrincipalName";
* };
* </pre>
- *
+ *
* An implementation of the {@link Login} interface specific to the
* {@code OAUTHBEARER} mechanism is automatically applied; it periodically
* refreshes any token before it expires so that the client can continue to make
@@ -196,14 +196,14 @@ import org.slf4j.LoggerFactory;
* <p>
* Here is a typical, basic JAAS configuration for a broker leveraging unsecured
* SASL/OAUTHBEARER validation:
- *
+ *
* <pre>
* KafkaServer {
* org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
* unsecuredLoginStringClaim_sub="thePrincipalName";
* };
* </pre>
- *
+ *
* Production use cases will require writing an implementation of
* {@link AuthenticateCallbackHandler} that can handle an instance of
* {@link OAuthBearerValidatorCallback} and declaring it via the
@@ -229,7 +229,7 @@ import org.slf4j.LoggerFactory;
* Better to mitigate this possibility by leaving the existing token (which
* still has some lifetime left) in place until a new replacement token is
* actually retrieved. This implementation supports this.
- *
+ *
* @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC
* @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC
* @see SaslConfigs#SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC
@@ -350,7 +350,7 @@ public class OAuthBearerLoginModule implements LoginModule {
}
@Override
- public boolean commit() throws LoginException {
+ public boolean commit() {
if (tokenRequiringCommit == null) {
if (log.isDebugEnabled())
log.debug("Nothing here to commit");
@@ -371,7 +371,7 @@ public class OAuthBearerLoginModule implements LoginModule {
}
@Override
- public boolean abort() throws LoginException {
+ public boolean abort() {
if (tokenRequiringCommit != null) {
log.info("Login aborted");
tokenRequiringCommit = null;
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClient.java
index 16db3c8..e32e16d 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClient.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClient.java
@@ -58,7 +58,7 @@ public class OAuthBearerSaslClient implements SaslClient {
enum State {
SEND_CLIENT_FIRST_MESSAGE, RECEIVE_SERVER_FIRST_MESSAGE, RECEIVE_SERVER_MESSAGE_AFTER_FAILURE, COMPLETE, FAILED
- };
+ }
private State state;
@@ -127,14 +127,14 @@ public class OAuthBearerSaslClient implements SaslClient {
}
@Override
- public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
+ public byte[] unwrap(byte[] incoming, int offset, int len) {
if (!isComplete())
throw new IllegalStateException("Authentication exchange has not completed");
return Arrays.copyOfRange(incoming, offset, offset + len);
}
@Override
- public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
+ public byte[] wrap(byte[] outgoing, int offset, int len) {
if (!isComplete())
throw new IllegalStateException("Authentication exchange has not completed");
return Arrays.copyOfRange(outgoing, offset, offset + len);
@@ -148,7 +148,7 @@ public class OAuthBearerSaslClient implements SaslClient {
}
@Override
- public void dispose() throws SaslException {
+ public void dispose() {
}
private void setState(State state) {
@@ -173,7 +173,7 @@ public class OAuthBearerSaslClient implements SaslClient {
public static class OAuthBearerSaslClientFactory implements SaslClientFactory {
@Override
public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol,
- String serverName, Map<String, ?> props, CallbackHandler callbackHandler) throws SaslException {
+ String serverName, Map<String, ?> props, CallbackHandler callbackHandler) {
String[] mechanismNamesCompatibleWithPolicy = getMechanismNames(props);
for (String mechanism : mechanisms) {
for (int i = 0; i < mechanismNamesCompatibleWithPolicy.length; i++) {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
index ab2b716..352e6b6 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
@@ -53,7 +53,7 @@ public class OAuthBearerSaslClientCallbackHandler implements AuthenticateCallbac
/**
* Return true if this instance has been configured, otherwise false
- *
+ *
* @return true if this instance has been configured, otherwise false
*/
public boolean configured() {
@@ -91,8 +91,8 @@ public class OAuthBearerSaslClientCallbackHandler implements AuthenticateCallbac
throw new IllegalArgumentException("Callback had a token already");
Subject subject = Subject.getSubject(AccessController.getContext());
Set<OAuthBearerToken> privateCredentials = subject != null
- ? subject.getPrivateCredentials(OAuthBearerToken.class)
- : Collections.<OAuthBearerToken>emptySet();
+ ? subject.getPrivateCredentials(OAuthBearerToken.class)
+ : Collections.emptySet();
if (privateCredentials.size() != 1)
throw new IOException(
String.format("Unable to find OAuth Bearer token in Subject's private credentials (size=%d)",
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServer.java
index 8f89c14..db332b4 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServer.java
@@ -128,21 +128,21 @@ public class OAuthBearerSaslServer implements SaslServer {
}
@Override
- public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
+ public byte[] unwrap(byte[] incoming, int offset, int len) {
if (!complete)
throw new IllegalStateException("Authentication exchange has not completed");
return Arrays.copyOfRange(incoming, offset, offset + len);
}
@Override
- public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
+ public byte[] wrap(byte[] outgoing, int offset, int len) {
if (!complete)
throw new IllegalStateException("Authentication exchange has not completed");
return Arrays.copyOfRange(outgoing, offset, offset + len);
}
@Override
- public void dispose() throws SaslException {
+ public void dispose() {
complete = false;
tokenForNegotiatedProperty = null;
extensions = null;
@@ -225,7 +225,7 @@ public class OAuthBearerSaslServer implements SaslServer {
public static class OAuthBearerSaslServerFactory implements SaslServerFactory {
@Override
public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map<String, ?> props,
- CallbackHandler callbackHandler) throws SaslException {
+ CallbackHandler callbackHandler) {
String[] mechanismNamesCompatibleWithPolicy = getMechanismNames(props);
for (int i = 0; i < mechanismNamesCompatibleWithPolicy.length; i++) {
if (mechanismNamesCompatibleWithPolicy[i].equals(mechanism)) {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
index b764e54..b5b3016 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
@@ -35,13 +35,12 @@ import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeType;
-import com.fasterxml.jackson.databind.node.NumericNode;
/**
* A simple unsecured JWS implementation. The '{@code nbf}' claim is ignored if
* it is given because the related logic is not required for Kafka testing and
* development purposes.
- *
+ *
* @see <a href="https://tools.ietf.org/html/rfc7515">RFC 7515</a>
*/
public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
@@ -58,7 +57,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
/**
* Constructor with the given principal and scope claim names
- *
+ *
* @param compactSerialization
* the compact serialization to parse as an unsecured JWS
* @param principalClaimName
@@ -118,7 +117,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
/**
* Return the 3 or 5 dot-separated sections of the JWT compact serialization
- *
+ *
* @return the 3 or 5 dot-separated sections of the JWT compact serialization
*/
public List<String> splits() {
@@ -127,7 +126,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
/**
* Return the JOSE Header as a {@code Map}
- *
+ *
* @return the JOSE header
*/
public Map<String, Object> header() {
@@ -156,7 +155,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
/**
* Return the JWT Claim Set as a {@code Map}
- *
+ *
* @return the (always non-null but possibly empty) claims
*/
public Map<String, Object> claims() {
@@ -165,7 +164,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
/**
* Return the (always non-null/non-empty) principal claim name
- *
+ *
* @return the (always non-null/non-empty) principal claim name
*/
public String principalClaimName() {
@@ -174,7 +173,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
/**
* Return the (always non-null/non-empty) scope claim name
- *
+ *
* @return the (always non-null/non-empty) scope claim name
*/
public String scopeClaimName() {
@@ -183,7 +182,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
/**
* Indicate if the claim exists and is the given type
- *
+ *
* @param claimName
* the mandatory JWT claim name
* @param type
@@ -205,7 +204,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
/**
* Extract a claim of the given type
- *
+ *
* @param claimName
* the mandatory JWT claim name
* @param type
@@ -228,7 +227,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
/**
* Extract a claim in its raw form
- *
+ *
* @param claimName
* the mandatory JWT claim name
* @return the raw claim value, if it exists, otherwise null
@@ -241,7 +240,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
* Return the
* <a href="https://tools.ietf.org/html/rfc7519#section-4.1.4">Expiration
* Time</a> claim
- *
+ *
* @return the <a href=
* "https://tools.ietf.org/html/rfc7519#section-4.1.4">Expiration
* Time</a> claim if available, otherwise null
@@ -255,7 +254,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
/**
* Return the <a href="https://tools.ietf.org/html/rfc7519#section-4.1.6">Issued
* At</a> claim
- *
+ *
* @return the
* <a href= "https://tools.ietf.org/html/rfc7519#section-4.1.6">Issued
* At</a> claim if available, otherwise null
@@ -269,7 +268,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
/**
* Return the
* <a href="https://tools.ietf.org/html/rfc7519#section-4.1.2">Subject</a> claim
- *
+ *
* @return the <a href=
* "https://tools.ietf.org/html/rfc7519#section-4.1.2">Subject</a> claim
* if available, otherwise null
@@ -284,7 +283,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
* Decode the given Base64URL-encoded value, parse the resulting JSON as a JSON
* object, and return the map of member names to their values (each value being
* represented as either a String, a Number, or a List of Strings).
- *
+ *
* @param split
* the value to decode and parse
* @return the map of JSON member names to their String, Number, or String List
@@ -326,12 +325,11 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
private static Object convert(JsonNode value) {
if (value.isArray()) {
List<String> retvalList = new ArrayList<>();
- for (JsonNode arrayElement : value) {
+ for (JsonNode arrayElement : value)
retvalList.add(arrayElement.asText());
- }
return retvalList;
}
- return value.getNodeType() == JsonNodeType.NUMBER ? ((NumericNode) value).numberValue() : value.asText();
+ return value.getNodeType() == JsonNodeType.NUMBER ? value.numberValue() : value.asText();
}
private Long calculateStartTimeMs() throws OAuthBearerIllegalTokenException {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
index 83b2602..8d259e3 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
@@ -79,7 +79,7 @@ import org.slf4j.LoggerFactory;
* be something other than '{@code scope}'</li>
* </ul>
* For example:
- *
+ *
* <pre>
* KafkaClient {
* org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
@@ -89,7 +89,7 @@ import org.slf4j.LoggerFactory;
* unsecuredLoginExtension_traceId="123";
* };
* </pre>
- *
+ *
* This class is the default when the SASL mechanism is OAUTHBEARER and no value
* is explicitly set via either the {@code sasl.login.callback.handler.class}
* client configuration property or the
@@ -122,7 +122,7 @@ public class OAuthBearerUnsecuredLoginCallbackHandler implements AuthenticateCal
/**
* For testing
- *
+ *
* @param time
* the mandatory time to set
*/
@@ -132,7 +132,7 @@ public class OAuthBearerUnsecuredLoginCallbackHandler implements AuthenticateCal
/**
* Return true if this instance has been configured, otherwise false
- *
+ *
* @return true if this instance has been configured, otherwise false
*/
public boolean configured() {
@@ -179,7 +179,7 @@ public class OAuthBearerUnsecuredLoginCallbackHandler implements AuthenticateCal
// empty
}
- private void handleTokenCallback(OAuthBearerTokenCallback callback) throws IOException {
+ private void handleTokenCallback(OAuthBearerTokenCallback callback) {
if (callback.token() != null)
throw new IllegalArgumentException("Callback had a token already");
String principalClaimNameValue = optionValue(PRINCIPAL_CLAIM_NAME_OPTION);
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
index 2e21cf4..54e289c 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.security.oauthbearer.internals.unsecured;
-import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -59,7 +58,7 @@ import org.slf4j.LoggerFactory;
* clock skew (the default is 0)</li>
* <ul>
* For example:
- *
+ *
* <pre>
* KafkaServer {
* org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
@@ -91,7 +90,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandler implements Authenticat
/**
* For testing
- *
+ *
* @param time
* the mandatory time to set
*/
@@ -101,7 +100,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandler implements Authenticat
/**
* Return true if this instance has been configured, otherwise false
- *
+ *
* @return true if this instance has been configured, otherwise false
*/
public boolean configured() {
@@ -124,7 +123,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandler implements Authenticat
}
@Override
- public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
if (!configured())
throw new IllegalStateException("Callback handler not configured");
for (Callback callback : callbacks) {
@@ -195,8 +194,8 @@ public class OAuthBearerUnsecuredValidatorCallbackHandler implements Authenticat
private List<String> requiredScope() {
String requiredSpaceDelimitedScope = option(REQUIRED_SCOPE_OPTION);
List<String> requiredScope = requiredSpaceDelimitedScope == null || requiredSpaceDelimitedScope.trim().isEmpty()
- ? Collections.<String>emptyList()
- : OAuthBearerScopeUtils.parseScope(requiredSpaceDelimitedScope.trim());
+ ? Collections.emptyList()
+ : OAuthBearerScopeUtils.parseScope(requiredSpaceDelimitedScope.trim());
return requiredScope;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainLoginModule.java b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainLoginModule.java
index 6c32ff1..4085168 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainLoginModule.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainLoginModule.java
@@ -22,7 +22,6 @@ import java.util.Map;
import javax.security.auth.Subject;
import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.login.LoginException;
import javax.security.auth.spi.LoginModule;
public class PlainLoginModule implements LoginModule {
@@ -45,22 +44,22 @@ public class PlainLoginModule implements LoginModule {
}
@Override
- public boolean login() throws LoginException {
+ public boolean login() {
return true;
}
@Override
- public boolean logout() throws LoginException {
+ public boolean logout() {
return true;
}
@Override
- public boolean commit() throws LoginException {
+ public boolean commit() {
return true;
}
@Override
- public boolean abort() throws LoginException {
+ public boolean abort() {
return false;
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainSaslServer.java
index 054d411..18df038 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/plain/internals/PlainSaslServer.java
@@ -143,21 +143,21 @@ public class PlainSaslServer implements SaslServer {
}
@Override
- public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
+ public byte[] unwrap(byte[] incoming, int offset, int len) {
if (!complete)
throw new IllegalStateException("Authentication exchange has not completed");
return Arrays.copyOfRange(incoming, offset, offset + len);
}
@Override
- public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
+ public byte[] wrap(byte[] outgoing, int offset, int len) {
if (!complete)
throw new IllegalStateException("Authentication exchange has not completed");
return Arrays.copyOfRange(outgoing, offset, offset + len);
}
@Override
- public void dispose() throws SaslException {
+ public void dispose() {
}
public static class PlainSaslServerFactory implements SaslServerFactory {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
index 3d7de58..104b3fc 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
@@ -24,7 +24,6 @@ import java.util.Map;
import javax.security.auth.Subject;
import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.login.LoginException;
import javax.security.auth.spi.LoginModule;
public class ScramLoginModule implements LoginModule {
@@ -55,22 +54,22 @@ public class ScramLoginModule implements LoginModule {
}
@Override
- public boolean login() throws LoginException {
+ public boolean login() {
return true;
}
@Override
- public boolean logout() throws LoginException {
+ public boolean logout() {
return true;
}
@Override
- public boolean commit() throws LoginException {
+ public boolean commit() {
return true;
}
@Override
- public boolean abort() throws LoginException {
+ public boolean abort() {
return false;
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
index 7b51890..439bcfe 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
@@ -26,7 +26,7 @@ import java.util.Map;
public class ScramExtensions extends SaslExtensions {
public ScramExtensions() {
- this(Collections.<String, String>emptyMap());
+ this(Collections.emptyMap());
}
public ScramExtensions(String extensions) {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java
index bd3b704..c21a52e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslClient.java
@@ -59,7 +59,7 @@ public class ScramSaslClient implements SaslClient {
RECEIVE_SERVER_FINAL_MESSAGE,
COMPLETE,
FAILED
- };
+ }
private final ScramMechanism mechanism;
private final CallbackHandler callbackHandler;
@@ -157,14 +157,14 @@ public class ScramSaslClient implements SaslClient {
}
@Override
- public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
+ public byte[] unwrap(byte[] incoming, int offset, int len) {
if (!isComplete())
throw new IllegalStateException("Authentication exchange has not completed");
return Arrays.copyOfRange(incoming, offset, offset + len);
}
@Override
- public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
+ public byte[] wrap(byte[] outgoing, int offset, int len) {
if (!isComplete())
throw new IllegalStateException("Authentication exchange has not completed");
return Arrays.copyOfRange(outgoing, offset, offset + len);
@@ -178,7 +178,7 @@ public class ScramSaslClient implements SaslClient {
}
@Override
- public void dispose() throws SaslException {
+ public void dispose() {
}
private void setState(State state) {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java
index b11300a..217dab7 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java
@@ -62,7 +62,7 @@ public class ScramSaslServer implements SaslServer {
RECEIVE_CLIENT_FINAL_MESSAGE,
COMPLETE,
FAILED
- };
+ }
private final ScramMechanism mechanism;
private final ScramFormatter formatter;
@@ -194,21 +194,21 @@ public class ScramSaslServer implements SaslServer {
}
@Override
- public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException {
+ public byte[] unwrap(byte[] incoming, int offset, int len) {
if (!isComplete())
throw new IllegalStateException("Authentication exchange has not completed");
return Arrays.copyOfRange(incoming, offset, offset + len);
}
@Override
- public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException {
+ public byte[] wrap(byte[] outgoing, int offset, int len) {
if (!isComplete())
throw new IllegalStateException("Authentication exchange has not completed");
return Arrays.copyOfRange(outgoing, offset, offset + len);
}
@Override
- public void dispose() throws SaslException {
+ public void dispose() {
}
private void setState(State state) {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramServerCallbackHandler.java
index f6f5315..63c2b17 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramServerCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramServerCallbackHandler.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.security.scram.internals;
-import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -50,7 +49,7 @@ public class ScramServerCallbackHandler implements AuthenticateCallbackHandler {
}
@Override
- public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
String username = null;
for (Callback callback : callbacks) {
if (callback instanceof NameCallback)
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index 61f2f0d..b1f7df8 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -447,7 +447,7 @@ public class SslFactory implements Reconfigurable {
private final Principal subjectPrincipal;
private final Set<List<?>> subjectAltNames;
- static List<CertificateEntries> create(KeyStore keystore) throws GeneralSecurityException, IOException {
+ static List<CertificateEntries> create(KeyStore keystore) throws GeneralSecurityException {
Enumeration<String> aliases = keystore.aliases();
List<CertificateEntries> entries = new ArrayList<>();
while (aliases.hasMoreElements()) {
@@ -463,7 +463,7 @@ public class SslFactory implements Reconfigurable {
this.subjectPrincipal = cert.getSubjectX500Principal();
Collection<List<?>> altNames = cert.getSubjectAlternativeNames();
// use a set for comparison
- this.subjectAltNames = altNames != null ? new HashSet<>(altNames) : Collections.<List<?>>emptySet();
+ this.subjectAltNames = altNames != null ? new HashSet<>(altNames) : Collections.emptySet();
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
index ffd2af3..8f360ca 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/TokenInformation.java
@@ -87,9 +87,7 @@ public class TokenInformation {
}
public boolean ownerOrRenewer(KafkaPrincipal principal) {
- if (owner.equals(principal) || renewers.contains(principal))
- return true;
- return false;
+ return owner.equals(principal) || renewers.contains(principal);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
index 0a17ecd..17c4ba1 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
@@ -99,8 +99,8 @@ public class AppInfoParser {
}
public interface AppInfoMBean {
- public String getVersion();
- public String getCommitId();
+ String getVersion();
+ String getCommitId();
}
public static class AppInfo implements AppInfoMBean {
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index a6d3e2c..f94858c 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -772,7 +772,7 @@ public final class Utils {
* @return
*/
public static <T> List<T> safe(List<T> other) {
- return other == null ? Collections.<T>emptyList() : other;
+ return other == null ? Collections.emptyList() : other;
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaEntity.java b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaEntity.java
index a5ff082..0f0eb62 100644
--- a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaEntity.java
+++ b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaEntity.java
@@ -28,7 +28,7 @@ public interface ClientQuotaEntity {
/**
* Entity type of a {@link ConfigEntity}
*/
- public enum ConfigEntityType {
+ enum ConfigEntityType {
USER,
CLIENT_ID,
DEFAULT_USER,
@@ -41,7 +41,7 @@ public interface ClientQuotaEntity {
* For example, {user, client-id} quota is represented using two
* instances of ConfigEntity with entity types USER and CLIENT_ID.
*/
- public interface ConfigEntity {
+ interface ConfigEntity {
/**
* Returns the name of this entity. For default quotas, an empty string is returned.
*/
diff --git a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
index 8e07e73..8eafba7 100644
--- a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
@@ -46,7 +46,7 @@ public class ResourceTypeTest {
};
@Test
- public void testIsUnknown() throws Exception {
+ public void testIsUnknown() {
for (AclResourceTypeTestInfo info : INFOS) {
assertEquals(info.resourceType + " was supposed to have unknown == " + info.unknown,
info.unknown, info.resourceType.isUnknown());
@@ -54,7 +54,7 @@ public class ResourceTypeTest {
}
@Test
- public void testCode() throws Exception {
+ public void testCode() {
assertEquals(ResourceType.values().length, INFOS.length);
for (AclResourceTypeTestInfo info : INFOS) {
assertEquals(info.resourceType + " was supposed to have code == " + info.code,
@@ -66,7 +66,7 @@ public class ResourceTypeTest {
}
@Test
- public void testName() throws Exception {
+ public void testName() {
for (AclResourceTypeTestInfo info : INFOS) {
assertEquals("ResourceType.fromString(" + info.name + ") was supposed to be " +
info.resourceType, info.resourceType, ResourceType.fromString(info.name));
@@ -75,7 +75,7 @@ public class ResourceTypeTest {
}
@Test
- public void testExhaustive() throws Exception {
+ public void testExhaustive() {
assertEquals(INFOS.length, ResourceType.values().length);
for (int i = 0; i < INFOS.length; i++) {
assertEquals(INFOS[i].resourceType, ResourceType.values()[i]);
diff --git a/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
index d96b359..8b8c251 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
@@ -190,38 +190,38 @@ public class JaasContextTest {
@Test
public void testLoadForServerWithListenerNameOverride() throws IOException {
writeConfiguration(Arrays.asList(
- "KafkaServer { test.LoginModuleDefault required; };",
- "plaintext.KafkaServer { test.LoginModuleOverride requisite; };"
+ "KafkaServer { test.LoginModuleDefault required; };",
+ "plaintext.KafkaServer { test.LoginModuleOverride requisite; };"
));
JaasContext context = JaasContext.loadServerContext(new ListenerName("plaintext"),
- "SOME-MECHANISM", Collections.<String, Object>emptyMap());
+ "SOME-MECHANISM", Collections.emptyMap());
assertEquals("plaintext.KafkaServer", context.name());
assertEquals(JaasContext.Type.SERVER, context.type());
assertEquals(1, context.configurationEntries().size());
checkEntry(context.configurationEntries().get(0), "test.LoginModuleOverride",
- LoginModuleControlFlag.REQUISITE, Collections.<String, Object>emptyMap());
+ LoginModuleControlFlag.REQUISITE, Collections.emptyMap());
}
@Test
public void testLoadForServerWithListenerNameAndFallback() throws IOException {
writeConfiguration(Arrays.asList(
- "KafkaServer { test.LoginModule required; };",
- "other.KafkaServer { test.LoginModuleOther requisite; };"
+ "KafkaServer { test.LoginModule required; };",
+ "other.KafkaServer { test.LoginModuleOther requisite; };"
));
JaasContext context = JaasContext.loadServerContext(new ListenerName("plaintext"),
- "SOME-MECHANISM", Collections.<String, Object>emptyMap());
+ "SOME-MECHANISM", Collections.emptyMap());
assertEquals("KafkaServer", context.name());
assertEquals(JaasContext.Type.SERVER, context.type());
assertEquals(1, context.configurationEntries().size());
checkEntry(context.configurationEntries().get(0), "test.LoginModule", LoginModuleControlFlag.REQUIRED,
- Collections.<String, Object>emptyMap());
+ Collections.emptyMap());
}
@Test(expected = IllegalArgumentException.class)
public void testLoadForServerWithWrongListenerName() throws IOException {
writeConfiguration("Server", "test.LoginModule required;");
JaasContext.loadServerContext(new ListenerName("plaintext"), "SOME-MECHANISM",
- Collections.<String, Object>emptyMap());
+ Collections.emptyMap());
}
private AppConfigurationEntry configurationEntry(JaasContext.Type contextType, String jaasConfigProp) {
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
index 2fce4c5..938fe94 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
@@ -128,7 +128,7 @@ public class ClientAuthenticationFailureTest {
}
@Test
- public void testTransactionalProducerWithInvalidCredentials() throws Exception {
+ public void testTransactionalProducerWithInvalidCredentials() {
Map<String, Object> props = new HashMap<>(saslClientConfigs);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port());
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txclient-1");
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java
index 5436b2a..34ff881 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java
@@ -56,7 +56,7 @@ public class LoginManagerTest {
public void testClientLoginManager() throws Exception {
Map<String, ?> configs = Collections.singletonMap("sasl.jaas.config", dynamicPlainContext);
JaasContext dynamicContext = JaasContext.loadClientContext(configs);
- JaasContext staticContext = JaasContext.loadClientContext(Collections.<String, Object>emptyMap());
+ JaasContext staticContext = JaasContext.loadClientContext(Collections.emptyMap());
LoginManager dynamicLogin = LoginManager.acquireLoginManager(dynamicContext, "PLAIN",
DefaultLogin.class, configs);
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 74058eb..dfefabb 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -830,7 +830,7 @@ public class SaslAuthenticatorTest {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Collections.singletonList("PLAIN"));
jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_CLIENT, TestPlainLoginModule.class.getName(),
- Collections.<String, Object>emptyMap());
+ Collections.emptyMap());
server = createEchoServer(securityProtocol);
// Connection should succeed using login callback override that sets correct username/password
@@ -854,7 +854,7 @@ public class SaslAuthenticatorTest {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Collections.singletonList("PLAIN"));
jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER, TestPlainLoginModule.class.getName(),
- Collections.<String, Object>emptyMap());
+ Collections.emptyMap());
jaasConfig.setClientOptions("PLAIN", TestServerCallbackHandler.USERNAME, TestServerCallbackHandler.PASSWORD);
ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
String prefix = listenerName.saslMechanismConfigPrefix("PLAIN");
@@ -996,7 +996,7 @@ public class SaslAuthenticatorTest {
TestJaasConfig.jaasConfigProperty("PLAIN", serverOptions));
TestJaasConfig staticJaasConfig = new TestJaasConfig();
staticJaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER, PlainLoginModule.class.getName(),
- Collections.<String, Object>emptyMap());
+ Collections.emptyMap());
staticJaasConfig.setClientOptions("PLAIN", "user1", "user1-secret");
Configuration.setConfiguration(staticJaasConfig);
server = createEchoServer(securityProtocol);
@@ -1544,7 +1544,7 @@ public class SaslAuthenticatorTest {
}
@Override
- protected boolean authenticate(String username, char[] password) throws IOException {
+ protected boolean authenticate(String username, char[] password) {
if (!configured)
throw new IllegalStateException("Server callback handler not configured");
return USERNAME.equals(username) && new String(password).equals(PASSWORD);
@@ -1594,7 +1594,7 @@ public class SaslAuthenticatorTest {
}
@Override
- public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
if (!configured)
throw new IllegalStateException("Client callback handler not configured");
for (Callback callback : callbacks) {
@@ -1665,7 +1665,7 @@ public class SaslAuthenticatorTest {
}
@Override
- public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ public void handle(Callback[] callbacks) {
if (!configured)
throw new IllegalStateException("Login callback handler not configured");
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index b70d592..1ae83ee 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -55,7 +55,7 @@ public class SaslServerAuthenticatorTest {
final Capture<ByteBuffer> size = EasyMock.newCapture();
EasyMock.expect(transportLayer.read(EasyMock.capture(size))).andAnswer(new IAnswer<Integer>() {
@Override
- public Integer answer() throws Throwable {
+ public Integer answer() {
size.getValue().putInt(SaslServerAuthenticator.MAX_RECEIVE_SIZE + 1);
return 4;
}
@@ -79,7 +79,7 @@ public class SaslServerAuthenticatorTest {
final Capture<ByteBuffer> size = EasyMock.newCapture();
EasyMock.expect(transportLayer.read(EasyMock.capture(size))).andAnswer(new IAnswer<Integer>() {
@Override
- public Integer answer() throws Throwable {
+ public Integer answer() {
size.getValue().putInt(headerStruct.sizeOf());
return 4;
}
@@ -88,7 +88,7 @@ public class SaslServerAuthenticatorTest {
final Capture<ByteBuffer> payload = EasyMock.newCapture();
EasyMock.expect(transportLayer.read(EasyMock.capture(payload))).andAnswer(new IAnswer<Integer>() {
@Override
- public Integer answer() throws Throwable {
+ public Integer answer() {
// serialize only the request header. the authenticator should not parse beyond this
headerStruct.writeTo(payload.getValue());
return headerStruct.sizeOf();
@@ -111,7 +111,7 @@ public class SaslServerAuthenticatorTest {
Map<String, JaasContext> jaasContexts = Collections.singletonMap(mechanism,
new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig, null));
Map<String, Subject> subjects = Collections.singletonMap(mechanism, new Subject());
- Map<String, AuthenticateCallbackHandler> callbackHandlers = Collections.<String, AuthenticateCallbackHandler>singletonMap(
+ Map<String, AuthenticateCallbackHandler> callbackHandlers = Collections.singletonMap(
mechanism, new SaslServerCallbackHandler());
return new SaslServerAuthenticator(configs, callbackHandlers, "node", subjects, null,
new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer);
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java
index 97b0b27..c27e853 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java
@@ -16,14 +16,12 @@
*/
package org.apache.kafka.common.security.authenticator;
-import java.io.IOException;
import java.util.List;
import java.util.Map;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.RealmCallback;
@@ -45,7 +43,7 @@ public class TestDigestLoginModule extends PlainLoginModule {
}
@Override
- public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ public void handle(Callback[] callbacks) {
String username = null;
for (Callback callback : callbacks) {
if (callback instanceof NameCallback) {
diff --git a/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java b/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java
index 1acd80c..687f937 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/kerberos/KerberosNameTest.java
@@ -87,7 +87,7 @@ public class KerberosNameTest {
}
@Test
- public void testInvalidRules() throws Exception {
+ public void testInvalidRules() {
testInvalidRule(Arrays.asList("default"));
testInvalidRule(Arrays.asList("DEFAUL"));
testInvalidRule(Arrays.asList("DEFAULT/L"));
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java
index a9620fa..51d6012 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModuleTest.java
@@ -134,14 +134,14 @@ public class OAuthBearerLoginModuleTest {
// Create login modules
OAuthBearerLoginModule loginModule1 = new OAuthBearerLoginModule();
- loginModule1.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
- Collections.<String, Object>emptyMap());
+ loginModule1.initialize(subject, testTokenCallbackHandler, Collections.emptyMap(),
+ Collections.emptyMap());
OAuthBearerLoginModule loginModule2 = new OAuthBearerLoginModule();
- loginModule2.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
- Collections.<String, Object>emptyMap());
+ loginModule2.initialize(subject, testTokenCallbackHandler, Collections.emptyMap(),
+ Collections.emptyMap());
OAuthBearerLoginModule loginModule3 = new OAuthBearerLoginModule();
- loginModule3.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
- Collections.<String, Object>emptyMap());
+ loginModule3.initialize(subject, testTokenCallbackHandler, Collections.emptyMap(),
+ Collections.emptyMap());
// Should start with nothing
assertEquals(0, privateCredentials.size());
@@ -230,11 +230,11 @@ public class OAuthBearerLoginModuleTest {
// Create login modules
OAuthBearerLoginModule loginModule1 = new OAuthBearerLoginModule();
- loginModule1.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
- Collections.<String, Object>emptyMap());
+ loginModule1.initialize(subject, testTokenCallbackHandler, Collections.emptyMap(),
+ Collections.emptyMap());
OAuthBearerLoginModule loginModule2 = new OAuthBearerLoginModule();
- loginModule2.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
- Collections.<String, Object>emptyMap());
+ loginModule2.initialize(subject, testTokenCallbackHandler, Collections.emptyMap(),
+ Collections.emptyMap());
// Should start with nothing
assertEquals(0, privateCredentials.size());
@@ -290,8 +290,8 @@ public class OAuthBearerLoginModuleTest {
// Create login module
OAuthBearerLoginModule loginModule = new OAuthBearerLoginModule();
- loginModule.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
- Collections.<String, Object>emptyMap());
+ loginModule.initialize(subject, testTokenCallbackHandler, Collections.emptyMap(),
+ Collections.emptyMap());
// Should start with nothing
assertEquals(0, privateCredentials.size());
@@ -342,14 +342,14 @@ public class OAuthBearerLoginModuleTest {
// Create login modules
OAuthBearerLoginModule loginModule1 = new OAuthBearerLoginModule();
- loginModule1.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
- Collections.<String, Object>emptyMap());
+ loginModule1.initialize(subject, testTokenCallbackHandler, Collections.emptyMap(),
+ Collections.emptyMap());
OAuthBearerLoginModule loginModule2 = new OAuthBearerLoginModule();
- loginModule2.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
- Collections.<String, Object>emptyMap());
+ loginModule2.initialize(subject, testTokenCallbackHandler, Collections.emptyMap(),
+ Collections.emptyMap());
OAuthBearerLoginModule loginModule3 = new OAuthBearerLoginModule();
- loginModule3.initialize(subject, testTokenCallbackHandler, Collections.<String, Object>emptyMap(),
- Collections.<String, Object>emptyMap());
+ loginModule3.initialize(subject, testTokenCallbackHandler, Collections.emptyMap(),
+ Collections.emptyMap());
// Should start with nothing
assertEquals(0, privateCredentials.size());
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
index 5df7968..d656608 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
@@ -194,7 +194,7 @@ public class OAuthBearerSaslServerTest {
}
private byte[] clientInitialResponse(String authorizationId, boolean illegalToken)
- throws OAuthBearerConfigException, IOException, UnsupportedCallbackException, LoginException {
+ throws OAuthBearerConfigException, IOException, UnsupportedCallbackException {
return clientInitialResponse(authorizationId, false, Collections.emptyMap());
}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandlerTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandlerTest.java
index be01fe3..34b8209 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandlerTest.java
@@ -29,7 +29,6 @@ import java.util.Map;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.auth.login.LoginException;
import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
import org.apache.kafka.common.security.authenticator.TestJaasConfig;
@@ -91,7 +90,7 @@ public class OAuthBearerUnsecuredLoginCallbackHandlerTest {
@SuppressWarnings("unchecked")
@Test
public void validOptionsWithExplicitOptionValues()
- throws IOException, UnsupportedCallbackException, LoginException {
+ throws IOException, UnsupportedCallbackException {
String explicitScope1 = "scope1";
String explicitScope2 = "scope2";
String explicitScopeClaimName = "putScopeInHere";
@@ -142,7 +141,7 @@ public class OAuthBearerUnsecuredLoginCallbackHandlerTest {
(Map) options);
OAuthBearerUnsecuredLoginCallbackHandler callbackHandler = new OAuthBearerUnsecuredLoginCallbackHandler();
callbackHandler.time(mockTime);
- callbackHandler.configure(Collections.<String, Object>emptyMap(), OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
+ callbackHandler.configure(Collections.emptyMap(), OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
Arrays.asList(config.getAppConfigurationEntry("KafkaClient")[0]));
return callbackHandler;
}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
index c945af0..2449a83 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandlerTest.java
@@ -73,7 +73,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandlerTest {
}
@Test
- public void validToken() throws IOException, UnsupportedCallbackException {
+ public void validToken() {
for (final boolean includeOptionalIssuedAtClaim : new boolean[] {true, false}) {
String claimsJson = "{" + PRINCIPAL_CLAIM_TEXT + comma(EXPIRATION_TIME_CLAIM_TEXT)
+ (includeOptionalIssuedAtClaim ? comma(ISSUED_AT_CLAIM_TEXT) : "") + "}";
@@ -101,7 +101,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandlerTest {
}
@Test
- public void includesRequiredScope() throws IOException, UnsupportedCallbackException {
+ public void includesRequiredScope() {
String claimsJson = "{" + SUB_CLAIM_TEXT + comma(EXPIRATION_TIME_CLAIM_TEXT) + comma(SCOPE_CLAIM_TEXT) + "}";
Object validationResult = validationResult(UNSECURED_JWT_HEADER_JSON, claimsJson,
MODULE_OPTIONS_MAP_REQUIRE_EXISTING_SCOPE);
@@ -124,7 +124,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandlerTest {
private static void confirmFailsValidation(String headerJson, String claimsJson,
Map<String, String> moduleOptionsMap, String optionalFailureScope) throws OAuthBearerConfigException,
- OAuthBearerIllegalTokenException, IOException, UnsupportedCallbackException {
+ OAuthBearerIllegalTokenException {
Object validationResultObj = validationResult(headerJson, claimsJson, moduleOptionsMap);
assertTrue(validationResultObj instanceof OAuthBearerValidatorCallback);
OAuthBearerValidatorCallback callback = (OAuthBearerValidatorCallback) validationResultObj;
@@ -159,7 +159,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandlerTest {
config.createOrUpdateEntry("KafkaClient", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule",
(Map) options);
OAuthBearerUnsecuredValidatorCallbackHandler callbackHandler = new OAuthBearerUnsecuredValidatorCallbackHandler();
- callbackHandler.configure(Collections.<String, Object>emptyMap(), OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
+ callbackHandler.configure(Collections.emptyMap(), OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
Arrays.asList(config.getAppConfigurationEntry("KafkaClient")[0]));
return callbackHandler;
}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerValidationUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerValidationUtilsTest.java
index 2a21864..7477dbc 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerValidationUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerValidationUtilsTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.security.oauthbearer.internals.unsecured;
import static org.junit.Assert.assertTrue;
-import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
@@ -160,18 +159,18 @@ public class OAuthBearerValidationUtilsTest {
@SuppressWarnings({"unchecked", "rawtypes"})
@Test
- public void validateScope() throws IOException {
+ public void validateScope() {
long nowMs = TIME.milliseconds();
double nowClaimValue = ((double) nowMs) / 1000;
final List<String> noScope = Collections.emptyList();
final List<String> scope1 = Arrays.asList("scope1");
final List<String> scope1And2 = Arrays.asList("scope1", "scope2");
for (boolean actualScopeExists : new boolean[] {true, false}) {
- List<? extends List> scopes = !actualScopeExists ? Arrays.<List>asList((List) null)
+ List<? extends List> scopes = !actualScopeExists ? Arrays.asList((List) null)
: Arrays.asList(noScope, scope1, scope1And2);
for (List<String> actualScope : scopes) {
for (boolean requiredScopeExists : new boolean[] {true, false}) {
- List<? extends List> requiredScopes = !requiredScopeExists ? Arrays.<List>asList((List) null)
+ List<? extends List> requiredScopes = !requiredScopeExists ? Arrays.asList((List) null)
: Arrays.asList(noScope, scope1, scope1And2);
for (List<String> requiredScope : requiredScopes) {
StringBuilder sb = new StringBuilder("{");
diff --git a/clients/src/test/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerTest.java b/clients/src/test/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerTest.java
index bc73e9f..b8c77df 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/plain/internals/PlainSaslServerTest.java
@@ -40,7 +40,7 @@ public class PlainSaslServerTest {
private PlainSaslServer saslServer;
@Before
- public void setUp() throws Exception {
+ public void setUp() {
TestJaasConfig jaasConfig = new TestJaasConfig();
Map<String, Object> options = new HashMap<>();
options.put("user_" + USER_A, PASSWORD_A);
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/internals/ScramMessagesTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/internals/ScramMessagesTest.java
index f171624..1f60efa 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/scram/internals/ScramMessagesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/internals/ScramMessagesTest.java
@@ -71,7 +71,7 @@ public class ScramMessagesTest {
@Test
public void validClientFirstMessage() throws SaslException {
String nonce = formatter.secureRandomString();
- ClientFirstMessage m = new ClientFirstMessage("someuser", nonce, Collections.<String, String>emptyMap());
+ ClientFirstMessage m = new ClientFirstMessage("someuser", nonce, Collections.emptyMap());
checkClientFirstMessage(m, "someuser", nonce, "");
// Default format used by Kafka client: only user and nonce are specified
@@ -116,7 +116,7 @@ public class ScramMessagesTest {
}
@Test
- public void invalidClientFirstMessage() throws SaslException {
+ public void invalidClientFirstMessage() {
String nonce = formatter.secureRandomString();
// Invalid entry in gs2-header
String invalid = String.format("n,x=something,n=testuser,r=%s", nonce);
@@ -166,7 +166,7 @@ public class ScramMessagesTest {
}
@Test
- public void invalidServerFirstMessage() throws SaslException {
+ public void invalidServerFirstMessage() {
String nonce = formatter.secureRandomString();
String salt = randomBytesAsString();
@@ -221,7 +221,7 @@ public class ScramMessagesTest {
}
@Test
- public void invalidClientFinalMessage() throws SaslException {
+ public void invalidClientFinalMessage() {
String nonce = formatter.secureRandomString();
String channelBinding = randomBytesAsString();
String proof = randomBytesAsString();
@@ -272,7 +272,7 @@ public class ScramMessagesTest {
}
@Test
- public void invalidServerFinalMessage() throws SaslException {
+ public void invalidServerFinalMessage() {
String serverSignature = randomBytesAsString();
// Invalid error
@@ -299,7 +299,7 @@ public class ScramMessagesTest {
private byte[] toBytes(String base64Str) {
return Base64.getDecoder().decode(base64Str);
- };
+ }
private void checkClientFirstMessage(ClientFirstMessage message, String saslName, String nonce, String authzid) {
assertEquals(saslName, message.saslName());
diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index 134882f..38bdbfe 100644
--- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -38,7 +38,7 @@ public class SerializationTest {
{
put(String.class, Arrays.asList("my string"));
put(Short.class, Arrays.asList((short) 32767, (short) -32768));
- put(Integer.class, Arrays.asList((int) 423412424, (int) -41243432));
+ put(Integer.class, Arrays.asList(423412424, -41243432));
put(Long.class, Arrays.asList(922337203685477580L, -922337203685477581L));
put(Float.class, Arrays.asList(5678567.12312f, -5678567.12341f));
put(Double.class, Arrays.asList(5678567.12312d, -5678567.12341d));
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashSetTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashSetTest.java
index 20084a2..249f6f8 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashSetTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashSetTest.java
@@ -87,7 +87,7 @@ public class ImplicitLinkedHashSetTest {
}
@Test
- public void testInsertDelete() throws Exception {
+ public void testInsertDelete() {
ImplicitLinkedHashSet<TestElement> set = new ImplicitLinkedHashSet<>(100);
assertTrue(set.add(new TestElement(1)));
TestElement second = new TestElement(2);
@@ -137,7 +137,7 @@ public class ImplicitLinkedHashSetTest {
}
@Test
- public void testTraversal() throws Exception {
+ public void testTraversal() {
ImplicitLinkedHashSet<TestElement> set = new ImplicitLinkedHashSet<>(100);
expectTraversal(set.iterator());
assertTrue(set.add(new TestElement(2)));
@@ -168,7 +168,7 @@ public class ImplicitLinkedHashSetTest {
}
@Test
- public void testCollisions() throws Exception {
+ public void testCollisions() {
ImplicitLinkedHashSet<TestElement> set = new ImplicitLinkedHashSet<>(5);
assertEquals(11, set.numSlots());
assertTrue(set.add(new TestElement(11)));
@@ -184,7 +184,7 @@ public class ImplicitLinkedHashSetTest {
}
@Test
- public void testEnlargement() throws Exception {
+ public void testEnlargement() {
ImplicitLinkedHashSet<TestElement> set = new ImplicitLinkedHashSet<>(5);
assertEquals(11, set.numSlots());
for (int i = 0; i < 6; i++) {
@@ -203,7 +203,7 @@ public class ImplicitLinkedHashSetTest {
}
@Test
- public void testManyInsertsAndDeletes() throws Exception {
+ public void testManyInsertsAndDeletes() {
Random random = new Random(123);
LinkedHashSet<Integer> existing = new LinkedHashSet<>();
ImplicitLinkedHashSet<TestElement> set = new ImplicitLinkedHashSet<>();
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
index 7134f0a..89ece60 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockScheduler.java
@@ -92,7 +92,7 @@ public class MockScheduler implements Scheduler, MockTime.MockTimeListener {
public Void apply(final Long now) {
executor.submit(new Callable<Void>() {
@Override
- public Void call() throws Exception {
+ public Void call() {
// Note: it is possible that we'll execute Callable#call right after
// the future is cancelled. This is a valid sequence of events
// that the author of the Callable needs to be able to handle.
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockTimeTest.java b/clients/src/test/java/org/apache/kafka/common/utils/MockTimeTest.java
index 58bcb19..7bd302b 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockTimeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockTimeTest.java
@@ -30,7 +30,7 @@ public class MockTimeTest {
final public Timeout globalTimeout = Timeout.millis(120000);
@Test
- public void testAdvanceClock() throws Exception {
+ public void testAdvanceClock() {
MockTime time = new MockTime(0, 100, 200);
Assert.assertEquals(100, time.milliseconds());
Assert.assertEquals(200, time.nanoseconds());
@@ -40,7 +40,7 @@ public class MockTimeTest {
}
@Test
- public void testAutoTickMs() throws Exception {
+ public void testAutoTickMs() {
MockTime time = new MockTime(1, 100, 200);
Assert.assertEquals(101, time.milliseconds());
Assert.assertEquals(2000200, time.nanoseconds());
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/SanitizerTest.java b/clients/src/test/java/org/apache/kafka/common/utils/SanitizerTest.java
index 59ac6c0..bba19a8 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/SanitizerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/SanitizerTest.java
@@ -20,7 +20,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import java.io.UnsupportedEncodingException;
import java.lang.management.ManagementFactory;
import javax.management.MBeanException;
@@ -34,7 +33,7 @@ import org.junit.Test;
public class SanitizerTest {
@Test
- public void testSanitize() throws UnsupportedEncodingException {
+ public void testSanitize() {
String principal = "CN=Some characters !@#$%&*()_-+=';:,/~";
String sanitizedPrincipal = Sanitizer.sanitize(principal);
assertTrue(sanitizedPrincipal.replace('%', '_').matches("[a-zA-Z0-9\\._\\-]+"));
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ShellTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ShellTest.java
index d77f4d0..7bd96f0 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/ShellTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ShellTest.java
@@ -49,7 +49,7 @@ public class ShellTest {
private final static String NONEXISTENT_PATH = "/dev/a/path/that/does/not/exist/in/the/filesystem";
@Test
- public void testAttemptToRunNonExistentProgram() throws Exception {
+ public void testAttemptToRunNonExistentProgram() {
assumeTrue(!OperatingSystem.IS_WINDOWS);
try {
Shell.execCommand(NONEXISTENT_PATH);
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 4d1d830..b258a34 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -106,7 +106,7 @@ public class UtilsTest {
assertEquals("1", Utils.join(Arrays.asList("1"), ","));
assertEquals("1,2,3", Utils.join(Arrays.asList(1, 2, 3), ","));
}
-
+
@Test
public void testAbs() {
assertEquals(0, Utils.abs(Integer.MIN_VALUE));
@@ -365,7 +365,7 @@ public class UtilsTest {
EasyMock.expect(channelMock.size()).andReturn((long) fileChannelContent.length());
EasyMock.expect(channelMock.read(EasyMock.anyObject(ByteBuffer.class), EasyMock.anyInt())).andAnswer(new IAnswer<Integer>() {
@Override
- public Integer answer() throws Throwable {
+ public Integer answer() {
ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0];
buffer.put(fileChannelContent.getBytes());
return -1;
@@ -399,7 +399,7 @@ public class UtilsTest {
final StringBuilder sb = new StringBuilder();
EasyMock.expect(channelMock.read(EasyMock.anyObject(ByteBuffer.class), EasyMock.anyInt())).andAnswer(new IAnswer<Integer>() {
@Override
- public Integer answer() throws Throwable {
+ public Integer answer() {
ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0];
for (int i = 0; i < mockedBytesRead; i++)
sb.append("a");