You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vcrfxia (via GitHub)" <gi...@apache.org> on 2023/01/21 00:49:41 UTC

[GitHub] [kafka] vcrfxia opened a new pull request, #13143: KAFKA-14491: [3/N] Add logical key value segments

vcrfxia opened a new pull request, #13143:
URL: https://github.com/apache/kafka/pull/13143

   (This PR is stacked on https://github.com/apache/kafka/pull/13142. The first commit does not need to be reviewed separately.)
   
   Today's KeyValueSegments create a new RocksDB instance for each KeyValueSegment. This PR introduces an analogous LogicalKeyValueSegments implementation, with corresponding LogicalKeyValueSegment, which shares a single physical RocksDB instance across all "logical" segments. This will be used for the RocksDB versioned store implementation proposed in [KIP-889](https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on pull request #13143: KAFKA-14491: [3/N] Add logical key value segments

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on PR #13143:
URL: https://github.com/apache/kafka/pull/13143#issuecomment-1409929859

   Merged the other PR -- can you rebase this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax merged pull request #13143: KAFKA-14491: [3/N] Add logical key value segments

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax merged PR #13143:
URL: https://github.com/apache/kafka/pull/13143


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on pull request #13143: KAFKA-14491: [3/N] Add logical key value segments

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on PR #13143:
URL: https://github.com/apache/kafka/pull/13143#issuecomment-1405948413

   Checkstyle error:
   ```
   > Task :streams:checkstyleTest
   
   [2023-01-26T23:03:19.137Z] [ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-13143/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java:23:15: Unused import - org.junit.Assert.assertFalse. [UnusedImports]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13143: KAFKA-14491: [3/N] Add logical key value segments

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13143:
URL: https://github.com/apache/kafka/pull/13143#discussion_r1095367325


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentsTest {
+
+    private static final long SEGMENT_INTERVAL = 100L;
+    private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
+    private static final String STORE_NAME = "logical-segments";
+    private static final String METRICS_SCOPE = "metrics-scope";
+    private static final String DB_FILE_DIR = "rocksdb";
+
+    private InternalMockProcessorContext context;
+
+    private LogicalKeyValueSegments segments;
+
+    @Before
+    public void setUp() {
+        context = new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.Long(),
+            new MockRecordCollector(),
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
+        );
+        segments = new LogicalKeyValueSegments(
+            STORE_NAME,
+            DB_FILE_DIR,
+            RETENTION_PERIOD,
+            SEGMENT_INTERVAL,
+            new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME)
+        );
+        segments.openExisting(context, -1L);
+    }
+
+    @After
+    public void tearDown() {
+        segments.close();
+    }
+
+    @Test
+    public void shouldGetSegmentIdsFromTimestamp() {
+        assertEquals(0, segments.segmentId(0));
+        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
+        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
+        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
+    }
+
+    @Test
+    public void shouldCreateSegments() {
+        final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final LogicalKeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(2, context, -1L);
+
+        final File rocksdbDir = new File(new File(context.stateDir(), DB_FILE_DIR), STORE_NAME);
+        assertTrue(rocksdbDir.isDirectory());
+
+        assertTrue(segment1.isOpen());
+        assertTrue(segment2.isOpen());
+        assertTrue(segment3.isOpen());
+    }
+
+    @Test
+    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
+        final long streamTime = updateStreamTimeAndCreateSegment(7);
+        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
+    }
+
+    @Test
+    public void shouldCleanupSegmentsThatHaveExpired() {

Review Comment:
   >  If you think it's worth it, I can remove these tests from here and also from KeyValueSegmentsTest.java, and create a dummy AbstractSegments implementation to add an AbstractSegmentsTest.java. I'd like to do that as a follow-up PR instead of as part of this change, though.
   
   Sounds cleaner to me. And yes, follow up PR is preferable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13143: KAFKA-14491: [3/N] Add logical key value segments

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13143:
URL: https://github.com/apache/kafka/pull/13143#discussion_r1096168089


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentTest.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentTest {
+
+    private static final String STORE_NAME = "physical-rocks";
+    private static final String METRICS_SCOPE = "metrics-scope";
+    private static final String DB_FILE_DIR = "rocksdb";
+    private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
+    private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();
+
+    private RocksDBStore physicalStore;
+
+    private LogicalKeyValueSegment segment1;
+    private LogicalKeyValueSegment segment2;
+
+    @Before
+    public void setUp() {
+        physicalStore = new RocksDBStore(STORE_NAME, DB_FILE_DIR, new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME), false);
+        physicalStore.init((StateStoreContext) new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.String(),
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig())
+        ), physicalStore);
+
+        segment1 = new LogicalKeyValueSegment(1, "segment-1", physicalStore);
+        segment2 = new LogicalKeyValueSegment(2, "segment-2", physicalStore);
+    }
+
+    @After
+    public void tearDown() {
+        segment1.close();
+        segment2.close();
+        physicalStore.close();
+    }
+
+    @Test
+    public void shouldPut() {
+        final KeyValue<String, String> kv0 = new KeyValue<>("1", "a");
+        final KeyValue<String, String> kv1 = new KeyValue<>("2", "b");
+
+        segment1.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8));
+        segment1.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8));
+        segment2.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8));
+        segment2.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8));
+
+        assertEquals("a", getAndDeserialize(segment1, "1"));

Review Comment:
   Ah good point. That's definitely a gap in `shouldPut()` and `shouldPutAll()`. All of the other tests are already set up in a way that they fail if segments are not properly isolated from each other. Just pushed a fix to the two tests which didn't ensure that, and some minor cleanup to a few of the other tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13143: KAFKA-14491: [3/N] Add logical key value segments

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13143:
URL: https://github.com/apache/kafka/pull/13143#discussion_r1093929163


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentsTest {
+
+    private static final long SEGMENT_INTERVAL = 100L;
+    private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
+    private static final String STORE_NAME = "logical-segments";
+    private static final String METRICS_SCOPE = "metrics-scope";
+    private static final String DB_FILE_DIR = "rocksdb";
+
+    private InternalMockProcessorContext context;
+
+    private LogicalKeyValueSegments segments;
+
+    @Before
+    public void setUp() {
+        context = new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.Long(),
+            new MockRecordCollector(),
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
+        );
+        segments = new LogicalKeyValueSegments(
+            STORE_NAME,
+            DB_FILE_DIR,
+            RETENTION_PERIOD,
+            SEGMENT_INTERVAL,
+            new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME)
+        );
+        segments.openExisting(context, -1L);
+    }
+
+    @After
+    public void tearDown() {
+        segments.close();
+    }
+
+    @Test
+    public void shouldGetSegmentIdsFromTimestamp() {
+        assertEquals(0, segments.segmentId(0));
+        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
+        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
+        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
+    }
+
+    @Test
+    public void shouldCreateSegments() {
+        final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L);

Review Comment:
   See [above](https://github.com/apache/kafka/pull/13143#discussion_r1093927849).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13143: KAFKA-14491: [3/N] Add logical key value segments

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13143:
URL: https://github.com/apache/kafka/pull/13143#discussion_r1095365120


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentTest.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentTest {
+
+    private static final String STORE_NAME = "physical-rocks";
+    private static final String METRICS_SCOPE = "metrics-scope";
+    private static final String DB_FILE_DIR = "rocksdb";
+    private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
+    private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();
+
+    private RocksDBStore physicalStore;
+
+    private LogicalKeyValueSegment segment1;
+    private LogicalKeyValueSegment segment2;
+
+    @Before
+    public void setUp() {
+        physicalStore = new RocksDBStore(STORE_NAME, DB_FILE_DIR, new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME), false);
+        physicalStore.init((StateStoreContext) new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.String(),
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig())
+        ), physicalStore);
+
+        segment1 = new LogicalKeyValueSegment(1, "segment-1", physicalStore);
+        segment2 = new LogicalKeyValueSegment(2, "segment-2", physicalStore);
+    }
+
+    @After
+    public void tearDown() {
+        segment1.close();
+        segment2.close();
+        physicalStore.close();
+    }
+
+    @Test
+    public void shouldPut() {
+        final KeyValue<String, String> kv0 = new KeyValue<>("1", "a");
+        final KeyValue<String, String> kv1 = new KeyValue<>("2", "b");
+
+        segment1.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8));
+        segment1.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8));
+        segment2.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8));
+        segment2.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8));
+
+        assertEquals("a", getAndDeserialize(segment1, "1"));

Review Comment:
   I see your point, but the test does not really achieve this, as we put the same data into both segment? To test "segment isolation" we would need to put 4 different record (2 per segment) and test both positive (put on s1 allows use to get on s1) and negative (put on s1, does not allow get on s2 to see the data)?
   
   Might apply to other tests, too?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentsTest {
+
+    private static final long SEGMENT_INTERVAL = 100L;
+    private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
+    private static final String STORE_NAME = "logical-segments";
+    private static final String METRICS_SCOPE = "metrics-scope";
+    private static final String DB_FILE_DIR = "rocksdb";
+
+    private InternalMockProcessorContext context;
+
+    private LogicalKeyValueSegments segments;
+
+    @Before
+    public void setUp() {
+        context = new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.Long(),
+            new MockRecordCollector(),
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
+        );
+        segments = new LogicalKeyValueSegments(
+            STORE_NAME,
+            DB_FILE_DIR,
+            RETENTION_PERIOD,
+            SEGMENT_INTERVAL,
+            new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME)
+        );
+        segments.openExisting(context, -1L);
+    }
+
+    @After
+    public void tearDown() {
+        segments.close();
+    }
+
+    @Test
+    public void shouldGetSegmentIdsFromTimestamp() {
+        assertEquals(0, segments.segmentId(0));
+        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
+        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
+        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
+    }
+
+    @Test
+    public void shouldCreateSegments() {
+        final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final LogicalKeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(2, context, -1L);
+
+        final File rocksdbDir = new File(new File(context.stateDir(), DB_FILE_DIR), STORE_NAME);
+        assertTrue(rocksdbDir.isDirectory());
+
+        assertTrue(segment1.isOpen());
+        assertTrue(segment2.isOpen());
+        assertTrue(segment3.isOpen());
+    }
+
+    @Test
+    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
+        final long streamTime = updateStreamTimeAndCreateSegment(7);
+        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
+    }
+
+    @Test
+    public void shouldCleanupSegmentsThatHaveExpired() {
+        final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, 0);
+        final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(0, context, SEGMENT_INTERVAL * 2L);
+        final LogicalKeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(3, context, SEGMENT_INTERVAL * 3L);
+        final LogicalKeyValueSegment segment4 = segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
+
+        final List<LogicalKeyValueSegment> allSegments = segments.allSegments(true);
+        assertEquals(2, allSegments.size());
+        assertEquals(segment3, allSegments.get(0));
+        assertEquals(segment4, allSegments.get(1));
+    }
+
+    @Test
+    public void shouldGetSegmentForTimestamp() {
+        final LogicalKeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, context, -1L);

Review Comment:
   It's not really an technical issue, just some "code cleanliness" (we might compute a negative time inside anyway when we subtract retention time).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13143: KAFKA-14491: [3/N] Add logical key value segments

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13143:
URL: https://github.com/apache/kafka/pull/13143#discussion_r1094000371


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentTest.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentTest {
+
+    private static final String STORE_NAME = "physical-rocks";
+    private static final String METRICS_SCOPE = "metrics-scope";
+    private static final String DB_FILE_DIR = "rocksdb";
+    private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
+    private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();
+
+    private RocksDBStore physicalStore;
+
+    private LogicalKeyValueSegment segment1;
+    private LogicalKeyValueSegment segment2;
+
+    @Before
+    public void setUp() {
+        physicalStore = new RocksDBStore(STORE_NAME, DB_FILE_DIR, new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME), false);
+        physicalStore.init((StateStoreContext) new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.String(),
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig())
+        ), physicalStore);
+
+        segment1 = new LogicalKeyValueSegment(1, "segment-1", physicalStore);
+        segment2 = new LogicalKeyValueSegment(2, "segment-2", physicalStore);
+    }
+
+    @After
+    public void tearDown() {
+        segment1.close();
+        segment2.close();
+        physicalStore.close();
+    }
+
+    @Test
+    public void shouldPut() {
+        final KeyValue<String, String> kv0 = new KeyValue<>("1", "a");
+        final KeyValue<String, String> kv1 = new KeyValue<>("2", "b");
+
+        segment1.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8));
+        segment1.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8));
+        segment2.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8));
+        segment2.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8));
+
+        assertEquals("a", getAndDeserialize(segment1, "1"));
+        assertEquals("b", getAndDeserialize(segment1, "2"));
+        assertEquals("a", getAndDeserialize(segment2, "1"));
+        assertEquals("b", getAndDeserialize(segment2, "2"));
+    }
+
+    @Test
+    public void shouldPutAll() {
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(
+            new Bytes(STRING_SERIALIZER.serialize(null, "1")),
+            STRING_SERIALIZER.serialize(null, "a")));
+        entries.add(new KeyValue<>(
+            new Bytes(STRING_SERIALIZER.serialize(null, "2")),
+            STRING_SERIALIZER.serialize(null, "b")));
+        entries.add(new KeyValue<>(
+            new Bytes(STRING_SERIALIZER.serialize(null, "3")),
+            STRING_SERIALIZER.serialize(null, "c")));
+
+        segment1.putAll(entries);
+        segment2.putAll(entries);
+
+        assertEquals("a", getAndDeserialize(segment1, "1"));
+        assertEquals("b", getAndDeserialize(segment1, "2"));
+        assertEquals("c", getAndDeserialize(segment1, "3"));
+        assertEquals("a", getAndDeserialize(segment2, "1"));
+        assertEquals("b", getAndDeserialize(segment2, "2"));
+        assertEquals("c", getAndDeserialize(segment2, "3"));
+    }
+
+    @Test
+    public void shouldPutIfAbsent() {
+        final Bytes keyBytes = new Bytes(STRING_SERIALIZER.serialize(null, "one"));
+        final byte[] valueBytes = STRING_SERIALIZER.serialize(null, "A");
+        final byte[] valueBytesUpdate = STRING_SERIALIZER.serialize(null, "B");
+
+        segment1.putIfAbsent(keyBytes, valueBytes);
+        segment1.putIfAbsent(keyBytes, valueBytesUpdate);
+        segment2.putIfAbsent(keyBytes, valueBytesUpdate);
+
+        assertEquals("A", STRING_DESERIALIZER.deserialize(null, segment1.get(keyBytes)));
+        assertEquals("B", STRING_DESERIALIZER.deserialize(null, segment2.get(keyBytes)));
+    }
+
+    @Test
+    public void shouldDelete() {
+        final KeyValue<String, String> kv0 = new KeyValue<>("1", "a");
+        final KeyValue<String, String> kv1 = new KeyValue<>("2", "b");
+
+        segment1.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8));
+        segment1.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8));
+        segment2.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8));
+        segment2.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8));
+        segment1.delete(new Bytes(kv0.key.getBytes(UTF_8)));
+
+        assertNull(segment1.get(new Bytes(kv0.key.getBytes(UTF_8))));
+        assertEquals("b", getAndDeserialize(segment1, "2"));
+        assertEquals("a", getAndDeserialize(segment2, "1"));
+        assertEquals("b", getAndDeserialize(segment2, "2"));
+    }
+
+    @Test
+    public void shouldReturnValuesOnRange() {
+        final KeyValue<String, String> kv0 = new KeyValue<>("0", "zero");
+        final KeyValue<String, String> kv1 = new KeyValue<>("1", "one");
+        final KeyValue<String, String> kv2 = new KeyValue<>("2", "two");
+        final KeyValue<String, String> kvOther = new KeyValue<>("1", "other");
+
+        segment1.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8));
+        segment1.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8));
+        segment1.put(new Bytes(kv2.key.getBytes(UTF_8)), kv2.value.getBytes(UTF_8));
+        segment2.put(new Bytes(kvOther.key.getBytes(UTF_8)), kvOther.value.getBytes(UTF_8));
+
+        final LinkedList<KeyValue<String, String>> expectedContents = new LinkedList<>();
+        expectedContents.add(kv0);
+        expectedContents.add(kv1);
+
+        try (final KeyValueIterator<Bytes, byte[]> iterator = segment1.range(null, new Bytes(STRING_SERIALIZER.serialize(null, "1")))) {

Review Comment:
   Heh, this additional test coverage caught a bug. Pushed a fix in the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on pull request #13143: KAFKA-14491: [3/N] Add logical key value segments

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on PR #13143:
URL: https://github.com/apache/kafka/pull/13143#issuecomment-1405780466

   Thanks. Needed to update the test for the latest changes which now set `isOpen = true` always. Fixed now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13143: KAFKA-14491: [3/N] Add logical key value segments

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13143:
URL: https://github.com/apache/kafka/pull/13143#discussion_r1093924959


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentTest.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentTest {
+
+    private static final String STORE_NAME = "physical-rocks";
+    private static final String METRICS_SCOPE = "metrics-scope";
+    private static final String DB_FILE_DIR = "rocksdb";
+    private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
+    private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();
+
+    private RocksDBStore physicalStore;
+
+    private LogicalKeyValueSegment segment1;
+    private LogicalKeyValueSegment segment2;
+
+    @Before
+    public void setUp() {
+        physicalStore = new RocksDBStore(STORE_NAME, DB_FILE_DIR, new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME), false);
+        physicalStore.init((StateStoreContext) new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.String(),
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig())
+        ), physicalStore);
+
+        segment1 = new LogicalKeyValueSegment(1, "segment-1", physicalStore);
+        segment2 = new LogicalKeyValueSegment(2, "segment-2", physicalStore);
+    }
+
+    @After
+    public void tearDown() {
+        segment1.close();
+        segment2.close();
+        physicalStore.close();
+    }
+
+    @Test
+    public void shouldPut() {
+        final KeyValue<String, String> kv0 = new KeyValue<>("1", "a");
+        final KeyValue<String, String> kv1 = new KeyValue<>("2", "b");
+
+        segment1.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8));
+        segment1.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8));
+        segment2.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8));
+        segment2.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8));
+
+        assertEquals("a", getAndDeserialize(segment1, "1"));

Review Comment:
   I was on the fence about this because it requires testing the internals of the class (i.e., specifically how the segment prefixes are serialized) rather than just the public-facing methods. In the end I opted to test indirectly instead, by inserting the same keys into different segments and checking that their values do not collide.
   
   If you prefer checking the contents of the physical store itself, I can make the update. 



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentTest.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentTest {
+
+    private static final String STORE_NAME = "physical-rocks";
+    private static final String METRICS_SCOPE = "metrics-scope";
+    private static final String DB_FILE_DIR = "rocksdb";
+    private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
+    private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();
+
+    private RocksDBStore physicalStore;
+
+    private LogicalKeyValueSegment segment1;
+    private LogicalKeyValueSegment segment2;
+
+    @Before
+    public void setUp() {
+        physicalStore = new RocksDBStore(STORE_NAME, DB_FILE_DIR, new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME), false);
+        physicalStore.init((StateStoreContext) new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.String(),
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig())
+        ), physicalStore);
+
+        segment1 = new LogicalKeyValueSegment(1, "segment-1", physicalStore);
+        segment2 = new LogicalKeyValueSegment(2, "segment-2", physicalStore);
+    }
+
+    @After
+    public void tearDown() {
+        segment1.close();
+        segment2.close();
+        physicalStore.close();
+    }
+
+    @Test
+    public void shouldPut() {
+        final KeyValue<String, String> kv0 = new KeyValue<>("1", "a");
+        final KeyValue<String, String> kv1 = new KeyValue<>("2", "b");
+
+        segment1.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8));

Review Comment:
   No particular reason. I copied from RocksDBStoreTest.java which also uses both.
   
   `StringSerializer` handles nulls while `getBytes` doesn't, so if we unify on one of them it'll have to be `StringSerializer`. `StringSerializer` is a bit longer / harder to read. Let me pull it out into a helper method to preserve readability.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentsTest {
+
+    private static final long SEGMENT_INTERVAL = 100L;
+    private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
+    private static final String STORE_NAME = "logical-segments";
+    private static final String METRICS_SCOPE = "metrics-scope";
+    private static final String DB_FILE_DIR = "rocksdb";
+
+    private InternalMockProcessorContext context;
+
+    private LogicalKeyValueSegments segments;
+
+    @Before
+    public void setUp() {
+        context = new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.Long(),
+            new MockRecordCollector(),
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
+        );
+        segments = new LogicalKeyValueSegments(
+            STORE_NAME,
+            DB_FILE_DIR,
+            RETENTION_PERIOD,
+            SEGMENT_INTERVAL,
+            new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME)
+        );
+        segments.openExisting(context, -1L);
+    }
+
+    @After
+    public void tearDown() {
+        segments.close();
+    }
+
+    @Test
+    public void shouldGetSegmentIdsFromTimestamp() {
+        assertEquals(0, segments.segmentId(0));
+        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
+        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
+        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
+    }
+
+    @Test
+    public void shouldCreateSegments() {
+        final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final LogicalKeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(2, context, -1L);
+
+        final File rocksdbDir = new File(new File(context.stateDir(), DB_FILE_DIR), STORE_NAME);
+        assertTrue(rocksdbDir.isDirectory());
+
+        assertTrue(segment1.isOpen());
+        assertTrue(segment2.isOpen());
+        assertTrue(segment3.isOpen());
+    }
+
+    @Test
+    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
+        final long streamTime = updateStreamTimeAndCreateSegment(7);
+        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
+    }
+
+    @Test
+    public void shouldCleanupSegmentsThatHaveExpired() {
+        final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, 0);
+        final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(0, context, SEGMENT_INTERVAL * 2L);
+        final LogicalKeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(3, context, SEGMENT_INTERVAL * 3L);
+        final LogicalKeyValueSegment segment4 = segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
+
+        final List<LogicalKeyValueSegment> allSegments = segments.allSegments(true);
+        assertEquals(2, allSegments.size());
+        assertEquals(segment3, allSegments.get(0));
+        assertEquals(segment4, allSegments.get(1));
+    }
+
+    @Test
+    public void shouldGetSegmentForTimestamp() {
+        final LogicalKeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, context, -1L);

Review Comment:
   Fixed. FWIW the existing KeyValueSegmentsTest.java does the same thing 🤷 



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentsTest {
+
+    private static final long SEGMENT_INTERVAL = 100L;
+    private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
+    private static final String STORE_NAME = "logical-segments";
+    private static final String METRICS_SCOPE = "metrics-scope";
+    private static final String DB_FILE_DIR = "rocksdb";
+
+    private InternalMockProcessorContext context;
+
+    private LogicalKeyValueSegments segments;
+
+    @Before
+    public void setUp() {
+        context = new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.Long(),
+            new MockRecordCollector(),
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
+        );
+        segments = new LogicalKeyValueSegments(
+            STORE_NAME,
+            DB_FILE_DIR,
+            RETENTION_PERIOD,
+            SEGMENT_INTERVAL,
+            new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME)
+        );
+        segments.openExisting(context, -1L);
+    }
+
+    @After
+    public void tearDown() {
+        segments.close();
+    }
+
+    @Test
+    public void shouldGetSegmentIdsFromTimestamp() {
+        assertEquals(0, segments.segmentId(0));
+        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
+        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
+        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
+    }
+
+    @Test
+    public void shouldCreateSegments() {
+        final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final LogicalKeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(2, context, -1L);
+
+        final File rocksdbDir = new File(new File(context.stateDir(), DB_FILE_DIR), STORE_NAME);
+        assertTrue(rocksdbDir.isDirectory());
+
+        assertTrue(segment1.isOpen());
+        assertTrue(segment2.isOpen());
+        assertTrue(segment3.isOpen());
+    }
+
+    @Test
+    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
+        final long streamTime = updateStreamTimeAndCreateSegment(7);
+        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
+    }
+
+    @Test
+    public void shouldCleanupSegmentsThatHaveExpired() {
+        final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, 0);
+        final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(0, context, SEGMENT_INTERVAL * 2L);
+        final LogicalKeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(3, context, SEGMENT_INTERVAL * 3L);
+        final LogicalKeyValueSegment segment4 = segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
+
+        final List<LogicalKeyValueSegment> allSegments = segments.allSegments(true);
+        assertEquals(2, allSegments.size());
+        assertEquals(segment3, allSegments.get(0));
+        assertEquals(segment4, allSegments.get(1));
+    }
+
+    @Test
+    public void shouldGetSegmentForTimestamp() {
+        final LogicalKeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        segments.getOrCreateSegmentIfLive(1, context, -1L);

Review Comment:
   This line differs from the line above because it creates a different segment. The test checks that we get the expected segment, and not just the only one that exists. Let me rewrite this to clarify, and also add upper and lower bounds tests as you suggested.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentsTest {
+
+    private static final long SEGMENT_INTERVAL = 100L;
+    private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
+    private static final String STORE_NAME = "logical-segments";
+    private static final String METRICS_SCOPE = "metrics-scope";
+    private static final String DB_FILE_DIR = "rocksdb";
+
+    private InternalMockProcessorContext context;
+
+    private LogicalKeyValueSegments segments;
+
+    @Before
+    public void setUp() {
+        context = new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.Long(),
+            new MockRecordCollector(),
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
+        );
+        segments = new LogicalKeyValueSegments(
+            STORE_NAME,
+            DB_FILE_DIR,
+            RETENTION_PERIOD,
+            SEGMENT_INTERVAL,
+            new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME)
+        );
+        segments.openExisting(context, -1L);
+    }
+
+    @After
+    public void tearDown() {
+        segments.close();
+    }
+
+    @Test
+    public void shouldGetSegmentIdsFromTimestamp() {
+        assertEquals(0, segments.segmentId(0));
+        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
+        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
+        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
+    }
+
+    @Test
+    public void shouldCreateSegments() {
+        final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final LogicalKeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(2, context, -1L);
+
+        final File rocksdbDir = new File(new File(context.stateDir(), DB_FILE_DIR), STORE_NAME);
+        assertTrue(rocksdbDir.isDirectory());
+
+        assertTrue(segment1.isOpen());
+        assertTrue(segment2.isOpen());
+        assertTrue(segment3.isOpen());
+    }
+
+    @Test
+    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
+        final long streamTime = updateStreamTimeAndCreateSegment(7);
+        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
+    }
+
+    @Test
+    public void shouldCleanupSegmentsThatHaveExpired() {

Review Comment:
   You're right that these tests are testing logic from AbstractSegments and not anything specific about LogicalKeyValueSegments. The thing is, AbstractSegments doesn't have its own test file at the moment (I assume because it's abstract). If you think it's worth it, I can remove these tests from here and also from KeyValueSegmentsTest.java, and create a dummy AbstractSegments implementation to add an AbstractSegmentsTest.java. I'd like to do that as a follow-up PR instead of as part of this change, though.
   
   (Also, for this specific test, I would like to have it here because I plan to refactor the cleanup logic in AbstractSegments in a follow-up PR. The current approach (cleanup as part of `getOrCreateSegmentIfLive()`) is not very efficient for the versioned store use case because this method is called multiple times during a single put operation. It will be better to only perform cleanup once per put.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13143: KAFKA-14491: [3/N] Add logical key value segments

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13143:
URL: https://github.com/apache/kafka/pull/13143#discussion_r1084834388


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
+    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {

Review Comment:
   Seems `openDB` does not do much -- do we actually need it (compare my other comments about the `open` flag)?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
+    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();
+    }
+
+    private void closeOpenIterators() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        if (iterators.size() != 0) {
+            log.warn("Closing {} open iterators for store {}", iterators.size(), name);
+            for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+                iterator.close();
+            }
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;

Review Comment:
   Could we just return `true` here?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
+    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();
+    }
+
+    private void closeOpenIterators() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        if (iterators.size() != 0) {
+            log.warn("Closing {} open iterators for store {}", iterators.size(), name);
+            for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+                iterator.close();
+            }
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public synchronized byte[] get(final Bytes key) {
+        return physicalStore.get(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.range(
+            prefixKeyFormatter.forPhysicalStore(from),
+            prefixKeyFormatter.forPhysicalStore(to),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> all() {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.prefixScan(
+            prefixKeyFormatter.getPrefix(),
+            new BytesSerializer(),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        throw new UnsupportedOperationException("cannot estimate num entries for logical segment");
+    }
+
+    @Override
+    public void addToBatch(final KeyValue<byte[], byte[]> record, final WriteBatch batch) throws RocksDBException {
+        physicalStore.addToBatch(
+            new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(record.key),
+                record.value),
+            batch);
+    }
+
+    @Override
+    public void write(final WriteBatch batch) throws RocksDBException {
+        // no key transformations here since they should've already been done as part
+        // of adding to the write batch
+        physicalStore.write(batch);
+    }
+
+    /**
+     * Manages translation between raw key and the key to be stored into the physical store.
+     * The key for the physical store is the raw key prepended with a fixed-length prefix.
+     */
+    private static class PrefixKeyFormatter {
+        private final byte[] prefix;
+
+        PrefixKeyFormatter(final Bytes prefix) {
+            this.prefix = prefix.get();
+        }
+
+        Bytes forPhysicalStore(final Bytes key) {
+            return key == null ? null : Bytes.wrap(forPhysicalStore(key.get()));
+        }
+
+        byte[] forPhysicalStore(final byte[] key) {
+            final byte[] keyWithPrefix = new byte[prefix.length + key.length];
+            System.arraycopy(prefix, 0, keyWithPrefix, 0, prefix.length);
+            System.arraycopy(key, 0, keyWithPrefix, prefix.length, key.length);
+            return keyWithPrefix;
+        }
+
+        Bytes fromPhysicalStore(final Bytes keyWithPrefix) {
+            return Bytes.wrap(fromPhysicalStore(keyWithPrefix.get()));
+        }
+
+        private byte[] fromPhysicalStore(final byte[] keyWithPrefix) {
+            final int rawKeyLength = keyWithPrefix.length - prefix.length;
+            final byte[] rawKey = new byte[rawKeyLength];
+            System.arraycopy(keyWithPrefix, prefix.length, rawKey, 0, rawKeyLength);
+            return rawKey;
+        }
+
+        Bytes getPrefix() {
+            return Bytes.wrap(prefix);
+        }
+    }
+
+    /**
+     * Converts a {@link KeyValueIterator} which returns keys with prefixes to one which
+     * returns un-prefixed keys.
+     */
+    private static class StrippedPrefixKeyValueIteratorAdapter implements KeyValueIterator<Bytes, byte[]> {
+
+        private final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes;
+        private final Function<Bytes, Bytes> keyStripper;
+
+        StrippedPrefixKeyValueIteratorAdapter(final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes,
+                                              final Function<Bytes, Bytes> keyStripper) {
+            this.iteratorWithKeyPrefixes = iteratorWithKeyPrefixes;
+            this.keyStripper = keyStripper;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iteratorWithKeyPrefixes.hasNext();
+        }
+
+        @Override
+        public KeyValue<Bytes, byte[]> next() {
+            final KeyValue<Bytes, byte[]> next = iteratorWithKeyPrefixes.next();
+            return new KeyValue<>(keyStripper.apply(next.key), next.value);
+        }
+
+        @Override
+        public Bytes peekNextKey() {
+            return keyStripper.apply(iteratorWithKeyPrefixes.peekNextKey());
+        }
+
+        @Override
+        public void remove() {
+            iteratorWithKeyPrefixes.remove();
+        }
+
+        @Override
+        public void close() {
+            iteratorWithKeyPrefixes.close();
+        }
+    }
+
+    private static Bytes serializeLongToBytes(final long l) {
+        return Bytes.wrap(ByteBuffer.allocate(Long.BYTES).putLong(l).array());

Review Comment:
   Nit: If we pass this into `PrefixKeyFormatter` and unwrap it there, it seems there is no need to wrap it to begin with?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
+    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();

Review Comment:
   If the store was never open, it seems it's still safe to call `closeOpenIterators` and it should just be an empty list? -- Could we inline the code into `close()` directly?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
+    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();
+    }
+
+    private void closeOpenIterators() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        if (iterators.size() != 0) {
+            log.warn("Closing {} open iterators for store {}", iterators.size(), name);
+            for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+                iterator.close();
+            }
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public synchronized byte[] get(final Bytes key) {
+        return physicalStore.get(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.range(
+            prefixKeyFormatter.forPhysicalStore(from),
+            prefixKeyFormatter.forPhysicalStore(to),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> all() {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.prefixScan(
+            prefixKeyFormatter.getPrefix(),
+            new BytesSerializer(),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        throw new UnsupportedOperationException("cannot estimate num entries for logical segment");
+    }
+
+    @Override
+    public void addToBatch(final KeyValue<byte[], byte[]> record, final WriteBatch batch) throws RocksDBException {
+        physicalStore.addToBatch(
+            new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(record.key),
+                record.value),
+            batch);
+    }
+
+    @Override
+    public void write(final WriteBatch batch) throws RocksDBException {
+        // no key transformations here since they should've already been done as part
+        // of adding to the write batch
+        physicalStore.write(batch);
+    }
+
+    /**
+     * Manages translation between raw key and the key to be stored into the physical store.
+     * The key for the physical store is the raw key prepended with a fixed-length prefix.
+     */
+    private static class PrefixKeyFormatter {
+        private final byte[] prefix;
+
+        PrefixKeyFormatter(final Bytes prefix) {
+            this.prefix = prefix.get();
+        }
+
+        Bytes forPhysicalStore(final Bytes key) {
+            return key == null ? null : Bytes.wrap(forPhysicalStore(key.get()));
+        }
+
+        byte[] forPhysicalStore(final byte[] key) {
+            final byte[] keyWithPrefix = new byte[prefix.length + key.length];
+            System.arraycopy(prefix, 0, keyWithPrefix, 0, prefix.length);
+            System.arraycopy(key, 0, keyWithPrefix, prefix.length, key.length);
+            return keyWithPrefix;
+        }
+
+        Bytes fromPhysicalStore(final Bytes keyWithPrefix) {
+            return Bytes.wrap(fromPhysicalStore(keyWithPrefix.get()));
+        }
+
+        private byte[] fromPhysicalStore(final byte[] keyWithPrefix) {
+            final int rawKeyLength = keyWithPrefix.length - prefix.length;
+            final byte[] rawKey = new byte[rawKeyLength];
+            System.arraycopy(keyWithPrefix, prefix.length, rawKey, 0, rawKeyLength);
+            return rawKey;
+        }
+
+        Bytes getPrefix() {
+            return Bytes.wrap(prefix);

Review Comment:
   How large is the overhead to call `wrap()` (besides that it create a new object, what does it do?)
   
   We pass in `Bytes prefix` in the constructor and seem if we keep a reference, we could just return it (without the need to unwrap in the constructor and re-wrap here?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
+    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();
+    }
+
+    private void closeOpenIterators() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        if (iterators.size() != 0) {
+            log.warn("Closing {} open iterators for store {}", iterators.size(), name);
+            for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+                iterator.close();
+            }
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public synchronized byte[] get(final Bytes key) {
+        return physicalStore.get(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.range(
+            prefixKeyFormatter.forPhysicalStore(from),
+            prefixKeyFormatter.forPhysicalStore(to),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> all() {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.prefixScan(
+            prefixKeyFormatter.getPrefix(),
+            new BytesSerializer(),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        throw new UnsupportedOperationException("cannot estimate num entries for logical segment");
+    }
+
+    @Override
+    public void addToBatch(final KeyValue<byte[], byte[]> record, final WriteBatch batch) throws RocksDBException {
+        physicalStore.addToBatch(
+            new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(record.key),
+                record.value),
+            batch);
+    }
+
+    @Override
+    public void write(final WriteBatch batch) throws RocksDBException {
+        // no key transformations here since they should've already been done as part
+        // of adding to the write batch
+        physicalStore.write(batch);
+    }
+
+    /**
+     * Manages translation between raw key and the key to be stored into the physical store.
+     * The key for the physical store is the raw key prepended with a fixed-length prefix.
+     */
+    private static class PrefixKeyFormatter {
+        private final byte[] prefix;
+
+        PrefixKeyFormatter(final Bytes prefix) {
+            this.prefix = prefix.get();
+        }
+
+        Bytes forPhysicalStore(final Bytes key) {

Review Comment:
   I find `forPhysicalStore` and `fromPhysicalStore` not very intuitive --- maybe we could go with `addPrefix` and `removePrefix` ?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
+    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();
+    }
+
+    private void closeOpenIterators() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        if (iterators.size() != 0) {
+            log.warn("Closing {} open iterators for store {}", iterators.size(), name);
+            for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+                iterator.close();
+            }
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public synchronized byte[] get(final Bytes key) {
+        return physicalStore.get(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.range(
+            prefixKeyFormatter.forPhysicalStore(from),
+            prefixKeyFormatter.forPhysicalStore(to),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> all() {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.prefixScan(
+            prefixKeyFormatter.getPrefix(),
+            new BytesSerializer(),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        throw new UnsupportedOperationException("cannot estimate num entries for logical segment");
+    }
+
+    @Override
+    public void addToBatch(final KeyValue<byte[], byte[]> record, final WriteBatch batch) throws RocksDBException {
+        physicalStore.addToBatch(
+            new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(record.key),
+                record.value),
+            batch);
+    }
+
+    @Override
+    public void write(final WriteBatch batch) throws RocksDBException {
+        // no key transformations here since they should've already been done as part
+        // of adding to the write batch
+        physicalStore.write(batch);
+    }
+
+    /**
+     * Manages translation between raw key and the key to be stored into the physical store.
+     * The key for the physical store is the raw key prepended with a fixed-length prefix.
+     */
+    private static class PrefixKeyFormatter {
+        private final byte[] prefix;
+
+        PrefixKeyFormatter(final Bytes prefix) {
+            this.prefix = prefix.get();
+        }
+
+        Bytes forPhysicalStore(final Bytes key) {
+            return key == null ? null : Bytes.wrap(forPhysicalStore(key.get()));
+        }
+
+        byte[] forPhysicalStore(final byte[] key) {
+            final byte[] keyWithPrefix = new byte[prefix.length + key.length];
+            System.arraycopy(prefix, 0, keyWithPrefix, 0, prefix.length);
+            System.arraycopy(key, 0, keyWithPrefix, prefix.length, key.length);
+            return keyWithPrefix;
+        }
+
+        Bytes fromPhysicalStore(final Bytes keyWithPrefix) {
+            return Bytes.wrap(fromPhysicalStore(keyWithPrefix.get()));
+        }
+
+        private byte[] fromPhysicalStore(final byte[] keyWithPrefix) {
+            final int rawKeyLength = keyWithPrefix.length - prefix.length;
+            final byte[] rawKey = new byte[rawKeyLength];
+            System.arraycopy(keyWithPrefix, prefix.length, rawKey, 0, rawKeyLength);
+            return rawKey;
+        }
+
+        Bytes getPrefix() {
+            return Bytes.wrap(prefix);
+        }
+    }
+
+    /**
+     * Converts a {@link KeyValueIterator} which returns keys with prefixes to one which
+     * returns un-prefixed keys.
+     */
+    private static class StrippedPrefixKeyValueIteratorAdapter implements KeyValueIterator<Bytes, byte[]> {
+
+        private final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes;
+        private final Function<Bytes, Bytes> keyStripper;

Review Comment:
   nit: maybe better `prefixRemover` (we don't strip the key, but the prefix and the key is the result after stripping?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on pull request #13143: KAFKA-14491: [3/N] Add logical key value segments

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on PR #13143:
URL: https://github.com/apache/kafka/pull/13143#issuecomment-1404747240

   Seems some of you newly added tests fail. Can you have a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13143: KAFKA-14491: [3/N] Add logical key value segments

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13143:
URL: https://github.com/apache/kafka/pull/13143#discussion_r1093764504


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentTest.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentTest {
+
+    private static final String STORE_NAME = "physical-rocks";
+    private static final String METRICS_SCOPE = "metrics-scope";
+    private static final String DB_FILE_DIR = "rocksdb";
+    private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
+    private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();
+
+    private RocksDBStore physicalStore;
+
+    private LogicalKeyValueSegment segment1;
+    private LogicalKeyValueSegment segment2;
+
+    @Before
+    public void setUp() {
+        physicalStore = new RocksDBStore(STORE_NAME, DB_FILE_DIR, new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME), false);
+        physicalStore.init((StateStoreContext) new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.String(),
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig())
+        ), physicalStore);
+
+        segment1 = new LogicalKeyValueSegment(1, "segment-1", physicalStore);
+        segment2 = new LogicalKeyValueSegment(2, "segment-2", physicalStore);
+    }
+
+    @After
+    public void tearDown() {
+        segment1.close();
+        segment2.close();
+        physicalStore.close();
+    }
+
+    @Test
+    public void shouldPut() {
+        final KeyValue<String, String> kv0 = new KeyValue<>("1", "a");
+        final KeyValue<String, String> kv1 = new KeyValue<>("2", "b");
+
+        segment1.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8));

Review Comment:
   Why use `getBytes` here and the `StringSerializer` below?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentTest.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentTest {
+
+    private static final String STORE_NAME = "physical-rocks";
+    private static final String METRICS_SCOPE = "metrics-scope";
+    private static final String DB_FILE_DIR = "rocksdb";
+    private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
+    private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();
+
+    private RocksDBStore physicalStore;
+
+    private LogicalKeyValueSegment segment1;
+    private LogicalKeyValueSegment segment2;
+
+    @Before
+    public void setUp() {
+        physicalStore = new RocksDBStore(STORE_NAME, DB_FILE_DIR, new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME), false);
+        physicalStore.init((StateStoreContext) new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.String(),
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig())
+        ), physicalStore);
+
+        segment1 = new LogicalKeyValueSegment(1, "segment-1", physicalStore);
+        segment2 = new LogicalKeyValueSegment(2, "segment-2", physicalStore);
+    }
+
+    @After
+    public void tearDown() {
+        segment1.close();
+        segment2.close();
+        physicalStore.close();
+    }
+
+    @Test
+    public void shouldPut() {
+        final KeyValue<String, String> kv0 = new KeyValue<>("1", "a");
+        final KeyValue<String, String> kv1 = new KeyValue<>("2", "b");
+
+        segment1.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8));
+        segment1.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8));
+        segment2.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8));
+        segment2.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8));
+
+        assertEquals("a", getAndDeserialize(segment1, "1"));

Review Comment:
   Should we also `get` on the physical store to see if the logic works as expected? (Also for other tests)



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentsTest {
+
+    private static final long SEGMENT_INTERVAL = 100L;
+    private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
+    private static final String STORE_NAME = "logical-segments";
+    private static final String METRICS_SCOPE = "metrics-scope";
+    private static final String DB_FILE_DIR = "rocksdb";
+
+    private InternalMockProcessorContext context;
+
+    private LogicalKeyValueSegments segments;
+
+    @Before
+    public void setUp() {
+        context = new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.Long(),
+            new MockRecordCollector(),
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
+        );
+        segments = new LogicalKeyValueSegments(
+            STORE_NAME,
+            DB_FILE_DIR,
+            RETENTION_PERIOD,
+            SEGMENT_INTERVAL,
+            new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME)
+        );
+        segments.openExisting(context, -1L);
+    }
+
+    @After
+    public void tearDown() {
+        segments.close();
+    }
+
+    @Test
+    public void shouldGetSegmentIdsFromTimestamp() {
+        assertEquals(0, segments.segmentId(0));
+        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
+        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
+        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
+    }
+
+    @Test
+    public void shouldCreateSegments() {
+        final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final LogicalKeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(2, context, -1L);
+
+        final File rocksdbDir = new File(new File(context.stateDir(), DB_FILE_DIR), STORE_NAME);
+        assertTrue(rocksdbDir.isDirectory());
+
+        assertTrue(segment1.isOpen());
+        assertTrue(segment2.isOpen());
+        assertTrue(segment3.isOpen());
+    }
+
+    @Test
+    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
+        final long streamTime = updateStreamTimeAndCreateSegment(7);
+        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
+    }
+
+    @Test
+    public void shouldCleanupSegmentsThatHaveExpired() {
+        final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, 0);
+        final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(0, context, SEGMENT_INTERVAL * 2L);
+        final LogicalKeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(3, context, SEGMENT_INTERVAL * 3L);
+        final LogicalKeyValueSegment segment4 = segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
+
+        final List<LogicalKeyValueSegment> allSegments = segments.allSegments(true);
+        assertEquals(2, allSegments.size());
+        assertEquals(segment3, allSegments.get(0));
+        assertEquals(segment4, allSegments.get(1));
+    }
+
+    @Test
+    public void shouldGetSegmentForTimestamp() {
+        final LogicalKeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        segments.getOrCreateSegmentIfLive(1, context, -1L);

Review Comment:
   Why are we "re-creating" the segment? If we want to test "forTimestamp" should we test upper and lower bound of the segment?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentTest.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentTest {
+
+    private static final String STORE_NAME = "physical-rocks";
+    private static final String METRICS_SCOPE = "metrics-scope";
+    private static final String DB_FILE_DIR = "rocksdb";
+    private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
+    private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();
+
+    private RocksDBStore physicalStore;
+
+    private LogicalKeyValueSegment segment1;
+    private LogicalKeyValueSegment segment2;
+
+    @Before
+    public void setUp() {
+        physicalStore = new RocksDBStore(STORE_NAME, DB_FILE_DIR, new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME), false);
+        physicalStore.init((StateStoreContext) new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.String(),
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig())
+        ), physicalStore);
+
+        segment1 = new LogicalKeyValueSegment(1, "segment-1", physicalStore);
+        segment2 = new LogicalKeyValueSegment(2, "segment-2", physicalStore);
+    }
+
+    @After
+    public void tearDown() {
+        segment1.close();
+        segment2.close();
+        physicalStore.close();
+    }
+
+    @Test
+    public void shouldPut() {
+        final KeyValue<String, String> kv0 = new KeyValue<>("1", "a");
+        final KeyValue<String, String> kv1 = new KeyValue<>("2", "b");
+
+        segment1.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8));
+        segment1.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8));
+        segment2.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8));
+        segment2.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8));
+
+        assertEquals("a", getAndDeserialize(segment1, "1"));
+        assertEquals("b", getAndDeserialize(segment1, "2"));
+        assertEquals("a", getAndDeserialize(segment2, "1"));
+        assertEquals("b", getAndDeserialize(segment2, "2"));
+    }
+
+    @Test
+    public void shouldPutAll() {
+        final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
+        entries.add(new KeyValue<>(
+            new Bytes(STRING_SERIALIZER.serialize(null, "1")),
+            STRING_SERIALIZER.serialize(null, "a")));
+        entries.add(new KeyValue<>(
+            new Bytes(STRING_SERIALIZER.serialize(null, "2")),
+            STRING_SERIALIZER.serialize(null, "b")));
+        entries.add(new KeyValue<>(
+            new Bytes(STRING_SERIALIZER.serialize(null, "3")),
+            STRING_SERIALIZER.serialize(null, "c")));
+
+        segment1.putAll(entries);
+        segment2.putAll(entries);
+
+        assertEquals("a", getAndDeserialize(segment1, "1"));
+        assertEquals("b", getAndDeserialize(segment1, "2"));
+        assertEquals("c", getAndDeserialize(segment1, "3"));
+        assertEquals("a", getAndDeserialize(segment2, "1"));
+        assertEquals("b", getAndDeserialize(segment2, "2"));
+        assertEquals("c", getAndDeserialize(segment2, "3"));
+    }
+
+    @Test
+    public void shouldPutIfAbsent() {
+        final Bytes keyBytes = new Bytes(STRING_SERIALIZER.serialize(null, "one"));
+        final byte[] valueBytes = STRING_SERIALIZER.serialize(null, "A");
+        final byte[] valueBytesUpdate = STRING_SERIALIZER.serialize(null, "B");
+
+        segment1.putIfAbsent(keyBytes, valueBytes);
+        segment1.putIfAbsent(keyBytes, valueBytesUpdate);
+        segment2.putIfAbsent(keyBytes, valueBytesUpdate);
+
+        assertEquals("A", STRING_DESERIALIZER.deserialize(null, segment1.get(keyBytes)));
+        assertEquals("B", STRING_DESERIALIZER.deserialize(null, segment2.get(keyBytes)));
+    }
+
+    @Test
+    public void shouldDelete() {
+        final KeyValue<String, String> kv0 = new KeyValue<>("1", "a");
+        final KeyValue<String, String> kv1 = new KeyValue<>("2", "b");
+
+        segment1.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8));
+        segment1.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8));
+        segment2.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8));
+        segment2.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8));
+        segment1.delete(new Bytes(kv0.key.getBytes(UTF_8)));
+
+        assertNull(segment1.get(new Bytes(kv0.key.getBytes(UTF_8))));
+        assertEquals("b", getAndDeserialize(segment1, "2"));
+        assertEquals("a", getAndDeserialize(segment2, "1"));
+        assertEquals("b", getAndDeserialize(segment2, "2"));
+    }
+
+    @Test
+    public void shouldReturnValuesOnRange() {
+        final KeyValue<String, String> kv0 = new KeyValue<>("0", "zero");
+        final KeyValue<String, String> kv1 = new KeyValue<>("1", "one");
+        final KeyValue<String, String> kv2 = new KeyValue<>("2", "two");
+        final KeyValue<String, String> kvOther = new KeyValue<>("1", "other");
+
+        segment1.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8));
+        segment1.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8));
+        segment1.put(new Bytes(kv2.key.getBytes(UTF_8)), kv2.value.getBytes(UTF_8));
+        segment2.put(new Bytes(kvOther.key.getBytes(UTF_8)), kvOther.value.getBytes(UTF_8));
+
+        final LinkedList<KeyValue<String, String>> expectedContents = new LinkedList<>();
+        expectedContents.add(kv0);
+        expectedContents.add(kv1);
+
+        try (final KeyValueIterator<Bytes, byte[]> iterator = segment1.range(null, new Bytes(STRING_SERIALIZER.serialize(null, "1")))) {

Review Comment:
   Should we test different ranges? All lower and upper bound null/not-null combination?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentsTest {
+
+    private static final long SEGMENT_INTERVAL = 100L;
+    private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
+    private static final String STORE_NAME = "logical-segments";
+    private static final String METRICS_SCOPE = "metrics-scope";
+    private static final String DB_FILE_DIR = "rocksdb";
+
+    private InternalMockProcessorContext context;
+
+    private LogicalKeyValueSegments segments;
+
+    @Before
+    public void setUp() {
+        context = new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.Long(),
+            new MockRecordCollector(),
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
+        );
+        segments = new LogicalKeyValueSegments(
+            STORE_NAME,
+            DB_FILE_DIR,
+            RETENTION_PERIOD,
+            SEGMENT_INTERVAL,
+            new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME)
+        );
+        segments.openExisting(context, -1L);
+    }
+
+    @After
+    public void tearDown() {
+        segments.close();
+    }
+
+    @Test
+    public void shouldGetSegmentIdsFromTimestamp() {
+        assertEquals(0, segments.segmentId(0));
+        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
+        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
+        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
+    }
+
+    @Test
+    public void shouldCreateSegments() {
+        final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final LogicalKeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(2, context, -1L);
+
+        final File rocksdbDir = new File(new File(context.stateDir(), DB_FILE_DIR), STORE_NAME);
+        assertTrue(rocksdbDir.isDirectory());
+
+        assertTrue(segment1.isOpen());
+        assertTrue(segment2.isOpen());
+        assertTrue(segment3.isOpen());
+    }
+
+    @Test
+    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
+        final long streamTime = updateStreamTimeAndCreateSegment(7);
+        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
+    }
+
+    @Test
+    public void shouldCleanupSegmentsThatHaveExpired() {
+        final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, 0);
+        final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(0, context, SEGMENT_INTERVAL * 2L);
+        final LogicalKeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(3, context, SEGMENT_INTERVAL * 3L);
+        final LogicalKeyValueSegment segment4 = segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
+
+        final List<LogicalKeyValueSegment> allSegments = segments.allSegments(true);
+        assertEquals(2, allSegments.size());
+        assertEquals(segment3, allSegments.get(0));
+        assertEquals(segment4, allSegments.get(1));
+    }
+
+    @Test
+    public void shouldGetSegmentForTimestamp() {

Review Comment:
   Ab above



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentsTest {
+
+    private static final long SEGMENT_INTERVAL = 100L;
+    private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
+    private static final String STORE_NAME = "logical-segments";
+    private static final String METRICS_SCOPE = "metrics-scope";
+    private static final String DB_FILE_DIR = "rocksdb";
+
+    private InternalMockProcessorContext context;
+
+    private LogicalKeyValueSegments segments;
+
+    @Before
+    public void setUp() {
+        context = new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.Long(),
+            new MockRecordCollector(),
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
+        );
+        segments = new LogicalKeyValueSegments(
+            STORE_NAME,
+            DB_FILE_DIR,
+            RETENTION_PERIOD,
+            SEGMENT_INTERVAL,
+            new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME)
+        );
+        segments.openExisting(context, -1L);
+    }
+
+    @After
+    public void tearDown() {
+        segments.close();
+    }
+
+    @Test
+    public void shouldGetSegmentIdsFromTimestamp() {
+        assertEquals(0, segments.segmentId(0));
+        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
+        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
+        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
+    }
+
+    @Test
+    public void shouldCreateSegments() {
+        final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final LogicalKeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(2, context, -1L);
+
+        final File rocksdbDir = new File(new File(context.stateDir(), DB_FILE_DIR), STORE_NAME);
+        assertTrue(rocksdbDir.isDirectory());
+
+        assertTrue(segment1.isOpen());
+        assertTrue(segment2.isOpen());
+        assertTrue(segment3.isOpen());
+    }
+
+    @Test
+    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
+        final long streamTime = updateStreamTimeAndCreateSegment(7);
+        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
+    }
+
+    @Test
+    public void shouldCleanupSegmentsThatHaveExpired() {
+        final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, 0);
+        final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(0, context, SEGMENT_INTERVAL * 2L);
+        final LogicalKeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(3, context, SEGMENT_INTERVAL * 3L);
+        final LogicalKeyValueSegment segment4 = segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
+
+        final List<LogicalKeyValueSegment> allSegments = segments.allSegments(true);
+        assertEquals(2, allSegments.size());
+        assertEquals(segment3, allSegments.get(0));
+        assertEquals(segment4, allSegments.get(1));
+    }
+
+    @Test
+    public void shouldGetSegmentForTimestamp() {
+        final LogicalKeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        segments.getOrCreateSegmentIfLive(1, context, -1L);
+        assertEquals(segment, segments.getSegmentForTimestamp(0L));
+    }
+
+    @Test
+    public void shouldGetSegmentsWithinTimeRange() {

Review Comment:
   ab above



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentsTest {
+
+    private static final long SEGMENT_INTERVAL = 100L;
+    private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
+    private static final String STORE_NAME = "logical-segments";
+    private static final String METRICS_SCOPE = "metrics-scope";
+    private static final String DB_FILE_DIR = "rocksdb";
+
+    private InternalMockProcessorContext context;
+
+    private LogicalKeyValueSegments segments;
+
+    @Before
+    public void setUp() {
+        context = new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.Long(),
+            new MockRecordCollector(),
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
+        );
+        segments = new LogicalKeyValueSegments(
+            STORE_NAME,
+            DB_FILE_DIR,
+            RETENTION_PERIOD,
+            SEGMENT_INTERVAL,
+            new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME)
+        );
+        segments.openExisting(context, -1L);
+    }
+
+    @After
+    public void tearDown() {
+        segments.close();
+    }
+
+    @Test
+    public void shouldGetSegmentIdsFromTimestamp() {
+        assertEquals(0, segments.segmentId(0));
+        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
+        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
+        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
+    }
+
+    @Test
+    public void shouldCreateSegments() {
+        final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final LogicalKeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(2, context, -1L);
+
+        final File rocksdbDir = new File(new File(context.stateDir(), DB_FILE_DIR), STORE_NAME);
+        assertTrue(rocksdbDir.isDirectory());
+
+        assertTrue(segment1.isOpen());
+        assertTrue(segment2.isOpen());
+        assertTrue(segment3.isOpen());
+    }
+
+    @Test
+    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
+        final long streamTime = updateStreamTimeAndCreateSegment(7);
+        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
+    }
+
+    @Test
+    public void shouldCleanupSegmentsThatHaveExpired() {

Review Comment:
   Sound like we test `AbstractSegments` logic here -- do we need to do this?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentsTest {
+
+    private static final long SEGMENT_INTERVAL = 100L;
+    private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
+    private static final String STORE_NAME = "logical-segments";
+    private static final String METRICS_SCOPE = "metrics-scope";
+    private static final String DB_FILE_DIR = "rocksdb";
+
+    private InternalMockProcessorContext context;
+
+    private LogicalKeyValueSegments segments;
+
+    @Before
+    public void setUp() {
+        context = new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.Long(),
+            new MockRecordCollector(),
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
+        );
+        segments = new LogicalKeyValueSegments(
+            STORE_NAME,
+            DB_FILE_DIR,
+            RETENTION_PERIOD,
+            SEGMENT_INTERVAL,
+            new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME)
+        );
+        segments.openExisting(context, -1L);
+    }
+
+    @After
+    public void tearDown() {
+        segments.close();
+    }
+
+    @Test
+    public void shouldGetSegmentIdsFromTimestamp() {
+        assertEquals(0, segments.segmentId(0));
+        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
+        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
+        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
+    }
+
+    @Test
+    public void shouldCreateSegments() {
+        final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final LogicalKeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(2, context, -1L);
+
+        final File rocksdbDir = new File(new File(context.stateDir(), DB_FILE_DIR), STORE_NAME);
+        assertTrue(rocksdbDir.isDirectory());
+
+        assertTrue(segment1.isOpen());
+        assertTrue(segment2.isOpen());
+        assertTrue(segment3.isOpen());
+    }
+
+    @Test
+    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
+        final long streamTime = updateStreamTimeAndCreateSegment(7);
+        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
+    }
+
+    @Test
+    public void shouldCleanupSegmentsThatHaveExpired() {
+        final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, 0);
+        final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(0, context, SEGMENT_INTERVAL * 2L);
+        final LogicalKeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(3, context, SEGMENT_INTERVAL * 3L);
+        final LogicalKeyValueSegment segment4 = segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
+
+        final List<LogicalKeyValueSegment> allSegments = segments.allSegments(true);
+        assertEquals(2, allSegments.size());
+        assertEquals(segment3, allSegments.get(0));
+        assertEquals(segment4, allSegments.get(1));
+    }
+
+    @Test
+    public void shouldGetSegmentForTimestamp() {
+        final LogicalKeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, context, -1L);

Review Comment:
   nit: `-1L` does not sound like a valid `streamTime` to be passed in?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogicalKeyValueSegmentsTest {
+
+    private static final long SEGMENT_INTERVAL = 100L;
+    private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
+    private static final String STORE_NAME = "logical-segments";
+    private static final String METRICS_SCOPE = "metrics-scope";
+    private static final String DB_FILE_DIR = "rocksdb";
+
+    private InternalMockProcessorContext context;
+
+    private LogicalKeyValueSegments segments;
+
+    @Before
+    public void setUp() {
+        context = new InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.Long(),
+            new MockRecordCollector(),
+            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
+        );
+        segments = new LogicalKeyValueSegments(
+            STORE_NAME,
+            DB_FILE_DIR,
+            RETENTION_PERIOD,
+            SEGMENT_INTERVAL,
+            new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME)
+        );
+        segments.openExisting(context, -1L);
+    }
+
+    @After
+    public void tearDown() {
+        segments.close();
+    }
+
+    @Test
+    public void shouldGetSegmentIdsFromTimestamp() {
+        assertEquals(0, segments.segmentId(0));
+        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
+        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
+        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
+    }
+
+    @Test
+    public void shouldCreateSegments() {
+        final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L);

Review Comment:
   Should we call `getOrCreateSegment` instead? Otherwise we mainly test the logic of `AbstractSegments` ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13143: KAFKA-14491: [3/N] Add logical key value segments

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13143:
URL: https://github.com/apache/kafka/pull/13143#discussion_r1085990051


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
+    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();

Review Comment:
   Sure, I don't feel strongly so I made the change. Besides guarding against closing a segment which was never opened, the usage of `open` also guarded against closing the same segment twice. I've inlined `closeOpenIterators()` and accounted for this by clearing `openIterators` after it's copied.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
+    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {

Review Comment:
   Ack, see above. (Addressed in the latest commit.)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
+    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();
+    }
+
+    private void closeOpenIterators() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        if (iterators.size() != 0) {
+            log.warn("Closing {} open iterators for store {}", iterators.size(), name);
+            for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+                iterator.close();
+            }
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;

Review Comment:
   Ack, addressed in the latest commit.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
+    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();
+    }
+
+    private void closeOpenIterators() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        if (iterators.size() != 0) {
+            log.warn("Closing {} open iterators for store {}", iterators.size(), name);
+            for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+                iterator.close();
+            }
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public synchronized byte[] get(final Bytes key) {
+        return physicalStore.get(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.range(
+            prefixKeyFormatter.forPhysicalStore(from),
+            prefixKeyFormatter.forPhysicalStore(to),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> all() {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.prefixScan(
+            prefixKeyFormatter.getPrefix(),
+            new BytesSerializer(),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        throw new UnsupportedOperationException("cannot estimate num entries for logical segment");
+    }
+
+    @Override
+    public void addToBatch(final KeyValue<byte[], byte[]> record, final WriteBatch batch) throws RocksDBException {
+        physicalStore.addToBatch(
+            new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(record.key),
+                record.value),
+            batch);
+    }
+
+    @Override
+    public void write(final WriteBatch batch) throws RocksDBException {
+        // no key transformations here since they should've already been done as part
+        // of adding to the write batch
+        physicalStore.write(batch);
+    }
+
+    /**
+     * Manages translation between raw key and the key to be stored into the physical store.
+     * The key for the physical store is the raw key prepended with a fixed-length prefix.
+     */
+    private static class PrefixKeyFormatter {
+        private final byte[] prefix;
+
+        PrefixKeyFormatter(final Bytes prefix) {
+            this.prefix = prefix.get();
+        }
+
+        Bytes forPhysicalStore(final Bytes key) {
+            return key == null ? null : Bytes.wrap(forPhysicalStore(key.get()));
+        }
+
+        byte[] forPhysicalStore(final byte[] key) {
+            final byte[] keyWithPrefix = new byte[prefix.length + key.length];
+            System.arraycopy(prefix, 0, keyWithPrefix, 0, prefix.length);
+            System.arraycopy(key, 0, keyWithPrefix, prefix.length, key.length);
+            return keyWithPrefix;
+        }
+
+        Bytes fromPhysicalStore(final Bytes keyWithPrefix) {
+            return Bytes.wrap(fromPhysicalStore(keyWithPrefix.get()));
+        }
+
+        private byte[] fromPhysicalStore(final byte[] keyWithPrefix) {
+            final int rawKeyLength = keyWithPrefix.length - prefix.length;
+            final byte[] rawKey = new byte[rawKeyLength];
+            System.arraycopy(keyWithPrefix, prefix.length, rawKey, 0, rawKeyLength);
+            return rawKey;
+        }
+
+        Bytes getPrefix() {
+            return Bytes.wrap(prefix);
+        }
+    }
+
+    /**
+     * Converts a {@link KeyValueIterator} which returns keys with prefixes to one which
+     * returns un-prefixed keys.
+     */
+    private static class StrippedPrefixKeyValueIteratorAdapter implements KeyValueIterator<Bytes, byte[]> {
+
+        private final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes;
+        private final Function<Bytes, Bytes> keyStripper;
+
+        StrippedPrefixKeyValueIteratorAdapter(final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes,
+                                              final Function<Bytes, Bytes> keyStripper) {
+            this.iteratorWithKeyPrefixes = iteratorWithKeyPrefixes;
+            this.keyStripper = keyStripper;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iteratorWithKeyPrefixes.hasNext();
+        }
+
+        @Override
+        public KeyValue<Bytes, byte[]> next() {
+            final KeyValue<Bytes, byte[]> next = iteratorWithKeyPrefixes.next();
+            return new KeyValue<>(keyStripper.apply(next.key), next.value);
+        }
+
+        @Override
+        public Bytes peekNextKey() {
+            return keyStripper.apply(iteratorWithKeyPrefixes.peekNextKey());
+        }
+
+        @Override
+        public void remove() {
+            iteratorWithKeyPrefixes.remove();
+        }
+
+        @Override
+        public void close() {
+            iteratorWithKeyPrefixes.close();
+        }
+    }
+
+    private static Bytes serializeLongToBytes(final long l) {
+        return Bytes.wrap(ByteBuffer.allocate(Long.BYTES).putLong(l).array());

Review Comment:
   Nice eye. Fixed.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
+    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();
+    }
+
+    private void closeOpenIterators() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        if (iterators.size() != 0) {
+            log.warn("Closing {} open iterators for store {}", iterators.size(), name);
+            for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+                iterator.close();
+            }
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public synchronized byte[] get(final Bytes key) {
+        return physicalStore.get(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.range(
+            prefixKeyFormatter.forPhysicalStore(from),
+            prefixKeyFormatter.forPhysicalStore(to),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> all() {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.prefixScan(
+            prefixKeyFormatter.getPrefix(),
+            new BytesSerializer(),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        throw new UnsupportedOperationException("cannot estimate num entries for logical segment");
+    }
+
+    @Override
+    public void addToBatch(final KeyValue<byte[], byte[]> record, final WriteBatch batch) throws RocksDBException {
+        physicalStore.addToBatch(
+            new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(record.key),
+                record.value),
+            batch);
+    }
+
+    @Override
+    public void write(final WriteBatch batch) throws RocksDBException {
+        // no key transformations here since they should've already been done as part
+        // of adding to the write batch
+        physicalStore.write(batch);
+    }
+
+    /**
+     * Manages translation between raw key and the key to be stored into the physical store.
+     * The key for the physical store is the raw key prepended with a fixed-length prefix.
+     */
+    private static class PrefixKeyFormatter {
+        private final byte[] prefix;
+
+        PrefixKeyFormatter(final Bytes prefix) {
+            this.prefix = prefix.get();
+        }
+
+        Bytes forPhysicalStore(final Bytes key) {
+            return key == null ? null : Bytes.wrap(forPhysicalStore(key.get()));
+        }
+
+        byte[] forPhysicalStore(final byte[] key) {
+            final byte[] keyWithPrefix = new byte[prefix.length + key.length];
+            System.arraycopy(prefix, 0, keyWithPrefix, 0, prefix.length);
+            System.arraycopy(key, 0, keyWithPrefix, prefix.length, key.length);
+            return keyWithPrefix;
+        }
+
+        Bytes fromPhysicalStore(final Bytes keyWithPrefix) {
+            return Bytes.wrap(fromPhysicalStore(keyWithPrefix.get()));
+        }
+
+        private byte[] fromPhysicalStore(final byte[] keyWithPrefix) {
+            final int rawKeyLength = keyWithPrefix.length - prefix.length;
+            final byte[] rawKey = new byte[rawKeyLength];
+            System.arraycopy(keyWithPrefix, prefix.length, rawKey, 0, rawKeyLength);
+            return rawKey;
+        }
+
+        Bytes getPrefix() {
+            return Bytes.wrap(prefix);

Review Comment:
   `wrap()` just creates the new object (after performing a null check) so it's very lightweight. 
   
   It's more convenient to keep `prefix` as `byte[]` than `Bytes` because all the other operations require `byte[]` rather than `Bytes`. If we really wanted we could keep both (one copy as `byte[]` and another as `Bytes`) but that feels like overkill.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
+    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();
+    }
+
+    private void closeOpenIterators() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        if (iterators.size() != 0) {
+            log.warn("Closing {} open iterators for store {}", iterators.size(), name);
+            for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+                iterator.close();
+            }
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public synchronized byte[] get(final Bytes key) {
+        return physicalStore.get(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.range(
+            prefixKeyFormatter.forPhysicalStore(from),
+            prefixKeyFormatter.forPhysicalStore(to),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> all() {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.prefixScan(
+            prefixKeyFormatter.getPrefix(),
+            new BytesSerializer(),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        throw new UnsupportedOperationException("cannot estimate num entries for logical segment");
+    }
+
+    @Override
+    public void addToBatch(final KeyValue<byte[], byte[]> record, final WriteBatch batch) throws RocksDBException {
+        physicalStore.addToBatch(
+            new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(record.key),
+                record.value),
+            batch);
+    }
+
+    @Override
+    public void write(final WriteBatch batch) throws RocksDBException {
+        // no key transformations here since they should've already been done as part
+        // of adding to the write batch
+        physicalStore.write(batch);
+    }
+
+    /**
+     * Manages translation between raw key and the key to be stored into the physical store.
+     * The key for the physical store is the raw key prepended with a fixed-length prefix.
+     */
+    private static class PrefixKeyFormatter {
+        private final byte[] prefix;
+
+        PrefixKeyFormatter(final Bytes prefix) {
+            this.prefix = prefix.get();
+        }
+
+        Bytes forPhysicalStore(final Bytes key) {

Review Comment:
   Yup, updated. Originally I was going to have this class be more general (i.e., not necessarily prefix-based) so I left the names generic, but I think that's overkill for now since we only have the one use case. Agreed that switching to the simpler names improves readability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org