You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/09 01:48:56 UTC

kafka git commit: HOTFIX: open window segments on init

Repository: kafka
Updated Branches:
  refs/heads/trunk f7ad3d1b1 -> feda3f68e


HOTFIX: open window segments on init

guozhangwang

A window store should open all existing segments. This is important for segment cleanup, and it also ensures that the first fetch() call returns the hits, the values in the search range. (previously, it missed the hits in fetch() immediately after initialization).

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #886 from ymatsuda/hotfix3


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/feda3f68
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/feda3f68
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/feda3f68

Branch: refs/heads/trunk
Commit: feda3f68e98b5269431db9f2a5f131c03a16f651
Parents: f7ad3d1
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Mon Feb 8 16:48:06 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Feb 8 16:48:06 2016 -0800

----------------------------------------------------------------------
 .../streams/state/internals/RocksDBStore.java   |  8 +-
 .../state/internals/RocksDBWindowStore.java     | 79 +++++++++++++++-----
 .../state/internals/RocksDBWindowStoreTest.java | 21 ++----
 3 files changed, 75 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/feda3f68/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 6176767..11bf96e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -60,6 +60,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     private static final String DB_FILE_DIR = "rocksdb";
 
     private final String name;
+    private final String parentDir;
 
     private final Options options;
     private final WriteOptions wOptions;
@@ -91,7 +92,12 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     }
 
     public RocksDBStore(String name, Serdes<K, V> serdes) {
+        this(name, DB_FILE_DIR, serdes);
+    }
+
+    public RocksDBStore(String name, String parentDir, Serdes<K, V> serdes) {
         this.name = name;
+        this.parentDir = parentDir;
         this.serdes = serdes;
 
         // initialize the rocksdb options
@@ -131,7 +137,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     public void openDB(ProcessorContext context) {
         this.context = context;
-        this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), this.name);
+        this.dbDir = new File(new File(this.context.stateDir(), parentDir), this.name);
         this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/feda3f68/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 581b742..825c70d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.WindowStoreUtils;
 
 
+import java.io.File;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
@@ -47,8 +48,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     private static class Segment extends RocksDBStore<byte[], byte[]> {
         public final long id;
 
-        Segment(String name, long id) {
-            super(name, WindowStoreUtils.INNER_SERDES);
+        Segment(String segmentName, String windowName, long id) {
+            super(segmentName, windowName, WindowStoreUtils.INNER_SERDES);
             this.id = id;
         }
 
@@ -159,6 +160,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     public void init(ProcessorContext context) {
         this.context = context;
 
+        openExistingSegments();
+
         this.changeLogger = this.loggingEnabled ?
                 new RawStoreChangeLogger(name, context) : null;
 
@@ -169,6 +172,26 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
                 putInternal(key, value);
             }
         });
+
+        flush();
+    }
+
+    private void openExistingSegments() {
+        try {
+            File dir = new File(context.stateDir(), name);
+
+            if (dir.exists()) {
+                for (String segmentName : dir.list()) {
+                    long segmentId = segmentIdFromSegmentName(segmentName);
+                    if (segmentId >= 0)
+                        getSegment(segmentId);
+                }
+            } else {
+                dir.mkdir();
+            }
+        } catch (Exception ex) {
+
+        }
     }
 
     @Override
@@ -228,11 +251,12 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         }
 
         // If the record is within the retention period, put it in the store.
-        if (segmentId > currentSegmentId - segments.length) {
+        Segment segment = getSegment(segmentId);
+        if (segment != null) {
             if (retainDuplicates)
                 seqnum = (seqnum + 1) & 0x7FFFFFFF;
             byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes);
-            getSegment(segmentId).put(binaryKey, serdes.rawValue(value));
+            segment.put(binaryKey, serdes.rawValue(value));
             return binaryKey;
         } else {
             return null;
@@ -249,16 +273,16 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         }
 
         // If the record is within the retention period, put it in the store.
-        if (segmentId > currentSegmentId - segments.length)
-            getSegment(segmentId).put(binaryKey, binaryValue);
+        Segment segment = getSegment(segmentId);
+        if (segment != null)
+            segment.put(binaryKey, binaryValue);
     }
 
     private byte[] getInternal(byte[] binaryKey) {
         long segmentId = segmentId(WindowStoreUtils.timestampFromBinaryKey(binaryKey));
 
-        Segment segment = segments[(int) (segmentId % segments.length)];
-
-        if (segment != null && segment.id == segmentId) {
+        Segment segment = getSegment(segmentId);
+        if (segment != null) {
             return segment.get(binaryKey);
         } else {
             return null;
@@ -277,9 +301,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         ArrayList<KeyValueIterator<byte[], byte[]>> iterators = new ArrayList<>();
 
         for (long segmentId = segFrom; segmentId <= segTo; segmentId++) {
-            Segment segment = segments[(int) (segmentId % segments.length)];
-
-            if (segment != null && segment.id == segmentId)
+            Segment segment = getSegment(segmentId);
+            if (segment != null)
                 iterators.add(segment.range(binaryFrom, binaryTo));
         }
 
@@ -291,14 +314,23 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     }
 
     private Segment getSegment(long segmentId) {
-        int index = (int) (segmentId % segments.length);
+        if (segmentId > currentSegmentId - segments.length) {
+            int index = (int) (segmentId % segments.length);
 
-        if (segments[index] == null) {
-            segments[index] = new Segment(name + "-" + directorySuffix(segmentId), segmentId);
-            segments[index].openDB(context);
-        }
+            if (segments[index] != null && segments[index].id != segmentId) {
+                cleanup();
+            }
+
+            if (segments[index] == null) {
+                segments[index] = new Segment(segmentName(segmentId), name, segmentId);
+                segments[index].openDB(context);
+            }
+
+            return segments[index];
 
-        return segments[index];
+        } else {
+            return null;
+        }
     }
 
     private void cleanup() {
@@ -316,10 +348,19 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     }
 
     // this method is defined public since it is used for unit tests
-    public String directorySuffix(long segmentId) {
+    public String segmentName(long segmentId) {
         return formatter.format(new Date(segmentId * segmentInterval));
     }
 
+    public long segmentIdFromSegmentName(String segmentName) {
+        try {
+            Date date = formatter.parse(segmentName);
+            return date.getTime() / segmentInterval;
+        } catch (Exception ex) {
+            return -1L;
+        }
+    }
+
     // this method is defined public since it is used for unit tests
     public Set<Long> segmentIds() {
         HashSet<Long> segmentIds = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/feda3f68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 94385c8..3eda1be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -39,6 +39,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -52,6 +53,7 @@ public class RocksDBWindowStoreTest {
 
     private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
     private final ByteArrayDeserializer byteArrayDeserializer = new ByteArrayDeserializer();
+    private final String windowName = "window";
     private final int numSegments = 3;
     private final long segmentSize = RocksDBWindowStore.MIN_SEGMENT_INTERVAL;
     private final long retentionPeriod = segmentSize * (numSegments - 1);
@@ -60,7 +62,7 @@ public class RocksDBWindowStoreTest {
 
     @SuppressWarnings("unchecked")
     protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, Serdes<K, V> serdes) {
-        StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>("window", retentionPeriod, numSegments, true, serdes, null);
+        StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, true, serdes, null);
 
         WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
         store.init(context);
@@ -516,7 +518,7 @@ public class RocksDBWindowStoreTest {
                 // check segment directories
                 store.flush();
                 assertEquals(
-                        Utils.mkSet(inner.directorySuffix(4L), inner.directorySuffix(5L), inner.directorySuffix(6L)),
+                        Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)),
                         segmentDirs(baseDir)
                 );
             } finally {
@@ -606,7 +608,7 @@ public class RocksDBWindowStoreTest {
                     (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
 
             try {
-                context.restore("window", changeLog);
+                context.restore(windowName, changeLog);
 
                 assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
 
@@ -623,7 +625,7 @@ public class RocksDBWindowStoreTest {
                 // check segment directories
                 store.flush();
                 assertEquals(
-                        Utils.mkSet(inner.directorySuffix(4L), inner.directorySuffix(5L), inner.directorySuffix(6L)),
+                        Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)),
                         segmentDirs(baseDir)
                 );
             } finally {
@@ -645,16 +647,9 @@ public class RocksDBWindowStoreTest {
     }
 
     private Set<String> segmentDirs(File baseDir) {
-        File rocksDbDir = new File(baseDir, "rocksdb");
-        String[] subdirs = rocksDbDir.list();
+        File windowDir = new File(baseDir, windowName);
 
-        HashSet<String> set = new HashSet<>();
-
-        for (String subdir : subdirs) {
-            if (subdir.startsWith("window-"))
-            set.add(subdir.substring(7));
-        }
-        return set;
+        return new HashSet<>(Arrays.asList(windowDir.list()));
     }
 
     private Map<Integer, Set<String>> entriesByKey(List<KeyValue<byte[], byte[]>> changeLog, long startTime) {