You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/15 05:17:33 UTC

[kafka] branch 0.10.2 updated: KAFKA-6360: Clear RocksDB Segments when store is closed

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 0.10.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.10.2 by this push:
     new e78e91f  KAFKA-6360: Clear RocksDB Segments when store is closed
e78e91f is described below

commit e78e91f69bb5b1e1f41cd3569363b03fea3febee
Author: Damian Guy <da...@gmail.com>
AuthorDate: Thu Dec 14 09:51:56 2017 -0800

    KAFKA-6360: Clear RocksDB Segments when store is closed
    
    Now that we support re-initializing state stores, we need to clear the segments when the store is closed so that they can be re-opened.
    
    Author: Damian Guy <da...@gmail.com>
    
    Reviewers: Bill Bejeck <bb...@gmail.com>, Guozhang Wang <wa...@gmail.com>, Ted Yu <yu...@gmail.com>
    
    Closes #4324 from dguy/kafka-6360
---
 .../kafka/streams/state/internals/Segments.java    |  1 +
 .../internals/RocksDBSegmentedBytesStoreTest.java  | 22 +++++++++++++++++-----
 .../streams/state/internals/SegmentsTest.java      |  9 +++++++++
 3 files changed, 27 insertions(+), 5 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 72eb802..f29ac30 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -143,6 +143,7 @@ class Segments {
         for (Segment segment : segments.values()) {
             segment.close();
         }
+        segments.clear();
     }
 
     private Segment getSegment(long segmentId) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index e9c8f89..6dc4ad6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -52,6 +52,7 @@ public class RocksDBSegmentedBytesStoreTest {
     private final String storeName = "bytes-store";
     private RocksDBSegmentedBytesStore bytesStore;
     private File stateDir;
+    private MockProcessorContext context;
 
     @Before
     public void before() {
@@ -62,11 +63,12 @@ public class RocksDBSegmentedBytesStoreTest {
                                                     new SessionKeySchema());
 
         stateDir = TestUtils.tempDirectory();
-        final MockProcessorContext context = new MockProcessorContext(stateDir,
-                                                                      Serdes.String(),
-                                                                      Serdes.Long(),
-                                                                      new NoOpRecordCollector(),
-                                                                      new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
+        context = new MockProcessorContext(
+            stateDir,
+            Serdes.String(),
+            Serdes.Long(),
+            new NoOpRecordCollector(),
+            new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
         bytesStore.init(context, bytesStore);
     }
 
@@ -144,6 +146,16 @@ public class RocksDBSegmentedBytesStoreTest {
 
     }
 
+    @Test
+    public void shouldBeAbleToWriteToReInitializedStore() {
+        final String key = "a";
+        // need to create a segment so we can attempt to write to it again.
+        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
+        bytesStore.close();
+        bytesStore.init(context, bytesStore);
+        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
+    }
+
     private Set<String> segmentDirs() {
         File windowDir = new File(stateDir, storeName);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index 9e34e63..4d32830 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -30,6 +30,9 @@ import org.junit.Test;
 import java.io.File;
 import java.util.List;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -184,6 +187,12 @@ public class SegmentsTest {
         verifyCorrectSegments(2, 5);
     }
 
+    @Test
+    public void shouldClearSegmentsOnClose() {
+        segments.getOrCreateSegment(0, context);
+        segments.close();
+        assertThat(segments.getSegmentForTimestamp(0), is(nullValue()));
+    }
     private void verifyCorrectSegments(final long first, final int numSegments) {
         final List<Segment> result = this.segments.segments(0, Long.MAX_VALUE);
         assertEquals(numSegments, result.size());

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.