You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vcrfxia (via GitHub)" <gi...@apache.org> on 2023/02/15 02:40:56 UTC

[GitHub] [kafka] vcrfxia opened a new pull request, #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

vcrfxia opened a new pull request, #13252:
URL: https://github.com/apache/kafka/pull/13252

   (This PR is stacked on https://github.com/apache/kafka/pull/13249 and https://github.com/apache/kafka/pull/13250. Only the last commit needs to be reviewed separately.)
   
   This PR introduces the metered store layer for the new versioned key-value store introduced in [KIP-889](https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores). This outermost, metered store layer handles all serialization/deserialization from `VersionedKeyValueStore` to a bytes-representation (`VersionedBytesStore`) so that all inner stores may operate only with bytes types (see changelogging layer in https://github.com/apache/kafka/pull/13251 as an example). 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1116530367


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class MeteredVersionedKeyValueStoreTest {
+
+    private static final String STORE_NAME = "versioned_store";
+    private static final Serde<String> STRING_SERDE = new StringSerde();
+    private static final Serde<ValueAndTimestamp<String>> VALUE_AND_TIMESTAMP_SERDE = new NullableValueAndTimestampSerde<>(STRING_SERDE);
+    private static final String METRICS_SCOPE = "scope";
+    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
+    private static final String APPLICATION_ID = "test-app";
+    private static final TaskId TASK_ID = new TaskId(0, 0, "My-Topology");
+
+    private static final String KEY = "k";
+    private static final String VALUE = "v";
+    private static final long TIMESTAMP = 10L;
+    private static final Bytes RAW_KEY = new Bytes(STRING_SERDE.serializer().serialize(null, KEY));
+    private static final byte[] RAW_VALUE_AND_TIMESTAMP = VALUE_AND_TIMESTAMP_SERDE.serializer()
+        .serialize(null, ValueAndTimestamp.make(VALUE, TIMESTAMP));
+
+    private final VersionedBytesStore inner = mock(VersionedBytesStore.class);
+    private final Metrics metrics = new Metrics();
+    private final Time mockTime = new MockTime();
+    private final String threadId = Thread.currentThread().getName();
+    private InternalProcessorContext context = mock(InternalProcessorContext.class);
+    private Map<String, String> tags;
+
+    private MeteredVersionedKeyValueStore<String, String> store;
+
+    @Before
+    public void setUp() {
+        when(inner.name()).thenReturn(STORE_NAME);
+        when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime));
+        when(context.applicationId()).thenReturn(APPLICATION_ID);
+        when(context.taskId()).thenReturn(TASK_ID);
+
+        metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
+        tags = mkMap(
+            mkEntry("thread-id", threadId),
+            mkEntry("task-id", TASK_ID.toString()),
+            mkEntry(METRICS_SCOPE + "-state-id", STORE_NAME)
+        );
+
+        store = newMeteredStore(inner);
+        store.init((StateStoreContext) context, store);
+    }
+
+    private MeteredVersionedKeyValueStore<String, String> newMeteredStore(final VersionedBytesStore inner) {
+        return new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            STRING_SERDE,
+            VALUE_AND_TIMESTAMP_SERDE
+        );
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        // recreate store in order to re-init
+        store.close();
+        final VersionedBytesStore mockInner = mock(VersionedBytesStore.class);
+        store = newMeteredStore(mockInner);
+
+        store.init((ProcessorContext) context, store);
+
+        verify(mockInner).init((ProcessorContext) context, store);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        // init is already called in setUp()
+        verify(inner).init((StateStoreContext) context, store);
+    }
+
+    @Test
+    public void shouldPassChangelogTopicNameToStateStoreSerde() {
+        final String changelogTopicName = "changelog-topic";
+        when(context.changelogFor(STORE_NAME)).thenReturn(changelogTopicName);
+        doShouldPassChangelogTopicNameToStateStoreSerde(changelogTopicName);
+    }
+
+    @Test
+    public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
+        final String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, TASK_ID.topologyName());
+        when(context.changelogFor(STORE_NAME)).thenReturn(null);
+        doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doShouldPassChangelogTopicNameToStateStoreSerde(final String changelogTopicName) {
+        // recreate store with mock serdes
+        final Serde<String> keySerde = mock(Serde.class);
+        final Serializer<String> keySerializer = mock(Serializer.class);
+        final Serde<String> valueSerde = mock(Serde.class);
+        final Serializer<String> valueSerializer = mock(Serializer.class);
+        final Deserializer<String> valueDeserializer = mock(Deserializer.class);
+        when(keySerde.serializer()).thenReturn(keySerializer);
+        when(valueSerde.serializer()).thenReturn(valueSerializer);
+        when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+        store.close();
+        store = new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            keySerde,
+            new NullableValueAndTimestampSerde<>(valueSerde)
+        );
+        store.init((StateStoreContext) context, store);
+
+        store.put(KEY, VALUE, TIMESTAMP);
+
+        verify(keySerializer).serialize(changelogTopicName, KEY);
+        verify(valueSerializer).serialize(changelogTopicName, VALUE);
+    }
+
+    @Test
+    public void shouldRecordMetricsOnInit() {
+        // init is called in setUp(). it suffices to verify one restore metric since all restore
+        // metrics are recorded by the same sensor, and the sensor is tested elsewhere.
+        assertThat((Double) getMetric("restore-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnPut() {
+        store.put(KEY, VALUE, TIMESTAMP);
+
+        verify(inner).put(RAW_KEY, RAW_VALUE_AND_TIMESTAMP);
+        assertThat((Double) getMetric("put-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnDelete() {
+        store.delete(KEY, TIMESTAMP);
+
+        verify(inner).delete(RAW_KEY, TIMESTAMP);
+        assertThat((Double) getMetric("delete-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnGet() {
+        store.get(KEY);
+
+        verify(inner).get(RAW_KEY);
+        assertThat((Double) getMetric("get-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnGetWithTimestamp() {
+        store.get(KEY, TIMESTAMP);
+
+        verify(inner).get(RAW_KEY, TIMESTAMP);
+        assertThat((Double) getMetric("get-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnFlush() {
+        store.flush();
+
+        verify(inner).flush();
+        assertThat((Double) getMetric("flush-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRemoveMetricsOnClose() {
+        assertThat(storeMetrics(), not(empty()));
+
+        store.close();
+
+        verify(inner).close();
+        assertThat(storeMetrics(), empty());
+    }
+
+    @Test
+    public void shouldRemoveMetricsOnCloseEvenIfInnerThrows() {

Review Comment:
   Responded above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax merged pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax merged PR #13252:
URL: https://github.com/apache/kafka/pull/13252


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1110445053


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide
+ * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type &lt;Bytes,byte[]&gt;, so we use {@link Serde}s
+ * to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param <K> The key type
+ * @param <V> The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore<K, V>
+    extends WrappedStateStore<VersionedBytesStore, K, V>
+    implements VersionedKeyValueStore<K, V> {
+
+    private final MeteredVersionedKeyValueStoreInternal internal;
+
+    MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+                                  final String metricScope,
+                                  final Time time,
+                                  final Serde<K> keySerde,
+                                  final Serde<ValueAndTimestamp<V>> valueSerde) {
+        super(inner);
+        internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    /**
+     * Private helper class which represents the functionality of a {@link VersionedKeyValueStore}
+     * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be
+     * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of
+     * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this
+     * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the
+     * {@link VersionedKeyValueStore} interface itself.

Review Comment:
   I think this comments need some more information. What about this:
   ```
   Conceptually, MeteredVersionedKeyValueStore should <code>extend</code> MeteredTimestampKeyValueStore, but due to type conflicts, we cannot do this. Thus, we use an <it>instance</it> of MeteredTimestampKeyValueStore to mimic inheritance instead. This instance of MeteredTimestampKeyValueStore wraps the inner VersionedKeyValueStore, and thus we need to do some type translation inside the internal wrapped below.
   
   It's not ideal in the sense that we need to translate between the APIs/types twice, but reusing code is still better than c&p it.
   
   Note that we overwrite `get()` but newly add `put(k, v, ts)` as `delete(k, ts)` to get the same API as defined on VersionedKeyValueStore.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide
+ * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type &lt;Bytes,byte[]&gt;, so we use {@link Serde}s
+ * to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param <K> The key type
+ * @param <V> The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore<K, V>
+    extends WrappedStateStore<VersionedBytesStore, K, V>
+    implements VersionedKeyValueStore<K, V> {
+
+    private final MeteredVersionedKeyValueStoreInternal internal;
+
+    MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+                                  final String metricScope,
+                                  final Time time,
+                                  final Serde<K> keySerde,
+                                  final Serde<ValueAndTimestamp<V>> valueSerde) {
+        super(inner);
+        internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    /**
+     * Private helper class which represents the functionality of a {@link VersionedKeyValueStore}
+     * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be
+     * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of
+     * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this
+     * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the
+     * {@link VersionedKeyValueStore} interface itself.
+     */
+    private class MeteredVersionedKeyValueStoreInternal
+        extends MeteredKeyValueStore<K, ValueAndTimestamp<V>>
+        implements TimestampedKeyValueStore<K, V> {
+
+        private final VersionedBytesStore inner;
+
+        MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
+                                              final String metricScope,
+                                              final Time time,
+                                              final Serde<K> keySerde,
+                                              final Serde<ValueAndTimestamp<V>> valueSerde) {
+            super(inner, metricScope, time, keySerde, valueSerde);
+            this.inner = inner;
+        }
+
+        @Override
+        public void put(final K key, final ValueAndTimestamp<V> value) {
+            super.put(

Review Comment:
   Thinking about this once more, it seems not ideal to do it this way? We get `ValueAndTimestamp` and call `super.put()`, what will do the serialization for use, and than call the `VersionedKeyValueToBytesStoreAdapter` that will split out raw-value, and raw-ts to call `inner.put(k,v,ts)`.
   
   Seems overly complicated. I merged 10/N already, but now I am wondering if it the right approach? Right now, we need to do this such that we can swap in/out the changeloggging store that only has a `put(k,v)` method, but why do we not add a `put(k,v,ts)` to the changeloggging (and also caching?) store and also only have `put(k,v,ts)` here?
   
   In N/10 you said that the changelogger implements `VersionedBytesStore` instead of `VersionedKeyValueStore`, and I am now questioning if it's the right call?
   
   We also add `get()` and `delete()` below without overwriting. It seems we should also add `put(k,v,ts)` in the same manner?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide
+ * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type &lt;Bytes,byte[]&gt;, so we use {@link Serde}s
+ * to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param <K> The key type
+ * @param <V> The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore<K, V>
+    extends WrappedStateStore<VersionedBytesStore, K, V>
+    implements VersionedKeyValueStore<K, V> {
+
+    private final MeteredVersionedKeyValueStoreInternal internal;
+
+    MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+                                  final String metricScope,
+                                  final Time time,
+                                  final Serde<K> keySerde,
+                                  final Serde<ValueAndTimestamp<V>> valueSerde) {
+        super(inner);
+        internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    /**
+     * Private helper class which represents the functionality of a {@link VersionedKeyValueStore}
+     * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be
+     * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of
+     * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this
+     * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the
+     * {@link VersionedKeyValueStore} interface itself.
+     */
+    private class MeteredVersionedKeyValueStoreInternal
+        extends MeteredKeyValueStore<K, ValueAndTimestamp<V>>
+        implements TimestampedKeyValueStore<K, V> {
+
+        private final VersionedBytesStore inner;
+
+        MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
+                                              final String metricScope,
+                                              final Time time,
+                                              final Serde<K> keySerde,
+                                              final Serde<ValueAndTimestamp<V>> valueSerde) {
+            super(inner, metricScope, time, keySerde, valueSerde);
+            this.inner = inner;
+        }
+
+        @Override
+        public void put(final K key, final ValueAndTimestamp<V> value) {
+            super.put(
+                key,
+                // versioned stores require a timestamp associated with all puts, including tombstones/deletes
+                value == null
+                    ? ValueAndTimestamp.makeAllowNullable(null, context.timestamp())
+                    : value
+            );
+        }
+
+        public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            try {
+                return maybeMeasureLatency(() -> outerValue(inner.get(keyBytes(key), asOfTimestamp)), time, getSensor);
+            } catch (final ProcessorStateException e) {
+                final String message = String.format(e.getMessage(), key);
+                throw new ProcessorStateException(message, e);
+            }
+        }
+
+        public ValueAndTimestamp<V> delete(final K key, final long timestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            try {
+                return maybeMeasureLatency(() -> outerValue(inner.delete(keyBytes(key), timestamp)), time, deleteSensor);
+            } catch (final ProcessorStateException e) {
+                final String message = String.format(e.getMessage(), key);
+                throw new ProcessorStateException(message, e);
+            }
+        }
+
+        @Override
+        public <R> QueryResult<R> query(final Query<R> query,
+                                        final PositionBound positionBound,
+                                        final QueryConfig config) {
+            final long start = time.nanoseconds();
+            final QueryResult<R> result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
+            }
+            // do not convert query or return types to/from inner bytes store to user-friendly types

Review Comment:
   Not sure if I understand point (2). The implementation of a query, is bound to the _implementation_ of the store, not the store type. Thus, if there is a `UserVersionedKeyValueStore` that supports a `RangeQuery` (as example), the semantics and types are bound to `UverVersionedKeyValueStore` -- when we later add `RangeQuery` to `RocksDBVersionedStore`, it seem ok if it's different because it comes with it's own implementation?
   
   Thus I think it ok to have the code as-is going with (2) (as a matter of fact, I think we can remove this comment as it seems unnecessary to me).
   
   At least that's my understanding on IQv2. Maybe @vvcephei can chime in and verify my understanding.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide
+ * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type &lt;Bytes,byte[]&gt;, so we use {@link Serde}s
+ * to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param <K> The key type
+ * @param <V> The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore<K, V>
+    extends WrappedStateStore<VersionedBytesStore, K, V>
+    implements VersionedKeyValueStore<K, V> {
+
+    private final MeteredVersionedKeyValueStoreInternal internal;
+
+    MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+                                  final String metricScope,
+                                  final Time time,
+                                  final Serde<K> keySerde,
+                                  final Serde<ValueAndTimestamp<V>> valueSerde) {
+        super(inner);
+        internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    /**
+     * Private helper class which represents the functionality of a {@link VersionedKeyValueStore}
+     * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be
+     * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of
+     * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this
+     * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the
+     * {@link VersionedKeyValueStore} interface itself.
+     */
+    private class MeteredVersionedKeyValueStoreInternal
+        extends MeteredKeyValueStore<K, ValueAndTimestamp<V>>
+        implements TimestampedKeyValueStore<K, V> {

Review Comment:
   Why do we not directly `extend MeteredTimestampKeyValueStore` but `extend MeteredKeyValueStore`  plus `implement TimestampedKeyValueStore`?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide
+ * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type &lt;Bytes,byte[]&gt;, so we use {@link Serde}s
+ * to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param <K> The key type
+ * @param <V> The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore<K, V>
+    extends WrappedStateStore<VersionedBytesStore, K, V>
+    implements VersionedKeyValueStore<K, V> {
+
+    private final MeteredVersionedKeyValueStoreInternal internal;
+
+    MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+                                  final String metricScope,
+                                  final Time time,
+                                  final Serde<K> keySerde,
+                                  final Serde<ValueAndTimestamp<V>> valueSerde) {
+        super(inner);
+        internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    /**
+     * Private helper class which represents the functionality of a {@link VersionedKeyValueStore}
+     * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be
+     * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of
+     * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this
+     * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the
+     * {@link VersionedKeyValueStore} interface itself.
+     */
+    private class MeteredVersionedKeyValueStoreInternal
+        extends MeteredKeyValueStore<K, ValueAndTimestamp<V>>
+        implements TimestampedKeyValueStore<K, V> {
+
+        private final VersionedBytesStore inner;
+
+        MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
+                                              final String metricScope,
+                                              final Time time,
+                                              final Serde<K> keySerde,
+                                              final Serde<ValueAndTimestamp<V>> valueSerde) {
+            super(inner, metricScope, time, keySerde, valueSerde);
+            this.inner = inner;
+        }
+
+        @Override
+        public void put(final K key, final ValueAndTimestamp<V> value) {
+            super.put(
+                key,
+                // versioned stores require a timestamp associated with all puts, including tombstones/deletes
+                value == null

Review Comment:
   Just for an internal safe-guard, we should put a `requiresNotNull` in place for this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1110240934


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide
+ * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type &lt;Bytes,byte[]&gt;, so we use {@link Serde}s
+ * to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param <K> The key type
+ * @param <V> The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore<K, V>
+    extends WrappedStateStore<VersionedBytesStore, K, V>
+    implements VersionedKeyValueStore<K, V> {
+
+    private final MeteredVersionedKeyValueStoreInternal internal;
+
+    MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+                                  final String metricScope,
+                                  final Time time,
+                                  final Serde<K> keySerde,
+                                  final Serde<ValueAndTimestamp<V>> valueSerde) {
+        super(inner);
+        internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    /**
+     * Private helper class which represents the functionality of a {@link VersionedKeyValueStore}
+     * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be
+     * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of
+     * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this
+     * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the
+     * {@link VersionedKeyValueStore} interface itself.
+     */
+    private class MeteredVersionedKeyValueStoreInternal
+        extends MeteredKeyValueStore<K, ValueAndTimestamp<V>>
+        implements TimestampedKeyValueStore<K, V> {
+
+        private final VersionedBytesStore inner;
+
+        MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
+                                              final String metricScope,
+                                              final Time time,
+                                              final Serde<K> keySerde,
+                                              final Serde<ValueAndTimestamp<V>> valueSerde) {
+            super(inner, metricScope, time, keySerde, valueSerde);
+            this.inner = inner;
+        }
+
+        @Override
+        public void put(final K key, final ValueAndTimestamp<V> value) {
+            super.put(
+                key,
+                // versioned stores require a timestamp associated with all puts, including tombstones/deletes
+                value == null

Review Comment:
   This one is tricky -- not sure if we should allow it, but instead change the usage of the store in the DSL to avoid calling this method? Just substituting `context.timestamp()` does not sound right to me?
   
   I guess the current idea was to bridge from TKV to VKV -- could we instead bridge the other way around, ie, we change the DSL to "assume" the VKV interface but if the underlying store is a TKV we translate with an adaptor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1107700533


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide
+ * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type &lt;Bytes,byte[]&gt;, so we use {@link Serde}s
+ * to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param <K> The key type
+ * @param <V> The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore<K, V>
+    extends WrappedStateStore<VersionedBytesStore, K, V>
+    implements VersionedKeyValueStore<K, V> {
+
+    private final MeteredVersionedKeyValueStoreInternal internal;
+
+    MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+                                  final String metricScope,
+                                  final Time time,
+                                  final Serde<K> keySerde,
+                                  final Serde<ValueAndTimestamp<V>> valueSerde) {
+        super(inner);
+        internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    /**
+     * Private helper class which represents the functionality of a {@link VersionedKeyValueStore}
+     * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be
+     * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of
+     * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this
+     * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the
+     * {@link VersionedKeyValueStore} interface itself.
+     */
+    private class MeteredVersionedKeyValueStoreInternal
+        extends MeteredKeyValueStore<K, ValueAndTimestamp<V>>
+        implements TimestampedKeyValueStore<K, V> {
+
+        private final VersionedBytesStore inner;
+
+        MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
+                                              final String metricScope,
+                                              final Time time,
+                                              final Serde<K> keySerde,
+                                              final Serde<ValueAndTimestamp<V>> valueSerde) {
+            super(inner, metricScope, time, keySerde, valueSerde);
+            this.inner = inner;
+        }
+
+        @Override
+        public void put(final K key, final ValueAndTimestamp<V> value) {
+            super.put(
+                key,
+                // versioned stores require a timestamp associated with all puts, including tombstones/deletes
+                value == null
+                    ? ValueAndTimestamp.makeAllowNullable(null, context.timestamp())
+                    : value
+            );
+        }
+
+        public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            try {
+                return maybeMeasureLatency(() -> outerValue(inner.get(keyBytes(key), asOfTimestamp)), time, getSensor);
+            } catch (final ProcessorStateException e) {
+                final String message = String.format(e.getMessage(), key);
+                throw new ProcessorStateException(message, e);
+            }
+        }
+
+        public ValueAndTimestamp<V> delete(final K key, final long timestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            try {
+                return maybeMeasureLatency(() -> outerValue(inner.delete(keyBytes(key), timestamp)), time, deleteSensor);
+            } catch (final ProcessorStateException e) {
+                final String message = String.format(e.getMessage(), key);
+                throw new ProcessorStateException(message, e);
+            }
+        }
+
+        @Override
+        public <R> QueryResult<R> query(final Query<R> query,
+                                        final PositionBound positionBound,
+                                        final QueryConfig config) {
+            final long start = time.nanoseconds();
+            final QueryResult<R> result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
+            }
+            // do not convert return type from inner bytes store to user-friendly interface at this

Review Comment:
   This design choice has implications -- I'm curious to hear reviewer thoughts.
   
   (The comment should more accurately say "do not convert query or return types to/from inner bytes store ...", which I will update in the commit itself.)
   
   The existing MeteredKeyValueStore and MeteredTimestampedKeyValueStore both contain logic to assist in translating `KeyQuery<K, V>` and `RangeQuery<K, V>` types to queries understandable by the underlying bytes store, and to deserialize the returned bytes result back for the original `K` and `V` result types. The return type of these queries for MeteredKeyValueStore and MeteredTimestampedKeyValueStore is `V` for the key query, and `KeyValueIterator<K, V>` for the range query. We'll want to introduce new return types for versioned stores, perhaps `VersionedRecord<V>` for the key query and `Iterator<VersionedRecord<V>>` for the range query. I'd like to postpone this decision-making for a follow-up KIP, but that means not assisting in translating to/from the inner bytes store for now.
   
   So we have three options in the meantime:
   1. Reject all IQv2 queries, since RocksDBVersionedKeyValueStore doesn't support IQv2 yet anyway. I don't think we should do this, because then users who create their own VersionedKeyValueStore implementations won't be able to issue IQv2 queries either.
   2. Allow IQv2 queries as a pass-through at the metered layer. This is what the PR currently does. It means that users who implement their own VersionedKeyValueStores will be able to issue IQv2 queries, but also has backwards compatibility implications for when we introduce type serialization/deserialization support for key and range queries in the future. The behavior at this metered store layer would suddenly change for those query types.
   3. Reject KeyQuery and RangeQuery types in order to reserve them for future behaviors, and pass-through all other query types. This avoids compatibility concerns for the future, while still allowing users to issue IQv2 queries to their own VersionedKeyValueStore implementations in the meantime.
   
   I'm curious to know whether there's existing precedent / discussion on this compatibility concern for introducing custom logic at the metered layer for handling specific types of IQv2 queries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1114799670


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide
+ * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type &lt;Bytes,byte[]&gt;, so we use {@link Serde}s
+ * to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param <K> The key type
+ * @param <V> The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore<K, V>
+    extends WrappedStateStore<VersionedBytesStore, K, V>
+    implements VersionedKeyValueStore<K, V> {
+
+    private final MeteredVersionedKeyValueStoreInternal internal;
+
+    MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+                                  final String metricScope,
+                                  final Time time,
+                                  final Serde<K> keySerde,
+                                  final Serde<ValueAndTimestamp<V>> valueSerde) {
+        super(inner);
+        internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    /**
+     * Private helper class which represents the functionality of a {@link VersionedKeyValueStore}
+     * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be
+     * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of
+     * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this
+     * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the
+     * {@link VersionedKeyValueStore} interface itself.
+     */
+    private class MeteredVersionedKeyValueStoreInternal
+        extends MeteredKeyValueStore<K, ValueAndTimestamp<V>>
+        implements TimestampedKeyValueStore<K, V> {
+
+        private final VersionedBytesStore inner;
+
+        MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
+                                              final String metricScope,
+                                              final Time time,
+                                              final Serde<K> keySerde,
+                                              final Serde<ValueAndTimestamp<V>> valueSerde) {
+            super(inner, metricScope, time, keySerde, valueSerde);
+            this.inner = inner;
+        }
+
+        @Override
+        public void put(final K key, final ValueAndTimestamp<V> value) {
+            super.put(
+                key,
+                // versioned stores require a timestamp associated with all puts, including tombstones/deletes
+                value == null
+                    ? ValueAndTimestamp.makeAllowNullable(null, context.timestamp())
+                    : value
+            );
+        }
+
+        public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            try {
+                return maybeMeasureLatency(() -> outerValue(inner.get(keyBytes(key), asOfTimestamp)), time, getSensor);
+            } catch (final ProcessorStateException e) {
+                final String message = String.format(e.getMessage(), key);
+                throw new ProcessorStateException(message, e);
+            }
+        }
+
+        public ValueAndTimestamp<V> delete(final K key, final long timestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            try {
+                return maybeMeasureLatency(() -> outerValue(inner.delete(keyBytes(key), timestamp)), time, deleteSensor);
+            } catch (final ProcessorStateException e) {
+                final String message = String.format(e.getMessage(), key);
+                throw new ProcessorStateException(message, e);
+            }
+        }
+
+        @Override
+        public <R> QueryResult<R> query(final Query<R> query,
+                                        final PositionBound positionBound,
+                                        final QueryConfig config) {
+            final long start = time.nanoseconds();
+            final QueryResult<R> result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
+            }
+            // do not convert query or return types to/from inner bytes store to user-friendly types

Review Comment:
   Hm, I think we might be talking about slightly different things. Here's my current understanding of how this works for the existing key-value stores today:
   * The user passes a `KeyValueStore<Bytes, byte[]>` implementation (via supplier/materializer) which is used as the innermost store. This store only knows how to serve queries from bytes, and doesn't know anything about what the original key or value types are.
   * The user's store gets wrapped inside some extra layers, including the metered layer which knows how to serialize the actual key and value types to bytes, and deserialize back.
   * When the user issues a IQv2 query to the store, the query hits the outer store (i.e., metered layer), and is passed to the inner layers from there.
   * What this means is that when a user issues an IQv2 `KeyQuery`, the key that they pass is the actual value type, and not bytes, even though the innermost store implementation they provided only knows about bytes. `MeteredKeyValueStore` has [logic](https://github.com/apache/kafka/blob/069ce59e1e33f47c000d8cdc247851f2e0a82154/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java#L297) for serializing the key in the `KeyQuery` to bytes, and passing that to the inner store, and also deserializing the bytes from the result into the actual value type before returning the result to the user. The same thing happens with `RangeQuery`, but these are the only two query types which `MeteredKeyValueStore` provides serialization/deserialization assistance for. All other query types are direct pass-throughs to the inner store.
   
   In order to provide the same convenience for users issuing IQv2 requests to versioned stores, `MeteredVersionedKeyValueStore` should also assist in serializing/deserializing `KeyQuery` and `RangeQuery` for users, so that they can issue `KeyQuery<K, V>` instead of `KeyQuery<Bytes, byte[]>` to their inner store, but I don't want to add this support now as part of KIP-889, since it should have its own KIP. So if a user wants to issue key queries to their custom versioned store implementation today, they will have to use `KeyQuery<Bytes, byte[]>`. But then later when we do add support for `KeyQuery<K, V>` at the metered store layer, users will have to stop using `KeyQuery<Bytes, byte[]>` and instead start using `KeyQuery<K, V>` -- this is the compatibility concern I have called out above. Does this extra context help?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1114772380


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide
+ * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type &lt;Bytes,byte[]&gt;, so we use {@link Serde}s
+ * to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param <K> The key type
+ * @param <V> The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore<K, V>
+    extends WrappedStateStore<VersionedBytesStore, K, V>
+    implements VersionedKeyValueStore<K, V> {
+
+    private final MeteredVersionedKeyValueStoreInternal internal;
+
+    MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+                                  final String metricScope,
+                                  final Time time,
+                                  final Serde<K> keySerde,
+                                  final Serde<ValueAndTimestamp<V>> valueSerde) {
+        super(inner);
+        internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    /**
+     * Private helper class which represents the functionality of a {@link VersionedKeyValueStore}
+     * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be
+     * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of
+     * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this
+     * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the
+     * {@link VersionedKeyValueStore} interface itself.
+     */
+    private class MeteredVersionedKeyValueStoreInternal
+        extends MeteredKeyValueStore<K, ValueAndTimestamp<V>>
+        implements TimestampedKeyValueStore<K, V> {
+
+        private final VersionedBytesStore inner;
+
+        MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
+                                              final String metricScope,
+                                              final Time time,
+                                              final Serde<K> keySerde,
+                                              final Serde<ValueAndTimestamp<V>> valueSerde) {
+            super(inner, metricScope, time, keySerde, valueSerde);
+            this.inner = inner;
+        }
+
+        @Override
+        public void put(final K key, final ValueAndTimestamp<V> value) {
+            super.put(

Review Comment:
   Let me try to repeat back your suggestion to make sure I understand correctly: as you've pointed out, the current `MeteredVersionedKeyValueStore` implementation first serializes value and timestamp to bytes and concatenates them into a special format (given by `NullableValueAndTimestampSerde`) to pass to inner stores. Then the inner stores have to separate the value and timestamp bytes again and deserialize the timestamp, for use in writing to the changelog or inserting into the inner store. You're suggesting that we make this more efficient by not having the metered layer serialize the timestamp and concatenate with the value bytes, and instead pass an unserialized timestamp and separate value bytes to the inner stores. Is that correct?
   
   If so, this suggestion is not possible for the innermost store (`RocksDBVersionedStore`, above which the `VersionedKeyValueToBytesStoreAdapter` is used) because of our decision to maintain compatibility with existing DSL methods for passing key-value stores, e.g., [StreamsBuilder#table()](https://github.com/apache/kafka/blob/3012332e3d82947e434933efd4ab4e9366ab429d/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java#L260) and [KTable methods](https://github.com/apache/kafka/blob/3012332e3d82947e434933efd4ab4e9366ab429d/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java#L153), which are explicitly typed to accept `Materialized<K, V, KeyValueStore<Bytes, byte[]>`, rather than adding new versions of these methods which accept `Materialized` instances to materialize stores other than `KeyValueStore<Bytes, byte[]>`. During the design phase we had discussed this tradeoff already and decided that the benefit of not needing to introduce new methods here is s
 ubstantial enough to warrant the performance hit from the extra round of serialization and deserialization.
   
   The changelogging layer is a different story. If we wanted to, we could indeed have `ChangeLoggingVersionedKeyValueBytesStore` implement `VersionedKeyValueStore<Bytes, byte[]>` instead of `VersionedBytesStore` to avoid needing to extract value and timestamp bytes from the serialized `rawValueAndTimestamp`. The cost of doing so, however, is extra duplicated code. If we do this, then we cannot have `ChangeLoggingVersionedKeyValueBytesStore extends ChangeLoggingKeyValueBytesStore` anymore because `ChangeLoggingKeyValueBytesStore implements KeyValueStore<Bytes, byte[]>` which clashes with `VersionedKeyValueStore<Bytes, byte[]>`. We also wouldn't be able to have the metered versioned store layer extend `MeteredVersionedKeyValueStore` anymore either because `MeteredVersionedKeyValueStore` is a `WrappedStateStore` with `KeyValueStore<Bytes, byte[]>` inside, and `ChangeLoggingVersionedKeyValueBytesStore` would no longer count as a `KeyValueStore<Bytes, byte[]>`. So, we'd end up more-or-le
 ss duplicating the existing code in both `MeteredKeyValueStore` and `ChangeLoggingKeyValueBytesStore` into their versioned counterparts. As a result, I don't think the performance benefit saved from avoiding the extra deserialization in the changelogging layer is worth this extra complexity. FWIW, `ChangeLoggingTimestampedKeyValueBytesStore` also has this same extra deserialization step (separating the concatenated value and timestamp bytes, and deserializing the timestamp for writing to the changelog), I assume for the same reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1117748690


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class MeteredVersionedKeyValueStoreTest {
+
+    private static final String STORE_NAME = "versioned_store";
+    private static final Serde<String> STRING_SERDE = new StringSerde();
+    private static final Serde<ValueAndTimestamp<String>> VALUE_AND_TIMESTAMP_SERDE = new NullableValueAndTimestampSerde<>(STRING_SERDE);
+    private static final String METRICS_SCOPE = "scope";
+    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
+    private static final String APPLICATION_ID = "test-app";
+    private static final TaskId TASK_ID = new TaskId(0, 0, "My-Topology");
+
+    private static final String KEY = "k";
+    private static final String VALUE = "v";
+    private static final long TIMESTAMP = 10L;
+    private static final Bytes RAW_KEY = new Bytes(STRING_SERDE.serializer().serialize(null, KEY));
+    private static final byte[] RAW_VALUE_AND_TIMESTAMP = VALUE_AND_TIMESTAMP_SERDE.serializer()
+        .serialize(null, ValueAndTimestamp.make(VALUE, TIMESTAMP));
+
+    private final VersionedBytesStore inner = mock(VersionedBytesStore.class);
+    private final Metrics metrics = new Metrics();
+    private final Time mockTime = new MockTime();
+    private final String threadId = Thread.currentThread().getName();
+    private InternalProcessorContext context = mock(InternalProcessorContext.class);
+    private Map<String, String> tags;
+
+    private MeteredVersionedKeyValueStore<String, String> store;
+
+    @Before
+    public void setUp() {
+        when(inner.name()).thenReturn(STORE_NAME);
+        when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime));
+        when(context.applicationId()).thenReturn(APPLICATION_ID);
+        when(context.taskId()).thenReturn(TASK_ID);
+
+        metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
+        tags = mkMap(
+            mkEntry("thread-id", threadId),
+            mkEntry("task-id", TASK_ID.toString()),
+            mkEntry(METRICS_SCOPE + "-state-id", STORE_NAME)
+        );
+
+        store = newMeteredStore(inner);
+        store.init((StateStoreContext) context, store);
+    }
+
+    private MeteredVersionedKeyValueStore<String, String> newMeteredStore(final VersionedBytesStore inner) {
+        return new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            STRING_SERDE,
+            VALUE_AND_TIMESTAMP_SERDE
+        );
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        // recreate store in order to re-init
+        store.close();
+        final VersionedBytesStore mockInner = mock(VersionedBytesStore.class);
+        store = newMeteredStore(mockInner);
+
+        store.init((ProcessorContext) context, store);
+
+        verify(mockInner).init((ProcessorContext) context, store);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        // init is already called in setUp()
+        verify(inner).init((StateStoreContext) context, store);
+    }
+
+    @Test
+    public void shouldPassChangelogTopicNameToStateStoreSerde() {
+        final String changelogTopicName = "changelog-topic";
+        when(context.changelogFor(STORE_NAME)).thenReturn(changelogTopicName);
+        doShouldPassChangelogTopicNameToStateStoreSerde(changelogTopicName);
+    }
+
+    @Test
+    public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
+        final String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, TASK_ID.topologyName());
+        when(context.changelogFor(STORE_NAME)).thenReturn(null);
+        doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doShouldPassChangelogTopicNameToStateStoreSerde(final String changelogTopicName) {
+        // recreate store with mock serdes
+        final Serde<String> keySerde = mock(Serde.class);
+        final Serializer<String> keySerializer = mock(Serializer.class);
+        final Serde<String> valueSerde = mock(Serde.class);
+        final Serializer<String> valueSerializer = mock(Serializer.class);
+        final Deserializer<String> valueDeserializer = mock(Deserializer.class);
+        when(keySerde.serializer()).thenReturn(keySerializer);
+        when(valueSerde.serializer()).thenReturn(valueSerializer);
+        when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+        store.close();
+        store = new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            keySerde,
+            new NullableValueAndTimestampSerde<>(valueSerde)
+        );
+        store.init((StateStoreContext) context, store);
+
+        store.put(KEY, VALUE, TIMESTAMP);
+
+        verify(keySerializer).serialize(changelogTopicName, KEY);
+        verify(valueSerializer).serialize(changelogTopicName, VALUE);
+    }
+
+    @Test
+    public void shouldRecordMetricsOnInit() {
+        // init is called in setUp(). it suffices to verify one restore metric since all restore
+        // metrics are recorded by the same sensor, and the sensor is tested elsewhere.
+        assertThat((Double) getMetric("restore-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnPut() {
+        store.put(KEY, VALUE, TIMESTAMP);
+
+        verify(inner).put(RAW_KEY, RAW_VALUE_AND_TIMESTAMP);
+        assertThat((Double) getMetric("put-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnDelete() {
+        store.delete(KEY, TIMESTAMP);
+
+        verify(inner).delete(RAW_KEY, TIMESTAMP);
+        assertThat((Double) getMetric("delete-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnGet() {
+        store.get(KEY);
+
+        verify(inner).get(RAW_KEY);
+        assertThat((Double) getMetric("get-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnGetWithTimestamp() {
+        store.get(KEY, TIMESTAMP);
+
+        verify(inner).get(RAW_KEY, TIMESTAMP);
+        assertThat((Double) getMetric("get-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnFlush() {
+        store.flush();
+
+        verify(inner).flush();
+        assertThat((Double) getMetric("flush-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRemoveMetricsOnClose() {

Review Comment:
   Fair enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1107700533


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide
+ * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type &lt;Bytes,byte[]&gt;, so we use {@link Serde}s
+ * to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param <K> The key type
+ * @param <V> The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore<K, V>
+    extends WrappedStateStore<VersionedBytesStore, K, V>
+    implements VersionedKeyValueStore<K, V> {
+
+    private final MeteredVersionedKeyValueStoreInternal internal;
+
+    MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+                                  final String metricScope,
+                                  final Time time,
+                                  final Serde<K> keySerde,
+                                  final Serde<ValueAndTimestamp<V>> valueSerde) {
+        super(inner);
+        internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    /**
+     * Private helper class which represents the functionality of a {@link VersionedKeyValueStore}
+     * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be
+     * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of
+     * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this
+     * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the
+     * {@link VersionedKeyValueStore} interface itself.
+     */
+    private class MeteredVersionedKeyValueStoreInternal
+        extends MeteredKeyValueStore<K, ValueAndTimestamp<V>>
+        implements TimestampedKeyValueStore<K, V> {
+
+        private final VersionedBytesStore inner;
+
+        MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
+                                              final String metricScope,
+                                              final Time time,
+                                              final Serde<K> keySerde,
+                                              final Serde<ValueAndTimestamp<V>> valueSerde) {
+            super(inner, metricScope, time, keySerde, valueSerde);
+            this.inner = inner;
+        }
+
+        @Override
+        public void put(final K key, final ValueAndTimestamp<V> value) {
+            super.put(
+                key,
+                // versioned stores require a timestamp associated with all puts, including tombstones/deletes
+                value == null
+                    ? ValueAndTimestamp.makeAllowNullable(null, context.timestamp())
+                    : value
+            );
+        }
+
+        public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            try {
+                return maybeMeasureLatency(() -> outerValue(inner.get(keyBytes(key), asOfTimestamp)), time, getSensor);
+            } catch (final ProcessorStateException e) {
+                final String message = String.format(e.getMessage(), key);
+                throw new ProcessorStateException(message, e);
+            }
+        }
+
+        public ValueAndTimestamp<V> delete(final K key, final long timestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            try {
+                return maybeMeasureLatency(() -> outerValue(inner.delete(keyBytes(key), timestamp)), time, deleteSensor);
+            } catch (final ProcessorStateException e) {
+                final String message = String.format(e.getMessage(), key);
+                throw new ProcessorStateException(message, e);
+            }
+        }
+
+        @Override
+        public <R> QueryResult<R> query(final Query<R> query,
+                                        final PositionBound positionBound,
+                                        final QueryConfig config) {
+            final long start = time.nanoseconds();
+            final QueryResult<R> result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
+            }
+            // do not convert return type from inner bytes store to user-friendly interface at this

Review Comment:
   This design choice has implications -- I'm curious to hear reviewer thoughts.
   
   The existing MeteredKeyValueStore and MeteredTimestampedKeyValueStore both contain logic to assist in translating `KeyQuery<K, V>` and `RangeQuery<K, V>` types to queries understandable by the underlying bytes store, and to deserialize the returned bytes result back for the original `K` and `V` result types. The return type of these queries for MeteredKeyValueStore and MeteredTimestampedKeyValueStore is `V` for the key query, and `KeyValueIterator<K, V>` for the range query. We'll want to introduce new return types for versioned stores, perhaps `VersionedRecord<V>` for the key query and `Iterator<VersionedRecord<V>>` for the range query. I'd like to postpone this decision-making for a follow-up KIP, but that means not assisting in translating to/from the inner bytes store for now.
   
   So we have three options in the meantime:
   1. Reject all IQv2 queries, since RocksDBVersionedKeyValueStore doesn't support IQv2 yet anyway. I don't think we should do this, because then users who create their own VersionedKeyValueStore implementations won't be able to issue IQv2 queries either.
   2. Allow IQv2 queries as a pass-through at the metered layer. This is what the PR currently does. It means that users who implement their own VersionedKeyValueStores will be able to issue IQv2 queries, but also has backwards compatibility implications for when we introduce type serialization/deserialization support for key and range queries in the future. The behavior at this metered store layer would suddenly change for those query types.
   3. Reject KeyQuery and RangeQuery types in order to reserve them for future behaviors, and pass-through all other query types. This avoids compatibility concerns for the future, while still allowing users to issue IQv2 queries to their own VersionedKeyValueStore implementations in the meantime.
   
   I'm curious to know whether there's existing precedent / discussion on this compatibility concern for introducing custom logic at the metered layer for handling specific types of IQv2 queries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1116266727


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide
+ * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type &lt;Bytes,byte[]&gt;, so we use {@link Serde}s
+ * to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param <K> The key type
+ * @param <V> The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore<K, V>
+    extends WrappedStateStore<VersionedBytesStore, K, V>
+    implements VersionedKeyValueStore<K, V> {
+
+    private final MeteredVersionedKeyValueStoreInternal internal;
+
+    MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+                                  final String metricScope,
+                                  final Time time,
+                                  final Serde<K> keySerde,
+                                  final Serde<ValueAndTimestamp<V>> valueSerde) {
+        super(inner);
+        internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    /**
+     * Private helper class which represents the functionality of a {@link VersionedKeyValueStore}
+     * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be
+     * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of
+     * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this
+     * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the
+     * {@link VersionedKeyValueStore} interface itself.
+     */
+    private class MeteredVersionedKeyValueStoreInternal
+        extends MeteredKeyValueStore<K, ValueAndTimestamp<V>>
+        implements TimestampedKeyValueStore<K, V> {
+
+        private final VersionedBytesStore inner;
+
+        MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
+                                              final String metricScope,
+                                              final Time time,
+                                              final Serde<K> keySerde,
+                                              final Serde<ValueAndTimestamp<V>> valueSerde) {
+            super(inner, metricScope, time, keySerde, valueSerde);
+            this.inner = inner;
+        }
+
+        @Override
+        public void put(final K key, final ValueAndTimestamp<V> value) {
+            super.put(
+                key,
+                // versioned stores require a timestamp associated with all puts, including tombstones/deletes
+                value == null
+                    ? ValueAndTimestamp.makeAllowNullable(null, context.timestamp())
+                    : value
+            );
+        }
+
+        public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            try {
+                return maybeMeasureLatency(() -> outerValue(inner.get(keyBytes(key), asOfTimestamp)), time, getSensor);
+            } catch (final ProcessorStateException e) {
+                final String message = String.format(e.getMessage(), key);
+                throw new ProcessorStateException(message, e);
+            }
+        }
+
+        public ValueAndTimestamp<V> delete(final K key, final long timestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            try {
+                return maybeMeasureLatency(() -> outerValue(inner.delete(keyBytes(key), timestamp)), time, deleteSensor);
+            } catch (final ProcessorStateException e) {
+                final String message = String.format(e.getMessage(), key);
+                throw new ProcessorStateException(message, e);
+            }
+        }
+
+        @Override
+        public <R> QueryResult<R> query(final Query<R> query,
+                                        final PositionBound positionBound,
+                                        final QueryConfig config) {
+            final long start = time.nanoseconds();
+            final QueryResult<R> result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
+            }
+            // do not convert query or return types to/from inner bytes store to user-friendly types

Review Comment:
   I spoke to @vvcephei who confirmed that allowing IQv2 queries to pass through the metered layer in this PR and then later following up with an in-built `KeyQuery` and/or `RangeQuery` implementation (to assist with serialization/deserialization of results) would be a breaking change that should be avoided. I will update this PR to reserve the `KeyQuery` and `RangeQuery` query types instead, i.e., throw an exception if those queries are issued while allowing other types of queries (e.g., user custom queries) to pass through.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1110327711


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide
+ * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type &lt;Bytes,byte[]&gt;, so we use {@link Serde}s
+ * to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param <K> The key type
+ * @param <V> The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore<K, V>
+    extends WrappedStateStore<VersionedBytesStore, K, V>
+    implements VersionedKeyValueStore<K, V> {
+
+    private final MeteredVersionedKeyValueStoreInternal internal;
+
+    MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+                                  final String metricScope,
+                                  final Time time,
+                                  final Serde<K> keySerde,
+                                  final Serde<ValueAndTimestamp<V>> valueSerde) {
+        super(inner);
+        internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    /**
+     * Private helper class which represents the functionality of a {@link VersionedKeyValueStore}
+     * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be
+     * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of
+     * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this
+     * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the
+     * {@link VersionedKeyValueStore} interface itself.
+     */
+    private class MeteredVersionedKeyValueStoreInternal
+        extends MeteredKeyValueStore<K, ValueAndTimestamp<V>>
+        implements TimestampedKeyValueStore<K, V> {
+
+        private final VersionedBytesStore inner;
+
+        MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
+                                              final String metricScope,
+                                              final Time time,
+                                              final Serde<K> keySerde,
+                                              final Serde<ValueAndTimestamp<V>> valueSerde) {
+            super(inner, metricScope, time, keySerde, valueSerde);
+            this.inner = inner;
+        }
+
+        @Override
+        public void put(final K key, final ValueAndTimestamp<V> value) {
+            super.put(
+                key,
+                // versioned stores require a timestamp associated with all puts, including tombstones/deletes
+                value == null

Review Comment:
   Hmm I think this code here is a remnant of my attempt at an earlier approach where we expose MeteredVersionedKeyValueStore itself as a TimestampedKeyValueStore -- which did not work out for other reasons, thus prompting https://github.com/apache/kafka/pull/13264. The line you've highlighted here is within MeteredVersionedKeyValueStoreInternal, which is only called from one place (MeteredVersionedKeyValueStore#put()) and that one place never passes null, so this can be removed. Good catch :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1116530916


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class MeteredVersionedKeyValueStoreTest {
+
+    private static final String STORE_NAME = "versioned_store";
+    private static final Serde<String> STRING_SERDE = new StringSerde();
+    private static final Serde<ValueAndTimestamp<String>> VALUE_AND_TIMESTAMP_SERDE = new NullableValueAndTimestampSerde<>(STRING_SERDE);
+    private static final String METRICS_SCOPE = "scope";
+    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
+    private static final String APPLICATION_ID = "test-app";
+    private static final TaskId TASK_ID = new TaskId(0, 0, "My-Topology");
+
+    private static final String KEY = "k";
+    private static final String VALUE = "v";
+    private static final long TIMESTAMP = 10L;
+    private static final Bytes RAW_KEY = new Bytes(STRING_SERDE.serializer().serialize(null, KEY));
+    private static final byte[] RAW_VALUE_AND_TIMESTAMP = VALUE_AND_TIMESTAMP_SERDE.serializer()
+        .serialize(null, ValueAndTimestamp.make(VALUE, TIMESTAMP));
+
+    private final VersionedBytesStore inner = mock(VersionedBytesStore.class);
+    private final Metrics metrics = new Metrics();
+    private final Time mockTime = new MockTime();
+    private final String threadId = Thread.currentThread().getName();
+    private InternalProcessorContext context = mock(InternalProcessorContext.class);
+    private Map<String, String> tags;
+
+    private MeteredVersionedKeyValueStore<String, String> store;
+
+    @Before
+    public void setUp() {
+        when(inner.name()).thenReturn(STORE_NAME);
+        when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime));
+        when(context.applicationId()).thenReturn(APPLICATION_ID);
+        when(context.taskId()).thenReturn(TASK_ID);
+
+        metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
+        tags = mkMap(
+            mkEntry("thread-id", threadId),
+            mkEntry("task-id", TASK_ID.toString()),
+            mkEntry(METRICS_SCOPE + "-state-id", STORE_NAME)
+        );
+
+        store = newMeteredStore(inner);
+        store.init((StateStoreContext) context, store);
+    }
+
+    private MeteredVersionedKeyValueStore<String, String> newMeteredStore(final VersionedBytesStore inner) {
+        return new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            STRING_SERDE,
+            VALUE_AND_TIMESTAMP_SERDE
+        );
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        // recreate store in order to re-init
+        store.close();
+        final VersionedBytesStore mockInner = mock(VersionedBytesStore.class);
+        store = newMeteredStore(mockInner);
+
+        store.init((ProcessorContext) context, store);
+
+        verify(mockInner).init((ProcessorContext) context, store);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        // init is already called in setUp()
+        verify(inner).init((StateStoreContext) context, store);
+    }
+
+    @Test
+    public void shouldPassChangelogTopicNameToStateStoreSerde() {
+        final String changelogTopicName = "changelog-topic";
+        when(context.changelogFor(STORE_NAME)).thenReturn(changelogTopicName);
+        doShouldPassChangelogTopicNameToStateStoreSerde(changelogTopicName);
+    }
+
+    @Test
+    public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
+        final String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, TASK_ID.topologyName());
+        when(context.changelogFor(STORE_NAME)).thenReturn(null);
+        doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doShouldPassChangelogTopicNameToStateStoreSerde(final String changelogTopicName) {
+        // recreate store with mock serdes
+        final Serde<String> keySerde = mock(Serde.class);
+        final Serializer<String> keySerializer = mock(Serializer.class);
+        final Serde<String> valueSerde = mock(Serde.class);
+        final Serializer<String> valueSerializer = mock(Serializer.class);
+        final Deserializer<String> valueDeserializer = mock(Deserializer.class);
+        when(keySerde.serializer()).thenReturn(keySerializer);
+        when(valueSerde.serializer()).thenReturn(valueSerializer);
+        when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+        store.close();
+        store = new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            keySerde,
+            new NullableValueAndTimestampSerde<>(valueSerde)
+        );
+        store.init((StateStoreContext) context, store);
+
+        store.put(KEY, VALUE, TIMESTAMP);
+
+        verify(keySerializer).serialize(changelogTopicName, KEY);
+        verify(valueSerializer).serialize(changelogTopicName, VALUE);
+    }
+
+    @Test
+    public void shouldRecordMetricsOnInit() {
+        // init is called in setUp(). it suffices to verify one restore metric since all restore
+        // metrics are recorded by the same sensor, and the sensor is tested elsewhere.
+        assertThat((Double) getMetric("restore-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnPut() {
+        store.put(KEY, VALUE, TIMESTAMP);
+
+        verify(inner).put(RAW_KEY, RAW_VALUE_AND_TIMESTAMP);
+        assertThat((Double) getMetric("put-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnDelete() {
+        store.delete(KEY, TIMESTAMP);
+
+        verify(inner).delete(RAW_KEY, TIMESTAMP);
+        assertThat((Double) getMetric("delete-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnGet() {
+        store.get(KEY);
+
+        verify(inner).get(RAW_KEY);
+        assertThat((Double) getMetric("get-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnGetWithTimestamp() {
+        store.get(KEY, TIMESTAMP);
+
+        verify(inner).get(RAW_KEY, TIMESTAMP);
+        assertThat((Double) getMetric("get-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnFlush() {
+        store.flush();
+
+        verify(inner).flush();
+        assertThat((Double) getMetric("flush-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRemoveMetricsOnClose() {
+        assertThat(storeMetrics(), not(empty()));
+
+        store.close();
+
+        verify(inner).close();
+        assertThat(storeMetrics(), empty());
+    }
+
+    @Test
+    public void shouldRemoveMetricsOnCloseEvenIfInnerThrows() {
+        doThrow(new RuntimeException("uh oh")).when(inner).close();
+        assertThat(storeMetrics(), not(empty()));
+
+        assertThrows(RuntimeException.class, () -> store.close());
+
+        assertThat(storeMetrics(), empty());
+    }
+
+    @Test
+    public void shouldNotSetFlushListenerIfInnerIsNotCaching() {

Review Comment:
   Same reasoning as above -- the functionality that this test checks for is also inherited from `MeteredKeyValueStore`, but I do think it's worth verifying that `MeteredVersionedKeyValueStore` properly inherits it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1116530251


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class MeteredVersionedKeyValueStoreTest {
+
+    private static final String STORE_NAME = "versioned_store";
+    private static final Serde<String> STRING_SERDE = new StringSerde();
+    private static final Serde<ValueAndTimestamp<String>> VALUE_AND_TIMESTAMP_SERDE = new NullableValueAndTimestampSerde<>(STRING_SERDE);
+    private static final String METRICS_SCOPE = "scope";
+    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
+    private static final String APPLICATION_ID = "test-app";
+    private static final TaskId TASK_ID = new TaskId(0, 0, "My-Topology");
+
+    private static final String KEY = "k";
+    private static final String VALUE = "v";
+    private static final long TIMESTAMP = 10L;
+    private static final Bytes RAW_KEY = new Bytes(STRING_SERDE.serializer().serialize(null, KEY));
+    private static final byte[] RAW_VALUE_AND_TIMESTAMP = VALUE_AND_TIMESTAMP_SERDE.serializer()
+        .serialize(null, ValueAndTimestamp.make(VALUE, TIMESTAMP));
+
+    private final VersionedBytesStore inner = mock(VersionedBytesStore.class);
+    private final Metrics metrics = new Metrics();
+    private final Time mockTime = new MockTime();
+    private final String threadId = Thread.currentThread().getName();
+    private InternalProcessorContext context = mock(InternalProcessorContext.class);
+    private Map<String, String> tags;
+
+    private MeteredVersionedKeyValueStore<String, String> store;
+
+    @Before
+    public void setUp() {
+        when(inner.name()).thenReturn(STORE_NAME);
+        when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime));
+        when(context.applicationId()).thenReturn(APPLICATION_ID);
+        when(context.taskId()).thenReturn(TASK_ID);
+
+        metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
+        tags = mkMap(
+            mkEntry("thread-id", threadId),
+            mkEntry("task-id", TASK_ID.toString()),
+            mkEntry(METRICS_SCOPE + "-state-id", STORE_NAME)
+        );
+
+        store = newMeteredStore(inner);
+        store.init((StateStoreContext) context, store);
+    }
+
+    private MeteredVersionedKeyValueStore<String, String> newMeteredStore(final VersionedBytesStore inner) {
+        return new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            STRING_SERDE,
+            VALUE_AND_TIMESTAMP_SERDE
+        );
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        // recreate store in order to re-init
+        store.close();
+        final VersionedBytesStore mockInner = mock(VersionedBytesStore.class);
+        store = newMeteredStore(mockInner);
+
+        store.init((ProcessorContext) context, store);
+
+        verify(mockInner).init((ProcessorContext) context, store);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        // init is already called in setUp()
+        verify(inner).init((StateStoreContext) context, store);
+    }
+
+    @Test
+    public void shouldPassChangelogTopicNameToStateStoreSerde() {
+        final String changelogTopicName = "changelog-topic";
+        when(context.changelogFor(STORE_NAME)).thenReturn(changelogTopicName);
+        doShouldPassChangelogTopicNameToStateStoreSerde(changelogTopicName);
+    }
+
+    @Test
+    public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
+        final String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, TASK_ID.topologyName());
+        when(context.changelogFor(STORE_NAME)).thenReturn(null);
+        doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doShouldPassChangelogTopicNameToStateStoreSerde(final String changelogTopicName) {
+        // recreate store with mock serdes
+        final Serde<String> keySerde = mock(Serde.class);
+        final Serializer<String> keySerializer = mock(Serializer.class);
+        final Serde<String> valueSerde = mock(Serde.class);
+        final Serializer<String> valueSerializer = mock(Serializer.class);
+        final Deserializer<String> valueDeserializer = mock(Deserializer.class);
+        when(keySerde.serializer()).thenReturn(keySerializer);
+        when(valueSerde.serializer()).thenReturn(valueSerializer);
+        when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+        store.close();
+        store = new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            keySerde,
+            new NullableValueAndTimestampSerde<>(valueSerde)
+        );
+        store.init((StateStoreContext) context, store);
+
+        store.put(KEY, VALUE, TIMESTAMP);
+
+        verify(keySerializer).serialize(changelogTopicName, KEY);
+        verify(valueSerializer).serialize(changelogTopicName, VALUE);
+    }
+
+    @Test
+    public void shouldRecordMetricsOnInit() {
+        // init is called in setUp(). it suffices to verify one restore metric since all restore
+        // metrics are recorded by the same sensor, and the sensor is tested elsewhere.
+        assertThat((Double) getMetric("restore-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnPut() {
+        store.put(KEY, VALUE, TIMESTAMP);
+
+        verify(inner).put(RAW_KEY, RAW_VALUE_AND_TIMESTAMP);
+        assertThat((Double) getMetric("put-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnDelete() {
+        store.delete(KEY, TIMESTAMP);
+
+        verify(inner).delete(RAW_KEY, TIMESTAMP);
+        assertThat((Double) getMetric("delete-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnGet() {
+        store.get(KEY);
+
+        verify(inner).get(RAW_KEY);
+        assertThat((Double) getMetric("get-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnGetWithTimestamp() {
+        store.get(KEY, TIMESTAMP);
+
+        verify(inner).get(RAW_KEY, TIMESTAMP);
+        assertThat((Double) getMetric("get-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnFlush() {
+        store.flush();
+
+        verify(inner).flush();
+        assertThat((Double) getMetric("flush-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRemoveMetricsOnClose() {

Review Comment:
   Not sure I follow. This test verifies that metrics are properly cleaned up when the store is closed, and this functionality is unique to this metered store layer. Maybe you mean that this test tests functionality which is inherited from `MeteredKeyValueStore`? That is true, but I think it is important to re-test here because we want to ensure that `MeteredVersionedKeyValueStore` properly cleans up its metrics as well, i.e., `MeteredVersionedKeyValueStore` is separate from `MeteredKeyValueStore`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1114835088


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide
+ * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type &lt;Bytes,byte[]&gt;, so we use {@link Serde}s
+ * to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param <K> The key type
+ * @param <V> The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore<K, V>
+    extends WrappedStateStore<VersionedBytesStore, K, V>
+    implements VersionedKeyValueStore<K, V> {
+
+    private final MeteredVersionedKeyValueStoreInternal internal;
+
+    MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+                                  final String metricScope,
+                                  final Time time,
+                                  final Serde<K> keySerde,
+                                  final Serde<ValueAndTimestamp<V>> valueSerde) {
+        super(inner);
+        internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    /**
+     * Private helper class which represents the functionality of a {@link VersionedKeyValueStore}
+     * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be
+     * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of
+     * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this
+     * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the
+     * {@link VersionedKeyValueStore} interface itself.

Review Comment:
   Thanks for the suggestions! Incorporated this into the latest commit. I used `KeyValueStore` in the comments instead of `TimestampedKeyValueStore` (pending our other discussion about whether `MeteredVersionedKeyValueStore` is conceptually `MeteredTimestampedKeyValueStore` or `MeteredKeyValueStore`) and also modified the last line since I wasn't sure which `get()` override you were referring to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1114772380


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide
+ * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type &lt;Bytes,byte[]&gt;, so we use {@link Serde}s
+ * to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param <K> The key type
+ * @param <V> The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore<K, V>
+    extends WrappedStateStore<VersionedBytesStore, K, V>
+    implements VersionedKeyValueStore<K, V> {
+
+    private final MeteredVersionedKeyValueStoreInternal internal;
+
+    MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+                                  final String metricScope,
+                                  final Time time,
+                                  final Serde<K> keySerde,
+                                  final Serde<ValueAndTimestamp<V>> valueSerde) {
+        super(inner);
+        internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    /**
+     * Private helper class which represents the functionality of a {@link VersionedKeyValueStore}
+     * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be
+     * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of
+     * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this
+     * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the
+     * {@link VersionedKeyValueStore} interface itself.
+     */
+    private class MeteredVersionedKeyValueStoreInternal
+        extends MeteredKeyValueStore<K, ValueAndTimestamp<V>>
+        implements TimestampedKeyValueStore<K, V> {
+
+        private final VersionedBytesStore inner;
+
+        MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
+                                              final String metricScope,
+                                              final Time time,
+                                              final Serde<K> keySerde,
+                                              final Serde<ValueAndTimestamp<V>> valueSerde) {
+            super(inner, metricScope, time, keySerde, valueSerde);
+            this.inner = inner;
+        }
+
+        @Override
+        public void put(final K key, final ValueAndTimestamp<V> value) {
+            super.put(

Review Comment:
   Let me try to repeat back your suggestion to make sure I understand correctly: as you've pointed out, the current `MeteredVersionedKeyValueStore` implementation first serializes value and timestamp to bytes and concatenates them into a special format (given by `NullableValueAndTimestampSerde`) to pass to inner stores. Then the inner stores have to separate the value and timestamp bytes again and deserialize the timestamp, for use in writing to the changelog or inserting into the inner store. You're suggesting that we make this more efficient by not having the metered layer serialize the timestamp and concatenate with the value bytes, and instead pass an unserialized timestamp and separate value bytes to the inner stores. Is that correct?
   
   If so, this suggestion is not possible for the innermost store (`RocksDBVersionedStore`, above which the `VersionedKeyValueToBytesStoreAdapter` is used) because of our decision to maintain compatibility with existing DSL methods for passing key-value stores, e.g., [StreamsBuilder#table()](https://github.com/apache/kafka/blob/3012332e3d82947e434933efd4ab4e9366ab429d/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java#L260) and [KTable methods](https://github.com/apache/kafka/blob/3012332e3d82947e434933efd4ab4e9366ab429d/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java#L153), which are explicitly typed to accept `Materialized<K, V, KeyValueStore<Bytes, byte[]>`, rather than adding new versions of these methods which accept `Materialized` instances to materialize stores other than `KeyValueStore<Bytes, byte[]>`. During the design phase we had discussed this tradeoff already and decided that the benefit of not needing to introduce new methods here is s
 ubstantial enough to warrant the performance hit from the extra round of serialization and deserialization.
   
   The changelogging layer is a different story. If we wanted to, we could indeed have `ChangeLoggingVersionedKeyValueBytesStore` implement `VersionedKeyValueStore<Bytes, byte[]>` instead of `VersionedBytesStore` to avoid needing to extract value and timestamp bytes from the serialized `rawValueAndTimestamp`. The cost of doing so, however, is extra duplicated code. If we do this, then we cannot have `ChangeLoggingVersionedKeyValueBytesStore extends ChangeLoggingKeyValueBytesStore` anymore because `ChangeLoggingKeyValueBytesStore implements KeyValueStore<Bytes, byte[]>` which clashes with `VersionedKeyValueStore<Bytes, byte[]>`. We also wouldn't be able to have the metered versioned store layer extend `MeteredKeyValueStore` anymore either because `MeteredKeyValueStore` is a `WrappedStateStore` with `KeyValueStore<Bytes, byte[]>` inside, and `ChangeLoggingVersionedKeyValueBytesStore` would no longer count as a `KeyValueStore<Bytes, byte[]>`. So, we'd end up more-or-less duplicating the
  existing code in both `MeteredKeyValueStore` and `ChangeLoggingKeyValueBytesStore` into their versioned counterparts. As a result, I don't think the performance benefit saved from avoiding the extra deserialization in the changelogging layer is worth this extra complexity. FWIW, `ChangeLoggingTimestampedKeyValueBytesStore` also has this same extra deserialization step (separating the concatenated value and timestamp bytes, and deserializing the timestamp for writing to the changelog), I assume for the same reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1116529217


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide
+ * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type &lt;Bytes,byte[]&gt;, so we use {@link Serde}s
+ * to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param <K> The key type
+ * @param <V> The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore<K, V>
+    extends WrappedStateStore<VersionedBytesStore, K, V>
+    implements VersionedKeyValueStore<K, V> {
+
+    private final MeteredVersionedKeyValueStoreInternal internal;
+
+    MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+                                  final String metricScope,
+                                  final Time time,
+                                  final Serde<K> keySerde,
+                                  final Serde<ValueAndTimestamp<V>> valueSerde) {
+        super(inner);
+        internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    /**
+     * Conceptually, {@link MeteredVersionedKeyValueStore} should {@code extend}
+     * {@link MeteredKeyValueStore}, but due to type conflicts, we cannot do this. (Specifically,
+     * the first needs to be {@link VersionedKeyValueStore} while the second is {@link KeyValueStore}
+     * and the two interfaces conflict.) Thus, we use an internal <it>instance</it> of
+     * {@code MeteredKeyValueStore} to mimic inheritance instead.
+     * <p>
+     * It's not ideal because it requires an extra step to translate between the APIs of
+     * {@link VersionedKeyValueStore} in {@link MeteredVersionedKeyValueStore} and
+     * the APIs of {@link TimestampedKeyValueStore} in {@link MeteredVersionedKeyValueStoreInternal}.
+     * This extra step is all that the methods of {@code MeteredVersionedKeyValueStoreInternal} do.
+     * <p>
+     * Note that the addition of {@link #get(Object, long)} and {@link #delete(Object, long)} in
+     * this class are to match the interface of {@link VersionedKeyValueStore}.
+     */
+    private class MeteredVersionedKeyValueStoreInternal
+        extends MeteredKeyValueStore<K, ValueAndTimestamp<V>> {
+
+        private final VersionedBytesStore inner;
+
+        MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
+                                              final String metricScope,
+                                              final Time time,
+                                              final Serde<K> keySerde,
+                                              final Serde<ValueAndTimestamp<V>> valueSerde) {
+            super(inner, metricScope, time, keySerde, valueSerde);
+            this.inner = inner;
+        }
+
+        @Override
+        public void put(final K key, final ValueAndTimestamp<V> value) {
+            if (value == null) {
+                throw new IllegalStateException("Versioned store requires timestamp associated with all puts, including tombstones/deletes");
+            }
+            super.put(key, value);
+        }
+
+        public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            try {
+                return maybeMeasureLatency(() -> outerValue(inner.get(keyBytes(key), asOfTimestamp)), time, getSensor);
+            } catch (final ProcessorStateException e) {
+                final String message = String.format(e.getMessage(), key);
+                throw new ProcessorStateException(message, e);
+            }
+        }
+
+        public ValueAndTimestamp<V> delete(final K key, final long timestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            try {
+                return maybeMeasureLatency(() -> outerValue(inner.delete(keyBytes(key), timestamp)), time, deleteSensor);
+            } catch (final ProcessorStateException e) {
+                final String message = String.format(e.getMessage(), key);
+                throw new ProcessorStateException(message, e);
+            }
+        }
+
+        @Override
+        protected <R> QueryResult<R> runRangeQuery(final Query<R> query,
+                                                   final PositionBound positionBound,
+                                                   final QueryConfig config) {
+            // throw exception for now to reserve the ability to implement this in the future
+            // without clashing with users' custom implementations in the meantime
+            throw new UnsupportedOperationException("Versioned stores do not support RangeQuery queries at this time.");
+        }
+
+        @Override
+        protected <R> QueryResult<R> runKeyQuery(final Query<R> query,
+                                                 final PositionBound positionBound,
+                                                 final QueryConfig config) {
+            // throw exception for now to reserve the ability to implement this in the future
+            // without clashing with users' custom implementations in the meantime
+            throw new UnsupportedOperationException("Versioned stores do not support KeyQuery queries at this time.");
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        protected Serde<ValueAndTimestamp<V>> prepareValueSerdeForStore(
+            final Serde<ValueAndTimestamp<V>> valueSerde,
+            final SerdeGetter getter
+        ) {
+            if (valueSerde == null) {
+                return new NullableValueAndTimestampSerde<>((Serde<V>) getter.valueSerde());
+            } else {
+                return super.prepareValueSerdeForStore(valueSerde, getter);
+            }
+        }
+    }
+
+    @Override
+    public void put(final K key, final V value, final long timestamp) {
+        internal.put(key, ValueAndTimestamp.makeAllowNullable(value, timestamp));
+    }
+
+    @Override
+    public VersionedRecord<V> delete(final K key, final long timestamp) {
+        final ValueAndTimestamp<V> valueAndTimestamp = internal.delete(key, timestamp);
+        return valueAndTimestamp == null
+            ? null
+            : new VersionedRecord<>(valueAndTimestamp.value(), valueAndTimestamp.timestamp());
+    }
+
+    @Override
+    public VersionedRecord<V> get(final K key) {
+        final ValueAndTimestamp<V> valueAndTimestamp = internal.get(key);
+        return valueAndTimestamp == null
+            ? null
+            : new VersionedRecord<>(valueAndTimestamp.value(), valueAndTimestamp.timestamp());
+    }
+
+    @Override
+    public VersionedRecord<V> get(final K key, final long asOfTimestamp) {
+        final ValueAndTimestamp<V> valueAndTimestamp = internal.get(key, asOfTimestamp);
+        return valueAndTimestamp == null
+            ? null
+            : new VersionedRecord<>(valueAndTimestamp.value(), valueAndTimestamp.timestamp());
+    }
+
+    @Override
+    public String name() {

Review Comment:
   All methods from `MeteredVersionedKeyValueStore` delegate to `internal`; the methods in `MeteredVersionedKeyValueStore` are purely a wrapper for the internal `MeteredVersionedKeyValueStoreInternal` instance. All the actual logic happens in `MeteredVersionedKeyValueStoreInternal`, which delegates its methods to `inner`, which is the actual inner store (either a changelogging layer or the inner versioned store implementation itself). The only reason that `MeteredVersionedKeyValueStore` implements `WrappedStateStore` at all is because there are places in the DSL code which expect the outermost store layer to be a `WrappedStateStore` in order to call methods such as `setFlushListener()` and `flushCache()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1108726927


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide
+ * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type &lt;Bytes,byte[]&gt;, so we use {@link Serde}s
+ * to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param <K> The key type
+ * @param <V> The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore<K, V>
+    extends WrappedStateStore<VersionedBytesStore, K, V>
+    implements VersionedKeyValueStore<K, V> {
+
+    private final MeteredVersionedKeyValueStoreInternal internal;
+
+    MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+                                  final String metricScope,
+                                  final Time time,
+                                  final Serde<K> keySerde,
+                                  final Serde<ValueAndTimestamp<V>> valueSerde) {
+        super(inner);
+        internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    /**
+     * Private helper class which represents the functionality of a {@link VersionedKeyValueStore}
+     * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be
+     * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of
+     * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this
+     * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the
+     * {@link VersionedKeyValueStore} interface itself.
+     */
+    private class MeteredVersionedKeyValueStoreInternal
+        extends MeteredKeyValueStore<K, ValueAndTimestamp<V>>
+        implements TimestampedKeyValueStore<K, V> {
+
+        private final VersionedBytesStore inner;
+
+        MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
+                                              final String metricScope,
+                                              final Time time,
+                                              final Serde<K> keySerde,
+                                              final Serde<ValueAndTimestamp<V>> valueSerde) {
+            super(inner, metricScope, time, keySerde, valueSerde);
+            this.inner = inner;
+        }
+
+        @Override
+        public void put(final K key, final ValueAndTimestamp<V> value) {
+            super.put(
+                key,
+                // versioned stores require a timestamp associated with all puts, including tombstones/deletes
+                value == null
+                    ? ValueAndTimestamp.makeAllowNullable(null, context.timestamp())
+                    : value
+            );
+        }
+
+        public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            try {
+                return maybeMeasureLatency(() -> outerValue(inner.get(keyBytes(key), asOfTimestamp)), time, getSensor);
+            } catch (final ProcessorStateException e) {
+                final String message = String.format(e.getMessage(), key);
+                throw new ProcessorStateException(message, e);
+            }
+        }
+
+        public ValueAndTimestamp<V> delete(final K key, final long timestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            try {
+                return maybeMeasureLatency(() -> outerValue(inner.delete(keyBytes(key), timestamp)), time, deleteSensor);
+            } catch (final ProcessorStateException e) {
+                final String message = String.format(e.getMessage(), key);
+                throw new ProcessorStateException(message, e);
+            }
+        }
+
+        @Override
+        public <R> QueryResult<R> query(final Query<R> query,
+                                        final PositionBound positionBound,
+                                        final QueryConfig config) {
+            final long start = time.nanoseconds();
+            final QueryResult<R> result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns");
+            }
+            // do not convert query or return types to/from inner bytes store to user-friendly types

Review Comment:
   This design choice has implications -- I'm curious to hear reviewer thoughts.
   
   The existing MeteredKeyValueStore and MeteredTimestampedKeyValueStore both contain logic to assist in translating `KeyQuery<K, V>` and `RangeQuery<K, V>` types to queries understandable by the underlying bytes store, and to deserialize the returned bytes result back for the original `K` and `V` result types. The return type of these queries for MeteredKeyValueStore and MeteredTimestampedKeyValueStore is `V` for the key query, and `KeyValueIterator<K, V>` for the range query. We'll want to introduce new return types for versioned stores, perhaps `VersionedRecord<V>` for the key query and `Iterator<VersionedRecord<V>>` for the range query. I'd like to postpone this decision-making for a follow-up KIP, but that means not assisting in translating to/from the inner bytes store for now.
   
   So we have three options in the meantime:
   1. Reject all IQv2 queries, since RocksDBVersionedKeyValueStore doesn't support IQv2 yet anyway. I don't think we should do this, because then users who create their own VersionedKeyValueStore implementations won't be able to issue IQv2 queries either.
   2. Allow IQv2 queries as a pass-through at the metered layer. This is what the PR currently does. It means that users who implement their own VersionedKeyValueStores will be able to issue IQv2 queries, but also has backwards compatibility implications for when we introduce type serialization/deserialization support for key and range queries in the future. The behavior at this metered store layer would suddenly change for those query types.
   3. Reject KeyQuery and RangeQuery types in order to reserve them for future behaviors, and pass-through all other query types. This avoids compatibility concerns for the future, while still allowing users to issue IQv2 queries to their own VersionedKeyValueStore implementations in the meantime.
   
   I'm curious to know whether there's existing precedent / discussion on this compatibility concern for introducing custom logic at the metered layer for handling specific types of IQv2 queries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1116343074


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class MeteredVersionedKeyValueStoreTest {
+
+    private static final String STORE_NAME = "versioned_store";
+    private static final Serde<String> STRING_SERDE = new StringSerde();
+    private static final Serde<ValueAndTimestamp<String>> VALUE_AND_TIMESTAMP_SERDE = new NullableValueAndTimestampSerde<>(STRING_SERDE);
+    private static final String METRICS_SCOPE = "scope";
+    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
+    private static final String APPLICATION_ID = "test-app";
+    private static final TaskId TASK_ID = new TaskId(0, 0, "My-Topology");
+
+    private static final String KEY = "k";
+    private static final String VALUE = "v";
+    private static final long TIMESTAMP = 10L;
+    private static final Bytes RAW_KEY = new Bytes(STRING_SERDE.serializer().serialize(null, KEY));
+    private static final byte[] RAW_VALUE_AND_TIMESTAMP = VALUE_AND_TIMESTAMP_SERDE.serializer()
+        .serialize(null, ValueAndTimestamp.make(VALUE, TIMESTAMP));
+
+    private final VersionedBytesStore inner = mock(VersionedBytesStore.class);
+    private final Metrics metrics = new Metrics();
+    private final Time mockTime = new MockTime();
+    private final String threadId = Thread.currentThread().getName();
+    private InternalProcessorContext context = mock(InternalProcessorContext.class);
+    private Map<String, String> tags;
+
+    private MeteredVersionedKeyValueStore<String, String> store;
+
+    @Before
+    public void setUp() {
+        when(inner.name()).thenReturn(STORE_NAME);
+        when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime));
+        when(context.applicationId()).thenReturn(APPLICATION_ID);
+        when(context.taskId()).thenReturn(TASK_ID);
+
+        metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
+        tags = mkMap(
+            mkEntry("thread-id", threadId),
+            mkEntry("task-id", TASK_ID.toString()),
+            mkEntry(METRICS_SCOPE + "-state-id", STORE_NAME)
+        );
+
+        store = newMeteredStore(inner);
+        store.init((StateStoreContext) context, store);
+    }
+
+    private MeteredVersionedKeyValueStore<String, String> newMeteredStore(final VersionedBytesStore inner) {
+        return new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            STRING_SERDE,
+            VALUE_AND_TIMESTAMP_SERDE
+        );
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        // recreate store in order to re-init
+        store.close();
+        final VersionedBytesStore mockInner = mock(VersionedBytesStore.class);
+        store = newMeteredStore(mockInner);
+
+        store.init((ProcessorContext) context, store);
+
+        verify(mockInner).init((ProcessorContext) context, store);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        // init is already called in setUp()
+        verify(inner).init((StateStoreContext) context, store);
+    }
+
+    @Test
+    public void shouldPassChangelogTopicNameToStateStoreSerde() {
+        final String changelogTopicName = "changelog-topic";
+        when(context.changelogFor(STORE_NAME)).thenReturn(changelogTopicName);
+        doShouldPassChangelogTopicNameToStateStoreSerde(changelogTopicName);
+    }
+
+    @Test
+    public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
+        final String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, TASK_ID.topologyName());
+        when(context.changelogFor(STORE_NAME)).thenReturn(null);
+        doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doShouldPassChangelogTopicNameToStateStoreSerde(final String changelogTopicName) {
+        // recreate store with mock serdes
+        final Serde<String> keySerde = mock(Serde.class);
+        final Serializer<String> keySerializer = mock(Serializer.class);
+        final Serde<String> valueSerde = mock(Serde.class);
+        final Serializer<String> valueSerializer = mock(Serializer.class);
+        final Deserializer<String> valueDeserializer = mock(Deserializer.class);
+        when(keySerde.serializer()).thenReturn(keySerializer);
+        when(valueSerde.serializer()).thenReturn(valueSerializer);
+        when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+        store.close();
+        store = new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            keySerde,
+            new NullableValueAndTimestampSerde<>(valueSerde)
+        );
+        store.init((StateStoreContext) context, store);
+
+        store.put(KEY, VALUE, TIMESTAMP);
+
+        verify(keySerializer).serialize(changelogTopicName, KEY);
+        verify(valueSerializer).serialize(changelogTopicName, VALUE);
+    }
+
+    @Test
+    public void shouldRecordMetricsOnInit() {
+        // init is called in setUp(). it suffices to verify one restore metric since all restore
+        // metrics are recorded by the same sensor, and the sensor is tested elsewhere.
+        assertThat((Double) getMetric("restore-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnPut() {
+        store.put(KEY, VALUE, TIMESTAMP);
+
+        verify(inner).put(RAW_KEY, RAW_VALUE_AND_TIMESTAMP);
+        assertThat((Double) getMetric("put-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnDelete() {
+        store.delete(KEY, TIMESTAMP);
+
+        verify(inner).delete(RAW_KEY, TIMESTAMP);
+        assertThat((Double) getMetric("delete-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnGet() {
+        store.get(KEY);

Review Comment:
   I think we should also mock inner.get() to let it return something useful and check that we get the expected result handed back here.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class MeteredVersionedKeyValueStoreTest {
+
+    private static final String STORE_NAME = "versioned_store";
+    private static final Serde<String> STRING_SERDE = new StringSerde();
+    private static final Serde<ValueAndTimestamp<String>> VALUE_AND_TIMESTAMP_SERDE = new NullableValueAndTimestampSerde<>(STRING_SERDE);
+    private static final String METRICS_SCOPE = "scope";
+    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
+    private static final String APPLICATION_ID = "test-app";
+    private static final TaskId TASK_ID = new TaskId(0, 0, "My-Topology");
+
+    private static final String KEY = "k";
+    private static final String VALUE = "v";
+    private static final long TIMESTAMP = 10L;
+    private static final Bytes RAW_KEY = new Bytes(STRING_SERDE.serializer().serialize(null, KEY));
+    private static final byte[] RAW_VALUE_AND_TIMESTAMP = VALUE_AND_TIMESTAMP_SERDE.serializer()
+        .serialize(null, ValueAndTimestamp.make(VALUE, TIMESTAMP));
+
+    private final VersionedBytesStore inner = mock(VersionedBytesStore.class);
+    private final Metrics metrics = new Metrics();
+    private final Time mockTime = new MockTime();
+    private final String threadId = Thread.currentThread().getName();
+    private InternalProcessorContext context = mock(InternalProcessorContext.class);
+    private Map<String, String> tags;
+
+    private MeteredVersionedKeyValueStore<String, String> store;
+
+    @Before
+    public void setUp() {
+        when(inner.name()).thenReturn(STORE_NAME);
+        when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime));
+        when(context.applicationId()).thenReturn(APPLICATION_ID);
+        when(context.taskId()).thenReturn(TASK_ID);
+
+        metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
+        tags = mkMap(
+            mkEntry("thread-id", threadId),
+            mkEntry("task-id", TASK_ID.toString()),
+            mkEntry(METRICS_SCOPE + "-state-id", STORE_NAME)
+        );
+
+        store = newMeteredStore(inner);
+        store.init((StateStoreContext) context, store);
+    }
+
+    private MeteredVersionedKeyValueStore<String, String> newMeteredStore(final VersionedBytesStore inner) {
+        return new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            STRING_SERDE,
+            VALUE_AND_TIMESTAMP_SERDE
+        );
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        // recreate store in order to re-init
+        store.close();
+        final VersionedBytesStore mockInner = mock(VersionedBytesStore.class);
+        store = newMeteredStore(mockInner);
+
+        store.init((ProcessorContext) context, store);
+
+        verify(mockInner).init((ProcessorContext) context, store);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        // init is already called in setUp()
+        verify(inner).init((StateStoreContext) context, store);
+    }
+
+    @Test
+    public void shouldPassChangelogTopicNameToStateStoreSerde() {
+        final String changelogTopicName = "changelog-topic";
+        when(context.changelogFor(STORE_NAME)).thenReturn(changelogTopicName);
+        doShouldPassChangelogTopicNameToStateStoreSerde(changelogTopicName);
+    }
+
+    @Test
+    public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
+        final String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, TASK_ID.topologyName());
+        when(context.changelogFor(STORE_NAME)).thenReturn(null);
+        doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doShouldPassChangelogTopicNameToStateStoreSerde(final String changelogTopicName) {
+        // recreate store with mock serdes
+        final Serde<String> keySerde = mock(Serde.class);
+        final Serializer<String> keySerializer = mock(Serializer.class);
+        final Serde<String> valueSerde = mock(Serde.class);
+        final Serializer<String> valueSerializer = mock(Serializer.class);
+        final Deserializer<String> valueDeserializer = mock(Deserializer.class);
+        when(keySerde.serializer()).thenReturn(keySerializer);
+        when(valueSerde.serializer()).thenReturn(valueSerializer);
+        when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+        store.close();
+        store = new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            keySerde,
+            new NullableValueAndTimestampSerde<>(valueSerde)
+        );
+        store.init((StateStoreContext) context, store);
+
+        store.put(KEY, VALUE, TIMESTAMP);
+
+        verify(keySerializer).serialize(changelogTopicName, KEY);
+        verify(valueSerializer).serialize(changelogTopicName, VALUE);
+    }
+
+    @Test
+    public void shouldRecordMetricsOnInit() {
+        // init is called in setUp(). it suffices to verify one restore metric since all restore
+        // metrics are recorded by the same sensor, and the sensor is tested elsewhere.
+        assertThat((Double) getMetric("restore-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnPut() {
+        store.put(KEY, VALUE, TIMESTAMP);
+
+        verify(inner).put(RAW_KEY, RAW_VALUE_AND_TIMESTAMP);
+        assertThat((Double) getMetric("put-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnDelete() {
+        store.delete(KEY, TIMESTAMP);
+
+        verify(inner).delete(RAW_KEY, TIMESTAMP);
+        assertThat((Double) getMetric("delete-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnGet() {
+        store.get(KEY);
+
+        verify(inner).get(RAW_KEY);
+        assertThat((Double) getMetric("get-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnGetWithTimestamp() {
+        store.get(KEY, TIMESTAMP);

Review Comment:
   As above.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class MeteredVersionedKeyValueStoreTest {
+
+    private static final String STORE_NAME = "versioned_store";
+    private static final Serde<String> STRING_SERDE = new StringSerde();
+    private static final Serde<ValueAndTimestamp<String>> VALUE_AND_TIMESTAMP_SERDE = new NullableValueAndTimestampSerde<>(STRING_SERDE);
+    private static final String METRICS_SCOPE = "scope";
+    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
+    private static final String APPLICATION_ID = "test-app";
+    private static final TaskId TASK_ID = new TaskId(0, 0, "My-Topology");
+
+    private static final String KEY = "k";
+    private static final String VALUE = "v";
+    private static final long TIMESTAMP = 10L;
+    private static final Bytes RAW_KEY = new Bytes(STRING_SERDE.serializer().serialize(null, KEY));
+    private static final byte[] RAW_VALUE_AND_TIMESTAMP = VALUE_AND_TIMESTAMP_SERDE.serializer()
+        .serialize(null, ValueAndTimestamp.make(VALUE, TIMESTAMP));
+
+    private final VersionedBytesStore inner = mock(VersionedBytesStore.class);
+    private final Metrics metrics = new Metrics();
+    private final Time mockTime = new MockTime();
+    private final String threadId = Thread.currentThread().getName();
+    private InternalProcessorContext context = mock(InternalProcessorContext.class);
+    private Map<String, String> tags;
+
+    private MeteredVersionedKeyValueStore<String, String> store;
+
+    @Before
+    public void setUp() {
+        when(inner.name()).thenReturn(STORE_NAME);
+        when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime));
+        when(context.applicationId()).thenReturn(APPLICATION_ID);
+        when(context.taskId()).thenReturn(TASK_ID);
+
+        metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
+        tags = mkMap(
+            mkEntry("thread-id", threadId),
+            mkEntry("task-id", TASK_ID.toString()),
+            mkEntry(METRICS_SCOPE + "-state-id", STORE_NAME)
+        );
+
+        store = newMeteredStore(inner);
+        store.init((StateStoreContext) context, store);
+    }
+
+    private MeteredVersionedKeyValueStore<String, String> newMeteredStore(final VersionedBytesStore inner) {
+        return new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            STRING_SERDE,
+            VALUE_AND_TIMESTAMP_SERDE
+        );
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        // recreate store in order to re-init
+        store.close();
+        final VersionedBytesStore mockInner = mock(VersionedBytesStore.class);
+        store = newMeteredStore(mockInner);
+
+        store.init((ProcessorContext) context, store);
+
+        verify(mockInner).init((ProcessorContext) context, store);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        // init is already called in setUp()
+        verify(inner).init((StateStoreContext) context, store);
+    }
+
+    @Test
+    public void shouldPassChangelogTopicNameToStateStoreSerde() {
+        final String changelogTopicName = "changelog-topic";
+        when(context.changelogFor(STORE_NAME)).thenReturn(changelogTopicName);
+        doShouldPassChangelogTopicNameToStateStoreSerde(changelogTopicName);
+    }
+
+    @Test
+    public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
+        final String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, TASK_ID.topologyName());
+        when(context.changelogFor(STORE_NAME)).thenReturn(null);
+        doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doShouldPassChangelogTopicNameToStateStoreSerde(final String changelogTopicName) {
+        // recreate store with mock serdes
+        final Serde<String> keySerde = mock(Serde.class);
+        final Serializer<String> keySerializer = mock(Serializer.class);
+        final Serde<String> valueSerde = mock(Serde.class);
+        final Serializer<String> valueSerializer = mock(Serializer.class);
+        final Deserializer<String> valueDeserializer = mock(Deserializer.class);
+        when(keySerde.serializer()).thenReturn(keySerializer);
+        when(valueSerde.serializer()).thenReturn(valueSerializer);
+        when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+        store.close();
+        store = new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            keySerde,
+            new NullableValueAndTimestampSerde<>(valueSerde)
+        );
+        store.init((StateStoreContext) context, store);
+
+        store.put(KEY, VALUE, TIMESTAMP);
+
+        verify(keySerializer).serialize(changelogTopicName, KEY);
+        verify(valueSerializer).serialize(changelogTopicName, VALUE);
+    }
+
+    @Test
+    public void shouldRecordMetricsOnInit() {
+        // init is called in setUp(). it suffices to verify one restore metric since all restore
+        // metrics are recorded by the same sensor, and the sensor is tested elsewhere.
+        assertThat((Double) getMetric("restore-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnPut() {
+        store.put(KEY, VALUE, TIMESTAMP);
+
+        verify(inner).put(RAW_KEY, RAW_VALUE_AND_TIMESTAMP);
+        assertThat((Double) getMetric("put-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnDelete() {
+        store.delete(KEY, TIMESTAMP);

Review Comment:
   as below



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class MeteredVersionedKeyValueStoreTest {
+
+    private static final String STORE_NAME = "versioned_store";
+    private static final Serde<String> STRING_SERDE = new StringSerde();
+    private static final Serde<ValueAndTimestamp<String>> VALUE_AND_TIMESTAMP_SERDE = new NullableValueAndTimestampSerde<>(STRING_SERDE);
+    private static final String METRICS_SCOPE = "scope";
+    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
+    private static final String APPLICATION_ID = "test-app";
+    private static final TaskId TASK_ID = new TaskId(0, 0, "My-Topology");
+
+    private static final String KEY = "k";
+    private static final String VALUE = "v";
+    private static final long TIMESTAMP = 10L;
+    private static final Bytes RAW_KEY = new Bytes(STRING_SERDE.serializer().serialize(null, KEY));
+    private static final byte[] RAW_VALUE_AND_TIMESTAMP = VALUE_AND_TIMESTAMP_SERDE.serializer()
+        .serialize(null, ValueAndTimestamp.make(VALUE, TIMESTAMP));
+
+    private final VersionedBytesStore inner = mock(VersionedBytesStore.class);
+    private final Metrics metrics = new Metrics();
+    private final Time mockTime = new MockTime();
+    private final String threadId = Thread.currentThread().getName();
+    private InternalProcessorContext context = mock(InternalProcessorContext.class);
+    private Map<String, String> tags;
+
+    private MeteredVersionedKeyValueStore<String, String> store;
+
+    @Before
+    public void setUp() {
+        when(inner.name()).thenReturn(STORE_NAME);
+        when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime));
+        when(context.applicationId()).thenReturn(APPLICATION_ID);
+        when(context.taskId()).thenReturn(TASK_ID);
+
+        metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
+        tags = mkMap(
+            mkEntry("thread-id", threadId),
+            mkEntry("task-id", TASK_ID.toString()),
+            mkEntry(METRICS_SCOPE + "-state-id", STORE_NAME)
+        );
+
+        store = newMeteredStore(inner);
+        store.init((StateStoreContext) context, store);
+    }
+
+    private MeteredVersionedKeyValueStore<String, String> newMeteredStore(final VersionedBytesStore inner) {
+        return new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            STRING_SERDE,
+            VALUE_AND_TIMESTAMP_SERDE
+        );
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        // recreate store in order to re-init
+        store.close();
+        final VersionedBytesStore mockInner = mock(VersionedBytesStore.class);
+        store = newMeteredStore(mockInner);
+
+        store.init((ProcessorContext) context, store);
+
+        verify(mockInner).init((ProcessorContext) context, store);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        // init is already called in setUp()
+        verify(inner).init((StateStoreContext) context, store);
+    }
+
+    @Test
+    public void shouldPassChangelogTopicNameToStateStoreSerde() {
+        final String changelogTopicName = "changelog-topic";
+        when(context.changelogFor(STORE_NAME)).thenReturn(changelogTopicName);
+        doShouldPassChangelogTopicNameToStateStoreSerde(changelogTopicName);
+    }
+
+    @Test
+    public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
+        final String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, TASK_ID.topologyName());
+        when(context.changelogFor(STORE_NAME)).thenReturn(null);
+        doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doShouldPassChangelogTopicNameToStateStoreSerde(final String changelogTopicName) {
+        // recreate store with mock serdes
+        final Serde<String> keySerde = mock(Serde.class);
+        final Serializer<String> keySerializer = mock(Serializer.class);
+        final Serde<String> valueSerde = mock(Serde.class);
+        final Serializer<String> valueSerializer = mock(Serializer.class);
+        final Deserializer<String> valueDeserializer = mock(Deserializer.class);
+        when(keySerde.serializer()).thenReturn(keySerializer);
+        when(valueSerde.serializer()).thenReturn(valueSerializer);
+        when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+        store.close();
+        store = new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            keySerde,
+            new NullableValueAndTimestampSerde<>(valueSerde)
+        );
+        store.init((StateStoreContext) context, store);
+
+        store.put(KEY, VALUE, TIMESTAMP);
+
+        verify(keySerializer).serialize(changelogTopicName, KEY);
+        verify(valueSerializer).serialize(changelogTopicName, VALUE);
+    }
+
+    @Test
+    public void shouldRecordMetricsOnInit() {
+        // init is called in setUp(). it suffices to verify one restore metric since all restore
+        // metrics are recorded by the same sensor, and the sensor is tested elsewhere.
+        assertThat((Double) getMetric("restore-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnPut() {
+        store.put(KEY, VALUE, TIMESTAMP);
+
+        verify(inner).put(RAW_KEY, RAW_VALUE_AND_TIMESTAMP);
+        assertThat((Double) getMetric("put-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnDelete() {
+        store.delete(KEY, TIMESTAMP);
+
+        verify(inner).delete(RAW_KEY, TIMESTAMP);
+        assertThat((Double) getMetric("delete-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnGet() {
+        store.get(KEY);
+
+        verify(inner).get(RAW_KEY);
+        assertThat((Double) getMetric("get-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnGetWithTimestamp() {
+        store.get(KEY, TIMESTAMP);
+
+        verify(inner).get(RAW_KEY, TIMESTAMP);
+        assertThat((Double) getMetric("get-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnFlush() {
+        store.flush();
+
+        verify(inner).flush();
+        assertThat((Double) getMetric("flush-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRemoveMetricsOnClose() {
+        assertThat(storeMetrics(), not(empty()));
+
+        store.close();
+
+        verify(inner).close();
+        assertThat(storeMetrics(), empty());
+    }
+
+    @Test
+    public void shouldRemoveMetricsOnCloseEvenIfInnerThrows() {

Review Comment:
   as above



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class MeteredVersionedKeyValueStoreTest {
+
+    private static final String STORE_NAME = "versioned_store";
+    private static final Serde<String> STRING_SERDE = new StringSerde();
+    private static final Serde<ValueAndTimestamp<String>> VALUE_AND_TIMESTAMP_SERDE = new NullableValueAndTimestampSerde<>(STRING_SERDE);
+    private static final String METRICS_SCOPE = "scope";
+    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
+    private static final String APPLICATION_ID = "test-app";
+    private static final TaskId TASK_ID = new TaskId(0, 0, "My-Topology");
+
+    private static final String KEY = "k";
+    private static final String VALUE = "v";
+    private static final long TIMESTAMP = 10L;
+    private static final Bytes RAW_KEY = new Bytes(STRING_SERDE.serializer().serialize(null, KEY));
+    private static final byte[] RAW_VALUE_AND_TIMESTAMP = VALUE_AND_TIMESTAMP_SERDE.serializer()
+        .serialize(null, ValueAndTimestamp.make(VALUE, TIMESTAMP));
+
+    private final VersionedBytesStore inner = mock(VersionedBytesStore.class);
+    private final Metrics metrics = new Metrics();
+    private final Time mockTime = new MockTime();
+    private final String threadId = Thread.currentThread().getName();
+    private InternalProcessorContext context = mock(InternalProcessorContext.class);
+    private Map<String, String> tags;
+
+    private MeteredVersionedKeyValueStore<String, String> store;
+
+    @Before
+    public void setUp() {
+        when(inner.name()).thenReturn(STORE_NAME);
+        when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime));
+        when(context.applicationId()).thenReturn(APPLICATION_ID);
+        when(context.taskId()).thenReturn(TASK_ID);
+
+        metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
+        tags = mkMap(
+            mkEntry("thread-id", threadId),
+            mkEntry("task-id", TASK_ID.toString()),
+            mkEntry(METRICS_SCOPE + "-state-id", STORE_NAME)
+        );
+
+        store = newMeteredStore(inner);
+        store.init((StateStoreContext) context, store);
+    }
+
+    private MeteredVersionedKeyValueStore<String, String> newMeteredStore(final VersionedBytesStore inner) {
+        return new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            STRING_SERDE,
+            VALUE_AND_TIMESTAMP_SERDE
+        );
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        // recreate store in order to re-init
+        store.close();
+        final VersionedBytesStore mockInner = mock(VersionedBytesStore.class);
+        store = newMeteredStore(mockInner);
+
+        store.init((ProcessorContext) context, store);
+
+        verify(mockInner).init((ProcessorContext) context, store);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        // init is already called in setUp()
+        verify(inner).init((StateStoreContext) context, store);
+    }
+
+    @Test
+    public void shouldPassChangelogTopicNameToStateStoreSerde() {
+        final String changelogTopicName = "changelog-topic";
+        when(context.changelogFor(STORE_NAME)).thenReturn(changelogTopicName);
+        doShouldPassChangelogTopicNameToStateStoreSerde(changelogTopicName);
+    }
+
+    @Test
+    public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
+        final String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, TASK_ID.topologyName());
+        when(context.changelogFor(STORE_NAME)).thenReturn(null);
+        doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doShouldPassChangelogTopicNameToStateStoreSerde(final String changelogTopicName) {
+        // recreate store with mock serdes
+        final Serde<String> keySerde = mock(Serde.class);
+        final Serializer<String> keySerializer = mock(Serializer.class);
+        final Serde<String> valueSerde = mock(Serde.class);
+        final Serializer<String> valueSerializer = mock(Serializer.class);
+        final Deserializer<String> valueDeserializer = mock(Deserializer.class);
+        when(keySerde.serializer()).thenReturn(keySerializer);
+        when(valueSerde.serializer()).thenReturn(valueSerializer);
+        when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+        store.close();
+        store = new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            keySerde,
+            new NullableValueAndTimestampSerde<>(valueSerde)
+        );
+        store.init((StateStoreContext) context, store);
+
+        store.put(KEY, VALUE, TIMESTAMP);
+
+        verify(keySerializer).serialize(changelogTopicName, KEY);
+        verify(valueSerializer).serialize(changelogTopicName, VALUE);
+    }
+
+    @Test
+    public void shouldRecordMetricsOnInit() {
+        // init is called in setUp(). it suffices to verify one restore metric since all restore
+        // metrics are recorded by the same sensor, and the sensor is tested elsewhere.
+        assertThat((Double) getMetric("restore-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnPut() {
+        store.put(KEY, VALUE, TIMESTAMP);
+
+        verify(inner).put(RAW_KEY, RAW_VALUE_AND_TIMESTAMP);
+        assertThat((Double) getMetric("put-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnDelete() {
+        store.delete(KEY, TIMESTAMP);
+
+        verify(inner).delete(RAW_KEY, TIMESTAMP);
+        assertThat((Double) getMetric("delete-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnGet() {
+        store.get(KEY);
+
+        verify(inner).get(RAW_KEY);
+        assertThat((Double) getMetric("get-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnGetWithTimestamp() {
+        store.get(KEY, TIMESTAMP);
+
+        verify(inner).get(RAW_KEY, TIMESTAMP);
+        assertThat((Double) getMetric("get-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnFlush() {
+        store.flush();
+
+        verify(inner).flush();
+        assertThat((Double) getMetric("flush-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRemoveMetricsOnClose() {

Review Comment:
   Do we need this test? Seem it's rather testing the wrapped-store?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class MeteredVersionedKeyValueStoreTest {
+
+    private static final String STORE_NAME = "versioned_store";
+    private static final Serde<String> STRING_SERDE = new StringSerde();
+    private static final Serde<ValueAndTimestamp<String>> VALUE_AND_TIMESTAMP_SERDE = new NullableValueAndTimestampSerde<>(STRING_SERDE);
+    private static final String METRICS_SCOPE = "scope";
+    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
+    private static final String APPLICATION_ID = "test-app";
+    private static final TaskId TASK_ID = new TaskId(0, 0, "My-Topology");
+
+    private static final String KEY = "k";
+    private static final String VALUE = "v";
+    private static final long TIMESTAMP = 10L;
+    private static final Bytes RAW_KEY = new Bytes(STRING_SERDE.serializer().serialize(null, KEY));
+    private static final byte[] RAW_VALUE_AND_TIMESTAMP = VALUE_AND_TIMESTAMP_SERDE.serializer()
+        .serialize(null, ValueAndTimestamp.make(VALUE, TIMESTAMP));
+
+    private final VersionedBytesStore inner = mock(VersionedBytesStore.class);
+    private final Metrics metrics = new Metrics();
+    private final Time mockTime = new MockTime();
+    private final String threadId = Thread.currentThread().getName();
+    private InternalProcessorContext context = mock(InternalProcessorContext.class);
+    private Map<String, String> tags;
+
+    private MeteredVersionedKeyValueStore<String, String> store;
+
+    @Before
+    public void setUp() {
+        when(inner.name()).thenReturn(STORE_NAME);
+        when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime));
+        when(context.applicationId()).thenReturn(APPLICATION_ID);
+        when(context.taskId()).thenReturn(TASK_ID);
+
+        metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
+        tags = mkMap(
+            mkEntry("thread-id", threadId),
+            mkEntry("task-id", TASK_ID.toString()),
+            mkEntry(METRICS_SCOPE + "-state-id", STORE_NAME)
+        );
+
+        store = newMeteredStore(inner);
+        store.init((StateStoreContext) context, store);
+    }
+
+    private MeteredVersionedKeyValueStore<String, String> newMeteredStore(final VersionedBytesStore inner) {
+        return new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            STRING_SERDE,
+            VALUE_AND_TIMESTAMP_SERDE
+        );
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        // recreate store in order to re-init
+        store.close();
+        final VersionedBytesStore mockInner = mock(VersionedBytesStore.class);
+        store = newMeteredStore(mockInner);
+
+        store.init((ProcessorContext) context, store);
+
+        verify(mockInner).init((ProcessorContext) context, store);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        // init is already called in setUp()
+        verify(inner).init((StateStoreContext) context, store);

Review Comment:
   Should we also verify the delegation of `persistent()`, `isOpen()`, `getPosition()` and `name()`  (hope I did not forget any method).



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide
+ * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type &lt;Bytes,byte[]&gt;, so we use {@link Serde}s
+ * to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param <K> The key type
+ * @param <V> The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore<K, V>
+    extends WrappedStateStore<VersionedBytesStore, K, V>
+    implements VersionedKeyValueStore<K, V> {
+
+    private final MeteredVersionedKeyValueStoreInternal internal;
+
+    MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+                                  final String metricScope,
+                                  final Time time,
+                                  final Serde<K> keySerde,
+                                  final Serde<ValueAndTimestamp<V>> valueSerde) {
+        super(inner);
+        internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    /**
+     * Conceptually, {@link MeteredVersionedKeyValueStore} should {@code extend}
+     * {@link MeteredKeyValueStore}, but due to type conflicts, we cannot do this. (Specifically,
+     * the first needs to be {@link VersionedKeyValueStore} while the second is {@link KeyValueStore}
+     * and the two interfaces conflict.) Thus, we use an internal <it>instance</it> of
+     * {@code MeteredKeyValueStore} to mimic inheritance instead.
+     * <p>
+     * It's not ideal because it requires an extra step to translate between the APIs of
+     * {@link VersionedKeyValueStore} in {@link MeteredVersionedKeyValueStore} and
+     * the APIs of {@link TimestampedKeyValueStore} in {@link MeteredVersionedKeyValueStoreInternal}.
+     * This extra step is all that the methods of {@code MeteredVersionedKeyValueStoreInternal} do.
+     * <p>
+     * Note that the addition of {@link #get(Object, long)} and {@link #delete(Object, long)} in
+     * this class are to match the interface of {@link VersionedKeyValueStore}.
+     */
+    private class MeteredVersionedKeyValueStoreInternal
+        extends MeteredKeyValueStore<K, ValueAndTimestamp<V>> {
+
+        private final VersionedBytesStore inner;
+
+        MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
+                                              final String metricScope,
+                                              final Time time,
+                                              final Serde<K> keySerde,
+                                              final Serde<ValueAndTimestamp<V>> valueSerde) {
+            super(inner, metricScope, time, keySerde, valueSerde);
+            this.inner = inner;
+        }
+
+        @Override
+        public void put(final K key, final ValueAndTimestamp<V> value) {
+            if (value == null) {
+                throw new IllegalStateException("Versioned store requires timestamp associated with all puts, including tombstones/deletes");
+            }
+            super.put(key, value);
+        }
+
+        public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            try {
+                return maybeMeasureLatency(() -> outerValue(inner.get(keyBytes(key), asOfTimestamp)), time, getSensor);
+            } catch (final ProcessorStateException e) {
+                final String message = String.format(e.getMessage(), key);
+                throw new ProcessorStateException(message, e);
+            }
+        }
+
+        public ValueAndTimestamp<V> delete(final K key, final long timestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            try {
+                return maybeMeasureLatency(() -> outerValue(inner.delete(keyBytes(key), timestamp)), time, deleteSensor);
+            } catch (final ProcessorStateException e) {
+                final String message = String.format(e.getMessage(), key);
+                throw new ProcessorStateException(message, e);
+            }
+        }
+
+        @Override
+        protected <R> QueryResult<R> runRangeQuery(final Query<R> query,
+                                                   final PositionBound positionBound,
+                                                   final QueryConfig config) {
+            // throw exception for now to reserve the ability to implement this in the future
+            // without clashing with users' custom implementations in the meantime
+            throw new UnsupportedOperationException("Versioned stores do not support RangeQuery queries at this time.");
+        }
+
+        @Override
+        protected <R> QueryResult<R> runKeyQuery(final Query<R> query,
+                                                 final PositionBound positionBound,
+                                                 final QueryConfig config) {
+            // throw exception for now to reserve the ability to implement this in the future
+            // without clashing with users' custom implementations in the meantime
+            throw new UnsupportedOperationException("Versioned stores do not support KeyQuery queries at this time.");
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        protected Serde<ValueAndTimestamp<V>> prepareValueSerdeForStore(
+            final Serde<ValueAndTimestamp<V>> valueSerde,
+            final SerdeGetter getter
+        ) {
+            if (valueSerde == null) {
+                return new NullableValueAndTimestampSerde<>((Serde<V>) getter.valueSerde());
+            } else {
+                return super.prepareValueSerdeForStore(valueSerde, getter);
+            }
+        }
+    }
+
+    @Override
+    public void put(final K key, final V value, final long timestamp) {
+        internal.put(key, ValueAndTimestamp.makeAllowNullable(value, timestamp));
+    }
+
+    @Override
+    public VersionedRecord<V> delete(final K key, final long timestamp) {
+        final ValueAndTimestamp<V> valueAndTimestamp = internal.delete(key, timestamp);
+        return valueAndTimestamp == null
+            ? null
+            : new VersionedRecord<>(valueAndTimestamp.value(), valueAndTimestamp.timestamp());
+    }
+
+    @Override
+    public VersionedRecord<V> get(final K key) {
+        final ValueAndTimestamp<V> valueAndTimestamp = internal.get(key);
+        return valueAndTimestamp == null
+            ? null
+            : new VersionedRecord<>(valueAndTimestamp.value(), valueAndTimestamp.timestamp());
+    }
+
+    @Override
+    public VersionedRecord<V> get(final K key, final long asOfTimestamp) {
+        final ValueAndTimestamp<V> valueAndTimestamp = internal.get(key, asOfTimestamp);
+        return valueAndTimestamp == null
+            ? null
+            : new VersionedRecord<>(valueAndTimestamp.value(), valueAndTimestamp.timestamp());
+    }
+
+    @Override
+    public String name() {

Review Comment:
   It seems we overwrite some method to delegate to `internal.x()` instead of `inner.x()` but I am not sure what the pattern is. Can you elaborate?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class MeteredVersionedKeyValueStoreTest {
+
+    private static final String STORE_NAME = "versioned_store";
+    private static final Serde<String> STRING_SERDE = new StringSerde();
+    private static final Serde<ValueAndTimestamp<String>> VALUE_AND_TIMESTAMP_SERDE = new NullableValueAndTimestampSerde<>(STRING_SERDE);
+    private static final String METRICS_SCOPE = "scope";
+    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
+    private static final String APPLICATION_ID = "test-app";
+    private static final TaskId TASK_ID = new TaskId(0, 0, "My-Topology");
+
+    private static final String KEY = "k";
+    private static final String VALUE = "v";
+    private static final long TIMESTAMP = 10L;
+    private static final Bytes RAW_KEY = new Bytes(STRING_SERDE.serializer().serialize(null, KEY));
+    private static final byte[] RAW_VALUE_AND_TIMESTAMP = VALUE_AND_TIMESTAMP_SERDE.serializer()
+        .serialize(null, ValueAndTimestamp.make(VALUE, TIMESTAMP));
+
+    private final VersionedBytesStore inner = mock(VersionedBytesStore.class);
+    private final Metrics metrics = new Metrics();
+    private final Time mockTime = new MockTime();
+    private final String threadId = Thread.currentThread().getName();
+    private InternalProcessorContext context = mock(InternalProcessorContext.class);
+    private Map<String, String> tags;
+
+    private MeteredVersionedKeyValueStore<String, String> store;
+
+    @Before
+    public void setUp() {
+        when(inner.name()).thenReturn(STORE_NAME);
+        when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime));
+        when(context.applicationId()).thenReturn(APPLICATION_ID);
+        when(context.taskId()).thenReturn(TASK_ID);
+
+        metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
+        tags = mkMap(
+            mkEntry("thread-id", threadId),
+            mkEntry("task-id", TASK_ID.toString()),
+            mkEntry(METRICS_SCOPE + "-state-id", STORE_NAME)
+        );
+
+        store = newMeteredStore(inner);
+        store.init((StateStoreContext) context, store);
+    }
+
+    private MeteredVersionedKeyValueStore<String, String> newMeteredStore(final VersionedBytesStore inner) {
+        return new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            STRING_SERDE,
+            VALUE_AND_TIMESTAMP_SERDE
+        );
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        // recreate store in order to re-init
+        store.close();
+        final VersionedBytesStore mockInner = mock(VersionedBytesStore.class);
+        store = newMeteredStore(mockInner);
+
+        store.init((ProcessorContext) context, store);
+
+        verify(mockInner).init((ProcessorContext) context, store);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        // init is already called in setUp()
+        verify(inner).init((StateStoreContext) context, store);
+    }
+
+    @Test
+    public void shouldPassChangelogTopicNameToStateStoreSerde() {
+        final String changelogTopicName = "changelog-topic";
+        when(context.changelogFor(STORE_NAME)).thenReturn(changelogTopicName);
+        doShouldPassChangelogTopicNameToStateStoreSerde(changelogTopicName);
+    }
+
+    @Test
+    public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
+        final String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, STORE_NAME, TASK_ID.topologyName());
+        when(context.changelogFor(STORE_NAME)).thenReturn(null);
+        doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doShouldPassChangelogTopicNameToStateStoreSerde(final String changelogTopicName) {
+        // recreate store with mock serdes
+        final Serde<String> keySerde = mock(Serde.class);
+        final Serializer<String> keySerializer = mock(Serializer.class);
+        final Serde<String> valueSerde = mock(Serde.class);
+        final Serializer<String> valueSerializer = mock(Serializer.class);
+        final Deserializer<String> valueDeserializer = mock(Deserializer.class);
+        when(keySerde.serializer()).thenReturn(keySerializer);
+        when(valueSerde.serializer()).thenReturn(valueSerializer);
+        when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+        store.close();
+        store = new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            keySerde,
+            new NullableValueAndTimestampSerde<>(valueSerde)
+        );
+        store.init((StateStoreContext) context, store);
+
+        store.put(KEY, VALUE, TIMESTAMP);
+
+        verify(keySerializer).serialize(changelogTopicName, KEY);
+        verify(valueSerializer).serialize(changelogTopicName, VALUE);
+    }
+
+    @Test
+    public void shouldRecordMetricsOnInit() {
+        // init is called in setUp(). it suffices to verify one restore metric since all restore
+        // metrics are recorded by the same sensor, and the sensor is tested elsewhere.
+        assertThat((Double) getMetric("restore-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnPut() {
+        store.put(KEY, VALUE, TIMESTAMP);
+
+        verify(inner).put(RAW_KEY, RAW_VALUE_AND_TIMESTAMP);
+        assertThat((Double) getMetric("put-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnDelete() {
+        store.delete(KEY, TIMESTAMP);
+
+        verify(inner).delete(RAW_KEY, TIMESTAMP);
+        assertThat((Double) getMetric("delete-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnGet() {
+        store.get(KEY);
+
+        verify(inner).get(RAW_KEY);
+        assertThat((Double) getMetric("get-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnGetWithTimestamp() {
+        store.get(KEY, TIMESTAMP);
+
+        verify(inner).get(RAW_KEY, TIMESTAMP);
+        assertThat((Double) getMetric("get-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRecordMetricsOnFlush() {
+        store.flush();
+
+        verify(inner).flush();
+        assertThat((Double) getMetric("flush-rate").metricValue(), greaterThan(0.0));
+    }
+
+    @Test
+    public void shouldDelegateAndRemoveMetricsOnClose() {
+        assertThat(storeMetrics(), not(empty()));
+
+        store.close();
+
+        verify(inner).close();
+        assertThat(storeMetrics(), empty());
+    }
+
+    @Test
+    public void shouldRemoveMetricsOnCloseEvenIfInnerThrows() {
+        doThrow(new RuntimeException("uh oh")).when(inner).close();
+        assertThat(storeMetrics(), not(empty()));
+
+        assertThrows(RuntimeException.class, () -> store.close());
+
+        assertThat(storeMetrics(), empty());
+    }
+
+    @Test
+    public void shouldNotSetFlushListenerIfInnerIsNotCaching() {

Review Comment:
   as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1116527739


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class MeteredVersionedKeyValueStoreTest {
+
+    private static final String STORE_NAME = "versioned_store";
+    private static final Serde<String> STRING_SERDE = new StringSerde();
+    private static final Serde<ValueAndTimestamp<String>> VALUE_AND_TIMESTAMP_SERDE = new NullableValueAndTimestampSerde<>(STRING_SERDE);
+    private static final String METRICS_SCOPE = "scope";
+    private static final String STORE_LEVEL_GROUP = "stream-state-metrics";
+    private static final String APPLICATION_ID = "test-app";
+    private static final TaskId TASK_ID = new TaskId(0, 0, "My-Topology");
+
+    private static final String KEY = "k";
+    private static final String VALUE = "v";
+    private static final long TIMESTAMP = 10L;
+    private static final Bytes RAW_KEY = new Bytes(STRING_SERDE.serializer().serialize(null, KEY));
+    private static final byte[] RAW_VALUE_AND_TIMESTAMP = VALUE_AND_TIMESTAMP_SERDE.serializer()
+        .serialize(null, ValueAndTimestamp.make(VALUE, TIMESTAMP));
+
+    private final VersionedBytesStore inner = mock(VersionedBytesStore.class);
+    private final Metrics metrics = new Metrics();
+    private final Time mockTime = new MockTime();
+    private final String threadId = Thread.currentThread().getName();
+    private InternalProcessorContext context = mock(InternalProcessorContext.class);
+    private Map<String, String> tags;
+
+    private MeteredVersionedKeyValueStore<String, String> store;
+
+    @Before
+    public void setUp() {
+        when(inner.name()).thenReturn(STORE_NAME);
+        when(context.metrics()).thenReturn(new StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, mockTime));
+        when(context.applicationId()).thenReturn(APPLICATION_ID);
+        when(context.taskId()).thenReturn(TASK_ID);
+
+        metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
+        tags = mkMap(
+            mkEntry("thread-id", threadId),
+            mkEntry("task-id", TASK_ID.toString()),
+            mkEntry(METRICS_SCOPE + "-state-id", STORE_NAME)
+        );
+
+        store = newMeteredStore(inner);
+        store.init((StateStoreContext) context, store);
+    }
+
+    private MeteredVersionedKeyValueStore<String, String> newMeteredStore(final VersionedBytesStore inner) {
+        return new MeteredVersionedKeyValueStore<>(
+            inner,
+            METRICS_SCOPE,
+            mockTime,
+            STRING_SERDE,
+            VALUE_AND_TIMESTAMP_SERDE
+        );
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        // recreate store in order to re-init
+        store.close();
+        final VersionedBytesStore mockInner = mock(VersionedBytesStore.class);
+        store = newMeteredStore(mockInner);
+
+        store.init((ProcessorContext) context, store);
+
+        verify(mockInner).init((ProcessorContext) context, store);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        // init is already called in setUp()
+        verify(inner).init((StateStoreContext) context, store);

Review Comment:
   Sure, added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1114727896


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide
+ * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type &lt;Bytes,byte[]&gt;, so we use {@link Serde}s
+ * to convert from &lt;K,ValueAndTimestamp&lt;V&gt&gt; to &lt;Bytes,byte[]&gt;. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param <K> The key type
+ * @param <V> The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore<K, V>
+    extends WrappedStateStore<VersionedBytesStore, K, V>
+    implements VersionedKeyValueStore<K, V> {
+
+    private final MeteredVersionedKeyValueStoreInternal internal;
+
+    MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+                                  final String metricScope,
+                                  final Time time,
+                                  final Serde<K> keySerde,
+                                  final Serde<ValueAndTimestamp<V>> valueSerde) {
+        super(inner);
+        internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde);
+    }
+
+    /**
+     * Private helper class which represents the functionality of a {@link VersionedKeyValueStore}
+     * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be
+     * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of
+     * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this
+     * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the
+     * {@link VersionedKeyValueStore} interface itself.
+     */
+    private class MeteredVersionedKeyValueStoreInternal
+        extends MeteredKeyValueStore<K, ValueAndTimestamp<V>>
+        implements TimestampedKeyValueStore<K, V> {

Review Comment:
   `MeteredVersionedKeyValueStoreInternal` and `MeteredTimestampKeyValueStore` use different serdes -- `MeteredTimestampKeyValueStore` uses `ValueAndTimestampSerde` while `MeteredVersionedKeyValueStoreInternal` uses `NullableValueAndTimestampSerde`. 
   
   If you look at the code for `MeteredTimestampKeyValueStore`, the only method it overrides from `MeteredKeyValueStore` is `prepareValueSerdeForStore()`. We could have `MeteredVersionedKeyValueStoreInternal` extend `MeteredTimestampKeyValueStore` instead of `MeteredKeyValueStore` if we want but `MeteredVersionedKeyValueStoreInternal` needs to override `prepareValueSerdeForStore()` with its own serde anyway, so `MeteredVersionedKeyValueStoreInternal` would get zero functionality from `MeteredTimestampKeyValueStore`. 
   
   Also, IMO `MeteredVersionedKeyValueStoreInternal` and `MeteredTimestampKeyValueStore` are different conceptually even though they both have the signature `MeteredKeyValueStore<K, ValueAndTimestamp<V>>`. The reason is because the metered layer (in addition to collecting metrics) is responsible for handling serialization to/from the inner bytes stores (pending your other comment, which we can discuss there), and the serialization behavior of the two stores is different, since they use different serdes. This is a more minor reason, though. If you think it's conceptually easier to think of versioned stores as a special case of timestamped stores, we can have `MeteredVersionedKeyValueStoreInternal extends MeteredTimestampKeyValueStore` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org