You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/09 22:22:54 UTC

kafka git commit: HOTFIX: open window segments in order, add segment id check in getSegment

Repository: kafka
Updated Branches:
  refs/heads/trunk 4fd60c612 -> b5e6b8671


HOTFIX: open window segments in order, add segment id check in getSegment

* During window store initialization, we have to open segments in the segment id order and update ```currentSegmentId```, otherwise cleanup won't work.
* ```getSegment()``` should not create a segment and clean up old segments if the segment id is greater than ```currentSegmentId```. Segment maintenance should be driven not by query but only by data insertion.

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #891 from ymatsuda/hotfix2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b5e6b867
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b5e6b867
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b5e6b867

Branch: refs/heads/trunk
Commit: b5e6b8671a5b6d97d5026261ae8d62b54f068e53
Parents: 4fd60c6
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Tue Feb 9 13:22:46 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Feb 9 13:22:46 2016 -0800

----------------------------------------------------------------------
 .../state/internals/RocksDBWindowStore.java     |  21 ++-
 .../state/internals/RocksDBWindowStoreTest.java | 166 +++++++++++++++++++
 2 files changed, 182 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b5e6b867/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 825c70d..2758e6e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.state.WindowStoreUtils;
 import java.io.File;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.NoSuchElementException;
@@ -181,10 +182,20 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
             File dir = new File(context.stateDir(), name);
 
             if (dir.exists()) {
-                for (String segmentName : dir.list()) {
-                    long segmentId = segmentIdFromSegmentName(segmentName);
-                    if (segmentId >= 0)
-                        getSegment(segmentId);
+                String[] list = dir.list();
+                if (list != null) {
+                    long[] segmentIds = new long[list.length];
+                    for (int i = 0; i < list.length; i++)
+                        segmentIds[i] = segmentIdFromSegmentName(list[i]);
+
+                    // open segments in the id order
+                    Arrays.sort(segmentIds);
+                    for (long segmentId : segmentIds) {
+                        if (segmentId >= 0) {
+                            currentSegmentId = segmentId;
+                            getSegment(segmentId);
+                        }
+                    }
                 }
             } else {
                 dir.mkdir();
@@ -314,7 +325,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     }
 
     private Segment getSegment(long segmentId) {
-        if (segmentId > currentSegmentId - segments.length) {
+        if (segmentId <= currentSegmentId && segmentId > currentSegmentId - segments.length) {
             int index = (int) (segmentId % segments.length);
 
             if (segments[index] != null && segments[index].id != segmentId) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b5e6b867/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 3eda1be..fd55944 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -638,6 +638,172 @@ public class RocksDBWindowStoreTest {
         }
     }
 
+    @Test
+    public void testSegmentMaintenance() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+                    // do nothing
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
+            RocksDBWindowStore<Integer, String> inner =
+                    (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
+
+            try {
+
+                context.setTime(0L);
+                store.put(0, "v");
+                assertEquals(
+                        Utils.mkSet(inner.segmentName(0L)),
+                        segmentDirs(baseDir)
+                );
+
+                context.setTime(59999L);
+                store.put(0, "v");
+                context.setTime(59999L);
+                store.put(0, "v");
+                assertEquals(
+                        Utils.mkSet(inner.segmentName(0L)),
+                        segmentDirs(baseDir)
+                );
+
+                context.setTime(60000L);
+                store.put(0, "v");
+                assertEquals(
+                        Utils.mkSet(inner.segmentName(0L), inner.segmentName(1L)),
+                        segmentDirs(baseDir)
+                );
+
+                WindowStoreIterator iter;
+                int fetchedCount;
+
+                iter = store.fetch(0, 0L, 240000L);
+                fetchedCount = 0;
+                while (iter.hasNext()) {
+                    iter.next();
+                    fetchedCount++;
+                }
+                assertEquals(4, fetchedCount);
+
+                assertEquals(
+                        Utils.mkSet(inner.segmentName(0L), inner.segmentName(1L)),
+                        segmentDirs(baseDir)
+                );
+
+                context.setTime(180000L);
+                store.put(0, "v");
+
+                iter = store.fetch(0, 0L, 240000L);
+                fetchedCount = 0;
+                while (iter.hasNext()) {
+                    iter.next();
+                    fetchedCount++;
+                }
+                assertEquals(2, fetchedCount);
+
+                assertEquals(
+                        Utils.mkSet(inner.segmentName(1L), inner.segmentName(2L), inner.segmentName(3L)),
+                        segmentDirs(baseDir)
+                );
+
+                context.setTime(300000L);
+                store.put(0, "v");
+
+                iter = store.fetch(0, 240000L, 1000000L);
+                fetchedCount = 0;
+                while (iter.hasNext()) {
+                    iter.next();
+                    fetchedCount++;
+                }
+                assertEquals(1, fetchedCount);
+
+                assertEquals(
+                        Utils.mkSet(inner.segmentName(3L), inner.segmentName(4L), inner.segmentName(5L)),
+                        segmentDirs(baseDir)
+                );
+
+            } finally {
+                store.close();
+            }
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testInitialLoading() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+            RecordCollector recordCollector = new RecordCollector(producer) {
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+                    // do nothing
+                }
+            };
+
+            MockProcessorContext context = new MockProcessorContext(
+                    null, baseDir,
+                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+                    recordCollector);
+
+            File storeDir = new File(baseDir, windowName);
+
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
+            RocksDBWindowStore<Integer, String> inner =
+                    (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
+
+            try {
+                new File(storeDir, inner.segmentName(0L)).mkdir();
+                new File(storeDir, inner.segmentName(1L)).mkdir();
+                new File(storeDir, inner.segmentName(2L)).mkdir();
+                new File(storeDir, inner.segmentName(3L)).mkdir();
+                new File(storeDir, inner.segmentName(4L)).mkdir();
+                new File(storeDir, inner.segmentName(5L)).mkdir();
+                new File(storeDir, inner.segmentName(6L)).mkdir();
+            } finally {
+                store.close();
+            }
+
+            store = createWindowStore(context, serdes);
+            inner = (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
+
+            try {
+                assertEquals(
+                        Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)),
+                        segmentDirs(baseDir)
+                );
+
+                WindowStoreIterator iter = store.fetch(0, 0L, 1000000L);
+                while (iter.hasNext()) {
+                    iter.next();
+                }
+
+                assertEquals(
+                        Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)),
+                        segmentDirs(baseDir)
+                );
+
+            } finally {
+                store.close();
+            }
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
     private <E> List<E> toList(WindowStoreIterator<E> iterator) {
         ArrayList<E> list = new ArrayList<>();
         while (iterator.hasNext()) {