You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/04/23 07:44:47 UTC
[kafka] branch trunk updated: MINOR: Enable fatal warnings with
scala 2.13 (#8429)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c5ae154 MINOR: Enable fatal warnings with scala 2.13 (#8429)
c5ae154 is described below
commit c5ae154a3fdcf20d388405361da37feed0c64ad9
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Thu Apr 23 00:44:03 2020 -0700
MINOR: Enable fatal warnings with scala 2.13 (#8429)
* Upgrade to Scala 2.13.2 which introduces the ability to suppress warnings.
* Upgrade to scala-collection-compat 2.1.6 as it introduces the
@nowarn annotation for Scala 2.12.
* While at it, also update scala-java8-compat to 0.9.1.
* Fix compiler warnings and add @nowarn for the unfixed ones.
Scala 2.13.2 highlights (besides @nowarn):
* Rewrite Vector (using "radix-balanced finger tree vectors"),
for performance. Small vectors are now more compactly
represented. Some operations are now drastically faster on
large vectors. A few operations may be a little slower.
* Matching strings makes switches in bytecode.
https://github.com/scala/scala/releases/tag/v2.13.2
Reviewers: Manikumar Reddy <ma...@gmail.com>
---
build.gradle | 6 ++++++
core/src/main/scala/kafka/admin/ConfigCommand.scala | 2 ++
core/src/main/scala/kafka/log/Log.scala | 2 +-
core/src/main/scala/kafka/network/RequestChannel.scala | 3 ++-
core/src/main/scala/kafka/network/SocketServer.scala | 2 +-
core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala | 7 +++++--
.../main/scala/kafka/security/authorizer/AuthorizerUtils.scala | 3 +++
core/src/main/scala/kafka/server/KafkaApis.scala | 1 -
core/src/main/scala/kafka/server/ReplicationQuotaManager.scala | 2 +-
core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 2 ++
core/src/main/scala/kafka/tools/MirrorMaker.scala | 4 ++++
core/src/main/scala/kafka/utils/Pool.scala | 2 +-
core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala | 2 +-
.../kafka/api/AdminClientWithPoliciesIntegrationTest.scala | 2 ++
.../scala/integration/kafka/api/AuthorizerIntegrationTest.scala | 4 ++++
.../test/scala/integration/kafka/api/BaseProducerSendTest.scala | 2 ++
.../src/test/scala/integration/kafka/api/ConsumerBounceTest.scala | 3 +++
.../integration/kafka/api/PlaintextAdminIntegrationTest.scala | 8 ++++++--
.../scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala | 2 ++
.../kafka/server/DynamicBrokerReconfigurationTest.scala | 4 ++++
.../integration/MetricsDuringTopicCreationDeletionTest.scala | 2 +-
.../scala/unit/kafka/integration/UncleanLeaderElectionTest.scala | 3 +++
core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala | 5 ++++-
core/src/test/scala/unit/kafka/server/BaseRequestTest.scala | 3 ++-
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala | 2 +-
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala | 2 +-
.../scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala | 6 +++---
core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala | 4 ++++
core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala | 2 +-
gradle/dependencies.gradle | 6 +++---
30 files changed, 75 insertions(+), 23 deletions(-)
diff --git a/build.gradle b/build.gradle
index 7675b81..644d3eb 100644
--- a/build.gradle
+++ b/build.gradle
@@ -484,6 +484,12 @@ subprojects {
scalaCompileOptions.additionalParameters += ["-opt:l:inline"]
scalaCompileOptions.additionalParameters += inlineFrom
+ if (versions.baseScala != '2.12') {
+ scalaCompileOptions.additionalParameters += ["-opt-warnings"]
+ // Scala 2.13.2 introduces compiler warnings suppression, which is a pre-requisite for -Xfatal-warnings
+ scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"]
+ }
+
// these options are valid for Scala versions < 2.13 only
// Scala 2.13 removes them, see https://github.com/scala/scala/pull/6502 and https://github.com/scala/scala/pull/5969
if (versions.baseScala == '2.12') {
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 3be24aa..a43fe9e 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -39,6 +39,7 @@ import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S
import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
import org.apache.zookeeper.client.ZKClientConfig
+import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection._
@@ -297,6 +298,7 @@ object ConfigCommand extends Config {
}
}
+ @nowarn("cat=deprecation")
private[admin] def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
val entityTypes = opts.entityTypes
val entityNames = opts.entityNames
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index ec24987..3d6d803 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -498,7 +498,7 @@ class Log(@volatile private var _dir: File,
def newLeaderEpochFileCache(): LeaderEpochFileCache = {
val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel)
- new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile)
+ new LeaderEpochFileCache(topicPartition, () => logEndOffset, checkpointFile)
}
if (recordVersion.precedes(RecordVersion.V2)) {
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index faa20f0..17ea3f3 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}
+import scala.annotation.nowarn
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
@@ -106,7 +107,7 @@ object RequestChannel extends Logging {
def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}"
- def body[T <: AbstractRequest](implicit classTag: ClassTag[T], nn: NotNothing[T]): T = {
+ def body[T <: AbstractRequest](implicit classTag: ClassTag[T], @nowarn("cat=unused") nn: NotNothing[T]): T = {
bodyAndSize.request match {
case r: T => r
case r =>
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 77c47f0..350e58d 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -700,7 +700,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
* Wakeup the thread for selection.
*/
@Override
- def wakeup = nioSelector.wakeup()
+ def wakeup(): Unit = nioSelector.wakeup()
}
diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
index bbb2fc8..5f2be90 100644
--- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
@@ -39,7 +39,8 @@ import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult
import org.apache.kafka.server.authorizer._
import org.apache.zookeeper.client.ZKClientConfig
-import scala.collection.{mutable, Seq}
+import scala.annotation.nowarn
+import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Random, Success, Try}
@@ -249,7 +250,7 @@ class AclAuthorizer extends Authorizer with Logging {
}
} catch {
case e: Exception =>
- resourceBindingsBeingDeleted.foreach { case (binding, index) =>
+ resourceBindingsBeingDeleted.keys.foreach { binding =>
deleteExceptions.getOrElseUpdate(binding, apiException(e))
}
}
@@ -263,6 +264,7 @@ class AclAuthorizer extends Authorizer with Logging {
}.map(CompletableFuture.completedFuture[AclDeleteResult]).asJava
}
+ @nowarn("cat=optimizer")
override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = {
val aclBindings = new util.ArrayList[AclBinding]()
aclCache.foreach { case (resource, versionedAcls) =>
@@ -342,6 +344,7 @@ class AclAuthorizer extends Authorizer with Logging {
} else false
}
+ @nowarn("cat=deprecation")
private def matchingAcls(resourceType: ResourceType, resourceName: String): AclSeqs = {
// save aclCache reference to a local val to get a consistent view of the cache during acl updates.
val aclCacheSnapshot = aclCache
diff --git a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
index 4620c93..0d670be 100644
--- a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
@@ -28,9 +28,12 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer}
+import scala.annotation.nowarn
+
object AuthorizerUtils {
+ @nowarn("cat=deprecation")
def createAuthorizer(className: String): Authorizer = {
Utils.newInstance(className, classOf[Object]) match {
case auth: Authorizer => auth
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0d54919..b69db17 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger
import kafka.admin.{AdminUtils, RackAwareMode}
import kafka.api.ElectLeadersRequestOps
-import kafka.api.LeaderAndIsr
import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0, KAFKA_2_3_IV0}
import kafka.cluster.Partition
import kafka.common.OffsetAndMetadata
diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
index df1f946..375cd48 100644
--- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
@@ -133,7 +133,7 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig,
* @param value
*/
def record(value: Long): Unit = {
- sensor().record(value, time.milliseconds(), false)
+ sensor().record(value.toDouble, time.milliseconds(), false)
}
/**
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index bc69fc2..5f7b9ab 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.requests.ListOffsetRequest
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer}
import org.apache.kafka.common.utils.Utils
+import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
/**
@@ -575,6 +576,7 @@ class ChecksumMessageFormatter extends MessageFormatter {
topicStr = ""
}
+ @nowarn("cat=deprecation")
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
output.println(topicStr + "checksum:" + consumerRecord.checksum)
}
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 70e3e02..970dbd3 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
+import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.mutable.HashMap
import scala.util.control.ControlThrowable
@@ -190,6 +191,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
setName(threadName)
+ @nowarn("cat=deprecation")
private def toBaseConsumerRecord(record: ConsumerRecord[Array[Byte], Array[Byte]]): BaseConsumerRecord =
BaseConsumerRecord(record.topic,
record.partition,
@@ -412,10 +414,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
* If message.handler.args is specified. A constructor that takes in a String as argument must exist.
*/
trait MirrorMakerMessageHandler {
+ @nowarn("cat=deprecation")
def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]]
}
private[tools] object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler {
+ @nowarn("cat=deprecation")
override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
val timestamp: java.lang.Long = if (record.timestamp == RecordBatch.NO_TIMESTAMP) null else record.timestamp
Collections.singletonList(new ProducerRecord(record.topic, null, timestamp, record.key, record.value, record.headers))
diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala
index 964de7e..93fd97c 100644
--- a/core/src/main/scala/kafka/utils/Pool.scala
+++ b/core/src/main/scala/kafka/utils/Pool.scala
@@ -83,7 +83,7 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] {
def hasNext: Boolean = iter.hasNext
- def next: (K, V) = {
+ def next(): (K, V) = {
val n = iter.next
(n.getKey, n.getValue)
}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 02a533d..0a91ece 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -198,7 +198,7 @@ class ZooKeeperClient(connectString: String,
case GetDataRequest(path, ctx) =>
zooKeeper.getData(path, shouldWatch(request), new DataCallback {
def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat): Unit =
- callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat, responseMetadata(sendTimeMs))),
+ callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat, responseMetadata(sendTimeMs)))
}, ctx.orNull)
case GetChildrenRequest(path, _, ctx) =>
zooKeeper.getChildren(path, shouldWatch(request), new Children2Callback {
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index 3326003..900fc81 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -31,6 +31,7 @@ import org.junit.{After, Before, Rule, Test}
import org.junit.rules.Timeout
import org.scalatest.Assertions.intercept
+import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
/**
@@ -92,6 +93,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(zkClient, servers, client)
}
+ @nowarn("cat=deprecation")
@Test
def testInvalidAlterConfigsDueToPolicy(): Unit = {
client = Admin.create(createConfig)
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 0cbad3b..904a2be 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -59,6 +59,7 @@ import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.scalatest.Assertions.intercept
+import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.collection.mutable.Buffer
@@ -949,6 +950,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
consumeRecords(consumer)
}
+ @nowarn("cat=deprecation")
@Test
def testPatternSubscriptionWithNoTopicAccess(): Unit = {
createTopic(topic)
@@ -985,6 +987,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
}
+ @nowarn("cat=deprecation")
@Test
def testPatternSubscriptionWithTopicAndGroupRead(): Unit = {
createTopic(topic)
@@ -1016,6 +1019,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
assertTrue(consumer.assignment().isEmpty)
}
+ @nowarn("cat=deprecation")
@Test
def testPatternSubscriptionMatchingInternalTopic(): Unit = {
createTopic(topic)
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 9797e09..cec3e8d 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -36,6 +36,7 @@ import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.scalatest.Assertions.fail
+import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.mutable.Buffer
import scala.concurrent.ExecutionException
@@ -102,6 +103,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
* 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected.
* 2. Last message of the non-blocking send should return the correct offset metadata
*/
+ @nowarn("cat=deprecation")
@Test
def testSendOffset(): Unit = {
val producer = createProducer(brokerList)
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 24942fa..51c308d 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinator
import org.junit.Assert._
import org.junit.{After, Ignore, Test}
+import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.{Seq, mutable}
@@ -83,6 +84,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
* 1. Produce a bunch of messages
* 2. Then consume the messages while killing and restarting brokers at random
*/
+ @nowarn("cat=deprecation")
def consumeWithBrokerFailures(numIters: Int): Unit = {
val numRecords = 1000
val producer = createProducer()
@@ -379,6 +381,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
checkCloseDuringRebalance("group1", topic, executor, true)
}
+ @nowarn("cat=deprecation")
private def checkCloseDuringRebalance(groupId: String, topic: String, executor: ExecutorService, brokersAvailableDuringClose: Boolean): Unit = {
def subscribeAndPoll(consumer: KafkaConsumer[Array[Byte], Array[Byte]], revokeSemaphore: Option[Semaphore] = None): Future[Any] = {
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 6cb33fa..f014481 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -45,6 +45,7 @@ import org.junit.Assert._
import org.junit.{After, Before, Ignore, Test}
import org.scalatest.Assertions.intercept
+import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.Seq
import scala.concurrent.duration.Duration
@@ -2176,6 +2177,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
/**
* The AlterConfigs API is deprecated and should not support altering log levels
*/
+ @nowarn("cat=deprecation")
@Test
@Ignore // To be re-enabled once KAFKA-8779 is resolved
def testAlterConfigsForLog4jLogLevelsDoesNotWork(): Unit = {
@@ -2227,6 +2229,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
object PlaintextAdminIntegrationTest {
+ @nowarn("cat=deprecation")
def checkValidAlterConfigs(client: Admin, topicResource1: ConfigResource, topicResource2: ConfigResource): Unit = {
// Alter topics
var topicConfigEntries1 = Seq(
@@ -2289,6 +2292,7 @@ object PlaintextAdminIntegrationTest {
assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value)
}
+ @nowarn("cat=deprecation")
def checkInvalidAlterConfigs(zkClient: KafkaZkClient, servers: Seq[KafkaServer], client: Admin): Unit = {
// Create topics
val topic1 = "invalid-alter-configs-topic-1"
@@ -2356,12 +2360,12 @@ object PlaintextAdminIntegrationTest {
assertEquals(Defaults.LogCleanerMinCleanRatio.toString,
configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value)
- assertEquals(Defaults.CompressionType.toString,
+ assertEquals(Defaults.CompressionType,
configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value)
assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value)
- assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
+ assertEquals(Defaults.CompressionType, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value)
}
}
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index c0319f0e..3ef00f1 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.{After, Assert, Before, Test}
+import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
@@ -46,6 +47,7 @@ abstract class AuthorizationAdmin {
// Note: this test currently uses the deprecated SimpleAclAuthorizer to ensure we have test coverage
// It must be replaced with the new AclAuthorizer when SimpleAclAuthorizer is removed
class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup {
+ @nowarn("cat=deprecation")
val authorizationAdmin: AuthorizationAdmin = new LegacyAuthorizationAdmin
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 12548a1..b610db2 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -63,6 +63,7 @@ import org.junit.Assert._
import org.junit.{After, Before, Ignore, Test}
import org.scalatest.Assertions.intercept
+import scala.annotation.nowarn
import scala.collection._
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
@@ -1324,6 +1325,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}.mkString(",")
}
+ @nowarn("cat=deprecation")
private def alterAdvertisedListener(adminClient: Admin, externalAdminClient: Admin, oldHost: String, newHost: String): Unit = {
val configs = servers.map { server =>
val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
@@ -1350,6 +1352,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
assertTrue(s"Advertised listener update not propagated by controller: $endpoints", altered)
}
+ @nowarn("cat=deprecation")
private def alterConfigsOnServer(server: KafkaServer, props: Properties): Unit = {
val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
val newConfig = new Config(configEntries)
@@ -1358,6 +1361,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
props.asScala.foreach { case (k, v) => waitForConfigOnServer(server, k, v) }
}
+ @nowarn("cat=deprecation")
private def alterConfigs(servers: Seq[KafkaServer], adminClient: Admin, props: Properties,
perBrokerConfig: Boolean): AlterConfigsResult = {
val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
diff --git a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
index 11f1488..066725c 100644
--- a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
@@ -53,7 +53,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
.map(KafkaConfig.fromProps(_, overridingProps))
@Before
- override def setUp: Unit = {
+ override def setUp(): Unit = {
// Do some Metrics Registry cleanup by removing the metrics that this test checks.
// This is a test workaround to the issue that prior harness runs may have left a populated registry.
// see https://issues.apache.org/jira/browse/KAFKA-4605
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 6fcdd4c..2f16aba 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -40,6 +40,8 @@ import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigsRes
import org.junit.Assert._
import org.scalatest.Assertions.intercept
+import scala.annotation.nowarn
+
class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
val brokerId1 = 0
val brokerId2 = 1
@@ -347,6 +349,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
assertEquals(List("first", "third"), consumeAllMessages(topic, 2))
}
+ @nowarn("cat=deprecation")
private def alterTopicConfigs(adminClient: Admin, topic: String, topicConfigs: Properties): AlterConfigsResult = {
val configEntries = topicConfigs.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
val newConfig = new Config(configEntries)
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index b8bc006..7a4b328 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -31,6 +31,8 @@ import scala.util.Random
import kafka.utils.TestUtils
import org.apache.kafka.common.errors.InvalidOffsetException
+import scala.annotation.nowarn
+
class OffsetIndexTest {
var idx: OffsetIndex = null
@@ -47,7 +49,8 @@ class OffsetIndexTest {
if(this.idx != null)
this.idx.file.delete()
}
-
+
+ @nowarn("cat=deprecation")
@Test
def randomLookupTest(): Unit = {
assertEquals("Not present value should return physical offset 0.", OffsetPosition(idx.baseOffset, 0), idx.lookup(92L))
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index dfd425e..bf54cdb 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, ResponseHeader}
+import scala.annotation.nowarn
import scala.collection.Seq
import scala.reflect.ClassTag
@@ -86,7 +87,7 @@ abstract class BaseRequestTest extends IntegrationTestHarness {
}
def receive[T <: AbstractResponse](socket: Socket, apiKey: ApiKeys, version: Short)
- (implicit classTag: ClassTag[T], nn: NotNothing[T]): T = {
+ (implicit classTag: ClassTag[T], @nowarn("cat=unused") nn: NotNothing[T]): T = {
val incoming = new DataInputStream(socket.getInputStream)
val len = incoming.readInt()
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 4dde2d3..835c113 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -35,7 +35,7 @@ import kafka.utils.timer.MockTimer
import kafka.utils.{MockScheduler, MockTime, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
-import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState}
+import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record._
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index a3b59f4..6dd1518 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -16,7 +16,7 @@ package kafka.server
import java.util
import java.util.concurrent.{Executors, Future, TimeUnit}
-import java.util.{Collections, LinkedHashMap, Optional, Properties}
+import java.util.{Collections, Optional, Properties}
import kafka.api.LeaderAndIsr
import kafka.log.LogConfig
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
index af98b47..0ddc996 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
@@ -40,7 +40,7 @@ class LeaderEpochFileCacheTest {
override def write(epochs: Seq[EpochEntry]): Unit = this.epochs = epochs
override def read(): Seq[EpochEntry] = this.epochs
}
- private val cache = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint)
+ private val cache = new LeaderEpochFileCache(tp, () => logEndOffset, checkpoint)
@Test
def shouldAddEpochAndMessageOffsetToCache() = {
@@ -231,12 +231,12 @@ class LeaderEpochFileCacheTest {
val checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath))
//Given
- val cache = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint)
+ val cache = new LeaderEpochFileCache(tp, () => logEndOffset, checkpoint)
cache.assign(epoch = 2, startOffset = 6)
//When
val checkpoint2 = new LeaderEpochCheckpointFile(new File(checkpointPath))
- val cache2 = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint2)
+ val cache2 = new LeaderEpochFileCache(tp, () => logEndOffset, checkpoint2)
//Then
assertEquals(1, cache2.epochEntries.size)
diff --git a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala
index 0afdf09..413b5ba 100644
--- a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala
@@ -19,10 +19,14 @@ package kafka.tools
import kafka.consumer.BaseConsumerRecord
import org.apache.kafka.common.record.{RecordBatch, TimestampType}
+
import scala.jdk.CollectionConverters._
import org.junit.Assert._
import org.junit.Test
+import scala.annotation.nowarn
+
+@nowarn("cat=deprecation")
class MirrorMakerTest {
@Test
diff --git a/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala b/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala
index 31f7efb..672ac87 100644
--- a/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala
@@ -38,7 +38,7 @@ class ShutdownableThreadTest {
}
val latch = new CountDownLatch(1)
val thread = new ShutdownableThread("shutdownable-thread-test") {
- override def doWork: Unit = {
+ override def doWork(): Unit = {
latch.countDown()
throw new FatalExitError
}
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 67d85d6..e38b144 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -28,7 +28,7 @@ ext {
// Add Scala version
def defaultScala212Version = '2.12.11'
-def defaultScala213Version = '2.13.1'
+def defaultScala213Version = '2.13.2'
if (hasProperty('scalaVersion')) {
if (scalaVersion == '2.12') {
versions["scala"] = defaultScala212Version
@@ -102,9 +102,9 @@ versions += [
powermock: "2.0.7",
reflections: "0.9.12",
rocksDB: "5.18.4",
- scalaCollectionCompat: "2.1.4",
+ scalaCollectionCompat: "2.1.6",
scalafmt: "1.5.1",
- scalaJava8Compat : "0.9.0",
+ scalaJava8Compat : "0.9.1",
scalatest: "3.0.8",
scoverage: "1.4.1",
scoveragePlugin: "4.0.1",