You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/01/29 01:50:44 UTC

git commit: KAFKA-695 Broker shuts down due to attempt to read a closed index file; reviewed by Neha Narkhede, Jay Kreps

Updated Branches:
  refs/heads/0.8 40a80fa7b -> 1fb3e8c03


KAFKA-695 Broker shuts down due to attempt to read a closed index file;reviewed by Neha Narkhede, Jay Kreps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1fb3e8c0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1fb3e8c0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1fb3e8c0

Branch: refs/heads/0.8
Commit: 1fb3e8c037c01ecfe6bb1d4261b59cb19baae6cf
Parents: 40a80fa
Author: Jun Rao <ju...@gmail.com>
Authored: Mon Jan 28 16:50:28 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Mon Jan 28 16:50:36 2013 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/server/RequestPurgatory.scala |   29 ++++++---------
 1 files changed, 12 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1fb3e8c0/core/src/main/scala/kafka/server/RequestPurgatory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index 8fb9865..3aaf38e 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -88,18 +88,11 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
   private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
   expirationThread.start()
 
-  def purgeSatisfied() {
-    expiredRequestReaper.forcePurge()
-  }
-
   /**
    * Add a new delayed request watching the contained keys
    */
   def watch(delayedRequest: T) {
-    if (requestCounter.getAndIncrement() >= purgeInterval) {
-      requestCounter.set(0)
-      purgeSatisfied()
-    }
+    requestCounter.getAndIncrement()
 
     for(key <- delayedRequest.keys) {
       var lst = watchersFor(key)
@@ -218,15 +211,19 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
       while(running.get) {
         try {
           val curr = pollExpired()
-          curr synchronized {
-            expire(curr)
+          if (curr != null) {
+            curr synchronized {
+              expire(curr)
+            }
           }
-        } catch {
-          case ie: InterruptedException =>
+          if (requestCounter.get >= purgeInterval) { // see if we need to force a full purge
+            requestCounter.set(0)
             val purged = purgeSatisfied()
             debug("Purged %d requests from delay queue.".format(purged))
             val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum
             debug("Purged %d (watcher) requests.".format(numPurgedFromWatchers))
+          }
+        } catch {
           case e: Exception =>
             error("Error in long poll expiry thread: ", e)
         }
@@ -240,10 +237,6 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
       unsatisfied.incrementAndGet()
     }
 
-    def forcePurge() {
-      expirationThread.interrupt()
-    }
-
     /** Shutdown the expiry thread*/
     def shutdown() {
       debug("Shutting down.")
@@ -261,7 +254,9 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
      */
     private def pollExpired(): T = {
       while(true) {
-        val curr = delayed.take()
+        val curr = delayed.poll(200L, TimeUnit.MILLISECONDS)
+        if (curr == null)
+          return null.asInstanceOf[T]
         val updated = curr.satisfied.compareAndSet(false, true)
         if(updated) {
           unsatisfied.getAndDecrement()