You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/01/29 01:50:44 UTC
git commit: KAFKA-695 Broker shuts down due to attempt to read a
closed index file; reviewed by Neha Narkhede, Jay Kreps
Updated Branches:
refs/heads/0.8 40a80fa7b -> 1fb3e8c03
KAFKA-695 Broker shuts down due to attempt to read a closed index file;reviewed by Neha Narkhede, Jay Kreps
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1fb3e8c0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1fb3e8c0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1fb3e8c0
Branch: refs/heads/0.8
Commit: 1fb3e8c037c01ecfe6bb1d4261b59cb19baae6cf
Parents: 40a80fa
Author: Jun Rao <ju...@gmail.com>
Authored: Mon Jan 28 16:50:28 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Mon Jan 28 16:50:36 2013 -0800
----------------------------------------------------------------------
.../main/scala/kafka/server/RequestPurgatory.scala | 29 ++++++---------
1 files changed, 12 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1fb3e8c0/core/src/main/scala/kafka/server/RequestPurgatory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index 8fb9865..3aaf38e 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -88,18 +88,11 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
expirationThread.start()
- def purgeSatisfied() {
- expiredRequestReaper.forcePurge()
- }
-
/**
* Add a new delayed request watching the contained keys
*/
def watch(delayedRequest: T) {
- if (requestCounter.getAndIncrement() >= purgeInterval) {
- requestCounter.set(0)
- purgeSatisfied()
- }
+ requestCounter.getAndIncrement()
for(key <- delayedRequest.keys) {
var lst = watchersFor(key)
@@ -218,15 +211,19 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
while(running.get) {
try {
val curr = pollExpired()
- curr synchronized {
- expire(curr)
+ if (curr != null) {
+ curr synchronized {
+ expire(curr)
+ }
}
- } catch {
- case ie: InterruptedException =>
+ if (requestCounter.get >= purgeInterval) { // see if we need to force a full purge
+ requestCounter.set(0)
val purged = purgeSatisfied()
debug("Purged %d requests from delay queue.".format(purged))
val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum
debug("Purged %d (watcher) requests.".format(numPurgedFromWatchers))
+ }
+ } catch {
case e: Exception =>
error("Error in long poll expiry thread: ", e)
}
@@ -240,10 +237,6 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
unsatisfied.incrementAndGet()
}
- def forcePurge() {
- expirationThread.interrupt()
- }
-
/** Shutdown the expiry thread*/
def shutdown() {
debug("Shutting down.")
@@ -261,7 +254,9 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
*/
private def pollExpired(): T = {
while(true) {
- val curr = delayed.take()
+ val curr = delayed.poll(200L, TimeUnit.MILLISECONDS)
+ if (curr == null)
+ return null.asInstanceOf[T]
val updated = curr.satisfied.compareAndSet(false, true)
if(updated) {
unsatisfied.getAndDecrement()