You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2021/11/03 14:00:37 UTC

[kafka] branch trunk updated: TRIVIAL: Fix type inconsistencies, unthrown exceptions, etc (#10678)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 22d056c  TRIVIAL: Fix type inconsistencies, unthrown exceptions, etc (#10678)
22d056c is described below

commit 22d056c9b76c9bf8417d8701594d1fcee1c6a655
Author: Lee Dongjin <do...@apache.org>
AuthorDate: Wed Nov 3 22:58:42 2021 +0900

    TRIVIAL: Fix type inconsistencies, unthrown exceptions, etc (#10678)
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Bruno Cadonna <ca...@apache.org>
---
 .../org/apache/kafka/common/config/SslConfigs.java |  6 +-
 .../security/ssl/DefaultSslEngineFactory.java      |  2 +-
 .../apache/kafka/common/compress/KafkaLZ4Test.java |  4 +-
 .../kafka/common/requests/ProduceRequestTest.java  |  9 ++-
 .../security/ssl/DefaultSslEngineFactoryTest.java  |  2 +-
 .../coordinator/group/GroupMetadataManager.scala   |  1 -
 core/src/main/scala/kafka/log/UnifiedLog.scala     |  2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala | 74 +++++++++++-----------
 .../main/scala/kafka/tools/ConsoleConsumer.scala   |  2 +-
 ...chDrivenReplicationProtocolAcceptanceTest.scala |  4 +-
 .../kafka/raft/internals/RecordsIteratorTest.java  |  2 +-
 11 files changed, 52 insertions(+), 56 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
index d7ed803..5061ed5 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
@@ -92,10 +92,10 @@ public class SslConfigs {
     public static final String SSL_KEYSTORE_PASSWORD_CONFIG = "ssl.keystore.password";
     public static final String SSL_KEYSTORE_PASSWORD_DOC = "The store password for the key store file. "
         + "This is optional for client and only needed if 'ssl.keystore.location' is configured. "
-        + " Key store password is not supported for PEM format.";
+        + "Key store password is not supported for PEM format.";
 
     public static final String SSL_KEY_PASSWORD_CONFIG = "ssl.key.password";
-    public static final String SSL_KEY_PASSWORD_DOC = "The password of the private key in the key store file or"
+    public static final String SSL_KEY_PASSWORD_DOC = "The password of the private key in the key store file or "
         + "the PEM key specified in `ssl.keystore.key'. This is required for clients only if two-way authentication is configured.";
 
     public static final String SSL_TRUSTSTORE_TYPE_CONFIG = "ssl.truststore.type";
@@ -103,7 +103,7 @@ public class SslConfigs {
     public static final String DEFAULT_SSL_TRUSTSTORE_TYPE = "JKS";
 
     public static final String SSL_TRUSTSTORE_LOCATION_CONFIG = "ssl.truststore.location";
-    public static final String SSL_TRUSTSTORE_LOCATION_DOC = "The location of the trust store file. ";
+    public static final String SSL_TRUSTSTORE_LOCATION_DOC = "The location of the trust store file.";
 
     public static final String SSL_TRUSTSTORE_PASSWORD_CONFIG = "ssl.truststore.password";
     public static final String SSL_TRUSTSTORE_PASSWORD_DOC = "The password for the trust store file. "
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
index 1528a4a..d27fb36 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
@@ -324,7 +324,7 @@ public final class DefaultSslEngineFactory implements SslEngineFactory {
             return null;
     }
 
-    static interface SecurityStore {
+    interface SecurityStore {
         KeyStore get();
         char[] keyPassword();
         boolean modified();
diff --git a/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java b/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java
index a03c830..c3692fd 100644
--- a/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java
+++ b/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java
@@ -93,7 +93,7 @@ public class KafkaLZ4Test {
     private static class Lz4ArgumentsProvider implements ArgumentsProvider {
 
         @Override
-        public Stream<? extends Arguments> provideArguments(ExtensionContext context) throws Exception {
+        public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
             List<Payload> payloads = new ArrayList<>();
 
             payloads.add(new Payload("empty", new byte[0]));
@@ -175,8 +175,6 @@ public class KafkaLZ4Test {
         assertTrue(e.getMessage().contains("exceeded max"));
     }
 
-
-
     @ParameterizedTest
     @ArgumentsSource(Lz4ArgumentsProvider.class)
     public void testCompression(Args args) throws Exception {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
index fee026e..9a01600 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
@@ -47,7 +47,7 @@ public class ProduceRequestTest {
                                                                "value".getBytes());
 
     @Test
-    public void shouldBeFlaggedAsTransactionalWhenTransactionalRecords() throws Exception {
+    public void shouldBeFlaggedAsTransactionalWhenTransactionalRecords() {
         final MemoryRecords memoryRecords = MemoryRecords.withTransactionalRecords(0, CompressionType.NONE, 1L,
                 (short) 1, 1, 1, simpleRecord);
 
@@ -65,19 +65,19 @@ public class ProduceRequestTest {
     }
 
     @Test
-    public void shouldNotBeFlaggedAsTransactionalWhenNoRecords() throws Exception {
+    public void shouldNotBeFlaggedAsTransactionalWhenNoRecords() {
         final ProduceRequest request = createNonIdempotentNonTransactionalRecords();
         assertFalse(RequestUtils.hasTransactionalRecords(request));
     }
 
     @Test
-    public void shouldNotBeFlaggedAsIdempotentWhenRecordsNotIdempotent() throws Exception {
+    public void shouldNotBeFlaggedAsIdempotentWhenRecordsNotIdempotent() {
         final ProduceRequest request = createNonIdempotentNonTransactionalRecords();
         assertFalse(RequestUtils.hasTransactionalRecords(request));
     }
 
     @Test
-    public void shouldBeFlaggedAsIdempotentWhenIdempotentRecords() throws Exception {
+    public void shouldBeFlaggedAsIdempotentWhenIdempotentRecords() {
         final MemoryRecords memoryRecords = MemoryRecords.withIdempotentRecords(1, CompressionType.NONE, 1L,
                 (short) 1, 1, 1, simpleRecord);
         final ProduceRequest request = ProduceRequest.forCurrentMagic(new ProduceRequestData()
@@ -244,7 +244,6 @@ public class ProduceRequestTest {
         final long producerId = 15L;
         final short producerEpoch = 5;
         final int sequence = 10;
-        final String transactionalId = "txnlId";
 
         final MemoryRecords nonTxnRecords = MemoryRecords.withRecords(CompressionType.NONE,
                 new SimpleRecord("foo".getBytes()));
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
index be45729..0e494cc 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java
@@ -313,7 +313,7 @@ public class DefaultSslEngineFactoryTest {
         return TestUtils.tempFile(pem).getAbsolutePath();
     }
 
-    private Password pemAsConfigValue(String... pemValues)  throws Exception {
+    private Password pemAsConfigValue(String... pemValues) {
         StringBuilder builder = new StringBuilder();
         for (String pem : pemValues) {
             builder.append(pem);
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index ac3fc39..24f9ad5 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -318,7 +318,6 @@ class GroupMetadataManager(brokerId: Int,
 
       case None =>
         responseCallback(Errors.NOT_COORDINATOR)
-        None
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 029d1fb..7332549 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -59,7 +59,7 @@ object LogAppendInfo {
   /**
    * In ProduceResponse V8+, we add two new fields record_errors and error_message (see KIP-467).
    * For any record failures with InvalidTimestamp or InvalidRecordException, we construct a LogAppendInfo object like the one
-   * in unknownLogAppendInfoWithLogStartOffset, but with additiona fields recordErrors and errorMessage
+   * in unknownLogAppendInfoWithLogStartOffset, but with additional fields recordErrors and errorMessage
    */
   def unknownLogAppendInfoWithAdditionalInfo(logStartOffset: Long, recordErrors: Seq[RecordError], errorMessage: String): LogAppendInfo =
     LogAppendInfo(None, -1, None, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 91eb54f..17da10e 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -420,43 +420,43 @@ object KafkaConfig {
   val RackProp = "broker.rack"
   /** ********* Log Configuration ***********/
   val NumPartitionsProp = "num.partitions"
-  val LogDirsProp = "log.dirs"
-  val LogDirProp = "log.dir"
-  val LogSegmentBytesProp = "log.segment.bytes"
-
-  val LogRollTimeMillisProp = "log.roll.ms"
-  val LogRollTimeHoursProp = "log.roll.hours"
-
-  val LogRollTimeJitterMillisProp = "log.roll.jitter.ms"
-  val LogRollTimeJitterHoursProp = "log.roll.jitter.hours"
-
-  val LogRetentionTimeMillisProp = "log.retention.ms"
-  val LogRetentionTimeMinutesProp = "log.retention.minutes"
-  val LogRetentionTimeHoursProp = "log.retention.hours"
-
-  val LogRetentionBytesProp = "log.retention.bytes"
-  val LogCleanupIntervalMsProp = "log.retention.check.interval.ms"
-  val LogCleanupPolicyProp = "log.cleanup.policy"
-  val LogCleanerThreadsProp = "log.cleaner.threads"
-  val LogCleanerIoMaxBytesPerSecondProp = "log.cleaner.io.max.bytes.per.second"
-  val LogCleanerDedupeBufferSizeProp = "log.cleaner.dedupe.buffer.size"
-  val LogCleanerIoBufferSizeProp = "log.cleaner.io.buffer.size"
-  val LogCleanerDedupeBufferLoadFactorProp = "log.cleaner.io.buffer.load.factor"
-  val LogCleanerBackoffMsProp = "log.cleaner.backoff.ms"
-  val LogCleanerMinCleanRatioProp = "log.cleaner.min.cleanable.ratio"
-  val LogCleanerEnableProp = "log.cleaner.enable"
-  val LogCleanerDeleteRetentionMsProp = "log.cleaner.delete.retention.ms"
-  val LogCleanerMinCompactionLagMsProp = "log.cleaner.min.compaction.lag.ms"
-  val LogCleanerMaxCompactionLagMsProp = "log.cleaner.max.compaction.lag.ms"
-  val LogIndexSizeMaxBytesProp = "log.index.size.max.bytes"
-  val LogIndexIntervalBytesProp = "log.index.interval.bytes"
-  val LogFlushIntervalMessagesProp = "log.flush.interval.messages"
-  val LogDeleteDelayMsProp = "log.segment.delete.delay.ms"
-  val LogFlushSchedulerIntervalMsProp = "log.flush.scheduler.interval.ms"
-  val LogFlushIntervalMsProp = "log.flush.interval.ms"
-  val LogFlushOffsetCheckpointIntervalMsProp = "log.flush.offset.checkpoint.interval.ms"
-  val LogFlushStartOffsetCheckpointIntervalMsProp = "log.flush.start.offset.checkpoint.interval.ms"
-  val LogPreAllocateProp = "log.preallocate"
+  val LogDirsProp = LogConfigPrefix + "dirs"
+  val LogDirProp = LogConfigPrefix + "dir"
+  val LogSegmentBytesProp = LogConfigPrefix + "segment.bytes"
+
+  val LogRollTimeMillisProp = LogConfigPrefix + "roll.ms"
+  val LogRollTimeHoursProp = LogConfigPrefix + "roll.hours"
+
+  val LogRollTimeJitterMillisProp = LogConfigPrefix + "roll.jitter.ms"
+  val LogRollTimeJitterHoursProp = LogConfigPrefix + "roll.jitter.hours"
+
+  val LogRetentionTimeMillisProp = LogConfigPrefix + "retention.ms"
+  val LogRetentionTimeMinutesProp = LogConfigPrefix + "retention.minutes"
+  val LogRetentionTimeHoursProp = LogConfigPrefix + "retention.hours"
+
+  val LogRetentionBytesProp = LogConfigPrefix + "retention.bytes"
+  val LogCleanupIntervalMsProp = LogConfigPrefix + "retention.check.interval.ms"
+  val LogCleanupPolicyProp = LogConfigPrefix + "cleanup.policy"
+  val LogCleanerThreadsProp = LogConfigPrefix + "cleaner.threads"
+  val LogCleanerIoMaxBytesPerSecondProp = LogConfigPrefix + "cleaner.io.max.bytes.per.second"
+  val LogCleanerDedupeBufferSizeProp = LogConfigPrefix + "cleaner.dedupe.buffer.size"
+  val LogCleanerIoBufferSizeProp = LogConfigPrefix + "cleaner.io.buffer.size"
+  val LogCleanerDedupeBufferLoadFactorProp = LogConfigPrefix + "cleaner.io.buffer.load.factor"
+  val LogCleanerBackoffMsProp = LogConfigPrefix + "cleaner.backoff.ms"
+  val LogCleanerMinCleanRatioProp = LogConfigPrefix + "cleaner.min.cleanable.ratio"
+  val LogCleanerEnableProp = LogConfigPrefix + "cleaner.enable"
+  val LogCleanerDeleteRetentionMsProp = LogConfigPrefix + "cleaner.delete.retention.ms"
+  val LogCleanerMinCompactionLagMsProp = LogConfigPrefix + "cleaner.min.compaction.lag.ms"
+  val LogCleanerMaxCompactionLagMsProp = LogConfigPrefix + "cleaner.max.compaction.lag.ms"
+  val LogIndexSizeMaxBytesProp = LogConfigPrefix + "index.size.max.bytes"
+  val LogIndexIntervalBytesProp = LogConfigPrefix + "index.interval.bytes"
+  val LogFlushIntervalMessagesProp = LogConfigPrefix + "flush.interval.messages"
+  val LogDeleteDelayMsProp = LogConfigPrefix + "segment.delete.delay.ms"
+  val LogFlushSchedulerIntervalMsProp = LogConfigPrefix + "flush.scheduler.interval.ms"
+  val LogFlushIntervalMsProp = LogConfigPrefix + "flush.interval.ms"
+  val LogFlushOffsetCheckpointIntervalMsProp = LogConfigPrefix + "flush.offset.checkpoint.interval.ms"
+  val LogFlushStartOffsetCheckpointIntervalMsProp = LogConfigPrefix + "flush.start.offset.checkpoint.interval.ms"
+  val LogPreAllocateProp = LogConfigPrefix + "preallocate"
 
   /* See `TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG` for details */
   @deprecated("3.0")
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 4390999..9cef54f 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -61,7 +61,7 @@ object ConsoleConsumer extends Logging {
   }
 
   def run(conf: ConsumerConfig): Unit = {
-    val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue
+    val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs.toLong else Long.MaxValue
     val consumer = new KafkaConsumer(consumerProps(conf), new ByteArrayDeserializer, new ByteArrayDeserializer)
 
     val consumerWrapper =
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 72d2866..0a884f1 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -190,7 +190,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
     producer = createBufferingProducer
 
     //Write 100 messages
-    (0 until 100).foreach { i =>
+    (0 until 100).foreach { _ =>
       producer.send(new ProducerRecord(topic, 0, null, msg))
       producer.flush()
     }
@@ -225,7 +225,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness wit
     printSegments()
 
     //Start broker 101. When it comes up it should read a whole batch of messages from the leader.
-    //As the chronology is lost we would end up with non-monatonic offsets (pre kip-101)
+    //As the chronology is lost we would end up with non-monotonic offsets (pre kip-101)
     brokers(1).startup()
 
     //Wait for replication to resync
diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
index 4a10f57..7d98489 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
@@ -59,7 +59,7 @@ public final class RecordsIteratorTest {
 
     @ParameterizedTest
     @MethodSource("emptyRecords")
-    void testEmptyRecords(Records records) throws IOException {
+    void testEmptyRecords(Records records) {
         testIterator(Collections.emptyList(), records);
     }