You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/08 22:00:55 UTC

kafka git commit: MINOR: fix RocksDBStore range search

Repository: kafka
Updated Branches:
  refs/heads/trunk 4ee68b43c -> d2fc6f36c


MINOR: fix RocksDBStore range search

The range is inclusive according to KeyValueStore's java doc.

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang

Closes #883 from ymatsuda/minor


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d2fc6f36
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d2fc6f36
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d2fc6f36

Branch: refs/heads/trunk
Commit: d2fc6f36cc0b98d4d0acaa62ce3f2283c4e60581
Parents: 4ee68b4
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Tue Feb 9 05:00:38 2016 +0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Feb 9 05:00:38 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kafka/streams/state/internals/RocksDBStore.java  | 2 +-
 .../kafka/streams/state/internals/RocksDBWindowStore.java       | 5 +++--
 2 files changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d2fc6f36/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 5c57854..6176767 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -456,7 +456,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
         @Override
         public boolean hasNext() {
-            return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
+            return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) <= 0;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2fc6f36/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index d6baf30..581b742 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -189,6 +189,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     @Override
     public void close() {
+        flush();
         for (Segment segment : segments) {
             if (segment != null)
                 segment.close();
@@ -271,7 +272,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         long segTo = segmentId(Math.max(0L, timeTo));
 
         byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes);
-        byte[] binaryUntil = WindowStoreUtils.toBinaryKey(key, timeTo + 1L, 0, serdes);
+        byte[] binaryTo = WindowStoreUtils.toBinaryKey(key, timeTo, Integer.MAX_VALUE, serdes);
 
         ArrayList<KeyValueIterator<byte[], byte[]>> iterators = new ArrayList<>();
 
@@ -279,7 +280,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
             Segment segment = segments[(int) (segmentId % segments.length)];
 
             if (segment != null && segment.id == segmentId)
-                iterators.add(segment.range(binaryFrom, binaryUntil));
+                iterators.add(segment.range(binaryFrom, binaryTo));
         }
 
         if (iterators.size() > 0) {