You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/09 22:22:54 UTC
kafka git commit: HOTFIX: open window segments in order,
add segment id check in getSegment
Repository: kafka
Updated Branches:
refs/heads/trunk 4fd60c612 -> b5e6b8671
HOTFIX: open window segments in order, add segment id check in getSegment
* During window store initialization, we have to open segments in the segment id order and update ```currentSegmentId```, otherwise cleanup won't work.
* ```getSegment()``` should not create a segment and clean up old segments if the segment id is greater than ```currentSegmentId```. Segment maintenance should be driven not by query but only by data insertion.
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #891 from ymatsuda/hotfix2
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b5e6b867
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b5e6b867
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b5e6b867
Branch: refs/heads/trunk
Commit: b5e6b8671a5b6d97d5026261ae8d62b54f068e53
Parents: 4fd60c6
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Tue Feb 9 13:22:46 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Feb 9 13:22:46 2016 -0800
----------------------------------------------------------------------
.../state/internals/RocksDBWindowStore.java | 21 ++-
.../state/internals/RocksDBWindowStoreTest.java | 166 +++++++++++++++++++
2 files changed, 182 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b5e6b867/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 825c70d..2758e6e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.state.WindowStoreUtils;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.NoSuchElementException;
@@ -181,10 +182,20 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
File dir = new File(context.stateDir(), name);
if (dir.exists()) {
- for (String segmentName : dir.list()) {
- long segmentId = segmentIdFromSegmentName(segmentName);
- if (segmentId >= 0)
- getSegment(segmentId);
+ String[] list = dir.list();
+ if (list != null) {
+ long[] segmentIds = new long[list.length];
+ for (int i = 0; i < list.length; i++)
+ segmentIds[i] = segmentIdFromSegmentName(list[i]);
+
+ // open segments in the id order
+ Arrays.sort(segmentIds);
+ for (long segmentId : segmentIds) {
+ if (segmentId >= 0) {
+ currentSegmentId = segmentId;
+ getSegment(segmentId);
+ }
+ }
}
} else {
dir.mkdir();
@@ -314,7 +325,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
}
private Segment getSegment(long segmentId) {
- if (segmentId > currentSegmentId - segments.length) {
+ if (segmentId <= currentSegmentId && segmentId > currentSegmentId - segments.length) {
int index = (int) (segmentId % segments.length);
if (segments[index] != null && segments[index].id != segmentId) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b5e6b867/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 3eda1be..fd55944 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -638,6 +638,172 @@ public class RocksDBWindowStoreTest {
}
}
+ @Test
+ public void testSegmentMaintenance() throws IOException {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+ Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+ RecordCollector recordCollector = new RecordCollector(producer) {
+ @Override
+ public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+ // do nothing
+ }
+ };
+
+ MockProcessorContext context = new MockProcessorContext(
+ null, baseDir,
+ byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+ recordCollector);
+
+ WindowStore<Integer, String> store = createWindowStore(context, serdes);
+ RocksDBWindowStore<Integer, String> inner =
+ (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
+
+ try {
+
+ context.setTime(0L);
+ store.put(0, "v");
+ assertEquals(
+ Utils.mkSet(inner.segmentName(0L)),
+ segmentDirs(baseDir)
+ );
+
+ context.setTime(59999L);
+ store.put(0, "v");
+ context.setTime(59999L);
+ store.put(0, "v");
+ assertEquals(
+ Utils.mkSet(inner.segmentName(0L)),
+ segmentDirs(baseDir)
+ );
+
+ context.setTime(60000L);
+ store.put(0, "v");
+ assertEquals(
+ Utils.mkSet(inner.segmentName(0L), inner.segmentName(1L)),
+ segmentDirs(baseDir)
+ );
+
+ WindowStoreIterator iter;
+ int fetchedCount;
+
+ iter = store.fetch(0, 0L, 240000L);
+ fetchedCount = 0;
+ while (iter.hasNext()) {
+ iter.next();
+ fetchedCount++;
+ }
+ assertEquals(4, fetchedCount);
+
+ assertEquals(
+ Utils.mkSet(inner.segmentName(0L), inner.segmentName(1L)),
+ segmentDirs(baseDir)
+ );
+
+ context.setTime(180000L);
+ store.put(0, "v");
+
+ iter = store.fetch(0, 0L, 240000L);
+ fetchedCount = 0;
+ while (iter.hasNext()) {
+ iter.next();
+ fetchedCount++;
+ }
+ assertEquals(2, fetchedCount);
+
+ assertEquals(
+ Utils.mkSet(inner.segmentName(1L), inner.segmentName(2L), inner.segmentName(3L)),
+ segmentDirs(baseDir)
+ );
+
+ context.setTime(300000L);
+ store.put(0, "v");
+
+ iter = store.fetch(0, 240000L, 1000000L);
+ fetchedCount = 0;
+ while (iter.hasNext()) {
+ iter.next();
+ fetchedCount++;
+ }
+ assertEquals(1, fetchedCount);
+
+ assertEquals(
+ Utils.mkSet(inner.segmentName(3L), inner.segmentName(4L), inner.segmentName(5L)),
+ segmentDirs(baseDir)
+ );
+
+ } finally {
+ store.close();
+ }
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
+ @Test
+ public void testInitialLoading() throws IOException {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+ Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+ RecordCollector recordCollector = new RecordCollector(producer) {
+ @Override
+ public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+ // do nothing
+ }
+ };
+
+ MockProcessorContext context = new MockProcessorContext(
+ null, baseDir,
+ byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+ recordCollector);
+
+ File storeDir = new File(baseDir, windowName);
+
+ WindowStore<Integer, String> store = createWindowStore(context, serdes);
+ RocksDBWindowStore<Integer, String> inner =
+ (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
+
+ try {
+ new File(storeDir, inner.segmentName(0L)).mkdir();
+ new File(storeDir, inner.segmentName(1L)).mkdir();
+ new File(storeDir, inner.segmentName(2L)).mkdir();
+ new File(storeDir, inner.segmentName(3L)).mkdir();
+ new File(storeDir, inner.segmentName(4L)).mkdir();
+ new File(storeDir, inner.segmentName(5L)).mkdir();
+ new File(storeDir, inner.segmentName(6L)).mkdir();
+ } finally {
+ store.close();
+ }
+
+ store = createWindowStore(context, serdes);
+ inner = (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
+
+ try {
+ assertEquals(
+ Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)),
+ segmentDirs(baseDir)
+ );
+
+ WindowStoreIterator iter = store.fetch(0, 0L, 1000000L);
+ while (iter.hasNext()) {
+ iter.next();
+ }
+
+ assertEquals(
+ Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)),
+ segmentDirs(baseDir)
+ );
+
+ } finally {
+ store.close();
+ }
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
private <E> List<E> toList(WindowStoreIterator<E> iterator) {
ArrayList<E> list = new ArrayList<>();
while (iterator.hasNext()) {