You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/04/23 07:44:47 UTC

[kafka] branch trunk updated: MINOR: Enable fatal warnings with scala 2.13 (#8429)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c5ae154  MINOR: Enable fatal warnings with scala 2.13 (#8429)
c5ae154 is described below

commit c5ae154a3fdcf20d388405361da37feed0c64ad9
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Thu Apr 23 00:44:03 2020 -0700

    MINOR: Enable fatal warnings with scala 2.13 (#8429)
    
    * Upgrade to Scala 2.13.2 which introduces the ability to suppress warnings.
    * Upgrade to scala-collection-compat 2.1.6 as it introduces the
    @nowarn annotation for Scala 2.12.
    * While at it, also update scala-java8-compat to 0.9.1.
    * Fix compiler warnings and add @nowarn for the unfixed ones.
    
    Scala 2.13.2 highlights (besides @nowarn):
    
    * Rewrite Vector (using "radix-balanced finger tree vectors"),
    for performance. Small vectors are now more compactly
    represented. Some operations are now drastically faster on
    large vectors. A few operations may be a little slower.
    * Matching strings makes switches in bytecode.
    
    https://github.com/scala/scala/releases/tag/v2.13.2
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 build.gradle                                                      | 6 ++++++
 core/src/main/scala/kafka/admin/ConfigCommand.scala               | 2 ++
 core/src/main/scala/kafka/log/Log.scala                           | 2 +-
 core/src/main/scala/kafka/network/RequestChannel.scala            | 3 ++-
 core/src/main/scala/kafka/network/SocketServer.scala              | 2 +-
 core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala | 7 +++++--
 .../main/scala/kafka/security/authorizer/AuthorizerUtils.scala    | 3 +++
 core/src/main/scala/kafka/server/KafkaApis.scala                  | 1 -
 core/src/main/scala/kafka/server/ReplicationQuotaManager.scala    | 2 +-
 core/src/main/scala/kafka/tools/ConsoleConsumer.scala             | 2 ++
 core/src/main/scala/kafka/tools/MirrorMaker.scala                 | 4 ++++
 core/src/main/scala/kafka/utils/Pool.scala                        | 2 +-
 core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala         | 2 +-
 .../kafka/api/AdminClientWithPoliciesIntegrationTest.scala        | 2 ++
 .../scala/integration/kafka/api/AuthorizerIntegrationTest.scala   | 4 ++++
 .../test/scala/integration/kafka/api/BaseProducerSendTest.scala   | 2 ++
 .../src/test/scala/integration/kafka/api/ConsumerBounceTest.scala | 3 +++
 .../integration/kafka/api/PlaintextAdminIntegrationTest.scala     | 8 ++++++--
 .../scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala | 2 ++
 .../kafka/server/DynamicBrokerReconfigurationTest.scala           | 4 ++++
 .../integration/MetricsDuringTopicCreationDeletionTest.scala      | 2 +-
 .../scala/unit/kafka/integration/UncleanLeaderElectionTest.scala  | 3 +++
 core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala          | 5 ++++-
 core/src/test/scala/unit/kafka/server/BaseRequestTest.scala       | 3 ++-
 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala    | 2 +-
 core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala      | 2 +-
 .../scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala  | 6 +++---
 core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala        | 4 ++++
 core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala | 2 +-
 gradle/dependencies.gradle                                        | 6 +++---
 30 files changed, 75 insertions(+), 23 deletions(-)

diff --git a/build.gradle b/build.gradle
index 7675b81..644d3eb 100644
--- a/build.gradle
+++ b/build.gradle
@@ -484,6 +484,12 @@ subprojects {
     scalaCompileOptions.additionalParameters += ["-opt:l:inline"]
     scalaCompileOptions.additionalParameters += inlineFrom
 
+    if (versions.baseScala != '2.12') {
+      scalaCompileOptions.additionalParameters += ["-opt-warnings"]
+      // Scala 2.13.2 introduces compiler warnings suppression, which is a pre-requisite for -Xfatal-warnings
+      scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"]
+    }
+
     // these options are valid for Scala versions < 2.13 only
     // Scala 2.13 removes them, see https://github.com/scala/scala/pull/6502 and https://github.com/scala/scala/pull/5969
     if (versions.baseScala == '2.12') {
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 3be24aa..a43fe9e 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -39,6 +39,7 @@ import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S
 import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
 import org.apache.zookeeper.client.ZKClientConfig
 
+import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 import scala.collection._
 
@@ -297,6 +298,7 @@ object ConfigCommand extends Config {
     }
   }
 
+  @nowarn("cat=deprecation")
   private[admin] def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
     val entityTypes = opts.entityTypes
     val entityNames = opts.entityNames
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index ec24987..3d6d803 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -498,7 +498,7 @@ class Log(@volatile private var _dir: File,
 
     def newLeaderEpochFileCache(): LeaderEpochFileCache = {
       val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel)
-      new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile)
+      new LeaderEpochFileCache(topicPartition, () => logEndOffset, checkpointFile)
     }
 
     if (recordVersion.precedes(RecordVersion.V2)) {
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index faa20f0..17ea3f3 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Sanitizer, Time}
 
+import scala.annotation.nowarn
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
@@ -106,7 +107,7 @@ object RequestChannel extends Logging {
 
     def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}"
 
-    def body[T <: AbstractRequest](implicit classTag: ClassTag[T], nn: NotNothing[T]): T = {
+    def body[T <: AbstractRequest](implicit classTag: ClassTag[T], @nowarn("cat=unused") nn: NotNothing[T]): T = {
       bodyAndSize.request match {
         case r: T => r
         case r =>
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 77c47f0..350e58d 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -700,7 +700,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
    * Wakeup the thread for selection.
    */
   @Override
-  def wakeup = nioSelector.wakeup()
+  def wakeup(): Unit = nioSelector.wakeup()
 
 }
 
diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
index bbb2fc8..5f2be90 100644
--- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
@@ -39,7 +39,8 @@ import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult
 import org.apache.kafka.server.authorizer._
 import org.apache.zookeeper.client.ZKClientConfig
 
-import scala.collection.{mutable, Seq}
+import scala.annotation.nowarn
+import scala.collection.{Seq, mutable}
 import scala.jdk.CollectionConverters._
 import scala.util.{Failure, Random, Success, Try}
 
@@ -249,7 +250,7 @@ class AclAuthorizer extends Authorizer with Logging {
           }
         } catch {
           case e: Exception =>
-            resourceBindingsBeingDeleted.foreach { case (binding, index) =>
+            resourceBindingsBeingDeleted.keys.foreach { binding =>
                 deleteExceptions.getOrElseUpdate(binding, apiException(e))
             }
         }
@@ -263,6 +264,7 @@ class AclAuthorizer extends Authorizer with Logging {
     }.map(CompletableFuture.completedFuture[AclDeleteResult]).asJava
   }
 
+  @nowarn("cat=optimizer")
   override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = {
       val aclBindings = new util.ArrayList[AclBinding]()
       aclCache.foreach { case (resource, versionedAcls) =>
@@ -342,6 +344,7 @@ class AclAuthorizer extends Authorizer with Logging {
     } else false
   }
 
+  @nowarn("cat=deprecation")
   private def matchingAcls(resourceType: ResourceType, resourceName: String): AclSeqs = {
     // save aclCache reference to a local val to get a consistent view of the cache during acl updates.
     val aclCacheSnapshot = aclCache
diff --git a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
index 4620c93..0d670be 100644
--- a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
@@ -28,9 +28,12 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer}
 
+import scala.annotation.nowarn
+
 
 object AuthorizerUtils {
 
+  @nowarn("cat=deprecation")
   def createAuthorizer(className: String): Authorizer = {
     Utils.newInstance(className, classOf[Object]) match {
       case auth: Authorizer => auth
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0d54919..b69db17 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import kafka.admin.{AdminUtils, RackAwareMode}
 import kafka.api.ElectLeadersRequestOps
-import kafka.api.LeaderAndIsr
 import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0, KAFKA_2_3_IV0}
 import kafka.cluster.Partition
 import kafka.common.OffsetAndMetadata
diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
index df1f946..375cd48 100644
--- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
@@ -133,7 +133,7 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig,
     * @param value
     */
   def record(value: Long): Unit = {
-    sensor().record(value, time.milliseconds(), false)
+    sensor().record(value.toDouble, time.milliseconds(), false)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index bc69fc2..5f7b9ab 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.requests.ListOffsetRequest
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer}
 import org.apache.kafka.common.utils.Utils
 
+import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 
 /**
@@ -575,6 +576,7 @@ class ChecksumMessageFormatter extends MessageFormatter {
       topicStr = ""
   }
 
+  @nowarn("cat=deprecation")
   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
     output.println(topicStr + "checksum:" + consumerRecord.checksum)
   }
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 70e3e02..970dbd3 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 
+import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.HashMap
 import scala.util.control.ControlThrowable
@@ -190,6 +191,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
     setName(threadName)
 
+    @nowarn("cat=deprecation")
     private def toBaseConsumerRecord(record: ConsumerRecord[Array[Byte], Array[Byte]]): BaseConsumerRecord =
       BaseConsumerRecord(record.topic,
         record.partition,
@@ -412,10 +414,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
    * If message.handler.args is specified. A constructor that takes in a String as argument must exist.
    */
   trait MirrorMakerMessageHandler {
+    @nowarn("cat=deprecation")
     def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]]
   }
 
   private[tools] object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler {
+    @nowarn("cat=deprecation")
     override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
       val timestamp: java.lang.Long = if (record.timestamp == RecordBatch.NO_TIMESTAMP) null else record.timestamp
       Collections.singletonList(new ProducerRecord(record.topic, null, timestamp, record.key, record.value, record.headers))
diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala
index 964de7e..93fd97c 100644
--- a/core/src/main/scala/kafka/utils/Pool.scala
+++ b/core/src/main/scala/kafka/utils/Pool.scala
@@ -83,7 +83,7 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] {
     
     def hasNext: Boolean = iter.hasNext
     
-    def next: (K, V) = {
+    def next(): (K, V) = {
       val n = iter.next
       (n.getKey, n.getValue)
     }
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 02a533d..0a91ece 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -198,7 +198,7 @@ class ZooKeeperClient(connectString: String,
       case GetDataRequest(path, ctx) =>
         zooKeeper.getData(path, shouldWatch(request), new DataCallback {
           def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat): Unit =
-            callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat, responseMetadata(sendTimeMs))),
+            callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat, responseMetadata(sendTimeMs)))
         }, ctx.orNull)
       case GetChildrenRequest(path, _, ctx) =>
         zooKeeper.getChildren(path, shouldWatch(request), new Children2Callback {
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index 3326003..900fc81 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -31,6 +31,7 @@ import org.junit.{After, Before, Rule, Test}
 import org.junit.rules.Timeout
 import org.scalatest.Assertions.intercept
 
+import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 
 /**
@@ -92,6 +93,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
     PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(zkClient, servers, client)
   }
 
+  @nowarn("cat=deprecation")
   @Test
   def testInvalidAlterConfigsDueToPolicy(): Unit = {
     client = Admin.create(createConfig)
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 0cbad3b..904a2be 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -59,6 +59,7 @@ import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.scalatest.Assertions.intercept
 
+import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable
 import scala.collection.mutable.Buffer
@@ -949,6 +950,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     consumeRecords(consumer)
   }
 
+  @nowarn("cat=deprecation")
   @Test
   def testPatternSubscriptionWithNoTopicAccess(): Unit = {
     createTopic(topic)
@@ -985,6 +987,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
   }
 
+  @nowarn("cat=deprecation")
   @Test
   def testPatternSubscriptionWithTopicAndGroupRead(): Unit = {
     createTopic(topic)
@@ -1016,6 +1019,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     assertTrue(consumer.assignment().isEmpty)
   }
 
+  @nowarn("cat=deprecation")
   @Test
   def testPatternSubscriptionMatchingInternalTopic(): Unit = {
     createTopic(topic)
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 9797e09..cec3e8d 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -36,6 +36,7 @@ import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.scalatest.Assertions.fail
 
+import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.Buffer
 import scala.concurrent.ExecutionException
@@ -102,6 +103,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
    * 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected.
    * 2. Last message of the non-blocking send should return the correct offset metadata
    */
+  @nowarn("cat=deprecation")
   @Test
   def testSendOffset(): Unit = {
     val producer = createProducer(brokerList)
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 24942fa..51c308d 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinator
 import org.junit.Assert._
 import org.junit.{After, Ignore, Test}
 
+import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 import scala.collection.{Seq, mutable}
 
@@ -83,6 +84,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
    * 1. Produce a bunch of messages
    * 2. Then consume the messages while killing and restarting brokers at random
    */
+  @nowarn("cat=deprecation")
   def consumeWithBrokerFailures(numIters: Int): Unit = {
     val numRecords = 1000
     val producer = createProducer()
@@ -379,6 +381,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
     checkCloseDuringRebalance("group1", topic, executor, true)
   }
 
+  @nowarn("cat=deprecation")
   private def checkCloseDuringRebalance(groupId: String, topic: String, executor: ExecutorService, brokersAvailableDuringClose: Boolean): Unit = {
 
     def subscribeAndPoll(consumer: KafkaConsumer[Array[Byte], Array[Byte]], revokeSemaphore: Option[Semaphore] = None): Future[Any] = {
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 6cb33fa..f014481 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -45,6 +45,7 @@ import org.junit.Assert._
 import org.junit.{After, Before, Ignore, Test}
 import org.scalatest.Assertions.intercept
 
+import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 import scala.concurrent.duration.Duration
@@ -2176,6 +2177,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
   /**
     * The AlterConfigs API is deprecated and should not support altering log levels
     */
+  @nowarn("cat=deprecation")
   @Test
   @Ignore // To be re-enabled once KAFKA-8779 is resolved
   def testAlterConfigsForLog4jLogLevelsDoesNotWork(): Unit = {
@@ -2227,6 +2229,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
 
 object PlaintextAdminIntegrationTest {
 
+  @nowarn("cat=deprecation")
   def checkValidAlterConfigs(client: Admin, topicResource1: ConfigResource, topicResource2: ConfigResource): Unit = {
     // Alter topics
     var topicConfigEntries1 = Seq(
@@ -2289,6 +2292,7 @@ object PlaintextAdminIntegrationTest {
     assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
   }
 
+  @nowarn("cat=deprecation")
   def checkInvalidAlterConfigs(zkClient: KafkaZkClient, servers: Seq[KafkaServer], client: Admin): Unit = {
     // Create topics
     val topic1 = "invalid-alter-configs-topic-1"
@@ -2356,12 +2360,12 @@ object PlaintextAdminIntegrationTest {
 
     assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
       configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
-    assertEquals(Defaults.CompressionType.toString,
+    assertEquals(Defaults.CompressionType,
       configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value)
 
     assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
 
-    assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
+    assertEquals(Defaults.CompressionType, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
   }
 
 }
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index c0319f0e..3ef00f1 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.{After, Assert, Before, Test}
 
+import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 import scala.compat.java8.OptionConverters._
@@ -46,6 +47,7 @@ abstract class AuthorizationAdmin {
 // Note: this test currently uses the deprecated SimpleAclAuthorizer to ensure we have test coverage
 // It must be replaced with the new AclAuthorizer when SimpleAclAuthorizer is removed
 class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup {
+  @nowarn("cat=deprecation")
   val authorizationAdmin: AuthorizationAdmin = new LegacyAuthorizationAdmin
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
 
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 12548a1..b610db2 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -63,6 +63,7 @@ import org.junit.Assert._
 import org.junit.{After, Before, Ignore, Test}
 import org.scalatest.Assertions.intercept
 
+import scala.annotation.nowarn
 import scala.collection._
 import scala.collection.mutable.ArrayBuffer
 import scala.jdk.CollectionConverters._
@@ -1324,6 +1325,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     }.mkString(",")
   }
 
+  @nowarn("cat=deprecation")
   private def alterAdvertisedListener(adminClient: Admin, externalAdminClient: Admin, oldHost: String, newHost: String): Unit = {
     val configs = servers.map { server =>
       val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
@@ -1350,6 +1352,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     assertTrue(s"Advertised listener update not propagated by controller: $endpoints", altered)
   }
 
+  @nowarn("cat=deprecation")
   private def alterConfigsOnServer(server: KafkaServer, props: Properties): Unit = {
     val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
     val newConfig = new Config(configEntries)
@@ -1358,6 +1361,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     props.asScala.foreach { case (k, v) => waitForConfigOnServer(server, k, v) }
   }
 
+  @nowarn("cat=deprecation")
   private def alterConfigs(servers: Seq[KafkaServer], adminClient: Admin, props: Properties,
                    perBrokerConfig: Boolean): AlterConfigsResult = {
     val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
diff --git a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
index 11f1488..066725c 100644
--- a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
@@ -53,7 +53,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
     .map(KafkaConfig.fromProps(_, overridingProps))
 
   @Before
-  override def setUp: Unit = {
+  override def setUp(): Unit = {
     // Do some Metrics Registry cleanup by removing the metrics that this test checks.
     // This is a test workaround to the issue that prior harness runs may have left a populated registry.
     // see https://issues.apache.org/jira/browse/KAFKA-4605
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 6fcdd4c..2f16aba 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -40,6 +40,8 @@ import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigsRes
 import org.junit.Assert._
 import org.scalatest.Assertions.intercept
 
+import scala.annotation.nowarn
+
 class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
   val brokerId1 = 0
   val brokerId2 = 1
@@ -347,6 +349,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     assertEquals(List("first", "third"), consumeAllMessages(topic, 2))
   }
 
+  @nowarn("cat=deprecation")
   private def alterTopicConfigs(adminClient: Admin, topic: String, topicConfigs: Properties): AlterConfigsResult = {
     val configEntries = topicConfigs.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
     val newConfig = new Config(configEntries)
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index b8bc006..7a4b328 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -31,6 +31,8 @@ import scala.util.Random
 import kafka.utils.TestUtils
 import org.apache.kafka.common.errors.InvalidOffsetException
 
+import scala.annotation.nowarn
+
 class OffsetIndexTest {
   
   var idx: OffsetIndex = null
@@ -47,7 +49,8 @@ class OffsetIndexTest {
     if(this.idx != null)
       this.idx.file.delete()
   }
-  
+
+  @nowarn("cat=deprecation")
   @Test
   def randomLookupTest(): Unit = {
     assertEquals("Not present value should return physical offset 0.", OffsetPosition(idx.baseOffset, 0), idx.lookup(92L))
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index dfd425e..bf54cdb 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, ResponseHeader}
 
+import scala.annotation.nowarn
 import scala.collection.Seq
 import scala.reflect.ClassTag
 
@@ -86,7 +87,7 @@ abstract class BaseRequestTest extends IntegrationTestHarness {
   }
 
   def receive[T <: AbstractResponse](socket: Socket, apiKey: ApiKeys, version: Short)
-                                    (implicit classTag: ClassTag[T], nn: NotNothing[T]): T = {
+                                    (implicit classTag: ClassTag[T], @nowarn("cat=unused") nn: NotNothing[T]): T = {
     val incoming = new DataInputStream(socket.getInputStream)
     val len = incoming.readInt()
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 4dde2d3..835c113 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -35,7 +35,7 @@ import kafka.utils.timer.MockTimer
 import kafka.utils.{MockScheduler, MockTime, TestUtils}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
-import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState}
+import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record._
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index a3b59f4..6dd1518 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -16,7 +16,7 @@ package kafka.server
 
 import java.util
 import java.util.concurrent.{Executors, Future, TimeUnit}
-import java.util.{Collections, LinkedHashMap, Optional, Properties}
+import java.util.{Collections, Optional, Properties}
 
 import kafka.api.LeaderAndIsr
 import kafka.log.LogConfig
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
index af98b47..0ddc996 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
@@ -40,7 +40,7 @@ class LeaderEpochFileCacheTest {
     override def write(epochs: Seq[EpochEntry]): Unit = this.epochs = epochs
     override def read(): Seq[EpochEntry] = this.epochs
   }
-  private val cache = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint)
+  private val cache = new LeaderEpochFileCache(tp, () => logEndOffset, checkpoint)
 
   @Test
   def shouldAddEpochAndMessageOffsetToCache() = {
@@ -231,12 +231,12 @@ class LeaderEpochFileCacheTest {
     val checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath))
 
     //Given
-    val cache = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint)
+    val cache = new LeaderEpochFileCache(tp, () => logEndOffset, checkpoint)
     cache.assign(epoch = 2, startOffset = 6)
 
     //When
     val checkpoint2 = new LeaderEpochCheckpointFile(new File(checkpointPath))
-    val cache2 = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint2)
+    val cache2 = new LeaderEpochFileCache(tp, () => logEndOffset, checkpoint2)
 
     //Then
     assertEquals(1, cache2.epochEntries.size)
diff --git a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala
index 0afdf09..413b5ba 100644
--- a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala
@@ -19,10 +19,14 @@ package kafka.tools
 
 import kafka.consumer.BaseConsumerRecord
 import org.apache.kafka.common.record.{RecordBatch, TimestampType}
+
 import scala.jdk.CollectionConverters._
 import org.junit.Assert._
 import org.junit.Test
 
+import scala.annotation.nowarn
+
+@nowarn("cat=deprecation")
 class MirrorMakerTest {
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala b/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala
index 31f7efb..672ac87 100644
--- a/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala
@@ -38,7 +38,7 @@ class ShutdownableThreadTest {
     }
     val latch = new CountDownLatch(1)
     val thread = new ShutdownableThread("shutdownable-thread-test") {
-      override def doWork: Unit = {
+      override def doWork(): Unit = {
         latch.countDown()
         throw new FatalExitError
       }
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 67d85d6..e38b144 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -28,7 +28,7 @@ ext {
 
 // Add Scala version
 def defaultScala212Version = '2.12.11'
-def defaultScala213Version = '2.13.1'
+def defaultScala213Version = '2.13.2'
 if (hasProperty('scalaVersion')) {
   if (scalaVersion == '2.12') {
     versions["scala"] = defaultScala212Version
@@ -102,9 +102,9 @@ versions += [
   powermock: "2.0.7",
   reflections: "0.9.12",
   rocksDB: "5.18.4",
-  scalaCollectionCompat: "2.1.4",
+  scalaCollectionCompat: "2.1.6",
   scalafmt: "1.5.1",
-  scalaJava8Compat : "0.9.0",
+  scalaJava8Compat : "0.9.1",
   scalatest: "3.0.8",
   scoverage: "1.4.1",
   scoveragePlugin: "4.0.1",