You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/15 14:14:17 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #8157: KAFKA-9088: Consolidate InternalMockProcessorContext and MockInternalProcessorContext

cadonna commented on a change in pull request #8157:
URL: https://github.com/apache/kafka/pull/8157#discussion_r425728214



##########
File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
##########
@@ -16,40 +16,96 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-public class MockInternalProcessorContext
-    extends MockProcessorContext
-    implements InternalProcessorContext<Object, Object> {
+import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
+
+@SuppressWarnings("rawtypes")
+public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext<Object, Object> {
+
+    public static final TaskId DEFAULT_TASK_ID = new TaskId(0, 0);
+    public static final RecordHeaders DEFAULT_HEADERS = new RecordHeaders();
+    public static final String DEFAULT_TOPIC = "";
+    public static final int DEFAULT_PARTITION = 0;
+    public static final long DEFAULT_OFFSET = 0L;
+    public static final long DEFAULT_TIMESTAMP = 0L;
+    public static final String DEFAULT_CLIENT_ID = "client-id";
+    public static final String DEFAULT_THREAD_CACHE_PREFIX = "testCache ";

Review comment:
       ```suggestion
       public static final String DEFAULT_THREAD_CACHE_PREFIX = "testCache";
   ```

##########
File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContextTest.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.test.MockInternalProcessorContext.DEFAULT_TASK_ID;
+
+public class MockInternalProcessorContextTest {
+
+    @Test
+    public void shouldReturnDefaults() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        verifyDefaultMetricsVersion(context);
+        verifyDefaultRecordCollector(context);
+        verifyDefaultTaskId(context);
+        verifyDefaultTopic(context);
+        verifyDefaultPartition(context);
+        verifyDefaultTimestamp(context);
+        verifyDefaultOffset(context);
+        verifyDefaultHeaders(context);
+        verifyDefaultProcessorNodeName(context);
+    }

Review comment:
       req: Please verify also the configs. You just need to check whether 
   - `StreamsConfig.BOOTSTRAP_SERVERS_CONFIG` is `"localhost:9091"`
   - `StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG` is `Serdes.ByteArraySerde.class.getName()`
   - `StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG` is `Serdes.ByteArraySerde.class.getName()`
   
   Add also a comment saying that these checks verify that the `StreamsTestUtils` generated streams config are set.
   
   Please verify all values you set in the constructors also the record collector, thread cache and so on. This applies also to the other constructor tests.  

##########
File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContextTest.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.test.MockInternalProcessorContext.DEFAULT_TASK_ID;
+
+public class MockInternalProcessorContextTest {
+
+    @Test
+    public void shouldReturnDefaults() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        verifyDefaultMetricsVersion(context);
+        verifyDefaultRecordCollector(context);
+        verifyDefaultTaskId(context);
+        verifyDefaultTopic(context);
+        verifyDefaultPartition(context);
+        verifyDefaultTimestamp(context);
+        verifyDefaultOffset(context);
+        verifyDefaultHeaders(context);
+        verifyDefaultProcessorNodeName(context);
+    }
+
+    @Test
+    public void shouldReturnMetricsVersionLatest() {
+        shouldReturnMetricsVersion(Version.LATEST, StreamsConfig.METRICS_LATEST);
+    }
+
+    @Test
+    public void shouldReturnMetricsVersionFrom0100To24() {
+        shouldReturnMetricsVersion(Version.FROM_0100_TO_24, StreamsConfig.METRICS_0100_TO_24);
+    }
+
+    private static void shouldReturnMetricsVersion(final Version version, final String builtInMetricsVersion) {
+        final Properties properties = StreamsTestUtils.getStreamsConfig();
+        properties.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
+        final MockInternalProcessorContext context = new MockInternalProcessorContext(properties, new Metrics());
+
+        Assert.assertEquals(version, context.metrics().version());
+        verifyDefaultRecordCollector(context);
+        verifyDefaultTaskId(context);
+        verifyDefaultTopic(context);
+        verifyDefaultPartition(context);
+        verifyDefaultTimestamp(context);
+        verifyDefaultOffset(context);
+        verifyDefaultHeaders(context);
+        verifyDefaultProcessorNodeName(context);
+    }
+
+    @Test
+    public void shouldHaveStateDirAtTheSpecifiedPath() {
+        final String stateDir = "state-dir";
+        final Properties properties = StreamsTestUtils.getStreamsConfig();
+        properties.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+
+        final InternalProcessorContext<Object, Object> context = new MockInternalProcessorContext(properties, new Metrics());
+
+        Assert.assertEquals(new File(stateDir).getAbsolutePath(), context.stateDir().getAbsolutePath());
+    }
+

Review comment:
       req: Please do also test the remaining two constructors.

##########
File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
##########
@@ -16,40 +16,96 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-public class MockInternalProcessorContext
-    extends MockProcessorContext
-    implements InternalProcessorContext<Object, Object> {
+import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
+
+@SuppressWarnings("rawtypes")
+public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext<Object, Object> {
+
+    public static final TaskId DEFAULT_TASK_ID = new TaskId(0, 0);
+    public static final RecordHeaders DEFAULT_HEADERS = new RecordHeaders();
+    public static final String DEFAULT_TOPIC = "";
+    public static final int DEFAULT_PARTITION = 0;
+    public static final long DEFAULT_OFFSET = 0L;
+    public static final long DEFAULT_TIMESTAMP = 0L;

Review comment:
       prop: Could you set these two default values to something not equal 0, e.g., 42? Just to avoid the ambiguity when something in a test is initialized to 0. Also the two values should be distinct. In that way, we would immediately discover if we compare offsets with timestamps by mistake.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
##########
@@ -96,16 +97,16 @@ public void setUp() {
         keySchema = new WindowKeySchema();
         bytesStore = new RocksDBSegmentedBytesStore("test", "metrics-scope", 0, SEGMENT_INTERVAL, keySchema);
         underlyingStore = new RocksDBWindowStore(
-            bytesStore,
-            false,
-            WINDOW_SIZE);
+                bytesStore,
+                false,
+                WINDOW_SIZE);

Review comment:
       prop: Should be
   ```
           underlyingStore = new RocksDBWindowStore(bytesStore, false, WINDOW_SIZE);
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
##########
@@ -186,10 +188,10 @@ public void close() {}
         final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), streamsConfiguration, initialWallClockTime);
 
         final TestInputTopic<String, String> inputTopic = driver.createInputTopic(TOPIC,
-            Serdes.String().serializer(),
-            Serdes.String().serializer(),
-            initialWallClockTime,
-            Duration.ZERO);
+                Serdes.String().serializer(),
+                Serdes.String().serializer(),
+                initialWallClockTime,
+                Duration.ZERO);

Review comment:
       ```suggestion
               Serdes.String().serializer(),
               Serdes.String().serializer(),
               initialWallClockTime,
               Duration.ZERO
           );
   ```

##########
File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
##########
@@ -16,40 +16,96 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-public class MockInternalProcessorContext
-    extends MockProcessorContext
-    implements InternalProcessorContext<Object, Object> {
+import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
+
+@SuppressWarnings("rawtypes")
+public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext<Object, Object> {
+
+    public static final TaskId DEFAULT_TASK_ID = new TaskId(0, 0);
+    public static final RecordHeaders DEFAULT_HEADERS = new RecordHeaders();
+    public static final String DEFAULT_TOPIC = "";

Review comment:
       prop: Could you give the default topic a non-empty string like `"test-topic"`?

##########
File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
##########
@@ -16,40 +16,96 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-public class MockInternalProcessorContext
-    extends MockProcessorContext
-    implements InternalProcessorContext<Object, Object> {
+import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
+
+@SuppressWarnings("rawtypes")
+public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext<Object, Object> {
+
+    public static final TaskId DEFAULT_TASK_ID = new TaskId(0, 0);
+    public static final RecordHeaders DEFAULT_HEADERS = new RecordHeaders();

Review comment:
       prop:
   ```suggestion
       public static final RecordHeaders DEFAULT_HEADERS = new RecordHeaders(
           Collections.singletonList(new RecordHeader("headerKey", "headerValue".getBytes()))
       );
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
##########
@@ -120,57 +121,58 @@ public void shouldNotReturnDuplicatesInRanges() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final StoreBuilder<WindowStore<String, String>> storeBuilder = Stores.windowStoreBuilder(
-            Stores.persistentWindowStore("store-name", ofHours(1L), ofMinutes(1L), false),
-            Serdes.String(),
-            Serdes.String())
-            .withCachingEnabled();
+                Stores.persistentWindowStore("store-name", ofHours(1L), ofMinutes(1L), false),
+                Serdes.String(),
+                Serdes.String())
+                .withCachingEnabled();
 
         builder.addStateStore(storeBuilder);
 
         builder.stream(TOPIC,
-            Consumed.with(Serdes.String(), Serdes.String()))
-            .transform(() -> new Transformer<String, String, KeyValue<String, String>>() {
-                private WindowStore<String, String> store;
-                private int numRecordsProcessed;
-                private ProcessorContext context;
-
-                @SuppressWarnings("unchecked")
-                @Override
-                public void init(final ProcessorContext processorContext) {
-                    this.context = processorContext;
-                    this.store = (WindowStore<String, String>) processorContext.getStateStore("store-name");
-                    int count = 0;
-
-                    final KeyValueIterator<Windowed<String>, String> all = store.all();
-                    while (all.hasNext()) {
-                        count++;
-                        all.next();
+                Consumed.with(Serdes.String(), Serdes.String()))
+                .transform(() -> new Transformer<String, String, KeyValue<String, String>>() {
+                    private WindowStore<String, String> store;
+                    private int numRecordsProcessed;
+                    private ProcessorContext context;
+
+                    @SuppressWarnings("unchecked")
+                    @Override
+                    public void init(final ProcessorContext processorContext) {
+                        this.context = processorContext;
+                        this.store = (WindowStore<String, String>) processorContext.getStateStore("store-name");
+                        int count = 0;
+
+                        final KeyValueIterator<Windowed<String>, String> all = store.all();
+                        while (all.hasNext()) {
+                            count++;
+                            all.next();
+                        }
+
+                        assertThat(count, equalTo(0));
                     }
 
-                    assertThat(count, equalTo(0));
-                }
+                    @Override
+                    public KeyValue<String, String> transform(final String key, final String value) {
+                        int count = 0;
 
-                @Override
-                public KeyValue<String, String> transform(final String key, final String value) {
-                    int count = 0;
+                        final KeyValueIterator<Windowed<String>, String> all = store.all();
+                        while (all.hasNext()) {
+                            count++;
+                            all.next();
+                        }
+                        assertThat(count, equalTo(numRecordsProcessed));
 
-                    final KeyValueIterator<Windowed<String>, String> all = store.all();
-                    while (all.hasNext()) {
-                        count++;
-                        all.next();
-                    }
-                    assertThat(count, equalTo(numRecordsProcessed));
+                        store.put(value, value, context.timestamp());
 
-                    store.put(value, value, context.timestamp());
+                        numRecordsProcessed++;
 
-                    numRecordsProcessed++;
-
-                    return new KeyValue<>(key, value);
-                }
+                        return new KeyValue<>(key, value);
+                    }
 
-                @Override
-                public void close() {}
-            }, "store-name");
+                    @Override
+                    public void close() {
+                    }

Review comment:
       req: It seems something happened with the indentation. I think before it was OK. In any case indentation should be 4 spaces.

##########
File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContextTest.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.test.MockInternalProcessorContext.DEFAULT_TASK_ID;
+
+public class MockInternalProcessorContextTest {
+
+    @Test
+    public void shouldReturnDefaults() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        verifyDefaultMetricsVersion(context);
+        verifyDefaultRecordCollector(context);
+        verifyDefaultTaskId(context);
+        verifyDefaultTopic(context);
+        verifyDefaultPartition(context);
+        verifyDefaultTimestamp(context);
+        verifyDefaultOffset(context);
+        verifyDefaultHeaders(context);
+        verifyDefaultProcessorNodeName(context);

Review comment:
       req: Please do not add a method for each default value. IMO using `assertThat()` as I did in the following is readable enough.
     
   ```suggestion
           assertThat(context.metrics().version(), is(Version.LATEST));
           assertThat(context.recordCollector(), notNullValue());
           assertThat(context.taskId(), is(MockInternalProcessorContext.DEFAULT_TASK_ID));
           assertThat(context.recordContext().topic(), is(MockInternalProcessorContext.DEFAULT_TOPIC));
           assertThat(context.recordContext().partition(), is(MockInternalProcessorContext.DEFAULT_PARTITION));
           assertThat(context.recordContext().timestamp(), is(MockInternalProcessorContext.DEFAULT_TIMESTAMP));
           assertThat(context.recordContext().offset(), is(MockInternalProcessorContext.DEFAULT_OFFSET));
           assertThat(context.recordContext().headers(), is(MockInternalProcessorContext.DEFAULT_HEADERS));
           assertThat(context.currentNode().name(), is(MockInternalProcessorContext.DEFAULT_PROCESSOR_NODE_NAME));
   ```

##########
File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContextTest.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.test.MockInternalProcessorContext.DEFAULT_TASK_ID;
+
+public class MockInternalProcessorContextTest {
+
+    @Test
+    public void shouldReturnDefaults() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        verifyDefaultMetricsVersion(context);
+        verifyDefaultRecordCollector(context);
+        verifyDefaultTaskId(context);
+        verifyDefaultTopic(context);
+        verifyDefaultPartition(context);
+        verifyDefaultTimestamp(context);
+        verifyDefaultOffset(context);
+        verifyDefaultHeaders(context);
+        verifyDefaultProcessorNodeName(context);
+    }
+
+    @Test
+    public void shouldReturnMetricsVersionLatest() {
+        shouldReturnMetricsVersion(Version.LATEST, StreamsConfig.METRICS_LATEST);
+    }
+
+    @Test
+    public void shouldReturnMetricsVersionFrom0100To24() {
+        shouldReturnMetricsVersion(Version.FROM_0100_TO_24, StreamsConfig.METRICS_0100_TO_24);
+    }
+
+    private static void shouldReturnMetricsVersion(final Version version, final String builtInMetricsVersion) {
+        final Properties properties = StreamsTestUtils.getStreamsConfig();
+        properties.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
+        final MockInternalProcessorContext context = new MockInternalProcessorContext(properties, new Metrics());
+
+        Assert.assertEquals(version, context.metrics().version());
+        verifyDefaultRecordCollector(context);
+        verifyDefaultTaskId(context);
+        verifyDefaultTopic(context);
+        verifyDefaultPartition(context);
+        verifyDefaultTimestamp(context);
+        verifyDefaultOffset(context);
+        verifyDefaultHeaders(context);
+        verifyDefaultProcessorNodeName(context);
+    }
+
+    @Test
+    public void shouldHaveStateDirAtTheSpecifiedPath() {
+        final String stateDir = "state-dir";
+        final Properties properties = StreamsTestUtils.getStreamsConfig();
+        properties.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+
+        final InternalProcessorContext<Object, Object> context = new MockInternalProcessorContext(properties, new Metrics());
+
+        Assert.assertEquals(new File(stateDir).getAbsolutePath(), context.stateDir().getAbsolutePath());

Review comment:
       prop: Please use `assertThat()`.

##########
File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContextTest.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.test.MockInternalProcessorContext.DEFAULT_TASK_ID;
+
+public class MockInternalProcessorContextTest {
+
+    @Test
+    public void shouldReturnDefaults() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        verifyDefaultMetricsVersion(context);
+        verifyDefaultRecordCollector(context);
+        verifyDefaultTaskId(context);
+        verifyDefaultTopic(context);
+        verifyDefaultPartition(context);
+        verifyDefaultTimestamp(context);
+        verifyDefaultOffset(context);
+        verifyDefaultHeaders(context);
+        verifyDefaultProcessorNodeName(context);
+    }
+
+    @Test
+    public void shouldReturnMetricsVersionLatest() {
+        shouldReturnMetricsVersion(Version.LATEST, StreamsConfig.METRICS_LATEST);
+    }
+
+    @Test
+    public void shouldReturnMetricsVersionFrom0100To24() {
+        shouldReturnMetricsVersion(Version.FROM_0100_TO_24, StreamsConfig.METRICS_0100_TO_24);
+    }
+
+    private static void shouldReturnMetricsVersion(final Version version, final String builtInMetricsVersion) {
+        final Properties properties = StreamsTestUtils.getStreamsConfig();
+        properties.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
+        final MockInternalProcessorContext context = new MockInternalProcessorContext(properties, new Metrics());
+
+        Assert.assertEquals(version, context.metrics().version());
+        verifyDefaultRecordCollector(context);
+        verifyDefaultTaskId(context);
+        verifyDefaultTopic(context);
+        verifyDefaultPartition(context);
+        verifyDefaultTimestamp(context);
+        verifyDefaultOffset(context);
+        verifyDefaultHeaders(context);
+        verifyDefaultProcessorNodeName(context);
+    }
+
+    @Test
+    public void shouldHaveStateDirAtTheSpecifiedPath() {
+        final String stateDir = "state-dir";
+        final Properties properties = StreamsTestUtils.getStreamsConfig();
+        properties.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+
+        final InternalProcessorContext<Object, Object> context = new MockInternalProcessorContext(properties, new Metrics());
+
+        Assert.assertEquals(new File(stateDir).getAbsolutePath(), context.stateDir().getAbsolutePath());
+    }
+
+    @Test
+    public void shouldRegisterStateStore() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        final String storeName = "store-name";
+        final StateStore stateStore = new MockKeyValueStore(storeName, false);
+        context.register(stateStore, null);
+
+        Assert.assertSame(stateStore, context.getStateStore(storeName));
+    }
+
+    @Test
+    public void shouldRegisterStateRestoreListener() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        final String storeName = "store-name";
+        final StateRestoreCallback callback = new MockStateRestoreListener();
+        context.register(new MockKeyValueStore(storeName, false), callback);
+
+        Assert.assertSame(callback, context.getRestoreListener(storeName));
+    }
+
+    @Test
+    public void shouldReturnNoOpStateRestoreListener() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        final String storeName = "store-name";
+        context.register(new MockKeyValueStore(storeName, false), new MockRestoreCallback());
+
+        Assert.assertEquals(CompositeRestoreListener.NO_OP_STATE_RESTORE_LISTENER, context.getRestoreListener(storeName));
+    }
+
+    @Test
+    public void shouldCallOnRestoreStartAndOnRestoreEndWhenRestore() {
+        final String storeName = "store-name";
+
+        final AbstractNotifyingRestoreCallback stateRestoreListener = EasyMock.mock(AbstractNotifyingRestoreCallback.class);
+        final Capture<TopicPartition> topicPartitionCapture = Capture.newInstance();
+        final Capture<String> storeNameCapture = Capture.newInstance();
+        final Capture<Long> startingOffset = Capture.newInstance();
+        final Capture<Long> endingOffset = Capture.newInstance();
+        stateRestoreListener.onRestoreStart(
+                EasyMock.capture(topicPartitionCapture),
+                EasyMock.capture(storeNameCapture),
+                EasyMock.captureLong(startingOffset),
+                EasyMock.captureLong(endingOffset)
+        );
+        EasyMock.expectLastCall().andAnswer(() -> {
+            Assert.assertNull(topicPartitionCapture.getValue());
+            Assert.assertEquals(storeName, storeNameCapture.getValue());
+            Assert.assertEquals(0L, startingOffset.getValue().longValue());
+            Assert.assertEquals(0L, endingOffset.getValue().longValue());
+            return null;
+        });

Review comment:
       Q: Why not just use `stateRestoreListener.onRestoreStart(null, storeName, DEFAULT_OFFSET, DEFAULT_OFFSET)`?

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
##########
@@ -186,10 +188,10 @@ public void close() {}
         final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), streamsConfiguration, initialWallClockTime);
 
         final TestInputTopic<String, String> inputTopic = driver.createInputTopic(TOPIC,
-            Serdes.String().serializer(),
-            Serdes.String().serializer(),
-            initialWallClockTime,
-            Duration.ZERO);
+                Serdes.String().serializer(),
+                Serdes.String().serializer(),
+                initialWallClockTime,
+                Duration.ZERO);

Review comment:
       The rest of the class seem also to have the wrong indentation.

##########
File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
##########
@@ -16,40 +16,96 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-public class MockInternalProcessorContext
-    extends MockProcessorContext
-    implements InternalProcessorContext<Object, Object> {
+import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
+
+@SuppressWarnings("rawtypes")
+public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext<Object, Object> {
+
+    public static final TaskId DEFAULT_TASK_ID = new TaskId(0, 0);
+    public static final RecordHeaders DEFAULT_HEADERS = new RecordHeaders();
+    public static final String DEFAULT_TOPIC = "";
+    public static final int DEFAULT_PARTITION = 0;
+    public static final long DEFAULT_OFFSET = 0L;
+    public static final long DEFAULT_TIMESTAMP = 0L;
+    public static final String DEFAULT_CLIENT_ID = "client-id";
+    public static final String DEFAULT_THREAD_CACHE_PREFIX = "testCache ";
+    public static final String DEFAULT_PROCESSOR_NODE_NAME = "TESTING_NODE";
+    public static final int DEFAULT_MAX_CACHE_SIZE_BYTES = 0;
+    public static final String DEFAULT_METRICS_VERSION = StreamsConfig.METRICS_LATEST;
 
     private final Map<String, StateRestoreCallback> restoreCallbacks = new LinkedHashMap<>();
-    private ProcessorNode<?, ?> currentNode;
+    private ThreadCache threadCache;
+    private ProcessorNode currentNode;
+    private StreamsMetricsImpl metrics;
     private RecordCollector recordCollector;
 
     public MockInternalProcessorContext() {
+        super(StreamsTestUtils.getStreamsConfig(), DEFAULT_TASK_ID, TestUtils.tempDirectory());
+        final StreamsMetricsImpl metrics = (StreamsMetricsImpl) super.metrics();
+        init(metrics, new ThreadCache(new LogContext(DEFAULT_THREAD_CACHE_PREFIX), DEFAULT_MAX_CACHE_SIZE_BYTES, metrics));
+    }
+
+    public MockInternalProcessorContext(final LogContext logContext, final long maxCacheSizeBytes) {
+        super(StreamsTestUtils.getStreamsConfig(), DEFAULT_TASK_ID, TestUtils.tempDirectory());

Review comment:
       See my comment on line 72

##########
File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContextTest.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.test.MockInternalProcessorContext.DEFAULT_TASK_ID;
+
+public class MockInternalProcessorContextTest {
+
+    @Test
+    public void shouldReturnDefaults() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        verifyDefaultMetricsVersion(context);
+        verifyDefaultRecordCollector(context);
+        verifyDefaultTaskId(context);
+        verifyDefaultTopic(context);
+        verifyDefaultPartition(context);
+        verifyDefaultTimestamp(context);
+        verifyDefaultOffset(context);
+        verifyDefaultHeaders(context);
+        verifyDefaultProcessorNodeName(context);
+    }
+
+    @Test
+    public void shouldReturnMetricsVersionLatest() {
+        shouldReturnMetricsVersion(Version.LATEST, StreamsConfig.METRICS_LATEST);
+    }
+
+    @Test
+    public void shouldReturnMetricsVersionFrom0100To24() {
+        shouldReturnMetricsVersion(Version.FROM_0100_TO_24, StreamsConfig.METRICS_0100_TO_24);
+    }
+
+    private static void shouldReturnMetricsVersion(final Version version, final String builtInMetricsVersion) {
+        final Properties properties = StreamsTestUtils.getStreamsConfig();
+        properties.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
+        final MockInternalProcessorContext context = new MockInternalProcessorContext(properties, new Metrics());
+
+        Assert.assertEquals(version, context.metrics().version());
+        verifyDefaultRecordCollector(context);
+        verifyDefaultTaskId(context);
+        verifyDefaultTopic(context);
+        verifyDefaultPartition(context);
+        verifyDefaultTimestamp(context);
+        verifyDefaultOffset(context);
+        verifyDefaultHeaders(context);
+        verifyDefaultProcessorNodeName(context);
+    }
+
+    @Test
+    public void shouldHaveStateDirAtTheSpecifiedPath() {
+        final String stateDir = "state-dir";
+        final Properties properties = StreamsTestUtils.getStreamsConfig();
+        properties.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+
+        final InternalProcessorContext<Object, Object> context = new MockInternalProcessorContext(properties, new Metrics());
+
+        Assert.assertEquals(new File(stateDir).getAbsolutePath(), context.stateDir().getAbsolutePath());
+    }

Review comment:
       req: Add also verifications for the other values set during construction.

##########
File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContextTest.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.test.MockInternalProcessorContext.DEFAULT_TASK_ID;
+
+public class MockInternalProcessorContextTest {
+
+    @Test
+    public void shouldReturnDefaults() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        verifyDefaultMetricsVersion(context);
+        verifyDefaultRecordCollector(context);
+        verifyDefaultTaskId(context);
+        verifyDefaultTopic(context);
+        verifyDefaultPartition(context);
+        verifyDefaultTimestamp(context);
+        verifyDefaultOffset(context);
+        verifyDefaultHeaders(context);
+        verifyDefaultProcessorNodeName(context);
+    }
+
+    @Test
+    public void shouldReturnMetricsVersionLatest() {
+        shouldReturnMetricsVersion(Version.LATEST, StreamsConfig.METRICS_LATEST);
+    }
+
+    @Test
+    public void shouldReturnMetricsVersionFrom0100To24() {
+        shouldReturnMetricsVersion(Version.FROM_0100_TO_24, StreamsConfig.METRICS_0100_TO_24);
+    }
+
+    private static void shouldReturnMetricsVersion(final Version version, final String builtInMetricsVersion) {
+        final Properties properties = StreamsTestUtils.getStreamsConfig();
+        properties.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
+        final MockInternalProcessorContext context = new MockInternalProcessorContext(properties, new Metrics());
+
+        Assert.assertEquals(version, context.metrics().version());
+        verifyDefaultRecordCollector(context);
+        verifyDefaultTaskId(context);
+        verifyDefaultTopic(context);
+        verifyDefaultPartition(context);
+        verifyDefaultTimestamp(context);
+        verifyDefaultOffset(context);
+        verifyDefaultHeaders(context);
+        verifyDefaultProcessorNodeName(context);
+    }
+
+    @Test
+    public void shouldHaveStateDirAtTheSpecifiedPath() {
+        final String stateDir = "state-dir";
+        final Properties properties = StreamsTestUtils.getStreamsConfig();
+        properties.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+
+        final InternalProcessorContext<Object, Object> context = new MockInternalProcessorContext(properties, new Metrics());
+
+        Assert.assertEquals(new File(stateDir).getAbsolutePath(), context.stateDir().getAbsolutePath());
+    }
+
+    @Test
+    public void shouldRegisterStateStore() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        final String storeName = "store-name";
+        final StateStore stateStore = new MockKeyValueStore(storeName, false);
+        context.register(stateStore, null);
+
+        Assert.assertSame(stateStore, context.getStateStore(storeName));

Review comment:
       prop: Please use `assertThat()`.

##########
File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContextTest.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.test.MockInternalProcessorContext.DEFAULT_TASK_ID;
+
+public class MockInternalProcessorContextTest {
+
+    @Test
+    public void shouldReturnDefaults() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        verifyDefaultMetricsVersion(context);
+        verifyDefaultRecordCollector(context);
+        verifyDefaultTaskId(context);
+        verifyDefaultTopic(context);
+        verifyDefaultPartition(context);
+        verifyDefaultTimestamp(context);
+        verifyDefaultOffset(context);
+        verifyDefaultHeaders(context);
+        verifyDefaultProcessorNodeName(context);
+    }
+
+    @Test
+    public void shouldReturnMetricsVersionLatest() {
+        shouldReturnMetricsVersion(Version.LATEST, StreamsConfig.METRICS_LATEST);
+    }
+
+    @Test
+    public void shouldReturnMetricsVersionFrom0100To24() {
+        shouldReturnMetricsVersion(Version.FROM_0100_TO_24, StreamsConfig.METRICS_0100_TO_24);
+    }
+
+    private static void shouldReturnMetricsVersion(final Version version, final String builtInMetricsVersion) {
+        final Properties properties = StreamsTestUtils.getStreamsConfig();
+        properties.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
+        final MockInternalProcessorContext context = new MockInternalProcessorContext(properties, new Metrics());
+
+        Assert.assertEquals(version, context.metrics().version());
+        verifyDefaultRecordCollector(context);
+        verifyDefaultTaskId(context);
+        verifyDefaultTopic(context);
+        verifyDefaultPartition(context);
+        verifyDefaultTimestamp(context);
+        verifyDefaultOffset(context);
+        verifyDefaultHeaders(context);
+        verifyDefaultProcessorNodeName(context);
+    }
+
+    @Test
+    public void shouldHaveStateDirAtTheSpecifiedPath() {
+        final String stateDir = "state-dir";
+        final Properties properties = StreamsTestUtils.getStreamsConfig();
+        properties.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+
+        final InternalProcessorContext<Object, Object> context = new MockInternalProcessorContext(properties, new Metrics());
+
+        Assert.assertEquals(new File(stateDir).getAbsolutePath(), context.stateDir().getAbsolutePath());
+    }
+
+    @Test
+    public void shouldRegisterStateStore() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        final String storeName = "store-name";
+        final StateStore stateStore = new MockKeyValueStore(storeName, false);
+        context.register(stateStore, null);
+
+        Assert.assertSame(stateStore, context.getStateStore(storeName));
+    }
+
+    @Test
+    public void shouldRegisterStateRestoreListener() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        final String storeName = "store-name";
+        final StateRestoreCallback callback = new MockStateRestoreListener();
+        context.register(new MockKeyValueStore(storeName, false), callback);
+
+        Assert.assertSame(callback, context.getRestoreListener(storeName));

Review comment:
       prop: Please use `assertThat()`.

##########
File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContextTest.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.test.MockInternalProcessorContext.DEFAULT_TASK_ID;
+
+public class MockInternalProcessorContextTest {
+
+    @Test
+    public void shouldReturnDefaults() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        verifyDefaultMetricsVersion(context);
+        verifyDefaultRecordCollector(context);
+        verifyDefaultTaskId(context);
+        verifyDefaultTopic(context);
+        verifyDefaultPartition(context);
+        verifyDefaultTimestamp(context);
+        verifyDefaultOffset(context);
+        verifyDefaultHeaders(context);
+        verifyDefaultProcessorNodeName(context);
+    }
+
+    @Test
+    public void shouldReturnMetricsVersionLatest() {
+        shouldReturnMetricsVersion(Version.LATEST, StreamsConfig.METRICS_LATEST);
+    }
+
+    @Test
+    public void shouldReturnMetricsVersionFrom0100To24() {
+        shouldReturnMetricsVersion(Version.FROM_0100_TO_24, StreamsConfig.METRICS_0100_TO_24);
+    }
+
+    private static void shouldReturnMetricsVersion(final Version version, final String builtInMetricsVersion) {
+        final Properties properties = StreamsTestUtils.getStreamsConfig();
+        properties.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
+        final MockInternalProcessorContext context = new MockInternalProcessorContext(properties, new Metrics());
+
+        Assert.assertEquals(version, context.metrics().version());
+        verifyDefaultRecordCollector(context);
+        verifyDefaultTaskId(context);
+        verifyDefaultTopic(context);
+        verifyDefaultPartition(context);
+        verifyDefaultTimestamp(context);
+        verifyDefaultOffset(context);
+        verifyDefaultHeaders(context);
+        verifyDefaultProcessorNodeName(context);
+    }
+
+    @Test
+    public void shouldHaveStateDirAtTheSpecifiedPath() {
+        final String stateDir = "state-dir";
+        final Properties properties = StreamsTestUtils.getStreamsConfig();
+        properties.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+
+        final InternalProcessorContext<Object, Object> context = new MockInternalProcessorContext(properties, new Metrics());
+
+        Assert.assertEquals(new File(stateDir).getAbsolutePath(), context.stateDir().getAbsolutePath());
+    }
+
+    @Test
+    public void shouldRegisterStateStore() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        final String storeName = "store-name";
+        final StateStore stateStore = new MockKeyValueStore(storeName, false);
+        context.register(stateStore, null);
+
+        Assert.assertSame(stateStore, context.getStateStore(storeName));
+    }
+
+    @Test
+    public void shouldRegisterStateRestoreListener() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        final String storeName = "store-name";
+        final StateRestoreCallback callback = new MockStateRestoreListener();
+        context.register(new MockKeyValueStore(storeName, false), callback);
+
+        Assert.assertSame(callback, context.getRestoreListener(storeName));
+    }
+
+    @Test
+    public void shouldReturnNoOpStateRestoreListener() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        final String storeName = "store-name";
+        context.register(new MockKeyValueStore(storeName, false), new MockRestoreCallback());
+
+        Assert.assertEquals(CompositeRestoreListener.NO_OP_STATE_RESTORE_LISTENER, context.getRestoreListener(storeName));

Review comment:
       prop: Please use `assertThat()` here and for the rest of the class.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
##########
@@ -120,57 +121,58 @@ public void shouldNotReturnDuplicatesInRanges() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final StoreBuilder<WindowStore<String, String>> storeBuilder = Stores.windowStoreBuilder(
-            Stores.persistentWindowStore("store-name", ofHours(1L), ofMinutes(1L), false),
-            Serdes.String(),
-            Serdes.String())
-            .withCachingEnabled();
+                Stores.persistentWindowStore("store-name", ofHours(1L), ofMinutes(1L), false),
+                Serdes.String(),
+                Serdes.String())
+                .withCachingEnabled();

Review comment:
       prop: Should be
   ```
           final StoreBuilder<WindowStore<String, String>> storeBuilder = Stores.windowStoreBuilder(
               Stores.persistentWindowStore("store-name", ofHours(1L), ofMinutes(1L), false),
               Serdes.String(),
               Serdes.String()
           ).withCachingEnabled();
   ```

##########
File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContextTest.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.test.MockInternalProcessorContext.DEFAULT_TASK_ID;
+
+public class MockInternalProcessorContextTest {
+
+    @Test
+    public void shouldReturnDefaults() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        verifyDefaultMetricsVersion(context);
+        verifyDefaultRecordCollector(context);
+        verifyDefaultTaskId(context);
+        verifyDefaultTopic(context);
+        verifyDefaultPartition(context);
+        verifyDefaultTimestamp(context);
+        verifyDefaultOffset(context);
+        verifyDefaultHeaders(context);
+        verifyDefaultProcessorNodeName(context);
+    }
+
+    @Test
+    public void shouldReturnMetricsVersionLatest() {
+        shouldReturnMetricsVersion(Version.LATEST, StreamsConfig.METRICS_LATEST);
+    }
+
+    @Test
+    public void shouldReturnMetricsVersionFrom0100To24() {
+        shouldReturnMetricsVersion(Version.FROM_0100_TO_24, StreamsConfig.METRICS_0100_TO_24);
+    }
+
+    private static void shouldReturnMetricsVersion(final Version version, final String builtInMetricsVersion) {
+        final Properties properties = StreamsTestUtils.getStreamsConfig();
+        properties.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
+        final MockInternalProcessorContext context = new MockInternalProcessorContext(properties, new Metrics());
+
+        Assert.assertEquals(version, context.metrics().version());
+        verifyDefaultRecordCollector(context);
+        verifyDefaultTaskId(context);
+        verifyDefaultTopic(context);
+        verifyDefaultPartition(context);
+        verifyDefaultTimestamp(context);
+        verifyDefaultOffset(context);
+        verifyDefaultHeaders(context);
+        verifyDefaultProcessorNodeName(context);
+    }
+
+    @Test
+    public void shouldHaveStateDirAtTheSpecifiedPath() {
+        final String stateDir = "state-dir";
+        final Properties properties = StreamsTestUtils.getStreamsConfig();
+        properties.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+
+        final InternalProcessorContext<Object, Object> context = new MockInternalProcessorContext(properties, new Metrics());
+
+        Assert.assertEquals(new File(stateDir).getAbsolutePath(), context.stateDir().getAbsolutePath());
+    }
+
+    @Test
+    public void shouldRegisterStateStore() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        final String storeName = "store-name";
+        final StateStore stateStore = new MockKeyValueStore(storeName, false);
+        context.register(stateStore, null);
+
+        Assert.assertSame(stateStore, context.getStateStore(storeName));
+    }
+
+    @Test
+    public void shouldRegisterStateRestoreListener() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        final String storeName = "store-name";
+        final StateRestoreCallback callback = new MockStateRestoreListener();
+        context.register(new MockKeyValueStore(storeName, false), callback);
+
+        Assert.assertSame(callback, context.getRestoreListener(storeName));
+    }
+
+    @Test
+    public void shouldReturnNoOpStateRestoreListener() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        final String storeName = "store-name";
+        context.register(new MockKeyValueStore(storeName, false), new MockRestoreCallback());
+
+        Assert.assertEquals(CompositeRestoreListener.NO_OP_STATE_RESTORE_LISTENER, context.getRestoreListener(storeName));
+    }
+
+    @Test
+    public void shouldCallOnRestoreStartAndOnRestoreEndWhenRestore() {
+        final String storeName = "store-name";
+
+        final AbstractNotifyingRestoreCallback stateRestoreListener = EasyMock.mock(AbstractNotifyingRestoreCallback.class);
+        final Capture<TopicPartition> topicPartitionCapture = Capture.newInstance();
+        final Capture<String> storeNameCapture = Capture.newInstance();
+        final Capture<Long> startingOffset = Capture.newInstance();
+        final Capture<Long> endingOffset = Capture.newInstance();
+        stateRestoreListener.onRestoreStart(
+                EasyMock.capture(topicPartitionCapture),
+                EasyMock.capture(storeNameCapture),
+                EasyMock.captureLong(startingOffset),
+                EasyMock.captureLong(endingOffset)
+        );
+        EasyMock.expectLastCall().andAnswer(() -> {
+            Assert.assertNull(topicPartitionCapture.getValue());
+            Assert.assertEquals(storeName, storeNameCapture.getValue());
+            Assert.assertEquals(0L, startingOffset.getValue().longValue());
+            Assert.assertEquals(0L, endingOffset.getValue().longValue());
+            return null;
+        });
+        final Capture<Long> totalRestoredCapture = Capture.newInstance();
+        stateRestoreListener.onRestoreEnd(
+                EasyMock.capture(topicPartitionCapture),
+                EasyMock.capture(storeNameCapture),
+                EasyMock.captureLong(totalRestoredCapture)
+        );
+        EasyMock.expectLastCall().andAnswer(() -> {
+            Assert.assertNull(topicPartitionCapture.getValue());
+            Assert.assertEquals(storeName, storeNameCapture.getValue());
+            Assert.assertEquals(0L, totalRestoredCapture.getValue().longValue());
+            return null;
+        });
+        EasyMock.replay(stateRestoreListener);
+
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+        context.register(new MockKeyValueStore(storeName, false), stateRestoreListener);
+        context.restore(storeName, Collections.emptyList());
+
+        EasyMock.verify(stateRestoreListener);
+    }
+
+    @Test
+    public void shouldRestoreBatch() {
+        final String storeName = "store-name";
+        final RecordBatchingStateRestoreCallback callback = EasyMock.mock(RecordBatchingStateRestoreCallback.class);
+        final List<KeyValue<byte[], byte[]>> changelog = Arrays.asList(
+                new KeyValue<>("key1".getBytes(), "value1".getBytes()),
+                new KeyValue<>("key2".getBytes(), "value2".getBytes())
+        );
+        Capture<List<ConsumerRecord<byte[], byte[]>>> recordsCapture = Capture.newInstance();
+        callback.restoreBatch(EasyMock.capture(recordsCapture));
+        EasyMock.expectLastCall().andAnswer(() -> {
+            final List<ConsumerRecord<byte[], byte[]>> records = recordsCapture.getValue();
+            Assert.assertEquals(records.size(), changelog.size());
+            for (int i = 0; i < records.size(); i++) {
+                final ConsumerRecord<byte[], byte[]> consumerRecord = records.get(i);
+                final KeyValue<byte[], byte[]> keyValue = changelog.get(i);
+                Assert.assertEquals(MockInternalProcessorContext.DEFAULT_TOPIC, consumerRecord.topic());
+                Assert.assertEquals(MockInternalProcessorContext.DEFAULT_PARTITION, consumerRecord.partition());
+                Assert.assertEquals(MockInternalProcessorContext.DEFAULT_OFFSET, consumerRecord.offset());
+                Assert.assertEquals(keyValue.key, consumerRecord.key());
+                Assert.assertEquals(keyValue.value, consumerRecord.value());
+            }
+            return null;
+        });
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+        context.register(new MockKeyValueStore(storeName, false), callback);
+
+        EasyMock.replay(callback);
+
+        context.restore(storeName, changelog);
+
+        EasyMock.verify(callback);
+    }
+
+    @Test
+    public void shouldSetKeySerdeFromConfig() {
+        final Properties config = StreamsTestUtils.getStreamsConfig();
+        config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
+        final MockInternalProcessorContext context = new MockInternalProcessorContext(config, new Metrics());
+
+        Assert.assertEquals(Serdes.StringSerde.class, context.keySerde().getClass());
+    }
+
+    @Test
+    public void shouldSetValueSerdeFromConfig() {
+        final Properties config = StreamsTestUtils.getStreamsConfig();
+        config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
+        final MockInternalProcessorContext context = new MockInternalProcessorContext(config, new Metrics());
+
+        Assert.assertEquals(Serdes.StringSerde.class, context.valueSerde().getClass());
+    }

Review comment:
       req: I do not really understand what you want to verify here and in the test before. If you want to verify that the configs are set correctly in the constructor, I would do that in the tests for the two constructors that take configs as a parameter. There you could call `appConfigs()` and verify that the application ID is set to the value you set before in the configs that you passed to the constructor.

##########
File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
##########
@@ -16,40 +16,96 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-public class MockInternalProcessorContext
-    extends MockProcessorContext
-    implements InternalProcessorContext<Object, Object> {
+import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
+
+@SuppressWarnings("rawtypes")
+public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext<Object, Object> {
+
+    public static final TaskId DEFAULT_TASK_ID = new TaskId(0, 0);
+    public static final RecordHeaders DEFAULT_HEADERS = new RecordHeaders();
+    public static final String DEFAULT_TOPIC = "";
+    public static final int DEFAULT_PARTITION = 0;
+    public static final long DEFAULT_OFFSET = 0L;
+    public static final long DEFAULT_TIMESTAMP = 0L;
+    public static final String DEFAULT_CLIENT_ID = "client-id";
+    public static final String DEFAULT_THREAD_CACHE_PREFIX = "testCache ";
+    public static final String DEFAULT_PROCESSOR_NODE_NAME = "TESTING_NODE";
+    public static final int DEFAULT_MAX_CACHE_SIZE_BYTES = 0;
+    public static final String DEFAULT_METRICS_VERSION = StreamsConfig.METRICS_LATEST;
 
     private final Map<String, StateRestoreCallback> restoreCallbacks = new LinkedHashMap<>();
-    private ProcessorNode<?, ?> currentNode;
+    private ThreadCache threadCache;
+    private ProcessorNode currentNode;
+    private StreamsMetricsImpl metrics;
     private RecordCollector recordCollector;
 
     public MockInternalProcessorContext() {
+        super(StreamsTestUtils.getStreamsConfig(), DEFAULT_TASK_ID, TestUtils.tempDirectory());

Review comment:
       req: Please use `createStateDir()` instead of `TestUtils.tempDirectory()`. Otherwise the passed in state dir will be different from the state dir on the configs. This is a mistake in `MockProcessorContext`.
   

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
##########
@@ -114,18 +110,9 @@
 
     @Before
     public void setUp() {
-        final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion);
-
-        context = new InternalMockProcessorContext(
-            TestUtils.tempDirectory(),
-            Serdes.String(),
-            Serdes.Long(),
-            streamsMetrics,
-            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
-            MockRecordCollector::new,
-            new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
-        );
+        final Properties props = StreamsTestUtils.getStreamsConfig();
+        props.put(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
+        context = new MockInternalProcessorContext(props, metrics);

Review comment:
       Thank you for pointing this out. I will look into it.

##########
File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
##########
@@ -16,40 +16,96 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-public class MockInternalProcessorContext
-    extends MockProcessorContext
-    implements InternalProcessorContext<Object, Object> {
+import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
+
+@SuppressWarnings("rawtypes")
+public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext<Object, Object> {
+
+    public static final TaskId DEFAULT_TASK_ID = new TaskId(0, 0);
+    public static final RecordHeaders DEFAULT_HEADERS = new RecordHeaders();
+    public static final String DEFAULT_TOPIC = "";
+    public static final int DEFAULT_PARTITION = 0;
+    public static final long DEFAULT_OFFSET = 0L;
+    public static final long DEFAULT_TIMESTAMP = 0L;
+    public static final String DEFAULT_CLIENT_ID = "client-id";
+    public static final String DEFAULT_THREAD_CACHE_PREFIX = "testCache ";
+    public static final String DEFAULT_PROCESSOR_NODE_NAME = "TESTING_NODE";
+    public static final int DEFAULT_MAX_CACHE_SIZE_BYTES = 0;
+    public static final String DEFAULT_METRICS_VERSION = StreamsConfig.METRICS_LATEST;
 
     private final Map<String, StateRestoreCallback> restoreCallbacks = new LinkedHashMap<>();
-    private ProcessorNode<?, ?> currentNode;
+    private ThreadCache threadCache;
+    private ProcessorNode currentNode;
+    private StreamsMetricsImpl metrics;
     private RecordCollector recordCollector;
 
     public MockInternalProcessorContext() {
+        super(StreamsTestUtils.getStreamsConfig(), DEFAULT_TASK_ID, TestUtils.tempDirectory());
+        final StreamsMetricsImpl metrics = (StreamsMetricsImpl) super.metrics();
+        init(metrics, new ThreadCache(new LogContext(DEFAULT_THREAD_CACHE_PREFIX), DEFAULT_MAX_CACHE_SIZE_BYTES, metrics));
+    }
+
+    public MockInternalProcessorContext(final LogContext logContext, final long maxCacheSizeBytes) {
+        super(StreamsTestUtils.getStreamsConfig(), DEFAULT_TASK_ID, TestUtils.tempDirectory());
+        final StreamsMetricsImpl streamsMetrics = (StreamsMetricsImpl) super.metrics();
+        final ThreadCache threadCache = new ThreadCache(logContext, maxCacheSizeBytes, streamsMetrics);
+        init(streamsMetrics, threadCache);
+    }
+
+    public MockInternalProcessorContext(final Properties config, final Metrics metrics) {
+        super(config, DEFAULT_TASK_ID, createStateDir(config));
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, DEFAULT_CLIENT_ID, getMetricsVersion(config));
+        final ThreadCache threadCache = new ThreadCache(new LogContext(DEFAULT_THREAD_CACHE_PREFIX), DEFAULT_MAX_CACHE_SIZE_BYTES, streamsMetrics);
+        init(streamsMetrics, threadCache);
     }
 
-    public MockInternalProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
-        super(config, taskId, stateDir);
+    public MockInternalProcessorContext(final Properties config, final File stateDir, final ThreadCache cache) {

Review comment:
       Q: Do you think we can eliminate the `stateDir` parameter in this constructor and just take the state dir in the `config` parameter?

##########
File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContextTest.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.test.MockInternalProcessorContext.DEFAULT_TASK_ID;
+
+public class MockInternalProcessorContextTest {
+
+    @Test
+    public void shouldReturnDefaults() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        verifyDefaultMetricsVersion(context);
+        verifyDefaultRecordCollector(context);
+        verifyDefaultTaskId(context);
+        verifyDefaultTopic(context);
+        verifyDefaultPartition(context);
+        verifyDefaultTimestamp(context);
+        verifyDefaultOffset(context);
+        verifyDefaultHeaders(context);
+        verifyDefaultProcessorNodeName(context);
+    }
+
+    @Test
+    public void shouldReturnMetricsVersionLatest() {
+        shouldReturnMetricsVersion(Version.LATEST, StreamsConfig.METRICS_LATEST);
+    }
+
+    @Test
+    public void shouldReturnMetricsVersionFrom0100To24() {
+        shouldReturnMetricsVersion(Version.FROM_0100_TO_24, StreamsConfig.METRICS_0100_TO_24);
+    }
+
+    private static void shouldReturnMetricsVersion(final Version version, final String builtInMetricsVersion) {
+        final Properties properties = StreamsTestUtils.getStreamsConfig();
+        properties.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
+        final MockInternalProcessorContext context = new MockInternalProcessorContext(properties, new Metrics());
+
+        Assert.assertEquals(version, context.metrics().version());
+        verifyDefaultRecordCollector(context);
+        verifyDefaultTaskId(context);
+        verifyDefaultTopic(context);
+        verifyDefaultPartition(context);
+        verifyDefaultTimestamp(context);
+        verifyDefaultOffset(context);
+        verifyDefaultHeaders(context);
+        verifyDefaultProcessorNodeName(context);
+    }
+
+    @Test
+    public void shouldHaveStateDirAtTheSpecifiedPath() {
+        final String stateDir = "state-dir";
+        final Properties properties = StreamsTestUtils.getStreamsConfig();
+        properties.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+
+        final InternalProcessorContext<Object, Object> context = new MockInternalProcessorContext(properties, new Metrics());
+
+        Assert.assertEquals(new File(stateDir).getAbsolutePath(), context.stateDir().getAbsolutePath());
+    }
+
+    @Test
+    public void shouldRegisterStateStore() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        final String storeName = "store-name";
+        final StateStore stateStore = new MockKeyValueStore(storeName, false);
+        context.register(stateStore, null);
+
+        Assert.assertSame(stateStore, context.getStateStore(storeName));
+    }
+
+    @Test
+    public void shouldRegisterStateRestoreListener() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        final String storeName = "store-name";
+        final StateRestoreCallback callback = new MockStateRestoreListener();
+        context.register(new MockKeyValueStore(storeName, false), callback);
+
+        Assert.assertSame(callback, context.getRestoreListener(storeName));
+    }
+
+    @Test
+    public void shouldReturnNoOpStateRestoreListener() {
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+
+        final String storeName = "store-name";
+        context.register(new MockKeyValueStore(storeName, false), new MockRestoreCallback());
+
+        Assert.assertEquals(CompositeRestoreListener.NO_OP_STATE_RESTORE_LISTENER, context.getRestoreListener(storeName));
+    }
+
+    @Test
+    public void shouldCallOnRestoreStartAndOnRestoreEndWhenRestore() {
+        final String storeName = "store-name";
+
+        final AbstractNotifyingRestoreCallback stateRestoreListener = EasyMock.mock(AbstractNotifyingRestoreCallback.class);
+        final Capture<TopicPartition> topicPartitionCapture = Capture.newInstance();
+        final Capture<String> storeNameCapture = Capture.newInstance();
+        final Capture<Long> startingOffset = Capture.newInstance();
+        final Capture<Long> endingOffset = Capture.newInstance();
+        stateRestoreListener.onRestoreStart(
+                EasyMock.capture(topicPartitionCapture),
+                EasyMock.capture(storeNameCapture),
+                EasyMock.captureLong(startingOffset),
+                EasyMock.captureLong(endingOffset)
+        );
+        EasyMock.expectLastCall().andAnswer(() -> {
+            Assert.assertNull(topicPartitionCapture.getValue());
+            Assert.assertEquals(storeName, storeNameCapture.getValue());
+            Assert.assertEquals(0L, startingOffset.getValue().longValue());
+            Assert.assertEquals(0L, endingOffset.getValue().longValue());
+            return null;
+        });
+        final Capture<Long> totalRestoredCapture = Capture.newInstance();
+        stateRestoreListener.onRestoreEnd(
+                EasyMock.capture(topicPartitionCapture),
+                EasyMock.capture(storeNameCapture),
+                EasyMock.captureLong(totalRestoredCapture)
+        );
+        EasyMock.expectLastCall().andAnswer(() -> {
+            Assert.assertNull(topicPartitionCapture.getValue());
+            Assert.assertEquals(storeName, storeNameCapture.getValue());
+            Assert.assertEquals(0L, totalRestoredCapture.getValue().longValue());
+            return null;
+        });

Review comment:
       Same question here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org