You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "mjsax (via GitHub)" <gi...@apache.org> on 2023/01/24 06:12:11 UTC

[GitHub] [kafka] mjsax commented on a diff in pull request #13143: KAFKA-14491: [3/N] Add logical key value segments

mjsax commented on code in PR #13143:
URL: https://github.com/apache/kafka/pull/13143#discussion_r1084834388


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
+    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {

Review Comment:
   Seems `openDB` does not do much -- do we actually need it (compare my other comments about the `open` flag)?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
+    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();
+    }
+
+    private void closeOpenIterators() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        if (iterators.size() != 0) {
+            log.warn("Closing {} open iterators for store {}", iterators.size(), name);
+            for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+                iterator.close();
+            }
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;

Review Comment:
   Could we just return `true` here?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
+    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();
+    }
+
+    private void closeOpenIterators() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        if (iterators.size() != 0) {
+            log.warn("Closing {} open iterators for store {}", iterators.size(), name);
+            for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+                iterator.close();
+            }
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public synchronized byte[] get(final Bytes key) {
+        return physicalStore.get(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.range(
+            prefixKeyFormatter.forPhysicalStore(from),
+            prefixKeyFormatter.forPhysicalStore(to),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> all() {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.prefixScan(
+            prefixKeyFormatter.getPrefix(),
+            new BytesSerializer(),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        throw new UnsupportedOperationException("cannot estimate num entries for logical segment");
+    }
+
+    @Override
+    public void addToBatch(final KeyValue<byte[], byte[]> record, final WriteBatch batch) throws RocksDBException {
+        physicalStore.addToBatch(
+            new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(record.key),
+                record.value),
+            batch);
+    }
+
+    @Override
+    public void write(final WriteBatch batch) throws RocksDBException {
+        // no key transformations here since they should've already been done as part
+        // of adding to the write batch
+        physicalStore.write(batch);
+    }
+
+    /**
+     * Manages translation between raw key and the key to be stored into the physical store.
+     * The key for the physical store is the raw key prepended with a fixed-length prefix.
+     */
+    private static class PrefixKeyFormatter {
+        private final byte[] prefix;
+
+        PrefixKeyFormatter(final Bytes prefix) {
+            this.prefix = prefix.get();
+        }
+
+        Bytes forPhysicalStore(final Bytes key) {
+            return key == null ? null : Bytes.wrap(forPhysicalStore(key.get()));
+        }
+
+        byte[] forPhysicalStore(final byte[] key) {
+            final byte[] keyWithPrefix = new byte[prefix.length + key.length];
+            System.arraycopy(prefix, 0, keyWithPrefix, 0, prefix.length);
+            System.arraycopy(key, 0, keyWithPrefix, prefix.length, key.length);
+            return keyWithPrefix;
+        }
+
+        Bytes fromPhysicalStore(final Bytes keyWithPrefix) {
+            return Bytes.wrap(fromPhysicalStore(keyWithPrefix.get()));
+        }
+
+        private byte[] fromPhysicalStore(final byte[] keyWithPrefix) {
+            final int rawKeyLength = keyWithPrefix.length - prefix.length;
+            final byte[] rawKey = new byte[rawKeyLength];
+            System.arraycopy(keyWithPrefix, prefix.length, rawKey, 0, rawKeyLength);
+            return rawKey;
+        }
+
+        Bytes getPrefix() {
+            return Bytes.wrap(prefix);
+        }
+    }
+
+    /**
+     * Converts a {@link KeyValueIterator} which returns keys with prefixes to one which
+     * returns un-prefixed keys.
+     */
+    private static class StrippedPrefixKeyValueIteratorAdapter implements KeyValueIterator<Bytes, byte[]> {
+
+        private final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes;
+        private final Function<Bytes, Bytes> keyStripper;
+
+        StrippedPrefixKeyValueIteratorAdapter(final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes,
+                                              final Function<Bytes, Bytes> keyStripper) {
+            this.iteratorWithKeyPrefixes = iteratorWithKeyPrefixes;
+            this.keyStripper = keyStripper;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iteratorWithKeyPrefixes.hasNext();
+        }
+
+        @Override
+        public KeyValue<Bytes, byte[]> next() {
+            final KeyValue<Bytes, byte[]> next = iteratorWithKeyPrefixes.next();
+            return new KeyValue<>(keyStripper.apply(next.key), next.value);
+        }
+
+        @Override
+        public Bytes peekNextKey() {
+            return keyStripper.apply(iteratorWithKeyPrefixes.peekNextKey());
+        }
+
+        @Override
+        public void remove() {
+            iteratorWithKeyPrefixes.remove();
+        }
+
+        @Override
+        public void close() {
+            iteratorWithKeyPrefixes.close();
+        }
+    }
+
+    private static Bytes serializeLongToBytes(final long l) {
+        return Bytes.wrap(ByteBuffer.allocate(Long.BYTES).putLong(l).array());

Review Comment:
   Nit: If we pass this into `PrefixKeyFormatter` and unwrap it there, it seems there is no need to wrap it to begin with?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
+    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();

Review Comment:
   If the store was never open, it seems it's still safe to call `closeOpenIterators` and it should just be an empty list? -- Could we inline the code into `close()` directly?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
+    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();
+    }
+
+    private void closeOpenIterators() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        if (iterators.size() != 0) {
+            log.warn("Closing {} open iterators for store {}", iterators.size(), name);
+            for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+                iterator.close();
+            }
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public synchronized byte[] get(final Bytes key) {
+        return physicalStore.get(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.range(
+            prefixKeyFormatter.forPhysicalStore(from),
+            prefixKeyFormatter.forPhysicalStore(to),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> all() {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.prefixScan(
+            prefixKeyFormatter.getPrefix(),
+            new BytesSerializer(),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        throw new UnsupportedOperationException("cannot estimate num entries for logical segment");
+    }
+
+    @Override
+    public void addToBatch(final KeyValue<byte[], byte[]> record, final WriteBatch batch) throws RocksDBException {
+        physicalStore.addToBatch(
+            new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(record.key),
+                record.value),
+            batch);
+    }
+
+    @Override
+    public void write(final WriteBatch batch) throws RocksDBException {
+        // no key transformations here since they should've already been done as part
+        // of adding to the write batch
+        physicalStore.write(batch);
+    }
+
+    /**
+     * Manages translation between raw key and the key to be stored into the physical store.
+     * The key for the physical store is the raw key prepended with a fixed-length prefix.
+     */
+    private static class PrefixKeyFormatter {
+        private final byte[] prefix;
+
+        PrefixKeyFormatter(final Bytes prefix) {
+            this.prefix = prefix.get();
+        }
+
+        Bytes forPhysicalStore(final Bytes key) {
+            return key == null ? null : Bytes.wrap(forPhysicalStore(key.get()));
+        }
+
+        byte[] forPhysicalStore(final byte[] key) {
+            final byte[] keyWithPrefix = new byte[prefix.length + key.length];
+            System.arraycopy(prefix, 0, keyWithPrefix, 0, prefix.length);
+            System.arraycopy(key, 0, keyWithPrefix, prefix.length, key.length);
+            return keyWithPrefix;
+        }
+
+        Bytes fromPhysicalStore(final Bytes keyWithPrefix) {
+            return Bytes.wrap(fromPhysicalStore(keyWithPrefix.get()));
+        }
+
+        private byte[] fromPhysicalStore(final byte[] keyWithPrefix) {
+            final int rawKeyLength = keyWithPrefix.length - prefix.length;
+            final byte[] rawKey = new byte[rawKeyLength];
+            System.arraycopy(keyWithPrefix, prefix.length, rawKey, 0, rawKeyLength);
+            return rawKey;
+        }
+
+        Bytes getPrefix() {
+            return Bytes.wrap(prefix);

Review Comment:
   How large is the overhead to call `wrap()` (besides that it create a new object, what does it do?)
   
   We pass in `Bytes prefix` in the constructor and seem if we keep a reference, we could just return it (without the need to unwrap in the constructor and re-wrap here?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
+    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();
+    }
+
+    private void closeOpenIterators() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        if (iterators.size() != 0) {
+            log.warn("Closing {} open iterators for store {}", iterators.size(), name);
+            for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+                iterator.close();
+            }
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public synchronized byte[] get(final Bytes key) {
+        return physicalStore.get(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.range(
+            prefixKeyFormatter.forPhysicalStore(from),
+            prefixKeyFormatter.forPhysicalStore(to),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> all() {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.prefixScan(
+            prefixKeyFormatter.getPrefix(),
+            new BytesSerializer(),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        throw new UnsupportedOperationException("cannot estimate num entries for logical segment");
+    }
+
+    @Override
+    public void addToBatch(final KeyValue<byte[], byte[]> record, final WriteBatch batch) throws RocksDBException {
+        physicalStore.addToBatch(
+            new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(record.key),
+                record.value),
+            batch);
+    }
+
+    @Override
+    public void write(final WriteBatch batch) throws RocksDBException {
+        // no key transformations here since they should've already been done as part
+        // of adding to the write batch
+        physicalStore.write(batch);
+    }
+
+    /**
+     * Manages translation between raw key and the key to be stored into the physical store.
+     * The key for the physical store is the raw key prepended with a fixed-length prefix.
+     */
+    private static class PrefixKeyFormatter {
+        private final byte[] prefix;
+
+        PrefixKeyFormatter(final Bytes prefix) {
+            this.prefix = prefix.get();
+        }
+
+        Bytes forPhysicalStore(final Bytes key) {

Review Comment:
   I find `forPhysicalStore` and `fromPhysicalStore` not very intuitive --- maybe we could go with `addPrefix` and `removePrefix` ?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment {
+    private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+    public final long id;
+    private final String name;
+    private final RocksDBStore physicalStore;
+    private final PrefixKeyFormatter prefixKeyFormatter;
+
+    private volatile boolean open = false;
+    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    LogicalKeyValueSegment(final long id,
+                           final String name,
+                           final RocksDBStore physicalStore) {
+        this.id = id;
+        this.name = name;
+        this.physicalStore = Objects.requireNonNull(physicalStore);
+
+        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id));
+    }
+
+    void openDB() {
+        open = true;
+    }
+
+    @Override
+    public int compareTo(final LogicalKeyValueSegment segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public synchronized void destroy() {
+        final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+        // this is a prefix deletion, because the deleteRange() implementation
+        // calls Bytes.increment() in order to make keyTo inclusive
+        physicalStore.deleteRange(keyPrefix, keyPrefix);
+    }
+
+    @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        physicalStore.deleteRange(
+            prefixKeyFormatter.forPhysicalStore(keyFrom),
+            prefixKeyFormatter.forPhysicalStore(keyTo));
+    }
+
+    @Override
+    public synchronized void put(final Bytes key, final byte[] value) {
+        physicalStore.put(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        return physicalStore.putIfAbsent(
+            prefixKeyFormatter.forPhysicalStore(key),
+            value);
+    }
+
+    @Override
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        physicalStore.putAll(entries.stream()
+            .map(kv -> new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(kv.key),
+                kv.value))
+            .collect(Collectors.toList()));
+    }
+
+    @Override
+    public synchronized byte[] delete(final Bytes key) {
+        return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        throw new UnsupportedOperationException("cannot initialize a logical segment");
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException("nothing to flush for logical segment");
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!open) {
+            return;
+        }
+
+        open = false;
+        closeOpenIterators();
+    }
+
+    private void closeOpenIterators() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        if (iterators.size() != 0) {
+            log.warn("Closing {} open iterators for store {}", iterators.size(), name);
+            for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+                iterator.close();
+            }
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public synchronized byte[] get(final Bytes key) {
+        return physicalStore.get(prefixKeyFormatter.forPhysicalStore(key));
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.range(
+            prefixKeyFormatter.forPhysicalStore(from),
+            prefixKeyFormatter.forPhysicalStore(to),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public synchronized KeyValueIterator<Bytes, byte[]> all() {
+        final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.prefixScan(
+            prefixKeyFormatter.getPrefix(),
+            new BytesSerializer(),
+            openIterators);
+        return new StrippedPrefixKeyValueIteratorAdapter(
+            iteratorWithKeyPrefixes,
+            prefixKeyFormatter::fromPhysicalStore);
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        throw new UnsupportedOperationException("cannot estimate num entries for logical segment");
+    }
+
+    @Override
+    public void addToBatch(final KeyValue<byte[], byte[]> record, final WriteBatch batch) throws RocksDBException {
+        physicalStore.addToBatch(
+            new KeyValue<>(
+                prefixKeyFormatter.forPhysicalStore(record.key),
+                record.value),
+            batch);
+    }
+
+    @Override
+    public void write(final WriteBatch batch) throws RocksDBException {
+        // no key transformations here since they should've already been done as part
+        // of adding to the write batch
+        physicalStore.write(batch);
+    }
+
+    /**
+     * Manages translation between raw key and the key to be stored into the physical store.
+     * The key for the physical store is the raw key prepended with a fixed-length prefix.
+     */
+    private static class PrefixKeyFormatter {
+        private final byte[] prefix;
+
+        PrefixKeyFormatter(final Bytes prefix) {
+            this.prefix = prefix.get();
+        }
+
+        Bytes forPhysicalStore(final Bytes key) {
+            return key == null ? null : Bytes.wrap(forPhysicalStore(key.get()));
+        }
+
+        byte[] forPhysicalStore(final byte[] key) {
+            final byte[] keyWithPrefix = new byte[prefix.length + key.length];
+            System.arraycopy(prefix, 0, keyWithPrefix, 0, prefix.length);
+            System.arraycopy(key, 0, keyWithPrefix, prefix.length, key.length);
+            return keyWithPrefix;
+        }
+
+        Bytes fromPhysicalStore(final Bytes keyWithPrefix) {
+            return Bytes.wrap(fromPhysicalStore(keyWithPrefix.get()));
+        }
+
+        private byte[] fromPhysicalStore(final byte[] keyWithPrefix) {
+            final int rawKeyLength = keyWithPrefix.length - prefix.length;
+            final byte[] rawKey = new byte[rawKeyLength];
+            System.arraycopy(keyWithPrefix, prefix.length, rawKey, 0, rawKeyLength);
+            return rawKey;
+        }
+
+        Bytes getPrefix() {
+            return Bytes.wrap(prefix);
+        }
+    }
+
+    /**
+     * Converts a {@link KeyValueIterator} which returns keys with prefixes to one which
+     * returns un-prefixed keys.
+     */
+    private static class StrippedPrefixKeyValueIteratorAdapter implements KeyValueIterator<Bytes, byte[]> {
+
+        private final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes;
+        private final Function<Bytes, Bytes> keyStripper;

Review Comment:
   nit: maybe better `prefixRemover` (we don't strip the key, but the prefix and the key is the result after stripping?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org