You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/01/25 20:39:30 UTC
[kafka] branch trunk updated: KAFKA-4897;
Add pause method to ShutdownableThread (#4393)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7a246ee KAFKA-4897; Add pause method to ShutdownableThread (#4393)
7a246ee is described below
commit 7a246eea7b68851c3de5345d43f70853b4a15f21
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Fri Jan 26 02:09:20 2018 +0530
KAFKA-4897; Add pause method to ShutdownableThread (#4393)
- Use newly added pause method in LogCleaner and ControllerChannelManager classes
- Remove LogCleaner, Cleaner exclusions from findbugs-exclude.xml
Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
.../kafka/consumer/ConsumerFetcherManager.scala | 4 +-
.../controller/ControllerChannelManager.scala | 6 +--
core/src/main/scala/kafka/log/LogCleaner.scala | 13 ++---
.../scala/kafka/server/AbstractFetcherThread.scala | 4 +-
.../kafka/tools/ReplicaVerificationTool.scala | 2 +-
.../scala/kafka/utils/ShutdownableThread.scala | 57 ++++++++++++++--------
.../integration/kafka/api/ConsumerBounceTest.scala | 4 +-
.../server/DynamicBrokerReconfigurationTest.scala | 4 +-
gradle/findbugs-exclude.xml | 15 ------
9 files changed, 53 insertions(+), 56 deletions(-)
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 0a6b82e..23f5356 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -83,7 +83,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
}
} catch {
case t: Throwable => {
- if (!isRunning.get())
+ if (!isRunning)
throw t /* If this thread is stopped, propagate this exception to kill the thread. */
else
warn("Failed to find leader for %s".format(noLeaderPartitionSet), t)
@@ -98,7 +98,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
)
} catch {
case t: Throwable =>
- if (!isRunning.get())
+ if (!isRunning)
throw t /* If this thread is stopped, propagate this exception to kill the thread. */
else {
warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")), t)
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index e389821..d5456be 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -17,7 +17,7 @@
package kafka.controller
import java.net.SocketTimeoutException
-import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
+import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}
import com.yammer.metrics.core.Gauge
import kafka.api._
@@ -213,13 +213,13 @@ class RequestSendThread(val controllerId: Int,
override def doWork(): Unit = {
- def backoff(): Unit = CoreUtils.swallow(Thread.sleep(100), this, Level.TRACE)
+ def backoff(): Unit = pause(100, TimeUnit.MILLISECONDS)
val QueueItem(apiKey, requestBuilder, callback) = queue.take()
var clientResponse: ClientResponse = null
try {
var isSendSuccessful = false
- while (isRunning.get() && !isSendSuccessful) {
+ while (isRunning && !isSendSuccessful) {
// if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a
// removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
try {
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index c5c0d49..637e24c 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -21,7 +21,7 @@ import java.io.{File, IOException}
import java.nio._
import java.nio.file.Files
import java.util.Date
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.TimeUnit
import com.yammer.metrics.core.Gauge
import kafka.common._
@@ -233,10 +233,9 @@ class LogCleaner(val config: CleanerConfig,
checkDone = checkDone)
@volatile var lastStats: CleanerStats = new CleanerStats()
- private val backOffWaitLatch = new CountDownLatch(1)
private def checkDone(topicPartition: TopicPartition) {
- if (!isRunning.get())
+ if (!isRunning)
throw new ThreadShutdownException
cleanerManager.checkCleaningAborted(topicPartition)
}
@@ -248,12 +247,6 @@ class LogCleaner(val config: CleanerConfig,
cleanOrSleep()
}
- override def shutdown() = {
- initiateShutdown()
- backOffWaitLatch.countDown()
- awaitShutdown()
- }
-
/**
* Clean a log if there is a dirty log available, otherwise sleep for a bit
*/
@@ -289,7 +282,7 @@ class LogCleaner(val config: CleanerConfig,
}
}
if (!cleaned)
- backOffWaitLatch.await(config.backOffMs, TimeUnit.MILLISECONDS)
+ pause(config.backOffMs, TimeUnit.MILLISECONDS)
}
/**
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index b078073..925c330 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -148,7 +148,7 @@ abstract class AbstractFetcherThread(name: String,
responseData = fetch(fetchRequest)
} catch {
case t: Throwable =>
- if (isRunning.get) {
+ if (isRunning) {
warn(s"Error in fetch to broker ${sourceBroker.id}, request $fetchRequest", t)
inLock(partitionMapLock) {
partitionsWithError ++= partitionStates.partitionSet.asScala
@@ -218,7 +218,7 @@ abstract class AbstractFetcherThread(name: String,
partitionsWithError += topicPartition
}
case _ =>
- if (isRunning.get) {
+ if (isRunning) {
error(s"Error for partition $topicPartition from broker ${sourceBroker.id}", partitionData.exception.get)
partitionsWithError += topicPartition
}
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 71f3368..0408e92 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -368,7 +368,7 @@ private class ReplicaFetcher(name: String, sourceBroker: BrokerEndPoint, topicAn
response = simpleConsumer.fetch(fetchRequest)
} catch {
case t: Throwable =>
- if (!isRunning.get)
+ if (!isRunning)
throw t
}
diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
index 0922d15..13bbc90 100644
--- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala
+++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
@@ -17,8 +17,7 @@
package kafka.utils
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.apache.kafka.common.internals.FatalExitError
@@ -26,8 +25,8 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean
extends Thread(name) with Logging {
this.setDaemon(false)
this.logIdent = "[" + name + "]: "
- val isRunning: AtomicBoolean = new AtomicBoolean(true)
- private val shutdownLatch = new CountDownLatch(1)
+ private val shutdownInitiated = new CountDownLatch(1)
+ private val shutdownComplete = new CountDownLatch(1)
def shutdown(): Unit = {
initiateShutdown()
@@ -35,28 +34,43 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean
}
def isShutdownComplete: Boolean = {
- shutdownLatch.getCount == 0
+ shutdownComplete.getCount == 0
}
def initiateShutdown(): Boolean = {
- if (isRunning.compareAndSet(true, false)) {
- info("Shutting down")
- if (isInterruptible)
- interrupt()
- true
- } else
- false
+ this.synchronized {
+ if (isRunning) {
+ info("Shutting down")
+ shutdownInitiated.countDown()
+ if (isInterruptible)
+ interrupt()
+ true
+ } else
+ false
+ }
}
- /**
+ /**
* After calling initiateShutdown(), use this API to wait until the shutdown is complete
*/
def awaitShutdown(): Unit = {
- shutdownLatch.await()
+ shutdownComplete.await()
info("Shutdown completed")
}
/**
+ * Causes the current thread to wait until the shutdown is initiated,
+ * or the specified waiting time elapses.
+ *
+ * @param timeout
+ * @param unit
+ */
+ def pause(timeout: Long, unit: TimeUnit): Unit = {
+ if (shutdownInitiated.await(timeout, unit))
+ trace("shutdownInitiated latch count reached zero. Shutdown called.")
+ }
+
+ /**
* This method is repeatedly invoked until the thread shuts down or this method throws an exception
*/
def doWork(): Unit
@@ -64,19 +78,24 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean
override def run(): Unit = {
info("Starting")
try {
- while (isRunning.get)
+ while (isRunning)
doWork()
} catch {
case e: FatalExitError =>
- isRunning.set(false)
- shutdownLatch.countDown()
+ shutdownInitiated.countDown()
+ shutdownComplete.countDown()
info("Stopped")
Exit.exit(e.statusCode())
case e: Throwable =>
- if (isRunning.get())
+ if (isRunning)
error("Error due to", e)
+ } finally {
+ shutdownComplete.countDown()
}
- shutdownLatch.countDown()
info("Stopped")
}
+
+ def isRunning: Boolean = {
+ shutdownInitiated.getCount() != 0
+ }
}
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 8917921..58d1be9 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -104,7 +104,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
val scheduler = new BounceBrokerScheduler(numIters)
scheduler.start()
- while (scheduler.isRunning.get()) {
+ while (scheduler.isRunning) {
val records = consumer.poll(100).asScala
assertEquals(Set(tp), consumer.assignment.asScala)
@@ -146,7 +146,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
val scheduler = new BounceBrokerScheduler(numIters)
scheduler.start()
- while(scheduler.isRunning.get()) {
+ while(scheduler.isRunning) {
val coin = TestUtils.random.nextInt(3)
if (coin == 0) {
info("Seeking to end of log")
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index fc06b73..a760d7d 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -439,7 +439,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
@volatile var sent = 0
override def doWork(): Unit = {
try {
- while (isRunning.get) {
+ while (isRunning) {
sent += 1
val record = new ProducerRecord(topic, s"key$sent", s"value$sent")
producer.send(record).get(10, TimeUnit.SECONDS)
@@ -456,7 +456,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
var received = 0
override def doWork(): Unit = {
try {
- while (isRunning.get || (received < producerThread.sent && System.currentTimeMillis < endTimeMs)) {
+ while (isRunning || (received < producerThread.sent && System.currentTimeMillis < endTimeMs)) {
received += consumer.poll(50).count
}
} finally {
diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml
index 3bc3d39..66b4874 100644
--- a/gradle/findbugs-exclude.xml
+++ b/gradle/findbugs-exclude.xml
@@ -89,21 +89,6 @@ For a detailed description of findbugs bug categories, see http://findbugs.sourc
</Match>
<Match>
- <!-- Add a suppression for KAFKA-4897: LogCleaner#cleanSegments should not ignore failures to delete files.
- TODO: remove this suppression when KAFKA-4897 is fixed. -->
- <Package name="kafka.log"/>
- <Source name="LogCleaner.scala"/>
- <Bug pattern="RV_RETURN_VALUE_IGNORED,RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
- </Match>
-
- <Match>
- <!-- Add a suppression for ignoring the return value of CountDownLatch#await. -->
- <Class name="kafka.log.Cleaner"/>
- <Method name="cleanOrSleep"/>
- <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
- </Match>
-
- <Match>
<!-- Add a suppression for having the thread start in the constructor of the old, deprecated consumer. -->
<Class name="kafka.producer.Producer"/>
<Bug pattern="SC_START_IN_CTOR"/>
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.