You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/15 05:17:33 UTC
[kafka] branch 0.10.2 updated: KAFKA-6360: Clear RocksDB Segments
when store is closed
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 0.10.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/0.10.2 by this push:
new e78e91f KAFKA-6360: Clear RocksDB Segments when store is closed
e78e91f is described below
commit e78e91f69bb5b1e1f41cd3569363b03fea3febee
Author: Damian Guy <da...@gmail.com>
AuthorDate: Thu Dec 14 09:51:56 2017 -0800
KAFKA-6360: Clear RocksDB Segments when store is closed
Now that we support re-initializing state stores, we need to clear the segments when the store is closed so that they can be re-opened.
Author: Damian Guy <da...@gmail.com>
Reviewers: Bill Bejeck <bb...@gmail.com>, Guozhang Wang <wa...@gmail.com>, Ted Yu <yu...@gmail.com>
Closes #4324 from dguy/kafka-6360
---
.../kafka/streams/state/internals/Segments.java | 1 +
.../internals/RocksDBSegmentedBytesStoreTest.java | 22 +++++++++++++++++-----
.../streams/state/internals/SegmentsTest.java | 9 +++++++++
3 files changed, 27 insertions(+), 5 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 72eb802..f29ac30 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -143,6 +143,7 @@ class Segments {
for (Segment segment : segments.values()) {
segment.close();
}
+ segments.clear();
}
private Segment getSegment(long segmentId) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index e9c8f89..6dc4ad6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -52,6 +52,7 @@ public class RocksDBSegmentedBytesStoreTest {
private final String storeName = "bytes-store";
private RocksDBSegmentedBytesStore bytesStore;
private File stateDir;
+ private MockProcessorContext context;
@Before
public void before() {
@@ -62,11 +63,12 @@ public class RocksDBSegmentedBytesStoreTest {
new SessionKeySchema());
stateDir = TestUtils.tempDirectory();
- final MockProcessorContext context = new MockProcessorContext(stateDir,
- Serdes.String(),
- Serdes.Long(),
- new NoOpRecordCollector(),
- new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
+ context = new MockProcessorContext(
+ stateDir,
+ Serdes.String(),
+ Serdes.Long(),
+ new NoOpRecordCollector(),
+ new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
bytesStore.init(context, bytesStore);
}
@@ -144,6 +146,16 @@ public class RocksDBSegmentedBytesStoreTest {
}
+ @Test
+ public void shouldBeAbleToWriteToReInitializedStore() {
+ final String key = "a";
+ // need to create a segment so we can attempt to write to it again.
+ bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
+ bytesStore.close();
+ bytesStore.init(context, bytesStore);
+ bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
+ }
+
private Set<String> segmentDirs() {
File windowDir = new File(stateDir, storeName);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index 9e34e63..4d32830 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -30,6 +30,9 @@ import org.junit.Test;
import java.io.File;
import java.util.List;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -184,6 +187,12 @@ public class SegmentsTest {
verifyCorrectSegments(2, 5);
}
+ @Test
+ public void shouldClearSegmentsOnClose() {
+ segments.getOrCreateSegment(0, context);
+ segments.close();
+ assertThat(segments.getSegmentForTimestamp(0), is(nullValue()));
+ }
private void verifyCorrectSegments(final long first, final int numSegments) {
final List<Segment> result = this.segments.segments(0, Long.MAX_VALUE);
assertEquals(numSegments, result.size());
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.