You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/03 09:17:51 UTC
kafka git commit: KAFA-4378: Fix Scala 2.12 "eta-expansion of
zero-argument method" warnings
Repository: kafka
Updated Branches:
refs/heads/trunk f60009b14 -> 4a6bbd5f1
KAFA-4378: Fix Scala 2.12 "eta-expansion of zero-argument method" warnings
Author: Bernard Leach <le...@bouncycastle.org>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #2098 from leachbj/4378-eta-expansion
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4a6bbd5f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4a6bbd5f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4a6bbd5f
Branch: refs/heads/trunk
Commit: 4a6bbd5f1386562262193b5054b44894a196997b
Parents: f60009b
Author: Bernard Leach <le...@bouncycastle.org>
Authored: Wed May 3 09:28:52 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed May 3 10:17:42 2017 +0100
----------------------------------------------------------------------
.../consumer/ZookeeperConsumerConnector.scala | 2 +-
.../coordinator/group/DelayedHeartbeat.scala | 2 +-
.../kafka/coordinator/group/DelayedJoin.scala | 2 +-
.../group/GroupMetadataManager.scala | 6 ++--
.../TransactionMarkerChannelManager.scala | 5 +--
.../transaction/TransactionStateManager.scala | 4 +--
core/src/main/scala/kafka/log/Log.scala | 2 +-
core/src/main/scala/kafka/log/LogManager.scala | 16 ++++-----
.../scala/kafka/server/ReplicaManager.scala | 8 ++---
.../kafka/server/BaseReplicaFetchTest.scala | 2 +-
.../scala/unit/kafka/utils/SchedulerTest.scala | 34 ++++++++++----------
11 files changed, 40 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index b810f81..acc3cdf 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -145,7 +145,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
scheduler.startup
info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
scheduler.schedule("kafka-consumer-autocommit",
- autoCommit,
+ autoCommit _,
delay = config.autoCommitIntervalMs,
period = config.autoCommitIntervalMs,
unit = TimeUnit.MILLISECONDS)
http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
index beba952..2cbdf30 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
@@ -34,7 +34,7 @@ private[group] class DelayedHeartbeat(coordinator: GroupCoordinator,
// call purgatory operations while holding the group lock.
override def safeTryComplete(): Boolean = tryComplete()
- override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete)
+ override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete _)
override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline)
override def onComplete() = coordinator.onCompleteHeartbeat()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
index 4ef63bf..06a47da 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
@@ -37,7 +37,7 @@ private[group] class DelayedJoin(coordinator: GroupCoordinator,
// call purgatory operations while holding the group lock.
override def safeTryComplete(): Boolean = tryComplete()
- override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete)
+ override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
override def onExpiration() = coordinator.onExpireJoin()
override def onComplete() = coordinator.onCompleteJoin(group)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 063eee7..e711392 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -95,7 +95,7 @@ class GroupMetadataManager(brokerId: Int,
scheduler.startup()
scheduler.schedule(name = "delete-expired-group-metadata",
- fun = cleanupGroupMetadata,
+ fun = cleanupGroupMetadata _,
period = config.offsetsRetentionCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)
}
@@ -416,7 +416,7 @@ class GroupMetadataManager(brokerId: Int,
}
}
- scheduler.schedule(topicPartition.toString, doLoadGroupsAndOffsets)
+ scheduler.schedule(topicPartition.toString, doLoadGroupsAndOffsets _)
}
private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit) {
@@ -544,7 +544,7 @@ class GroupMetadataManager(brokerId: Int,
def removeGroupsForPartition(offsetsPartition: Int,
onGroupUnloaded: GroupMetadata => Unit) {
val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
- scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)
+ scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets _)
def removeGroupsAndOffsets() {
var numOffsetsRemoved = 0
http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 7121e31..2c17564 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -85,12 +85,9 @@ object TransactionMarkerChannelManager {
channel)
}
-
private[transaction] def requestGenerator(transactionMarkerChannel: TransactionMarkerChannel,
txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker]): () => Iterable[RequestAndCompletionHandler] = {
- def generateRequests(): Iterable[RequestAndCompletionHandler] = transactionMarkerChannel.drainQueuedTransactionMarkers(txnMarkerPurgatory)
-
- generateRequests
+ () => transactionMarkerChannel.drainQueuedTransactionMarkers(txnMarkerPurgatory)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index c8931c4..f07ca91 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -264,7 +264,7 @@ class TransactionStateManager(brokerId: Int,
}
}
- scheduler.schedule(topicPartition.toString, loadTransactions)
+ scheduler.schedule(topicPartition.toString, loadTransactions _)
}
/**
@@ -299,7 +299,7 @@ class TransactionStateManager(brokerId: Int,
}
}
- scheduler.schedule(topicPartition.toString, removeTransactions)
+ scheduler.schedule(topicPartition.toString, removeTransactions _)
}
private def validateTransactionTopicPartitionCountIsStable(): Unit = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 485525f..3c88dc8 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1200,7 +1200,7 @@ class Log(@volatile var dir: File,
info("Deleting segment %d from log %s.".format(segment.baseOffset, name))
segment.delete()
}
- scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)
+ scheduler.schedule("delete-file", deleteSeg _, delay = config.fileDeleteDelayMs)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index a9398f0..b89fc40 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -218,28 +218,28 @@ class LogManager(val logDirs: Array[File],
if(scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
- cleanupLogs,
+ cleanupLogs _,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
- scheduler.schedule("kafka-log-flusher",
- flushDirtyLogs,
- delay = InitialTaskDelayMs,
- period = flushCheckMs,
+ scheduler.schedule("kafka-log-flusher",
+ flushDirtyLogs _,
+ delay = InitialTaskDelayMs,
+ period = flushCheckMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
- checkpointRecoveryPointOffsets,
+ checkpointRecoveryPointOffsets _,
delay = InitialTaskDelayMs,
period = flushRecoveryOffsetCheckpointMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-log-start-offset-checkpoint",
- checkpointLogStartOffsets,
+ checkpointLogStartOffsets _,
delay = InitialTaskDelayMs,
period = flushStartOffsetCheckpointMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-delete-logs",
- deleteLogs,
+ deleteLogs _,
delay = InitialTaskDelayMs,
period = defaultConfig.fileDeleteDelayMs,
TimeUnit.MILLISECONDS)
http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b6f4b7d..de670e8 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -179,7 +179,7 @@ class ReplicaManager(val config: KafkaConfig,
def startHighWaterMarksCheckPointThread() = {
if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
- scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks, period = config.replicaHighWatermarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS)
+ scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks _, period = config.replicaHighWatermarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS)
}
def recordIsrChange(topicPartition: TopicPartition) {
@@ -246,8 +246,8 @@ class ReplicaManager(val config: KafkaConfig,
def startup() {
// start ISR expiration thread
// A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR
- scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
- scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS)
+ scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
+ scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period = 2500L, unit = TimeUnit.MILLISECONDS)
}
def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean): Errors = {
@@ -1112,4 +1112,4 @@ object OffsetsForLeaderEpoch extends Logging {
(tp, offset)
}.toMap.asJava
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
index e436b8d..7e9404e 100644
--- a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
@@ -83,6 +83,6 @@ abstract class BaseReplicaFetchTest extends ZooKeeperTestHarness {
}
result
}
- waitUntilTrue(logsMatch, "Broker logs should be identical")
+ waitUntilTrue(logsMatch _, "Broker logs should be identical")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index 7c131fc..dbb818c 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -37,11 +37,11 @@ class SchedulerTest {
def teardown() {
scheduler.shutdown()
}
-
+
@Test
def testMockSchedulerNonPeriodicTask() {
- mockTime.scheduler.schedule("test1", counter1.getAndIncrement, delay=1)
- mockTime.scheduler.schedule("test2", counter2.getAndIncrement, delay=100)
+ mockTime.scheduler.schedule("test1", counter1.getAndIncrement _, delay=1)
+ mockTime.scheduler.schedule("test2", counter2.getAndIncrement _, delay=100)
assertEquals("Counter1 should not be incremented prior to task running.", 0, counter1.get)
assertEquals("Counter2 should not be incremented prior to task running.", 0, counter2.get)
mockTime.sleep(1)
@@ -51,11 +51,11 @@ class SchedulerTest {
assertEquals("More sleeping should not result in more incrementing on counter1.", 1, counter1.get)
assertEquals("Counter2 should now be incremented.", 1, counter2.get)
}
-
+
@Test
def testMockSchedulerPeriodicTask() {
- mockTime.scheduler.schedule("test1", counter1.getAndIncrement, delay=1, period=1)
- mockTime.scheduler.schedule("test2", counter2.getAndIncrement, delay=100, period=100)
+ mockTime.scheduler.schedule("test1", counter1.getAndIncrement _, delay=1, period=1)
+ mockTime.scheduler.schedule("test2", counter2.getAndIncrement _, delay=100, period=100)
assertEquals("Counter1 should not be incremented prior to task running.", 0, counter1.get)
assertEquals("Counter2 should not be incremented prior to task running.", 0, counter2.get)
mockTime.sleep(1)
@@ -65,45 +65,45 @@ class SchedulerTest {
assertEquals("Counter1 should be incremented 101 times", 101, counter1.get)
assertEquals("Counter2 should not be incremented once", 1, counter2.get)
}
-
+
@Test
def testReentrantTaskInMockScheduler() {
- mockTime.scheduler.schedule("test1", () => mockTime.scheduler.schedule("test2", counter2.getAndIncrement, delay=0), delay=1)
+ mockTime.scheduler.schedule("test1", () => mockTime.scheduler.schedule("test2", counter2.getAndIncrement _, delay=0), delay=1)
mockTime.sleep(1)
assertEquals(1, counter2.get)
}
-
+
@Test
def testNonPeriodicTask() {
- scheduler.schedule("test", counter1.getAndIncrement, delay = 0)
+ scheduler.schedule("test", counter1.getAndIncrement _, delay = 0)
retry(30000) {
assertEquals(counter1.get, 1)
}
Thread.sleep(5)
assertEquals("Should only run once", 1, counter1.get)
}
-
+
@Test
def testPeriodicTask() {
- scheduler.schedule("test", counter1.getAndIncrement, delay = 0, period = 5)
+ scheduler.schedule("test", counter1.getAndIncrement _, delay = 0, period = 5)
retry(30000){
assertTrue("Should count to 20", counter1.get >= 20)
}
}
-
+
@Test
def testRestart() {
// schedule a task to increment a counter
- mockTime.scheduler.schedule("test1", counter1.getAndIncrement, delay=1)
+ mockTime.scheduler.schedule("test1", counter1.getAndIncrement _, delay=1)
mockTime.sleep(1)
assertEquals(1, counter1.get())
-
+
// restart the scheduler
mockTime.scheduler.shutdown()
mockTime.scheduler.startup()
-
+
// schedule another task to increment the counter
- mockTime.scheduler.schedule("test1", counter1.getAndIncrement, delay=1)
+ mockTime.scheduler.schedule("test1", counter1.getAndIncrement _, delay=1)
mockTime.sleep(1)
assertEquals(2, counter1.get())
}