You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2015/02/21 01:33:23 UTC
kafka git commit: Leaner DelayedItem;
reviewed by Neha Narkhede and Joel Koshy
Repository: kafka
Updated Branches:
refs/heads/trunk 8c1b9325b -> fbb115497
Leaner DelayedItem; reviewed by Neha Narkhede and Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fbb11549
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fbb11549
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fbb11549
Branch: refs/heads/trunk
Commit: fbb11549743aa00420e40696f11b48a705dce019
Parents: 8c1b932
Author: Yasuhiro Matsuda <ya...@gmail.com>
Authored: Fri Feb 20 16:33:01 2015 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Feb 20 16:33:15 2015 -0800
----------------------------------------------------------------------
.../main/scala/kafka/utils/DelayedItem.scala | 29 +++++++-------------
1 file changed, 10 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/fbb11549/core/src/main/scala/kafka/utils/DelayedItem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/DelayedItem.scala b/core/src/main/scala/kafka/utils/DelayedItem.scala
index a4e0dab..cbab2a0 100644
--- a/core/src/main/scala/kafka/utils/DelayedItem.scala
+++ b/core/src/main/scala/kafka/utils/DelayedItem.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -20,34 +20,25 @@ package kafka.utils
import java.util.concurrent._
import scala.math._
-class DelayedItem(delay: Long, unit: TimeUnit) extends Delayed with Logging {
+class DelayedItem(delayMs: Long) extends Delayed with Logging {
- val createdMs = SystemTime.milliseconds
- val delayMs = {
- val given = unit.toMillis(delay)
- if (given < 0 || (createdMs + given) < 0) (Long.MaxValue - createdMs)
- else given
- }
+ private val dueMs = SystemTime.milliseconds + delayMs
- def this(delayMs: Long) =
- this(delayMs, TimeUnit.MILLISECONDS)
+ def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
/**
* The remaining delay time
*/
def getDelay(unit: TimeUnit): Long = {
- val elapsedMs = (SystemTime.milliseconds - createdMs)
- unit.convert(max(delayMs - elapsedMs, 0), TimeUnit.MILLISECONDS)
+ unit.convert(max(dueMs - SystemTime.milliseconds, 0), TimeUnit.MILLISECONDS)
}
-
+
def compareTo(d: Delayed): Int = {
- val delayed = d.asInstanceOf[DelayedItem]
- val myEnd = createdMs + delayMs
- val yourEnd = delayed.createdMs + delayed.delayMs
+ val other = d.asInstanceOf[DelayedItem]
- if(myEnd < yourEnd) -1
- else if(myEnd > yourEnd) 1
+ if(dueMs < other.dueMs) -1
+ else if(dueMs > other.dueMs) 1
else 0
}
-
+
}