You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/10/25 02:17:47 UTC
[4/4] kafka git commit: MINOR: A bunch of clean-ups related to usage
of unused variables
MINOR: A bunch of clean-ups related to usage of unused variables
There should be only one cases where these clean-ups have a functional impact: replaced repeated identical logs with a single log for the stale controller epoch case.
The rest should just make the code easier to read and make it a bit less wasteful. I did this exercise because unused variables sometimes mask bugs.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #1985 from ijuma/remove-unused
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0926738
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0926738
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0926738
Branch: refs/heads/trunk
Commit: d092673838173d9dedbf5acf3f4e2cd8c736294f
Parents: 1fc450f
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Oct 25 02:55:55 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Oct 25 02:55:55 2016 +0100
----------------------------------------------------------------------
.../consumer/internals/AbstractCoordinator.java | 2 -
.../consumer/internals/ConsumerCoordinator.java | 2 -
.../consumer/internals/SubscriptionState.java | 7 --
.../kafka/clients/producer/KafkaProducer.java | 1 -
.../clients/producer/internals/Sender.java | 10 +--
.../apache/kafka/common/network/MultiSend.java | 6 +-
.../common/network/PlaintextChannelBuilder.java | 4 +-
.../kafka/common/network/SslChannelBuilder.java | 4 +-
.../kafka/common/network/SslTransportLayer.java | 5 +-
.../authenticator/SaslServerAuthenticator.java | 5 +-
.../apache/kafka/clients/NetworkClientTest.java | 3 +-
.../producer/internals/BufferPoolTest.java | 2 +-
.../clients/producer/internals/SenderTest.java | 3 -
.../apache/kafka/connect/data/Timestamp.java | 4 --
.../runtime/distributed/WorkerCoordinator.java | 5 +-
.../runtime/rest/entities/ConnectorInfo.java | 9 ---
.../src/main/scala/kafka/admin/AclCommand.scala | 1 -
.../src/main/scala/kafka/admin/AdminUtils.scala | 6 +-
.../main/scala/kafka/admin/ConfigCommand.scala | 10 +--
.../kafka/admin/ConsumerGroupCommand.scala | 2 +-
.../PreferredReplicaLeaderElectionCommand.scala | 2 +-
.../kafka/admin/ReassignPartitionsCommand.scala | 4 +-
.../main/scala/kafka/admin/TopicCommand.scala | 4 +-
.../scala/kafka/admin/ZkSecurityMigrator.scala | 1 -
.../kafka/api/ControlledShutdownResponse.scala | 2 +-
.../src/main/scala/kafka/api/FetchRequest.scala | 7 +-
.../src/main/scala/kafka/api/LeaderAndIsr.scala | 4 +-
.../scala/kafka/api/OffsetCommitResponse.scala | 2 +-
.../main/scala/kafka/api/OffsetRequest.scala | 5 +-
.../main/scala/kafka/api/ProducerRequest.scala | 5 +-
.../main/scala/kafka/client/ClientUtils.scala | 6 +-
.../scala/kafka/cluster/BrokerEndPoint.scala | 2 +-
.../main/scala/kafka/cluster/Partition.scala | 3 +-
core/src/main/scala/kafka/cluster/Replica.scala | 7 +-
.../ZkNodeChangeNotificationListener.scala | 2 +-
.../kafka/consumer/PartitionAssignor.scala | 4 +-
.../main/scala/kafka/consumer/TopicCount.scala | 4 +-
.../main/scala/kafka/consumer/TopicFilter.scala | 2 +-
.../consumer/ZookeeperConsumerConnector.scala | 24 +++----
.../kafka/controller/KafkaController.scala | 74 +++++++++-----------
.../controller/PartitionStateMachine.scala | 12 ++--
.../kafka/controller/ReplicaStateMachine.scala | 2 +-
.../kafka/controller/TopicDeletionManager.scala | 4 +-
.../coordinator/GroupMetadataManager.scala | 2 +-
core/src/main/scala/kafka/log/Log.scala | 4 +-
core/src/main/scala/kafka/log/LogCleaner.scala | 2 +-
.../scala/kafka/log/LogCleanerManager.scala | 10 ++-
.../kafka/message/ByteBufferMessageSet.scala | 2 +-
.../scala/kafka/metrics/KafkaMetricsGroup.scala | 19 ++---
.../scala/kafka/network/BlockingChannel.scala | 2 +-
.../kafka/producer/DefaultPartitioner.scala | 2 -
.../main/scala/kafka/producer/Producer.scala | 2 +-
.../scala/kafka/security/auth/Resource.scala | 2 +-
.../security/auth/SimpleAclAuthorizer.scala | 4 +-
.../kafka/server/AbstractFetcherManager.scala | 2 +-
.../main/scala/kafka/server/AdminManager.scala | 2 +-
.../kafka/server/BrokerMetadataCheckpoint.scala | 2 +-
.../scala/kafka/server/ClientQuotaManager.scala | 8 +--
.../main/scala/kafka/server/DelayedFetch.scala | 4 +-
.../kafka/server/DynamicConfigManager.scala | 3 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../scala/kafka/server/KafkaHealthcheck.scala | 1 -
.../scala/kafka/server/OffsetCheckpoint.scala | 2 +-
.../scala/kafka/server/ReplicaManager.scala | 21 +++---
.../kafka/server/ZookeeperLeaderElector.scala | 2 +-
.../scala/kafka/tools/ConsoleConsumer.scala | 6 +-
.../scala/kafka/tools/ConsumerPerformance.scala | 2 +-
.../scala/kafka/tools/DumpLogSegments.scala | 10 ++-
.../scala/kafka/tools/EndToEndLatency.scala | 3 +-
.../main/scala/kafka/tools/MirrorMaker.scala | 10 +--
.../scala/kafka/tools/ReplayLogProducer.scala | 5 +-
.../kafka/tools/ReplicaVerificationTool.scala | 21 +++---
core/src/main/scala/kafka/utils/CoreUtils.scala | 4 +-
core/src/main/scala/kafka/utils/FileLock.scala | 4 +-
.../src/main/scala/kafka/utils/Mx4jLoader.scala | 6 +-
.../scala/kafka/utils/ReplicationUtils.scala | 4 +-
.../kafka/utils/VerifiableProperties.scala | 4 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 30 ++++----
.../integration/kafka/api/AdminClientTest.scala | 1 -
.../kafka/api/AuthorizerIntegrationTest.scala | 15 ++--
.../kafka/api/BaseConsumerTest.scala | 2 +-
.../kafka/api/BaseProducerSendTest.scala | 21 +++---
.../kafka/api/EndToEndAuthorizationTest.scala | 2 +-
.../kafka/api/FixedPortTestUtils.scala | 2 +-
.../kafka/api/IntegrationTestHarness.scala | 4 +-
.../kafka/api/PlaintextConsumerTest.scala | 13 ++--
.../kafka/api/PlaintextProducerSendTest.scala | 4 +-
.../kafka/api/ProducerBounceTest.scala | 4 +-
.../scala/kafka/security/minikdc/MiniKdc.scala | 5 +-
.../scala/kafka/tools/TestLogCleaning.scala | 2 +-
.../test/scala/other/kafka/StressTestLog.scala | 3 +-
.../scala/other/kafka/TestCrcPerformance.scala | 14 ++--
.../scala/other/kafka/TestKafkaAppender.scala | 9 +--
.../other/kafka/TestLinearWriteSpeed.scala | 3 +-
.../scala/other/kafka/TestOffsetManager.scala | 6 +-
.../other/kafka/TestPurgatoryPerformance.scala | 2 +-
.../unit/kafka/admin/AddPartitionsTest.scala | 6 +-
.../unit/kafka/admin/ConfigCommandTest.scala | 4 +-
.../unit/kafka/admin/DeleteTopicTest.scala | 7 +-
.../kafka/admin/DescribeConsumerGroupTest.scala | 4 +-
.../admin/ReassignPartitionsCommandTest.scala | 2 +-
.../scala/unit/kafka/api/ApiUtilsTest.scala | 12 ++--
.../api/RequestResponseSerializationTest.scala | 36 ----------
.../scala/unit/kafka/common/ConfigTest.scala | 8 +--
.../scala/unit/kafka/common/TopicTest.scala | 6 +-
.../kafka/consumer/ConsumerIteratorTest.scala | 13 ++--
.../kafka/consumer/PartitionAssignorTest.scala | 26 +++----
.../ZookeeperConsumerConnectorTest.scala | 19 +++--
.../controller/ControllerFailoverTest.scala | 4 +-
.../coordinator/GroupMetadataManagerTest.scala | 4 +-
.../kafka/coordinator/GroupMetadataTest.scala | 2 +-
.../kafka/integration/AutoOffsetResetTest.scala | 6 +-
.../unit/kafka/integration/FetcherTest.scala | 9 +--
.../kafka/integration/PrimitiveApiTest.scala | 6 +-
.../ZookeeperConsumerConnectorTest.scala | 4 +-
.../unit/kafka/log/FileMessageSetTest.scala | 9 ++-
.../log/LogCleanerLagIntegrationTest.scala | 6 +-
.../unit/kafka/log/LogCleanerManagerTest.scala | 9 ---
.../scala/unit/kafka/log/LogCleanerTest.scala | 5 +-
.../scala/unit/kafka/log/LogConfigTest.scala | 6 +-
.../scala/unit/kafka/log/LogManagerTest.scala | 24 +++----
.../scala/unit/kafka/log/LogSegmentTest.scala | 6 +-
.../src/test/scala/unit/kafka/log/LogTest.scala | 62 ++++++++--------
.../scala/unit/kafka/log/OffsetIndexTest.scala | 2 +-
.../scala/unit/kafka/log/OffsetMapTest.scala | 3 +-
.../message/ByteBufferMessageSetTest.scala | 4 +-
.../kafka/message/MessageCompressionTest.scala | 5 +-
.../unit/kafka/network/SocketServerTest.scala | 6 +-
.../unit/kafka/producer/AsyncProducerTest.scala | 20 +++---
.../unit/kafka/producer/ProducerTest.scala | 66 ++++++++---------
.../unit/kafka/producer/SyncProducerTest.scala | 6 +-
.../kafka/security/auth/OperationTest.scala | 2 +-
.../security/auth/PermissionTypeTest.scala | 2 +-
.../kafka/security/auth/ResourceTypeTest.scala | 2 +-
.../security/auth/SimpleAclAuthorizerTest.scala | 2 +-
.../security/auth/ZkAuthorizationTest.scala | 8 +--
.../kafka/server/ClientQuotaManagerTest.scala | 4 +-
.../kafka/server/DynamicConfigChangeTest.scala | 8 +--
.../unit/kafka/server/ISRExpirationTest.scala | 25 +++----
.../unit/kafka/server/KafkaConfigTest.scala | 13 ++--
.../scala/unit/kafka/server/LogOffsetTest.scala | 8 +--
.../unit/kafka/server/MetadataCacheTest.scala | 2 +-
.../unit/kafka/server/OffsetCommitTest.scala | 1 -
.../kafka/server/ReplicationQuotasTest.scala | 4 +-
.../server/SaslApiVersionsRequestTest.scala | 2 +-
.../server/ServerGenerateBrokerIdTest.scala | 2 +-
.../unit/kafka/server/ServerShutdownTest.scala | 8 +--
.../unit/kafka/server/ServerStartupTest.scala | 2 +-
.../unit/kafka/tools/ConsoleProducerTest.scala | 2 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 8 +--
.../kafka/utils/timer/TimerTaskListTest.scala | 4 +-
.../scala/unit/kafka/zk/ZKEphemeralTest.scala | 7 +-
.../test/scala/unit/kafka/zk/ZKPathTest.scala | 50 ++++---------
.../scala/unit/kafka/zk/ZkFourLetterWords.scala | 2 +-
.../internals/RocksDBKeyValueStoreTest.java | 2 +-
155 files changed, 477 insertions(+), 704 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 73543ad..59319ef 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -749,7 +749,6 @@ public abstract class AbstractCoordinator implements Closeable {
}
private class GroupCoordinatorMetrics {
- public final Metrics metrics;
public final String metricGrpName;
public final Sensor heartbeatLatency;
@@ -757,7 +756,6 @@ public abstract class AbstractCoordinator implements Closeable {
public final Sensor syncLatency;
public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
- this.metrics = metrics;
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
this.heartbeatLatency = metrics.sensor("heartbeat-latency");
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index bd95409..a8d94fa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -757,13 +757,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
private class ConsumerCoordinatorMetrics {
- public final Metrics metrics;
public final String metricGrpName;
public final Sensor commitLatency;
public ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
- this.metrics = metrics;
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
this.commitLatency = metrics.sensor("commit-latency");
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 6dc2060..003d1a1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -65,9 +65,6 @@ public class SubscriptionState {
/* the list of topics the user has requested */
private Set<String> subscription;
- /* the list of partitions the user has requested */
- private Set<TopicPartition> userAssignment;
-
/* the list of topics the group has subscribed to (set only for the leader on join group completion) */
private final Set<String> groupSubscription;
@@ -86,7 +83,6 @@ public class SubscriptionState {
public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
this.defaultResetStrategy = defaultResetStrategy;
this.subscription = Collections.emptySet();
- this.userAssignment = Collections.emptySet();
this.assignment = new PartitionStates<>();
this.groupSubscription = new HashSet<>();
this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
@@ -160,8 +156,6 @@ public class SubscriptionState {
setSubscriptionType(SubscriptionType.USER_ASSIGNED);
if (!this.assignment.partitionSet().equals(partitions)) {
- this.userAssignment = partitions;
-
Map<TopicPartition, TopicPartitionState> partitionToState = new HashMap<>();
for (TopicPartition partition : partitions) {
TopicPartitionState state = assignment.stateValue(partition);
@@ -218,7 +212,6 @@ public class SubscriptionState {
public void unsubscribe() {
this.subscription = Collections.emptySet();
- this.userAssignment = Collections.emptySet();
this.assignment.clear();
this.subscribedPattern = null;
this.subscriptionType = SubscriptionType.NONE;
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3632384..489c762 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -318,7 +318,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
config.getInt(ProducerConfig.RETRIES_CONFIG),
this.metrics,
new SystemTime(),
- clientId,
this.requestTimeoutMs);
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8fc7f2c..c71bb67 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -92,9 +92,6 @@ public class Sender implements Runnable {
/* metrics */
private final SenderMetrics sensors;
- /* param clientId of the client */
- private String clientId;
-
/* the max time to wait for the server to respond to the request*/
private final int requestTimeout;
@@ -107,7 +104,6 @@ public class Sender implements Runnable {
int retries,
Metrics metrics,
Time time,
- String clientId,
int requestTimeout) {
this.client = client;
this.accumulator = accumulator;
@@ -118,7 +114,6 @@ public class Sender implements Runnable {
this.acks = acks;
this.retries = retries;
this.time = time;
- this.clientId = clientId;
this.sensors = new SenderMetrics(metrics);
this.requestTimeout = requestTimeout;
}
@@ -281,8 +276,7 @@ public class Sender implements Runnable {
completeBatch(batch, error, partResp.baseOffset, partResp.timestamp, correlationId, now);
}
this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs());
- this.sensors.recordThrottleTime(response.request().request().destination(),
- produceResponse.getThrottleTime());
+ this.sensors.recordThrottleTime(produceResponse.getThrottleTime());
} else {
// this is the acks = 0 case, just complete all requests
for (RecordBatch batch : batches.values())
@@ -564,7 +558,7 @@ public class Sender implements Runnable {
}
}
- public void recordThrottleTime(String node, long throttleTimeMs) {
+ public void recordThrottleTime(long throttleTimeMs) {
this.produceThrottleTimeSensor.record(throttleTimeMs, time.milliseconds());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java b/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
index 0e14a39..11f5e07 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
@@ -34,7 +34,6 @@ public class MultiSend implements Send {
private static final Logger log = LoggerFactory.getLogger(MultiSend.class);
private String dest;
private long totalWritten = 0;
- private List<Send> sends;
private Iterator<Send> sendsIterator;
private Send current;
private boolean doneSends = false;
@@ -42,7 +41,6 @@ public class MultiSend implements Send {
public MultiSend(String dest, List<Send> sends) {
this.dest = dest;
- this.sends = sends;
this.sendsIterator = sends.iterator();
nextSendOrDone();
for (Send send: sends)
@@ -76,7 +74,7 @@ public class MultiSend implements Send {
throw new KafkaException("This operation cannot be completed on a complete request.");
int totalWrittenPerCall = 0;
- boolean sendComplete = false;
+ boolean sendComplete;
do {
long written = current.writeTo(channel);
totalWritten += written;
@@ -97,4 +95,4 @@ public class MultiSend implements Send {
else
doneSends = true;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
index f0af935..c573672 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
@@ -36,17 +36,15 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
}
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
- KafkaChannel channel = null;
try {
PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key);
Authenticator authenticator = new DefaultAuthenticator();
authenticator.configure(transportLayer, this.principalBuilder, this.configs);
- channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+ return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
} catch (Exception e) {
log.warn("Failed to create channel due to ", e);
throw new KafkaException(e);
}
- return channel;
}
public void close() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index b546174..1d612bd 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -46,17 +46,15 @@ public class SslChannelBuilder implements ChannelBuilder {
}
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
- KafkaChannel channel = null;
try {
SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key);
Authenticator authenticator = new DefaultAuthenticator();
authenticator.configure(transportLayer, this.principalBuilder, this.configs);
- channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+ return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
} catch (Exception e) {
log.info("Failed to create channel due to ", e);
throw new KafkaException(e);
}
- return channel;
}
public void close() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index f8926f3..7ce59f3 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -397,12 +397,11 @@ public class SslTransportLayer implements TransportLayer {
private SSLEngineResult handshakeUnwrap(boolean doRead) throws IOException {
log.trace("SSLHandshake handshakeUnwrap {}", channelId);
SSLEngineResult result;
- boolean cont = false;
- int read = 0;
if (doRead) {
- read = socketChannel.read(netReadBuffer);
+ int read = socketChannel.read(netReadBuffer);
if (read == -1) throw new EOFException("EOF during handshake.");
}
+ boolean cont;
do {
//prepare the buffer with the incoming data
netReadBuffer.flip();
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index e1074a1..206fe39 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -63,7 +63,6 @@ import org.apache.kafka.common.protocol.Protocol;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractRequestResponse;
-import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;
@@ -310,7 +309,7 @@ public class SaslServerAuthenticator implements Authenticator {
LOG.debug("Handle Kafka request {}", apiKey);
switch (apiKey) {
case API_VERSIONS:
- handleApiVersionsRequest(requestHeader, (ApiVersionsRequest) request);
+ handleApiVersionsRequest(requestHeader);
break;
case SASL_HANDSHAKE:
clientMechanism = handleHandshakeRequest(requestHeader, (SaslHandshakeRequest) request);
@@ -361,7 +360,7 @@ public class SaslServerAuthenticator implements Authenticator {
}
}
- private void handleApiVersionsRequest(RequestHeader requestHeader, ApiVersionsRequest versionRequest) throws IOException, UnsupportedSaslMechanismException {
+ private void handleApiVersionsRequest(RequestHeader requestHeader) throws IOException, UnsupportedSaslMechanismException {
sendKafkaResponse(requestHeader, ApiVersionsResponse.apiVersionsResponse());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index b556240..d305e8e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -153,14 +153,13 @@ public class NetworkClientTest {
@Test
public void testLeastLoadedNode() {
- Node leastNode = null;
client.ready(node, time.milliseconds());
awaitReady(client, node);
client.poll(1, time.milliseconds());
assertTrue("The client should be ready", client.isReady(node, time.milliseconds()));
// leastloadednode should be our single node
- leastNode = client.leastLoadedNode(time.milliseconds());
+ Node leastNode = client.leastLoadedNode(time.milliseconds());
assertEquals("There should be one leastloadednode", leastNode.id(), node.id());
// sleep for longer than reconnect backoff
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 48682b1..3756d8a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -88,7 +88,7 @@ public class BufferPoolTest {
ByteBuffer buffer = pool.allocate(1024, maxBlockTimeMs);
assertEquals(1024, buffer.limit());
pool.deallocate(buffer);
- buffer = pool.allocate(1025, maxBlockTimeMs);
+ pool.allocate(1025, maxBlockTimeMs);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index b7f9e74..b7645dd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -83,7 +83,6 @@ public class SenderTest {
MAX_RETRIES,
metrics,
time,
- CLIENT_ID,
REQUEST_TIMEOUT);
metadata.update(cluster, time.milliseconds());
@@ -143,7 +142,6 @@ public class SenderTest {
maxRetries,
m,
time,
- "clientId",
REQUEST_TIMEOUT);
// do a successful retry
Future<RecordMetadata> future = accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
@@ -196,7 +194,6 @@ public class SenderTest {
maxRetries,
m,
time,
- "clientId",
REQUEST_TIMEOUT);
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java
index c447f6d..cd7ed4a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java
@@ -19,8 +19,6 @@ package org.apache.kafka.connect.data;
import org.apache.kafka.connect.errors.DataException;
-import java.util.TimeZone;
-
/**
* <p>
* A timestamp representing an absolute time, without timezone information. The corresponding Java type is a
@@ -30,8 +28,6 @@ import java.util.TimeZone;
public class Timestamp {
public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.Timestamp";
- private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
-
/**
* Returns a SchemaBuilder for a Timestamp. By returning a SchemaBuilder you can override additional schema settings such
* as required/optional, default value, and documentation.
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 8a065f1..88a0a8d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -52,7 +52,6 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
private final String restUrl;
private final ConfigBackingStore configStorage;
private ConnectProtocol.Assignment assignmentSnapshot;
- private final WorkerCoordinatorMetrics sensors;
private ClusterConfigState configSnapshot;
private final WorkerRebalanceListener listener;
private LeaderState leaderState;
@@ -86,7 +85,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
this.restUrl = restUrl;
this.configStorage = configStorage;
this.assignmentSnapshot = null;
- this.sensors = new WorkerCoordinatorMetrics(metrics, metricGrpPrefix);
+ new WorkerCoordinatorMetrics(metrics, metricGrpPrefix);
this.listener = listener;
this.rejoinRequested = false;
}
@@ -306,11 +305,9 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
}
private class WorkerCoordinatorMetrics {
- public final Metrics metrics;
public final String metricGrpName;
public WorkerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
- this.metrics = metrics;
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
Measurable numConnectors = new Measurable() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
index 9567ef9..3faff65 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
@@ -22,8 +22,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.connect.util.ConnectorTaskId;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -74,11 +72,4 @@ public class ConnectorInfo {
return Objects.hash(name, config, tasks);
}
-
- private static List<ConnectorTaskId> jsonTasks(Collection<org.apache.kafka.connect.util.ConnectorTaskId> tasks) {
- List<ConnectorTaskId> jsonTasks = new ArrayList<>();
- for (ConnectorTaskId task : tasks)
- jsonTasks.add(task);
- return jsonTasks;
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index a098535..58c966d 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -84,7 +84,6 @@ object AclCommand {
CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.")
for ((resource, acls) <- resourceToAcl) {
- val acls = resourceToAcl(resource)
println(s"Adding ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
authorizer.addAcls(acls, resource)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index aa38f69..d3ce217 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -320,7 +320,7 @@ object AdminUtils extends Logging with AdminUtilities {
try {
zkUtils.createPersistentPath(getDeleteTopicPath(topic))
} catch {
- case e1: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
+ case _: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
"topic %s is already marked for deletion".format(topic))
case e2: Throwable => throw new AdminOperationException(e2)
}
@@ -471,7 +471,7 @@ object AdminUtils extends Logging with AdminUtilities {
}
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
} catch {
- case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
+ case _: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
case e2: Throwable => throw new AdminOperationException(e2.toString)
}
}
@@ -593,7 +593,7 @@ object AdminUtils extends Logging with AdminUtilities {
case _ => throw new IllegalArgumentException(s"Invalid ${entityConfigPath} config: ${str}")
}
- case o => throw new IllegalArgumentException(s"Unexpected value in config:(${str}), entity_config_path: ${entityConfigPath}")
+ case _ => throw new IllegalArgumentException(s"Unexpected value in config:(${str}), entity_config_path: ${entityConfigPath}")
}
}
props
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 20048ec..34df6b0 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -99,7 +99,7 @@ object ConfigCommand extends Config {
private def parseBroker(broker: String): Int = {
try broker.toInt
catch {
- case e: NumberFormatException =>
+ case _: NumberFormatException =>
throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value")
}
}
@@ -190,20 +190,20 @@ object ConfigCommand extends Config {
val rootEntities = zkUtils.getAllEntitiesWithConfig(root.entityType)
.map(name => ConfigEntity(Entity(root.entityType, Some(name)), child))
child match {
- case Some (s) =>
+ case Some(s) =>
rootEntities.flatMap(rootEntity =>
ConfigEntity(rootEntity.root, Some(Entity(s.entityType, None))).getAllEntities(zkUtils))
case None => rootEntities
}
- case (rootName, Some(childEntity)) =>
+ case (_, Some(childEntity)) =>
childEntity.sanitizedName match {
- case Some(subName) => Seq(this)
+ case Some(_) => Seq(this)
case None =>
zkUtils.getAllEntitiesWithConfig(root.entityPath + "/" + childEntity.entityType)
.map(name => ConfigEntity(root, Some(Entity(childEntity.entityType, Some(name)))))
}
- case (rootName, None) =>
+ case (_, None) =>
Seq(this)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 354e6a2..6300d76 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -253,7 +253,7 @@ object ConsumerGroupCommand extends Logging {
}
assignmentRows ++= groupConsumerIds.sortBy(- consumerTopicPartitions.get(_).size).flatMap { consumerId =>
- topicsByConsumerId(consumerId).flatMap { topic =>
+ topicsByConsumerId(consumerId).flatMap { _ =>
// since consumers with no topic partitions are processed here, we pass empty for topic partitions and offsets
// since consumer id is repeated in client id, leave host and client id empty
collectConsumerAssignment(group, None, Array[TopicAndPartition](), Map[TopicAndPartition, Option[Long]](), Some(consumerId), None, None)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index d194eca..81014b1 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -109,7 +109,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
zkUtils.createPersistentPath(zkPath, jsonData)
info("Created preferred replica election path with %s".format(jsonData))
} catch {
- case nee: ZkNodeExistsException =>
+ case _: ZkNodeExistsException =>
val partitionsUndergoingPreferredReplicaElection =
PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(zkUtils.readData(zkPath)._1)
throw new AdminOperationException("Preferred replica leader election currently in progress for " +
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index a037fd4..709b365 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -218,7 +218,7 @@ object ReassignPartitionsCommand extends Logging {
partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = {
val newReplicas = partitionsToBeReassigned(topicAndPartition)
partitionsBeingReassigned.get(topicAndPartition) match {
- case Some(partition) => ReassignmentInProgress
+ case Some(_) => ReassignmentInProgress
case None =>
// check if the current replica assignment matches the expected one after reassignment
val assignedReplicas = zkUtils.getReplicasForPartition(topicAndPartition.topic, topicAndPartition.partition)
@@ -394,7 +394,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
true
}
} catch {
- case ze: ZkNodeExistsException =>
+ case _: ZkNodeExistsException =>
val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
throw new AdminCommandFailedException("Partition reassignment currently in " +
"progress for %s. Aborting operation".format(partitionsBeingReassigned))
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index a35a989..2fcc2ce 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -176,11 +176,11 @@ object TopicCommand extends Logging {
println("Note: This will have no impact if delete.topic.enable is not set to true.")
}
} catch {
- case e: ZkNodeExistsException =>
+ case _: ZkNodeExistsException =>
println("Topic %s is already marked for deletion.".format(topic))
case e: AdminOperationException =>
throw e
- case e: Throwable =>
+ case _: Throwable =>
throw new AdminOperationException("Error while deleting topic %s".format(topic))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index a87e5b7..9ffee86 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -123,7 +123,6 @@ object ZkSecurityMigrator extends Logging {
}
class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
- private val workQueue = new LinkedBlockingQueue[Runnable]
private val futures = new Queue[Future[String]]
private def setAcl(path: String, setPromise: Promise[String]) = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
index 02eeae1..1ba5cfa 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
@@ -30,7 +30,7 @@ object ControlledShutdownResponse {
val numEntries = buffer.getInt
var partitionsRemaining = Set[TopicAndPartition]()
- for (i<- 0 until numEntries){
+ for (_ <- 0 until numEntries){
val topic = readShortString(buffer)
val partition = buffer.getInt
partitionsRemaining += new TopicAndPartition(topic, partition)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 3c380c9..57e99c1 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -69,7 +69,7 @@ object FetchRequest {
val groupedByTopic = requestInfo.groupBy { case (tp, _) => tp.topic }.map { case (topic, values) =>
topic -> random.shuffle(values)
}
- random.shuffle(groupedByTopic.toSeq).flatMap { case (topic, partitions) =>
+ random.shuffle(groupedByTopic.toSeq).flatMap { case (_, partitions) =>
partitions.map { case (tp, fetchInfo) => tp -> fetchInfo }
}
}
@@ -196,9 +196,8 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
}
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
- val fetchResponsePartitionData = requestInfo.map {
- case (topicAndPartition, data) =>
- (topicAndPartition, FetchResponsePartitionData(Errors.forException(e).code, -1, MessageSet.Empty))
+ val fetchResponsePartitionData = requestInfo.map { case (topicAndPartition, _) =>
+ (topicAndPartition, FetchResponsePartitionData(Errors.forException(e).code, -1, MessageSet.Empty))
}
val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData, fetchRequest.versionId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/LeaderAndIsr.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
index e5813a5..9123788 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
@@ -46,10 +46,10 @@ object PartitionStateInfo {
val leader = buffer.getInt
val leaderEpoch = buffer.getInt
val isrSize = buffer.getInt
- val isr = for(i <- 0 until isrSize) yield buffer.getInt
+ val isr = for (_ <- 0 until isrSize) yield buffer.getInt
val zkVersion = buffer.getInt
val replicationFactor = buffer.getInt
- val replicas = for(i <- 0 until replicationFactor) yield buffer.getInt
+ val replicas = for (_ <- 0 until replicationFactor) yield buffer.getInt
PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr.toList, zkVersion), controllerEpoch),
replicas.toSet)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
index d4f6158..94223c7 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -48,7 +48,7 @@ case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
lazy val commitStatusGroupedByTopic = commitStatus.groupBy(_._1.topic)
- def hasError = commitStatus.exists{ case (topicAndPartition, errorCode) => errorCode != Errors.NONE.code }
+ def hasError = commitStatus.values.exists(_ != Errors.NONE.code)
def writeTo(buffer: ByteBuffer) {
buffer.putInt(correlationId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index b15cf5a..416dd73 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -114,8 +114,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
}
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
- val partitionOffsetResponseMap = requestInfo.map {
- case (topicAndPartition, partitionOffsetRequest) =>
+ val partitionOffsetResponseMap = requestInfo.map { case (topicAndPartition, _) =>
(topicAndPartition, PartitionOffsetsResponse(Errors.forException(e).code, Nil))
}
val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap)
@@ -133,4 +132,4 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
offsetRequest.append("; RequestInfo: " + requestInfo.mkString(","))
offsetRequest.toString()
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index aad2fa5..3ca7bd7 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -133,9 +133,8 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
requestChannel.closeConnection(request.processor, request)
}
else {
- val producerResponseStatus = data.map {
- case (topicAndPartition, data) =>
- (topicAndPartition, ProducerResponseStatus(Errors.forException(e).code, -1l, Message.NoTimestamp))
+ val producerResponseStatus = data.map { case (topicAndPartition, _) =>
+ (topicAndPartition, ProducerResponseStatus(Errors.forException(e).code, -1l, Message.NoTimestamp))
}
val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index fda8b0b..8893697 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -121,7 +121,7 @@ object ClientUtils extends Logging{
debug("Created channel to broker %s:%d.".format(channel.host, channel.port))
true
} catch {
- case e: Exception =>
+ case _: Exception =>
if (channel != null) channel.disconnect()
channel = null
info("Error while creating channel to %s:%d.".format(broker.host, broker.port))
@@ -164,7 +164,7 @@ object ClientUtils extends Logging{
}
}
catch {
- case ioe: IOException =>
+ case _: IOException =>
info("Failed to fetch consumer metadata from %s:%d.".format(queryChannel.host, queryChannel.port))
queryChannel.disconnect()
}
@@ -187,7 +187,7 @@ object ClientUtils extends Logging{
queryChannel.disconnect()
}
catch {
- case ioe: IOException => // offsets manager may have moved
+ case _: IOException => // offsets manager may have moved
info("Error while connecting to %s.".format(connectString))
if (offsetManagerChannel != null) offsetManagerChannel.disconnect()
Thread.sleep(retryBackOffMs)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
index 91823f0..847e959 100644
--- a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
+++ b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
@@ -32,7 +32,7 @@ object BrokerEndPoint {
*/
def parseHostPort(connectionString: String): Option[(String, Int)] = {
connectionString match {
- case uriParseExp(host, port) => try Some(host, port.toInt) catch { case e: NumberFormatException => None }
+ case uriParseExp(host, port) => try Some(host, port.toInt) catch { case _: NumberFormatException => None }
case _ => None
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 28d3c8d..4d3fb56 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -415,11 +415,10 @@ class Partition(val topic: String,
* is violated, that replica is considered to be out of sync
*
**/
- val leaderLogEndOffset = leaderReplica.logEndOffset
val candidateReplicas = inSyncReplicas - leaderReplica
val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
- if(laggingReplicas.nonEmpty)
+ if (laggingReplicas.nonEmpty)
debug("Lagging replicas for partition %s are %s".format(TopicAndPartition(topic, partitionId), laggingReplicas.map(_.brokerId).mkString(",")))
laggingReplicas
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index dfb203a..13c1921 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -38,12 +38,7 @@ class Replica(val brokerId: Int,
val topic = partition.topic
val partitionId = partition.partitionId
- def isLocal: Boolean = {
- log match {
- case Some(l) => true
- case None => false
- }
- }
+ def isLocal: Boolean = log.isDefined
private[this] val lastCaughtUpTimeMsUnderlying = new AtomicLong(time.milliseconds)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index 580ae33..ef8190c 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -91,7 +91,7 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
val changeId = changeNumber(notification)
if (changeId > lastExecutedChange) {
val changeZnode = seqNodeRoot + "/" + notification
- val (data, stat) = zkUtils.readDataMaybeNull(changeZnode)
+ val (data, _) = zkUtils.readDataMaybeNull(changeZnode)
data.map(notificationHandler.processNotification(_)).getOrElse {
logger.warn(s"read null data from $changeZnode when processing notification $notification")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
index 96fe690..900a4b6 100755
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -72,7 +72,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
def assign(ctx: AssignmentContext) = {
- val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
+ val valueFactory = (_: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
val partitionAssignment =
new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
@@ -131,7 +131,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
class RangeAssignor() extends PartitionAssignor with Logging {
def assign(ctx: AssignmentContext) = {
- val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
+ val valueFactory = (_: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
val partitionAssignment =
new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
for (topic <- ctx.myTopicThreadIds.keySet) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index 5706d3c..eb035f2 100755
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -140,8 +140,8 @@ private[kafka] class WildcardTopicCount(zkUtils: ZkUtils,
def pattern: String = {
topicFilter match {
- case wl: Whitelist => TopicCount.whiteListPattern
- case bl: Blacklist => TopicCount.blackListPattern
+ case _: Whitelist => TopicCount.whiteListPattern
+ case _: Blacklist => TopicCount.blackListPattern
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/consumer/TopicFilter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicFilter.scala b/core/src/main/scala/kafka/consumer/TopicFilter.scala
index 1ab4e5c..914e9b9 100644
--- a/core/src/main/scala/kafka/consumer/TopicFilter.scala
+++ b/core/src/main/scala/kafka/consumer/TopicFilter.scala
@@ -34,7 +34,7 @@ sealed abstract class TopicFilter(rawRegex: String) extends Logging {
Pattern.compile(regex)
}
catch {
- case e: PatternSyntaxException =>
+ case _: PatternSyntaxException =>
throw new RuntimeException(regex + " is an invalid regex.")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index f776578..81b6264 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -322,8 +322,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def commitOffsets(isAutoCommit: Boolean) {
val offsetsToCommit =
- immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) =>
- partitionTopicInfos.map { case (partition, info) =>
+ immutable.Map(topicRegistry.values.flatMap { partitionTopicInfos =>
+ partitionTopicInfos.values.map { info =>
TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset())
}
}.toSeq: _*)
@@ -442,7 +442,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
trace("Offset fetch response: %s.".format(offsetFetchResponse))
val (leaderChanged, loadInProgress) =
- offsetFetchResponse.requestInfo.foldLeft(false, false) { case(folded, (topicPartition, offsetMetadataAndError)) =>
+ offsetFetchResponse.requestInfo.values.foldLeft(false, false) { case (folded, offsetMetadataAndError) =>
(folded._1 || (offsetMetadataAndError.error == Errors.NOT_COORDINATOR_FOR_GROUP.code),
folded._2 || (offsetMetadataAndError.error == Errors.GROUP_LOAD_IN_PROGRESS.code))
}
@@ -706,7 +706,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val globalPartitionAssignment = partitionAssignor.assign(assignmentContext)
val partitionAssignment = globalPartitionAssignment.get(assignmentContext.consumerId)
val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]](
- valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo]))
+ valueFactory = Some((_: String) => new Pool[Int, PartitionTopicInfo]))
// fetch current offsets for all topic-partitions
val topicPartitions = partitionAssignment.keySet.toSeq
@@ -731,7 +731,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
if(reflectPartitionOwnershipDecision(partitionAssignment)) {
allTopicsOwnedPartitionsCount = partitionAssignment.size
- partitionAssignment.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
+ partitionAssignment.view.groupBy { case (topicPartition, _) => topicPartition.topic }
.foreach { case (topic, partitionThreadPairs) =>
newGauge("OwnedPartitionsCount",
new Gauge[Int] {
@@ -851,11 +851,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
successfullyOwnedPartitions ::= (topic, partition)
true
} catch {
- case e: ZkNodeExistsException =>
+ case _: ZkNodeExistsException =>
// The node hasn't been deleted by the original owner. So wait a bit and retry.
info("waiting for the partition ownership to be deleted: " + partition)
false
- case e2: Throwable => throw e2
}
}
val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1))
@@ -918,19 +917,18 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
topicCount.getConsumerThreadIdsPerTopic
val allQueuesAndStreams = topicCount match {
- case wildTopicCount: WildcardTopicCount =>
+ case _: WildcardTopicCount =>
/*
* Wild-card consumption streams share the same queues, so we need to
* duplicate the list for the subsequent zip operation.
*/
(1 to consumerThreadIdsPerTopic.keySet.size).flatMap(_ => queuesAndStreams).toList
- case statTopicCount: StaticTopicCount =>
+ case _: StaticTopicCount =>
queuesAndStreams
}
- val topicThreadIds = consumerThreadIdsPerTopic.map {
- case(topic, threadIds) =>
- threadIds.map((topic, _))
+ val topicThreadIds = consumerThreadIdsPerTopic.map { case (topic, threadIds) =>
+ threadIds.map((topic, _))
}.flatten
require(topicThreadIds.size == allQueuesAndStreams.size,
@@ -988,7 +986,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
"message streams by filter at most once.")
private val wildcardQueuesAndStreams = (1 to numStreams)
- .map(e => {
+ .map(_ => {
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
val stream = new KafkaStream[K,V](queue,
config.consumerTimeoutMs,
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 063ea6f..b7b4d71 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -77,25 +77,23 @@ class ControllerContext(val zkUtils: ZkUtils,
def liveOrShuttingDownBrokers = liveBrokersUnderlying
def partitionsOnBroker(brokerId: Int): Set[TopicAndPartition] = {
- partitionReplicaAssignment
- .filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) }
- .map { case(topicAndPartition, replicas) => topicAndPartition }
- .toSet
+ partitionReplicaAssignment.collect {
+ case (topicAndPartition, replicas) if replicas.contains(brokerId) => topicAndPartition
+ }.toSet
}
def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = {
brokerIds.flatMap { brokerId =>
- partitionReplicaAssignment
- .filter { case (topicAndPartition, replicas) => replicas.contains(brokerId) }
- .map { case (topicAndPartition, replicas) =>
+ partitionReplicaAssignment.collect {
+ case (topicAndPartition, replicas) if replicas.contains(brokerId) =>
new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId)
- }
+ }
}.toSet
}
def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
partitionReplicaAssignment
- .filter { case (topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }
+ .filter { case (topicAndPartition, _) => topicAndPartition.topic == topic }
.flatMap { case (topicAndPartition, replicas) =>
replicas.map { r =>
new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, r)
@@ -103,10 +101,8 @@ class ControllerContext(val zkUtils: ZkUtils,
}.toSet
}
- def partitionsForTopic(topic: String): collection.Set[TopicAndPartition] = {
- partitionReplicaAssignment
- .filter { case(topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }.keySet
- }
+ def partitionsForTopic(topic: String): collection.Set[TopicAndPartition] =
+ partitionReplicaAssignment.keySet.filter(topicAndPartition => topicAndPartition.topic == topic)
def allLiveReplicas(): Set[PartitionAndReplica] = {
replicasOnBrokers(liveBrokerIds)
@@ -144,7 +140,7 @@ object KafkaController extends Logging {
case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
}
} catch {
- case t: Throwable =>
+ case _: Throwable =>
// It may be due to an incompatible controller register version
warn("Failed to parse the controller info as json. "
+ "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
@@ -433,7 +429,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
partitionStateMachine.triggerOnlinePartitionStateChange()
// check if reassignment of some partitions need to be restarted
val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter {
- case (topicAndPartition, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
+ case (_, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
}
partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))
// check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists
@@ -622,12 +618,11 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
val newReplicas = reassignedPartitionContext.newReplicas
val topic = topicAndPartition.topic
val partition = topicAndPartition.partition
- val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
try {
val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
assignedReplicasOpt match {
case Some(assignedReplicas) =>
- if(assignedReplicas == newReplicas) {
+ if (assignedReplicas == newReplicas) {
throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
" %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
} else {
@@ -706,7 +701,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
controllerContext.epoch = newControllerEpoch
}
} catch {
- case nne: ZkNoNodeException =>
+ case _: ZkNoNodeException =>
// if path doesn't exist, this is the first controller whose epoch should be 1
// the following call can still fail if another controller gets elected between checking if the path exists and
// trying to create the controller epoch path
@@ -715,7 +710,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
controllerContext.epoch = KafkaController.InitialControllerEpoch
controllerContext.epochZkVersion = KafkaController.InitialControllerEpochZkVersion
} catch {
- case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
+ case _: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
"Aborting controller startup procedure")
case oe: Throwable => error("Error while incrementing controller epoch", oe)
}
@@ -788,7 +783,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
private def initializeTopicDeletion() {
val topicsQueuedForDeletion = zkUtils.getChildrenParentMayNotExist(ZkUtils.DeleteTopicsPath).toSet
- val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter { case(partition, replicas) =>
+ val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter { case (_, replicas) =>
replicas.exists(r => !controllerContext.liveBrokerIds.contains(r)) }.keySet.map(_.topic)
val topicsForWhichPreferredReplicaElectionIsInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic)
val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic)
@@ -989,7 +984,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
zkUtils.updatePersistentPath(zkPath, jsonPartitionMap)
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
} catch {
- case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
+ case _: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
case e2: Throwable => throw new KafkaException(e2.toString)
}
}
@@ -1182,7 +1177,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
inLock(controllerContext.controllerLock) {
preferredReplicasForTopicsByBrokers =
controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
- case(topicAndPartition, assignedReplicas) => assignedReplicas.head
+ case (_, assignedReplicas) => assignedReplicas.head
}
}
debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
@@ -1192,13 +1187,10 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
var imbalanceRatio: Double = 0
var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
inLock(controllerContext.controllerLock) {
- topicsNotInPreferredReplica =
- topicAndPartitionsForBroker.filter {
- case(topicPartition, replicas) => {
- controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
- controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
- }
- }
+ topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter { case (topicPartition, _) =>
+ controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
+ controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
+ }
debug("topics not in preferred replica " + topicsNotInPreferredReplica)
val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
@@ -1208,18 +1200,16 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
// check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
// that need to be on this broker
if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
- topicsNotInPreferredReplica.foreach {
- case(topicPartition, replicas) => {
- inLock(controllerContext.controllerLock) {
- // do this check only if the broker is live and there are no partitions being reassigned currently
- // and preferred replica election is not in progress
- if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
- controllerContext.partitionsBeingReassigned.isEmpty &&
- controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty &&
- !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
- controllerContext.allTopics.contains(topicPartition.topic)) {
- onPreferredReplicaElection(Set(topicPartition), true)
- }
+ topicsNotInPreferredReplica.keys.foreach { topicPartition =>
+ inLock(controllerContext.controllerLock) {
+ // do this check only if the broker is live and there are no partitions being reassigned currently
+ // and preferred replica election is not in progress
+ if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+ controllerContext.partitionsBeingReassigned.isEmpty &&
+ controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty &&
+ !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
+ controllerContext.allTopics.contains(topicPartition.topic)) {
+ onPreferredReplicaElection(Set(topicPartition), true)
}
}
}
@@ -1373,7 +1363,7 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil
private def getTopicAndPartition(child: String): Set[TopicAndPartition] = {
val changeZnode: String = ZkUtils.IsrChangeNotificationPath + "/" + child
- val (jsonOpt, stat) = controller.controllerContext.zkUtils.readDataMaybeNull(changeZnode)
+ val (jsonOpt, _) = controller.controllerContext.zkUtils.readDataMaybeNull(changeZnode)
if (jsonOpt.isDefined) {
val json = Json.parseFull(jsonOpt.get)
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 32bf4da..ee94b46 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -238,7 +238,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
* zookeeper
*/
private def initializePartitionState() {
- for((topicPartition, replicaAssignment) <- controllerContext.partitionReplicaAssignment) {
+ for (topicPartition <- controllerContext.partitionReplicaAssignment.keys) {
// check if leader and isr path exists for partition. If not, then it is in NEW state
controllerContext.partitionLeadershipInfo.get(topicPartition) match {
case Some(currentLeaderIsrAndEpoch) =>
@@ -297,7 +297,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment)
} catch {
- case e: ZkNodeExistsException =>
+ case _: ZkNodeExistsException =>
// read the controller epoch
val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topicAndPartition.topic,
topicAndPartition.partition).get
@@ -357,7 +357,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
newLeaderIsrAndControllerEpoch, replicas)
} catch {
- case lenne: LeaderElectionNotNeededException => // swallow
+ case _: LeaderElectionNotNeededException => // swallow
case nroe: NoReplicaOnlineException => throw nroe
case sce: Throwable =>
val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage)
@@ -430,8 +430,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
deletedTopics, addedPartitionReplicaAssignment))
- if(newTopics.nonEmpty)
- controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
+ if (newTopics.nonEmpty)
+ controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
} catch {
case e: Throwable => error("Error while handling new topic", e )
}
@@ -522,7 +522,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
if (partitionsToBeAdded.nonEmpty) {
info("New partitions to be added %s".format(partitionsToBeAdded))
controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
- controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
+ controller.onNewPartitionCreation(partitionsToBeAdded.keySet)
}
}
} catch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index d4e9bb4..03887ae 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -246,7 +246,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
// As an optimization, the controller removes dead replicas from the ISR
val leaderAndIsrIsEmpty: Boolean =
controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
- case Some(currLeaderIsrAndControllerEpoch) =>
+ case Some(_) =>
controller.removeReplicaFromIsr(topic, partition, replicaId) match {
case Some(updatedLeaderIsrAndControllerEpoch) =>
// send the shrunk ISR state change request to all the remaining alive replicas of the partition.
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 98057dd..8e5f3a1 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -342,8 +342,8 @@ class TopicDeletionManager(controller: KafkaController,
*@param replicasForTopicsToBeDeleted
*/
private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
- replicasForTopicsToBeDeleted.groupBy(_.topic).foreach { case(topic, replicas) =>
- var aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic.equals(topic))
+ replicasForTopicsToBeDeleted.groupBy(_.topic).keys.foreach { topic =>
+ val aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic == topic)
val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 79d4411..e0c8e65 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -265,7 +265,7 @@ class GroupMetadataManager(val brokerId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Short] => Unit): DelayedStore = {
// first filter out partitions with offset metadata size exceeding limit
- val filteredOffsetMetadata = offsetMetadata.filter { case (topicPartition, offsetAndMetadata) =>
+ val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) =>
validateOffsetMetadataLength(offsetAndMetadata.metadata)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index f29cde7..6b57696 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -290,7 +290,7 @@ class Log(val dir: File,
try {
curr.recover(config.maxMessageSize)
} catch {
- case e: InvalidOffsetException =>
+ case _: InvalidOffsetException =>
val startOffset = curr.baseOffset
warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " +
"creating an empty one with starting offset " + startOffset)
@@ -627,7 +627,7 @@ class Log(val dir: File,
val fetchDataInfo = read(offset, 1)
fetchDataInfo.fetchOffsetMetadata
} catch {
- case e: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata
+ case _: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index bb8a89a..34b0dbf 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -240,7 +240,7 @@ class LogCleaner(val config: CleanerConfig,
recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats)
endOffset = nextDirtyOffset
} catch {
- case pe: LogCleaningAbortedException => // task can be aborted, let it go.
+ case _: LogCleaningAbortedException => // task can be aborted, let it go.
} finally {
cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index b3e6e72..b808348 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -112,12 +112,10 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
*/
def deletableLogs(): Iterable[(TopicAndPartition, Log)] = {
inLock(lock) {
- val toClean = logs.filterNot {
- case (topicAndPartition, log) => inProgress.contains(topicAndPartition)
- }.filter {
- case (topicAndPartition, log) => isCompactAndDelete(log)
+ val toClean = logs.filter { case (topicAndPartition, log) =>
+ !inProgress.contains(topicAndPartition) && isCompactAndDelete(log)
}
- toClean.foreach{x => inProgress.put(x._1, LogCleaningInProgress)}
+ toClean.foreach { case (tp, _) => inProgress.put(tp, LogCleaningInProgress) }
toClean
}
@@ -317,4 +315,4 @@ private[log] object LogCleanerManager extends Logging {
(firstDirtyOffset, firstUncleanableDirtyOffset)
}
-}
\ No newline at end of file
+}