You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/01/25 20:39:30 UTC

[kafka] branch trunk updated: KAFKA-4897; Add pause method to ShutdownableThread (#4393)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7a246ee  KAFKA-4897; Add pause method to ShutdownableThread (#4393)
7a246ee is described below

commit 7a246eea7b68851c3de5345d43f70853b4a15f21
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Fri Jan 26 02:09:20 2018 +0530

    KAFKA-4897; Add pause method to ShutdownableThread (#4393)
    
     - Use newly added pause method in LogCleaner and ControllerChannelManager classes
     - Remove LogCleaner, Cleaner exclusions from findbugs-exclude.xml
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/consumer/ConsumerFetcherManager.scala    |  4 +-
 .../controller/ControllerChannelManager.scala      |  6 +--
 core/src/main/scala/kafka/log/LogCleaner.scala     | 13 ++---
 .../scala/kafka/server/AbstractFetcherThread.scala |  4 +-
 .../kafka/tools/ReplicaVerificationTool.scala      |  2 +-
 .../scala/kafka/utils/ShutdownableThread.scala     | 57 ++++++++++++++--------
 .../integration/kafka/api/ConsumerBounceTest.scala |  4 +-
 .../server/DynamicBrokerReconfigurationTest.scala  |  4 +-
 gradle/findbugs-exclude.xml                        | 15 ------
 9 files changed, 53 insertions(+), 56 deletions(-)

diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 0a6b82e..23f5356 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -83,7 +83,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
         }
       } catch {
         case t: Throwable => {
-            if (!isRunning.get())
+            if (!isRunning)
               throw t /* If this thread is stopped, propagate this exception to kill the thread. */
             else
               warn("Failed to find leader for %s".format(noLeaderPartitionSet), t)
@@ -98,7 +98,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
         )
       } catch {
         case t: Throwable =>
-          if (!isRunning.get())
+          if (!isRunning)
             throw t /* If this thread is stopped, propagate this exception to kill the thread. */
           else {
             warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")), t)
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index e389821..d5456be 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -17,7 +17,7 @@
 package kafka.controller
 
 import java.net.SocketTimeoutException
-import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
+import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}
 
 import com.yammer.metrics.core.Gauge
 import kafka.api._
@@ -213,13 +213,13 @@ class RequestSendThread(val controllerId: Int,
 
   override def doWork(): Unit = {
 
-    def backoff(): Unit = CoreUtils.swallow(Thread.sleep(100), this, Level.TRACE)
+    def backoff(): Unit = pause(100, TimeUnit.MILLISECONDS)
 
     val QueueItem(apiKey, requestBuilder, callback) = queue.take()
     var clientResponse: ClientResponse = null
     try {
       var isSendSuccessful = false
-      while (isRunning.get() && !isSendSuccessful) {
+      while (isRunning && !isSendSuccessful) {
         // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a
         // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
         try {
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index c5c0d49..637e24c 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -21,7 +21,7 @@ import java.io.{File, IOException}
 import java.nio._
 import java.nio.file.Files
 import java.util.Date
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.TimeUnit
 
 import com.yammer.metrics.core.Gauge
 import kafka.common._
@@ -233,10 +233,9 @@ class LogCleaner(val config: CleanerConfig,
                               checkDone = checkDone)
 
     @volatile var lastStats: CleanerStats = new CleanerStats()
-    private val backOffWaitLatch = new CountDownLatch(1)
 
     private def checkDone(topicPartition: TopicPartition) {
-      if (!isRunning.get())
+      if (!isRunning)
         throw new ThreadShutdownException
       cleanerManager.checkCleaningAborted(topicPartition)
     }
@@ -248,12 +247,6 @@ class LogCleaner(val config: CleanerConfig,
       cleanOrSleep()
     }
 
-    override def shutdown() = {
-    	 initiateShutdown()
-    	 backOffWaitLatch.countDown()
-    	 awaitShutdown()
-     }
-
     /**
      * Clean a log if there is a dirty log available, otherwise sleep for a bit
      */
@@ -289,7 +282,7 @@ class LogCleaner(val config: CleanerConfig,
           }
       }
       if (!cleaned)
-        backOffWaitLatch.await(config.backOffMs, TimeUnit.MILLISECONDS)
+        pause(config.backOffMs, TimeUnit.MILLISECONDS)
     }
 
     /**
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index b078073..925c330 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -148,7 +148,7 @@ abstract class AbstractFetcherThread(name: String,
       responseData = fetch(fetchRequest)
     } catch {
       case t: Throwable =>
-        if (isRunning.get) {
+        if (isRunning) {
           warn(s"Error in fetch to broker ${sourceBroker.id}, request $fetchRequest", t)
           inLock(partitionMapLock) {
             partitionsWithError ++= partitionStates.partitionSet.asScala
@@ -218,7 +218,7 @@ abstract class AbstractFetcherThread(name: String,
                       partitionsWithError += topicPartition
                   }
                 case _ =>
-                  if (isRunning.get) {
+                  if (isRunning) {
                     error(s"Error for partition $topicPartition from broker ${sourceBroker.id}", partitionData.exception.get)
                     partitionsWithError += topicPartition
                   }
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 71f3368..0408e92 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -368,7 +368,7 @@ private class ReplicaFetcher(name: String, sourceBroker: BrokerEndPoint, topicAn
       response = simpleConsumer.fetch(fetchRequest)
     } catch {
       case t: Throwable =>
-        if (!isRunning.get)
+        if (!isRunning)
           throw t
     }
 
diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
index 0922d15..13bbc90 100644
--- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala
+++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
@@ -17,8 +17,7 @@
 
 package kafka.utils
 
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import org.apache.kafka.common.internals.FatalExitError
 
@@ -26,8 +25,8 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean
         extends Thread(name) with Logging {
   this.setDaemon(false)
   this.logIdent = "[" + name + "]: "
-  val isRunning: AtomicBoolean = new AtomicBoolean(true)
-  private val shutdownLatch = new CountDownLatch(1)
+  private val shutdownInitiated = new CountDownLatch(1)
+  private val shutdownComplete = new CountDownLatch(1)
 
   def shutdown(): Unit = {
     initiateShutdown()
@@ -35,28 +34,43 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean
   }
 
   def isShutdownComplete: Boolean = {
-    shutdownLatch.getCount == 0
+    shutdownComplete.getCount == 0
   }
 
   def initiateShutdown(): Boolean = {
-    if (isRunning.compareAndSet(true, false)) {
-      info("Shutting down")
-      if (isInterruptible)
-        interrupt()
-      true
-    } else
-      false
+    this.synchronized {
+      if (isRunning) {
+        info("Shutting down")
+        shutdownInitiated.countDown()
+        if (isInterruptible)
+          interrupt()
+        true
+      } else
+        false
+    }
   }
 
-    /**
+  /**
    * After calling initiateShutdown(), use this API to wait until the shutdown is complete
    */
   def awaitShutdown(): Unit = {
-    shutdownLatch.await()
+    shutdownComplete.await()
     info("Shutdown completed")
   }
 
   /**
+   *  Causes the current thread to wait until the shutdown is initiated,
+   *  or the specified waiting time elapses.
+   *
+   * @param timeout
+   * @param unit
+   */
+  def pause(timeout: Long, unit: TimeUnit): Unit = {
+    if (shutdownInitiated.await(timeout, unit))
+      trace("shutdownInitiated latch count reached zero. Shutdown called.")
+  }
+
+  /**
    * This method is repeatedly invoked until the thread shuts down or this method throws an exception
    */
   def doWork(): Unit
@@ -64,19 +78,24 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean
   override def run(): Unit = {
     info("Starting")
     try {
-      while (isRunning.get)
+      while (isRunning)
         doWork()
     } catch {
       case e: FatalExitError =>
-        isRunning.set(false)
-        shutdownLatch.countDown()
+        shutdownInitiated.countDown()
+        shutdownComplete.countDown()
         info("Stopped")
         Exit.exit(e.statusCode())
       case e: Throwable =>
-        if (isRunning.get())
+        if (isRunning)
           error("Error due to", e)
+    } finally {
+       shutdownComplete.countDown()
     }
-    shutdownLatch.countDown()
     info("Stopped")
   }
+
+  def isRunning: Boolean = {
+    shutdownInitiated.getCount() != 0
+  }
 }
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 8917921..58d1be9 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -104,7 +104,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     val scheduler = new BounceBrokerScheduler(numIters)
     scheduler.start()
 
-    while (scheduler.isRunning.get()) {
+    while (scheduler.isRunning) {
       val records = consumer.poll(100).asScala
       assertEquals(Set(tp), consumer.assignment.asScala)
 
@@ -146,7 +146,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     val scheduler = new BounceBrokerScheduler(numIters)
     scheduler.start()
 
-    while(scheduler.isRunning.get()) {
+    while(scheduler.isRunning) {
       val coin = TestUtils.random.nextInt(3)
       if (coin == 0) {
         info("Seeking to end of log")
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index fc06b73..a760d7d 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -439,7 +439,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     @volatile var sent = 0
     override def doWork(): Unit = {
         try {
-            while (isRunning.get) {
+            while (isRunning) {
                 sent += 1
                 val record = new ProducerRecord(topic, s"key$sent", s"value$sent")
                 producer.send(record).get(10, TimeUnit.SECONDS)
@@ -456,7 +456,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     var received = 0
     override def doWork(): Unit = {
       try {
-        while (isRunning.get || (received < producerThread.sent && System.currentTimeMillis < endTimeMs)) {
+        while (isRunning || (received < producerThread.sent && System.currentTimeMillis < endTimeMs)) {
           received += consumer.poll(50).count
         }
       } finally {
diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml
index 3bc3d39..66b4874 100644
--- a/gradle/findbugs-exclude.xml
+++ b/gradle/findbugs-exclude.xml
@@ -89,21 +89,6 @@ For a detailed description of findbugs bug categories, see http://findbugs.sourc
     </Match>
 
     <Match>
-        <!-- Add a suppression for KAFKA-4897: LogCleaner#cleanSegments should not ignore failures to delete files.
-            TODO: remove this suppression when KAFKA-4897 is fixed. -->
-        <Package name="kafka.log"/>
-        <Source name="LogCleaner.scala"/>
-        <Bug pattern="RV_RETURN_VALUE_IGNORED,RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
-    </Match>
-
-    <Match>
-        <!-- Add a suppression for ignoring the return value of CountDownLatch#await. -->
-        <Class name="kafka.log.Cleaner"/>
-        <Method name="cleanOrSleep"/>
-        <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
-    </Match>
-
-    <Match>
         <!-- Add a suppression for having the thread start in the constructor of the old, deprecated consumer. -->
         <Class name="kafka.producer.Producer"/>
         <Bug pattern="SC_START_IN_CTOR"/>

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.