You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/08 22:00:55 UTC
kafka git commit: MINOR: fix RocksDBStore range search
Repository: kafka
Updated Branches:
refs/heads/trunk 4ee68b43c -> d2fc6f36c
MINOR: fix RocksDBStore range search
The range is inclusive according to KeyValueStore's java doc.
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #883 from ymatsuda/minor
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d2fc6f36
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d2fc6f36
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d2fc6f36
Branch: refs/heads/trunk
Commit: d2fc6f36cc0b98d4d0acaa62ce3f2283c4e60581
Parents: 4ee68b4
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Tue Feb 9 05:00:38 2016 +0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Feb 9 05:00:38 2016 +0800
----------------------------------------------------------------------
.../org/apache/kafka/streams/state/internals/RocksDBStore.java | 2 +-
.../kafka/streams/state/internals/RocksDBWindowStore.java | 5 +++--
2 files changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d2fc6f36/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 5c57854..6176767 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -456,7 +456,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
@Override
public boolean hasNext() {
- return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
+ return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) <= 0;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d2fc6f36/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index d6baf30..581b742 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -189,6 +189,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
@Override
public void close() {
+ flush();
for (Segment segment : segments) {
if (segment != null)
segment.close();
@@ -271,7 +272,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
long segTo = segmentId(Math.max(0L, timeTo));
byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes);
- byte[] binaryUntil = WindowStoreUtils.toBinaryKey(key, timeTo + 1L, 0, serdes);
+ byte[] binaryTo = WindowStoreUtils.toBinaryKey(key, timeTo, Integer.MAX_VALUE, serdes);
ArrayList<KeyValueIterator<byte[], byte[]>> iterators = new ArrayList<>();
@@ -279,7 +280,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
Segment segment = segments[(int) (segmentId % segments.length)];
if (segment != null && segment.id == segmentId)
- iterators.add(segment.range(binaryFrom, binaryUntil));
+ iterators.add(segment.range(binaryFrom, binaryTo));
}
if (iterators.size() > 0) {