You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2015/02/21 01:33:23 UTC

kafka git commit: Leaner DelayedItem; reviewed by Neha Narkhede and Joel Koshy

Repository: kafka
Updated Branches:
  refs/heads/trunk 8c1b9325b -> fbb115497


Leaner DelayedItem; reviewed by Neha Narkhede and Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fbb11549
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fbb11549
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fbb11549

Branch: refs/heads/trunk
Commit: fbb11549743aa00420e40696f11b48a705dce019
Parents: 8c1b932
Author: Yasuhiro Matsuda <ya...@gmail.com>
Authored: Fri Feb 20 16:33:01 2015 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Feb 20 16:33:15 2015 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/utils/DelayedItem.scala    | 29 +++++++-------------
 1 file changed, 10 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fbb11549/core/src/main/scala/kafka/utils/DelayedItem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/DelayedItem.scala b/core/src/main/scala/kafka/utils/DelayedItem.scala
index a4e0dab..cbab2a0 100644
--- a/core/src/main/scala/kafka/utils/DelayedItem.scala
+++ b/core/src/main/scala/kafka/utils/DelayedItem.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -20,34 +20,25 @@ package kafka.utils
 import java.util.concurrent._
 import scala.math._
 
-class DelayedItem(delay: Long, unit: TimeUnit) extends Delayed with Logging {
+class DelayedItem(delayMs: Long) extends Delayed with Logging {
 
-  val createdMs = SystemTime.milliseconds
-  val delayMs = {
-    val given = unit.toMillis(delay)
-    if (given < 0 || (createdMs + given) < 0) (Long.MaxValue - createdMs)
-    else given
-  }
+  private val dueMs = SystemTime.milliseconds + delayMs
 
-  def this(delayMs: Long) =
-    this(delayMs, TimeUnit.MILLISECONDS)
+  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
 
   /**
    * The remaining delay time
    */
   def getDelay(unit: TimeUnit): Long = {
-    val elapsedMs = (SystemTime.milliseconds - createdMs)
-    unit.convert(max(delayMs - elapsedMs, 0), TimeUnit.MILLISECONDS)
+    unit.convert(max(dueMs - SystemTime.milliseconds, 0), TimeUnit.MILLISECONDS)
   }
-    
+
   def compareTo(d: Delayed): Int = {
-    val delayed = d.asInstanceOf[DelayedItem]
-    val myEnd = createdMs + delayMs
-    val yourEnd = delayed.createdMs + delayed.delayMs
+    val other = d.asInstanceOf[DelayedItem]
 
-    if(myEnd < yourEnd) -1
-    else if(myEnd > yourEnd) 1
+    if(dueMs < other.dueMs) -1
+    else if(dueMs > other.dueMs) 1
     else 0
   }
-  
+
 }