You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/03 09:17:51 UTC

kafka git commit: KAFA-4378: Fix Scala 2.12 "eta-expansion of zero-argument method" warnings

Repository: kafka
Updated Branches:
  refs/heads/trunk f60009b14 -> 4a6bbd5f1


KAFA-4378: Fix Scala 2.12 "eta-expansion of zero-argument method" warnings

Author: Bernard Leach <le...@bouncycastle.org>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #2098 from leachbj/4378-eta-expansion


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4a6bbd5f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4a6bbd5f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4a6bbd5f

Branch: refs/heads/trunk
Commit: 4a6bbd5f1386562262193b5054b44894a196997b
Parents: f60009b
Author: Bernard Leach <le...@bouncycastle.org>
Authored: Wed May 3 09:28:52 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed May 3 10:17:42 2017 +0100

----------------------------------------------------------------------
 .../consumer/ZookeeperConsumerConnector.scala   |  2 +-
 .../coordinator/group/DelayedHeartbeat.scala    |  2 +-
 .../kafka/coordinator/group/DelayedJoin.scala   |  2 +-
 .../group/GroupMetadataManager.scala            |  6 ++--
 .../TransactionMarkerChannelManager.scala       |  5 +--
 .../transaction/TransactionStateManager.scala   |  4 +--
 core/src/main/scala/kafka/log/Log.scala         |  2 +-
 core/src/main/scala/kafka/log/LogManager.scala  | 16 ++++-----
 .../scala/kafka/server/ReplicaManager.scala     |  8 ++---
 .../kafka/server/BaseReplicaFetchTest.scala     |  2 +-
 .../scala/unit/kafka/utils/SchedulerTest.scala  | 34 ++++++++++----------
 11 files changed, 40 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index b810f81..acc3cdf 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -145,7 +145,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     scheduler.startup
     info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
     scheduler.schedule("kafka-consumer-autocommit",
-                       autoCommit,
+                       autoCommit _,
                        delay = config.autoCommitIntervalMs,
                        period = config.autoCommitIntervalMs,
                        unit = TimeUnit.MILLISECONDS)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
index beba952..2cbdf30 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
@@ -34,7 +34,7 @@ private[group] class DelayedHeartbeat(coordinator: GroupCoordinator,
   // call purgatory operations while holding the group lock.
   override def safeTryComplete(): Boolean = tryComplete()
 
-  override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete)
+  override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete _)
   override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline)
   override def onComplete() = coordinator.onCompleteHeartbeat()
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
index 4ef63bf..06a47da 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
@@ -37,7 +37,7 @@ private[group] class DelayedJoin(coordinator: GroupCoordinator,
   // call purgatory operations while holding the group lock.
   override def safeTryComplete(): Boolean = tryComplete()
 
-  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete)
+  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
   override def onExpiration() = coordinator.onExpireJoin()
   override def onComplete() = coordinator.onCompleteJoin(group)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 063eee7..e711392 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -95,7 +95,7 @@ class GroupMetadataManager(brokerId: Int,
     scheduler.startup()
 
     scheduler.schedule(name = "delete-expired-group-metadata",
-      fun = cleanupGroupMetadata,
+      fun = cleanupGroupMetadata _,
       period = config.offsetsRetentionCheckIntervalMs,
       unit = TimeUnit.MILLISECONDS)
   }
@@ -416,7 +416,7 @@ class GroupMetadataManager(brokerId: Int,
       }
     }
 
-    scheduler.schedule(topicPartition.toString, doLoadGroupsAndOffsets)
+    scheduler.schedule(topicPartition.toString, doLoadGroupsAndOffsets _)
   }
 
   private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit) {
@@ -544,7 +544,7 @@ class GroupMetadataManager(brokerId: Int,
   def removeGroupsForPartition(offsetsPartition: Int,
                                onGroupUnloaded: GroupMetadata => Unit) {
     val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
-    scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)
+    scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets _)
 
     def removeGroupsAndOffsets() {
       var numOffsetsRemoved = 0

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 7121e31..2c17564 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -85,12 +85,9 @@ object TransactionMarkerChannelManager {
       channel)
   }
 
-
   private[transaction] def requestGenerator(transactionMarkerChannel: TransactionMarkerChannel,
                                             txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker]): () => Iterable[RequestAndCompletionHandler] = {
-    def generateRequests(): Iterable[RequestAndCompletionHandler] = transactionMarkerChannel.drainQueuedTransactionMarkers(txnMarkerPurgatory)
-
-    generateRequests
+    () => transactionMarkerChannel.drainQueuedTransactionMarkers(txnMarkerPurgatory)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index c8931c4..f07ca91 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -264,7 +264,7 @@ class TransactionStateManager(brokerId: Int,
       }
     }
 
-    scheduler.schedule(topicPartition.toString, loadTransactions)
+    scheduler.schedule(topicPartition.toString, loadTransactions _)
   }
 
   /**
@@ -299,7 +299,7 @@ class TransactionStateManager(brokerId: Int,
       }
     }
 
-    scheduler.schedule(topicPartition.toString, removeTransactions)
+    scheduler.schedule(topicPartition.toString, removeTransactions _)
   }
 
   private def validateTransactionTopicPartitionCountIsStable(): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 485525f..3c88dc8 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1200,7 +1200,7 @@ class Log(@volatile var dir: File,
       info("Deleting segment %d from log %s.".format(segment.baseOffset, name))
       segment.delete()
     }
-    scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)
+    scheduler.schedule("delete-file", deleteSeg _, delay = config.fileDeleteDelayMs)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index a9398f0..b89fc40 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -218,28 +218,28 @@ class LogManager(val logDirs: Array[File],
     if(scheduler != null) {
       info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
       scheduler.schedule("kafka-log-retention",
-                         cleanupLogs,
+                         cleanupLogs _,
                          delay = InitialTaskDelayMs,
                          period = retentionCheckMs,
                          TimeUnit.MILLISECONDS)
       info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
-      scheduler.schedule("kafka-log-flusher", 
-                         flushDirtyLogs, 
-                         delay = InitialTaskDelayMs, 
-                         period = flushCheckMs, 
+      scheduler.schedule("kafka-log-flusher",
+                         flushDirtyLogs _,
+                         delay = InitialTaskDelayMs,
+                         period = flushCheckMs,
                          TimeUnit.MILLISECONDS)
       scheduler.schedule("kafka-recovery-point-checkpoint",
-                         checkpointRecoveryPointOffsets,
+                         checkpointRecoveryPointOffsets _,
                          delay = InitialTaskDelayMs,
                          period = flushRecoveryOffsetCheckpointMs,
                          TimeUnit.MILLISECONDS)
       scheduler.schedule("kafka-log-start-offset-checkpoint",
-                         checkpointLogStartOffsets,
+                         checkpointLogStartOffsets _,
                          delay = InitialTaskDelayMs,
                          period = flushStartOffsetCheckpointMs,
                          TimeUnit.MILLISECONDS)
       scheduler.schedule("kafka-delete-logs",
-                         deleteLogs,
+                         deleteLogs _,
                          delay = InitialTaskDelayMs,
                          period = defaultConfig.fileDeleteDelayMs,
                          TimeUnit.MILLISECONDS)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b6f4b7d..de670e8 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -179,7 +179,7 @@ class ReplicaManager(val config: KafkaConfig,
 
   def startHighWaterMarksCheckPointThread() = {
     if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
-      scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks, period = config.replicaHighWatermarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS)
+      scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks _, period = config.replicaHighWatermarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS)
   }
 
   def recordIsrChange(topicPartition: TopicPartition) {
@@ -246,8 +246,8 @@ class ReplicaManager(val config: KafkaConfig,
   def startup() {
     // start ISR expiration thread
     // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR
-    scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
-    scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS)
+    scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
+    scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period = 2500L, unit = TimeUnit.MILLISECONDS)
   }
 
   def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean): Errors  = {
@@ -1112,4 +1112,4 @@ object OffsetsForLeaderEpoch extends Logging {
       (tp, offset)
     }.toMap.asJava
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
index e436b8d..7e9404e 100644
--- a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
@@ -83,6 +83,6 @@ abstract class BaseReplicaFetchTest extends ZooKeeperTestHarness  {
       }
       result
     }
-    waitUntilTrue(logsMatch, "Broker logs should be identical")
+    waitUntilTrue(logsMatch _, "Broker logs should be identical")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a6bbd5f/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index 7c131fc..dbb818c 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -37,11 +37,11 @@ class SchedulerTest {
   def teardown() {
     scheduler.shutdown()
   }
-  
+
   @Test
   def testMockSchedulerNonPeriodicTask() {
-    mockTime.scheduler.schedule("test1", counter1.getAndIncrement, delay=1)
-    mockTime.scheduler.schedule("test2", counter2.getAndIncrement, delay=100)
+    mockTime.scheduler.schedule("test1", counter1.getAndIncrement _, delay=1)
+    mockTime.scheduler.schedule("test2", counter2.getAndIncrement _, delay=100)
     assertEquals("Counter1 should not be incremented prior to task running.", 0, counter1.get)
     assertEquals("Counter2 should not be incremented prior to task running.", 0, counter2.get)
     mockTime.sleep(1)
@@ -51,11 +51,11 @@ class SchedulerTest {
     assertEquals("More sleeping should not result in more incrementing on counter1.", 1, counter1.get)
     assertEquals("Counter2 should now be incremented.", 1, counter2.get)
   }
-  
+
   @Test
   def testMockSchedulerPeriodicTask() {
-    mockTime.scheduler.schedule("test1", counter1.getAndIncrement, delay=1, period=1)
-    mockTime.scheduler.schedule("test2", counter2.getAndIncrement, delay=100, period=100)
+    mockTime.scheduler.schedule("test1", counter1.getAndIncrement _, delay=1, period=1)
+    mockTime.scheduler.schedule("test2", counter2.getAndIncrement _, delay=100, period=100)
     assertEquals("Counter1 should not be incremented prior to task running.", 0, counter1.get)
     assertEquals("Counter2 should not be incremented prior to task running.", 0, counter2.get)
     mockTime.sleep(1)
@@ -65,45 +65,45 @@ class SchedulerTest {
     assertEquals("Counter1 should be incremented 101 times", 101, counter1.get)
     assertEquals("Counter2 should not be incremented once", 1, counter2.get)
   }
-  
+
   @Test
   def testReentrantTaskInMockScheduler() {
-    mockTime.scheduler.schedule("test1", () => mockTime.scheduler.schedule("test2", counter2.getAndIncrement, delay=0), delay=1)
+    mockTime.scheduler.schedule("test1", () => mockTime.scheduler.schedule("test2", counter2.getAndIncrement _, delay=0), delay=1)
     mockTime.sleep(1)
     assertEquals(1, counter2.get)
   }
-  
+
   @Test
   def testNonPeriodicTask() {
-    scheduler.schedule("test", counter1.getAndIncrement, delay = 0)
+    scheduler.schedule("test", counter1.getAndIncrement _, delay = 0)
     retry(30000) {
       assertEquals(counter1.get, 1)
     }
     Thread.sleep(5)
     assertEquals("Should only run once", 1, counter1.get)
   }
-  
+
   @Test
   def testPeriodicTask() {
-    scheduler.schedule("test", counter1.getAndIncrement, delay = 0, period = 5)
+    scheduler.schedule("test", counter1.getAndIncrement _, delay = 0, period = 5)
     retry(30000){
       assertTrue("Should count to 20", counter1.get >= 20)
     }
   }
-  
+
   @Test
   def testRestart() {
     // schedule a task to increment a counter
-    mockTime.scheduler.schedule("test1", counter1.getAndIncrement, delay=1)
+    mockTime.scheduler.schedule("test1", counter1.getAndIncrement _, delay=1)
     mockTime.sleep(1)
     assertEquals(1, counter1.get())
-    
+
     // restart the scheduler
     mockTime.scheduler.shutdown()
     mockTime.scheduler.startup()
-    
+
     // schedule another task to increment the counter
-    mockTime.scheduler.schedule("test1", counter1.getAndIncrement, delay=1)
+    mockTime.scheduler.schedule("test1", counter1.getAndIncrement _, delay=1)
     mockTime.sleep(1)
     assertEquals(2, counter1.get())
   }