You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/06/08 21:18:58 UTC

[kafka] branch 2.6 updated: KAFKA-10005: Decouple RestoreListener from RestoreCallback (#8676)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 516ab2d  KAFKA-10005: Decouple RestoreListener from RestoreCallback (#8676)
516ab2d is described below

commit 516ab2d938b1f59d5b7a2e799ae7c7d553d8e446
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Mon Jun 8 14:17:45 2020 -0700

    KAFKA-10005: Decouple RestoreListener from RestoreCallback (#8676)
    
    And remove bulk loading mechanism inside RocksDB.
    
    Reviewers: John Roesler <vv...@apache.org>, A. Sophie Blee-Goldman <so...@confluent.io>
---
 .../AbstractNotifyingBatchingRestoreCallback.java  |  83 --------
 .../AbstractNotifyingRestoreCallback.java          |  72 -------
 .../processor/BatchingStateRestoreCallback.java    |   4 +
 .../streams/processor/StateRestoreListener.java    |   9 +-
 .../processor/internals/ChangelogRegister.java     |   3 +-
 .../internals/CompositeRestoreListener.java        | 116 -----------
 .../processor/internals/ProcessorStateManager.java |  18 +-
 .../RecordBatchingStateRestoreCallback.java        |   5 -
 .../processor/internals/StoreChangelogReader.java  |  48 +----
 .../AbstractRocksDBSegmentedBytesStore.java        |  43 +---
 ...ulkLoadingStore.java => BatchWritingStore.java} |   3 +-
 .../streams/state/internals/RocksDBStore.java      |  60 +-----
 .../state/internals/RocksDBTimestampedStore.java   |  15 --
 .../kafka/streams/state/internals/Segment.java     |   2 +-
 .../internals/CompositeRestoreListenerTest.java    | 222 ---------------------
 .../processor/internals/MockChangelogReader.java   |   2 +-
 .../internals/ProcessorStateManagerTest.java       |  18 +-
 .../internals/StoreChangelogReaderTest.java        |  85 +-------
 .../AbstractRocksDBSegmentedBytesStoreTest.java    |  31 ---
 .../internals/RocksDBSegmentedBytesStoreTest.java  |   7 -
 .../streams/state/internals/RocksDBStoreTest.java  |  50 -----
 .../RocksDBTimestampedSegmentedBytesStoreTest.java |   7 -
 .../kafka/test/InternalMockProcessorContext.java   |  20 --
 .../test/MockBatchingStateRestoreListener.java     |  44 ----
 .../org/apache/kafka/test/MockRestoreCallback.java |   1 -
 .../kafka/test/MockStateRestoreListener.java       |  14 +-
 26 files changed, 39 insertions(+), 943 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingBatchingRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingBatchingRestoreCallback.java
deleted file mode 100644
index 7b5b5d0..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingBatchingRestoreCallback.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-
-import org.apache.kafka.common.TopicPartition;
-
-/**
- * Abstract implementation of the  {@link BatchingStateRestoreCallback} used for batch restoration operations.
- *
- * Includes default no-op methods of the {@link StateRestoreListener} {@link StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)},
- * {@link StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)}, and {@link StateRestoreListener#onRestoreEnd(TopicPartition, String, long)}.
- */
-public abstract class AbstractNotifyingBatchingRestoreCallback implements BatchingStateRestoreCallback, StateRestoreListener {
-
-    /**
-     * Single put restore operations not supported, please use {@link AbstractNotifyingRestoreCallback}
-     * or {@link StateRestoreCallback} instead for single action restores.
-     */
-    @Override
-    public void restore(final byte[] key,
-                        final byte[] value) {
-        throw new UnsupportedOperationException("Single restore not supported");
-    }
-
-
-    /**
-     * @see StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)
-     *
-     * This method does nothing by default; if desired, subclasses should override it with custom functionality.
-     *
-     */
-    @Override
-    public void onRestoreStart(final TopicPartition topicPartition,
-                               final String storeName,
-                               final long startingOffset,
-                               final long endingOffset) {
-
-    }
-
-
-    /**
-     * @see StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)
-     *
-     * This method does nothing by default; if desired, subclasses should override it with custom functionality.
-     *
-     */
-    @Override
-    public void onBatchRestored(final TopicPartition topicPartition,
-                                final String storeName,
-                                final long batchEndOffset,
-                                final long numRestored) {
-
-    }
-
-    /**
-     * @see StateRestoreListener#onRestoreEnd(TopicPartition, String, long)
-     *
-     * This method does nothing by default; if desired, subclasses should override it with custom functionality.
-     *
-     */
-    @Override
-    public void onRestoreEnd(final TopicPartition topicPartition,
-                             final String storeName,
-                             final long totalRestored) {
-
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingRestoreCallback.java
deleted file mode 100644
index 2eb3f66..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingRestoreCallback.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.common.TopicPartition;
-
-/**
- * Abstract implementation of the  {@link StateRestoreCallback} used for batch restoration operations.
- *
- * Includes default no-op methods of the {@link StateRestoreListener} {@link StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)},
- * {@link StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)}, and {@link StateRestoreListener#onRestoreEnd(TopicPartition, String, long)}.
- */
-public abstract class AbstractNotifyingRestoreCallback implements StateRestoreCallback, StateRestoreListener {
-
-
-    /**
-     * @see StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)
-     *
-     * This method does nothing by default; if desired, subclasses should override it with custom functionality.
-     *
-     */
-    @Override
-    public void onRestoreStart(final TopicPartition topicPartition,
-                               final String storeName,
-                               final long startingOffset,
-                               final long endingOffset) {
-
-    }
-
-
-    /**
-     * @see StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)
-     *
-     * This method does nothing by default; if desired, subclasses should override it with custom functionality.
-     *
-     */
-    @Override
-    public void onBatchRestored(final TopicPartition topicPartition,
-                                final String storeName,
-                                final long batchEndOffset,
-                                final long numRestored) {
-
-    }
-
-    /**
-     * @see StateRestoreListener#onRestoreEnd(TopicPartition, String, long)
-     *
-     * This method does nothing by default; if desired, subclasses should override it with custom functionality.
-     *
-     */
-    @Override
-    public void onRestoreEnd(final TopicPartition topicPartition,
-                             final String storeName,
-                             final long totalRestored) {
-
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/BatchingStateRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/BatchingStateRestoreCallback.java
index 3447a57..d29c7ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/BatchingStateRestoreCallback.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/BatchingStateRestoreCallback.java
@@ -38,4 +38,8 @@ public interface BatchingStateRestoreCallback extends StateRestoreCallback {
      */
     void restoreAll(Collection<KeyValue<byte[], byte[]>> records);
 
+    @Override
+    default void restore(byte[] key, byte[] value) {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
index ea1c288..210a5de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
@@ -29,9 +29,12 @@ import org.apache.kafka.common.TopicPartition;
  * Users desiring stateful operations will need to provide synchronization internally in
  * the {@code StateRestorerListener} implementation.
  *
- * When used for monitoring a single {@link StateStore} using either {@link AbstractNotifyingRestoreCallback} or
- * {@link AbstractNotifyingBatchingRestoreCallback} no synchronization is necessary
- * as each StreamThread has its own StateStore instance.
+ * Note that this listener is only registered at the per-client level and users can base on the {@code storeName}
+ * parameter to define specific monitoring for different {@link StateStore}s. There is another
+ * {@link StateRestoreCallback} interface which is registered via the {@link ProcessorContext#register(StateStore, StateRestoreCallback)}
+ * function per-store, and it is used to apply the fetched changelog records into the local state store during restoration.
+ * These two interfaces serve different restoration purposes and users should not try to implement both of them in a single
+ * class during state store registration.
  *
  * Incremental updates are exposed so users can estimate how much progress has been made.
  */
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java
index cdddd20..7403aad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java
@@ -34,7 +34,6 @@ interface ChangelogRegister {
     /**
      * Unregisters and removes the passed in partitions from the set of changelogs
      * @param removedPartitions the set of partitions to remove
-     * @param triggerOnRestoreEnd whether to trigger the onRestoreEnd callback
      */
-    void unregister(final Collection<TopicPartition> removedPartitions, final boolean triggerOnRestoreEnd);
+    void unregister(final Collection<TopicPartition> removedPartitions);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
deleted file mode 100644
index 7cccad6..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateRestoreListener;
-
-import java.util.Collection;
-
-public class CompositeRestoreListener implements RecordBatchingStateRestoreCallback, StateRestoreListener {
-
-    public static final NoOpStateRestoreListener NO_OP_STATE_RESTORE_LISTENER = new NoOpStateRestoreListener();
-    private final RecordBatchingStateRestoreCallback internalBatchingRestoreCallback;
-    private final StateRestoreListener storeRestoreListener;
-    private StateRestoreListener userRestoreListener = NO_OP_STATE_RESTORE_LISTENER;
-
-    CompositeRestoreListener(final StateRestoreCallback stateRestoreCallback) {
-
-        if (stateRestoreCallback instanceof StateRestoreListener) {
-            storeRestoreListener = (StateRestoreListener) stateRestoreCallback;
-        } else {
-            storeRestoreListener = NO_OP_STATE_RESTORE_LISTENER;
-        }
-
-        internalBatchingRestoreCallback = StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
-    }
-
-    /**
-     * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in
-     * {@link StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)}
-     */
-    @Override
-    public void onRestoreStart(final TopicPartition topicPartition,
-                               final String storeName,
-                               final long startingOffset,
-                               final long endingOffset) {
-        userRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
-        storeRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
-    }
-
-    /**
-     * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in
-     * {@link StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)}
-     */
-    @Override
-    public void onBatchRestored(final TopicPartition topicPartition,
-                                final String storeName,
-                                final long batchEndOffset,
-                                final long numRestored) {
-        userRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
-        storeRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
-    }
-
-    /**
-     * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in
-     * {@link StateRestoreListener#onRestoreEnd(TopicPartition, String, long)}
-     */
-    @Override
-    public void onRestoreEnd(final TopicPartition topicPartition,
-                             final String storeName,
-                             final long totalRestored) {
-        userRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
-        storeRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
-    }
-
-    @Override
-    public void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
-        internalBatchingRestoreCallback.restoreBatch(records);
-    }
-
-    void setUserRestoreListener(final StateRestoreListener userRestoreListener) {
-        if (userRestoreListener != null) {
-            this.userRestoreListener = userRestoreListener;
-        }
-    }
-
-    @Override
-    public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void restore(final byte[] key,
-                        final byte[] value) {
-        throw new UnsupportedOperationException("Single restore functionality shouldn't be called directly but "
-                                                    + "through the delegated StateRestoreCallback instance");
-    }
-
-    private static final class NoOpStateRestoreListener extends AbstractNotifyingBatchingRestoreCallback implements RecordBatchingStateRestoreCallback {
-        @Override
-        public void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
-
-        }
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index f00284f..d78c9a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
@@ -131,10 +132,6 @@ public class ProcessorStateManager implements StateManager {
             return this.stateStore;
         }
 
-        StateRestoreCallback restoreCallback() {
-            return this.restoreCallback;
-        }
-
         @Override
         public String toString() {
             return "StateStoreMetadata (" + stateStore.name() + " : " + changelogPartition + " @ " + offset;
@@ -303,6 +300,11 @@ public class ProcessorStateManager implements StateManager {
             throw new IllegalArgumentException(format("%sStore %s has already been registered.", logPrefix, storeName));
         }
 
+        if (stateRestoreCallback instanceof StateRestoreListener) {
+            log.warn("The registered state restore callback is also implementing the state restore listener interface, " +
+                    "which is not expected and would be ignored");
+        }
+
         final StateStoreMetadata storeMetadata = isLoggingEnabled(storeName) ?
             new StateStoreMetadata(
                 store,
@@ -459,7 +461,7 @@ public class ProcessorStateManager implements StateManager {
     public void close() throws ProcessorStateException {
         log.debug("Closing its state manager and all the registered state stores: {}", stores);
 
-        changelogReader.unregister(getAllChangelogTopicPartitions(), false);
+        changelogReader.unregister(getAllChangelogTopicPartitions());
 
         RuntimeException firstException = null;
         // attempting to close the stores, just in case they
@@ -499,11 +501,7 @@ public class ProcessorStateManager implements StateManager {
         log.debug("Recycling state for {} task {}.", taskType, taskId);
 
         final List<TopicPartition> allChangelogs = getAllChangelogTopicPartitions();
-        if (taskType.equals(TaskType.ACTIVE)) {
-            changelogReader.unregister(allChangelogs, true);
-        } else {
-            changelogReader.unregister(allChangelogs, false);
-        }
+        changelogReader.unregister(allChangelogs);
     }
 
     void transitionTaskType(final TaskType newType, final LogContext logContext) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordBatchingStateRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordBatchingStateRestoreCallback.java
index 78a885d..300b60d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordBatchingStateRestoreCallback.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordBatchingStateRestoreCallback.java
@@ -29,9 +29,4 @@ public interface RecordBatchingStateRestoreCallback extends BatchingStateRestore
     default void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) {
         throw new UnsupportedOperationException();
     }
-
-    @Override
-    default void restore(final byte[] key, final byte[] value) {
-        throw new UnsupportedOperationException();
-    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 6c6ff39..cecbbd3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -29,7 +28,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
@@ -524,13 +522,6 @@ public class StoreChangelogReader implements ChangelogReader {
             // do not trigger restore listener if we are processing standby tasks
             if (changelogMetadata.stateManager.taskType() == Task.TaskType.ACTIVE) {
                 try {
-                    // first trigger the store's specific listener if its registered callback is also an lister,
-                    // then trigger the user registered global listener
-                    final StateRestoreCallback restoreCallback = storeMetadata.restoreCallback();
-                    if (restoreCallback instanceof StateRestoreListener) {
-                        ((StateRestoreListener) restoreCallback).onBatchRestored(partition, storeName, currentOffset, numRecords);
-                    }
-
                     stateRestoreListener.onBatchRestored(partition, storeName, currentOffset, numRecords);
                 } catch (final Exception e) {
                     throw new StreamsException("State restore listener failed on batch restored", e);
@@ -547,13 +538,6 @@ public class StoreChangelogReader implements ChangelogReader {
             pauseChangelogsFromRestoreConsumer(Collections.singleton(partition));
 
             try {
-                // first trigger the store's specific listener if its registered callback is also an listener,
-                // then trigger the user registered global listener
-                final StateRestoreCallback restoreCallback = storeMetadata.restoreCallback();
-                if (restoreCallback instanceof StateRestoreListener) {
-                    ((StateRestoreListener) restoreCallback).onRestoreEnd(partition, storeName, changelogMetadata.totalRestored);
-                }
-
                 stateRestoreListener.onRestoreEnd(partition, storeName, changelogMetadata.totalRestored);
             } catch (final Exception e) {
                 throw new StreamsException("State restore listener failed on restore completed", e);
@@ -797,13 +781,6 @@ public class StoreChangelogReader implements ChangelogReader {
                 }
 
                 try {
-                    // first trigger the store's specific listener if its registered callback is also an lister,
-                    // then trigger the user registered global listener
-                    final StateRestoreCallback restoreCallback = storeMetadata.restoreCallback();
-                    if (restoreCallback instanceof StateRestoreListener) {
-                        ((StateRestoreListener) restoreCallback).onRestoreStart(partition, storeName, startOffset, changelogMetadata.restoreEndOffset);
-                    }
-
                     stateRestoreListener.onRestoreStart(partition, storeName, startOffset, changelogMetadata.restoreEndOffset);
                 } catch (final Exception e) {
                     throw new StreamsException("State restore listener failed on batch restored", e);
@@ -812,37 +789,14 @@ public class StoreChangelogReader implements ChangelogReader {
         }
     }
 
-    private RuntimeException invokeOnRestoreEnd(final TopicPartition partition,
-                                                final ChangelogMetadata changelogMetadata) {
-        // only trigger the store's specific listener to make sure we disable bulk loading before transition to standby
-        final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
-        final StateRestoreCallback restoreCallback = storeMetadata.restoreCallback();
-        final String storeName = storeMetadata.store().name();
-        if (restoreCallback instanceof StateRestoreListener) {
-            try {
-                ((StateRestoreListener) restoreCallback).onRestoreEnd(partition, storeName, changelogMetadata.totalRestored);
-            } catch (final RuntimeException e) {
-                return e;
-            }
-        }
-        return null;
-    }
-
     @Override
-    public void unregister(final Collection<TopicPartition> revokedChangelogs,
-                           final boolean triggerOnRestoreEnd) {
-        final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
-
+    public void unregister(final Collection<TopicPartition> revokedChangelogs) {
         // Only changelogs that are initialized have been added to the restore consumer's assignment
         final List<TopicPartition> revokedInitializedChangelogs = new ArrayList<>();
 
         for (final TopicPartition partition : revokedChangelogs) {
             final ChangelogMetadata changelogMetadata = changelogs.remove(partition);
             if (changelogMetadata != null) {
-                if (triggerOnRestoreEnd && changelogMetadata.state().equals(ChangelogState.RESTORING)) {
-                    firstException.compareAndSet(null, invokeOnRestoreEnd(partition, changelogMetadata));
-                }
-
                 if (!changelogMetadata.state().equals(ChangelogState.REGISTERED)) {
                     revokedInitializedChangelogs.add(partition);
                 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index ce64bf2..f0979f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -17,16 +17,14 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
+import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -37,10 +35,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements SegmentedBytesStore {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSegmentedBytesStore.class);
@@ -50,7 +46,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
     private final KeySchema keySchema;
     private InternalProcessorContext context;
     private volatile boolean open;
-    private Set<S> bulkLoadSegments;
     private Sensor expiredRecordSensor;
     private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
 
@@ -186,8 +181,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
 
         segments.openExisting(this.context, observedStreamTime);
 
-        bulkLoadSegments = new HashSet<>(segments.allSegments());
-
         // register and possibly restore the state from the logs
         context.register(root, new RocksDBSegmentsBatchingRestoreCallback());
 
@@ -249,17 +242,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
             final long segmentId = segments.segmentId(timestamp);
             final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
             if (segment != null) {
-                // This handles the case that state store is moved to a new client and does not
-                // have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading
-                // will only close the database and open it again with bulk loading enabled.
-                if (!bulkLoadSegments.contains(segment) && context.taskType() == TaskType.ACTIVE) {
-                    segment.toggleDbForBulkLoading(true);
-                    // If the store does not exist yet, the getOrCreateSegmentIfLive will call openDB that
-                    // makes the open flag for the newly created store.
-                    // if the store does exist already, then toggleDbForBulkLoading will make sure that
-                    // the store is already open here.
-                    bulkLoadSegments = new HashSet<>(segments.allSegments());
-                }
                 try {
                     final WriteBatch batch = writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch());
                     segment.addToBatch(record, batch);
@@ -271,32 +253,11 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
         return writeBatchMap;
     }
 
-    private void toggleForBulkLoading(final boolean prepareForBulkload) {
-        for (final S segment : segments.allSegments()) {
-            segment.toggleDbForBulkLoading(prepareForBulkload);
-        }
-    }
-
-    private class RocksDBSegmentsBatchingRestoreCallback extends AbstractNotifyingBatchingRestoreCallback {
+    private class RocksDBSegmentsBatchingRestoreCallback implements BatchingStateRestoreCallback {
 
         @Override
         public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) {
             restoreAllInternal(records);
         }
-
-        @Override
-        public void onRestoreStart(final TopicPartition topicPartition,
-                                   final String storeName,
-                                   final long startingOffset,
-                                   final long endingOffset) {
-            toggleForBulkLoading(true);
-        }
-
-        @Override
-        public void onRestoreEnd(final TopicPartition topicPartition,
-                                 final String storeName,
-                                 final long totalRestored) {
-            toggleForBulkLoading(false);
-        }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/BulkLoadingStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/BatchWritingStore.java
similarity index 91%
rename from streams/src/main/java/org/apache/kafka/streams/state/internals/BulkLoadingStore.java
rename to streams/src/main/java/org/apache/kafka/streams/state/internals/BatchWritingStore.java
index 1e27cc2..2ac1e3b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/BulkLoadingStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/BatchWritingStore.java
@@ -20,8 +20,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteBatch;
 
-public interface BulkLoadingStore {
-    void toggleDbForBulkLoading(final boolean prepareForBulkload);
+public interface BatchWritingStore {
     void addToBatch(final KeyValue<byte[], byte[]> record,
                     final WriteBatch batch) throws RocksDBException;
     void write(final WriteBatch batch) throws RocksDBException;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index a57f31e..fc2bb24 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
@@ -24,7 +23,6 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
 import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -73,7 +71,7 @@ import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CON
 /**
  * A persistent key-value store based on RocksDB.
  */
-public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingStore {
+public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingStore {
     private static final Logger log = LoggerFactory.getLogger(RocksDBStore.class);
 
     private static final Pattern SST_FILE_EXTENSION = Pattern.compile(".*\\.sst");
@@ -106,7 +104,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
     private final RocksDBMetricsRecorder metricsRecorder;
     private boolean isStatisticsRegistered = false;
 
-    private volatile boolean prepareForBulkload = false;
     ProcessorContext internalProcessorContext;
     // visible for testing
     volatile BatchingStateRestoreCallback batchingStateRestoreCallback = null;
@@ -175,10 +172,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
             configSetter.setConfig(name, userSpecifiedOptions, configs);
         }
 
-        if (prepareForBulkload) {
-            userSpecifiedOptions.prepareForBulkLoad();
-        }
-
         dbDir = new File(new File(context.stateDir(), parentDir), name);
 
         try {
@@ -236,11 +229,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
         context.register(root, batchingStateRestoreCallback);
     }
 
-    // visible for testing
-    boolean isPrepareForBulkload() {
-        return prepareForBulkload;
-    }
-
     @Override
     public String name() {
         return name;
@@ -262,7 +250,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public synchronized void put(final Bytes key,
                                  final byte[] value) {
@@ -391,22 +378,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
     }
 
     @Override
-    public void toggleDbForBulkLoading(final boolean prepareForBulkload) {
-        if (prepareForBulkload) {
-            // if the store is not empty, we need to compact to get around the num.levels check for bulk loading
-            final String[] sstFileNames = dbDir.list((dir, name) -> SST_FILE_EXTENSION.matcher(name).matches());
-
-            if (sstFileNames != null && sstFileNames.length > 0) {
-                dbAccessor.toggleDbForBulkLoading();
-            }
-        }
-
-        close();
-        this.prepareForBulkload = prepareForBulkload;
-        openDB(internalProcessorContext);
-    }
-
-    @Override
     public void addToBatch(final KeyValue<byte[], byte[]> record,
                            final WriteBatch batch) throws RocksDBException {
         dbAccessor.addToBatch(record.key, record.value, batch);
@@ -506,8 +477,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
                         final WriteBatch batch) throws RocksDBException;
 
         void close();
-
-        void toggleDbForBulkLoading();
     }
 
     class SingleColumnFamilyAccessor implements RocksDBAccessor {
@@ -607,20 +576,10 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
         public void close() {
             columnFamily.close();
         }
-
-        @Override
-        @SuppressWarnings("deprecation")
-        public void toggleDbForBulkLoading() {
-            try {
-                db.compactRange(columnFamily, true, 1, 0);
-            } catch (final RocksDBException e) {
-                throw new ProcessorStateException("Error while range compacting during restoring  store " + name, e);
-            }
-        }
     }
 
     // not private for testing
-    static class RocksDBBatchingRestoreCallback extends AbstractNotifyingBatchingRestoreCallback {
+    static class RocksDBBatchingRestoreCallback implements BatchingStateRestoreCallback {
 
         private final RocksDBStore rocksDBStore;
 
@@ -637,21 +596,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
                 throw new ProcessorStateException("Error restoring batch to store " + rocksDBStore.name, e);
             }
         }
-
-        @Override
-        public void onRestoreStart(final TopicPartition topicPartition,
-                                   final String storeName,
-                                   final long startingOffset,
-                                   final long endingOffset) {
-            rocksDBStore.toggleDbForBulkLoading(true);
-        }
-
-        @Override
-        public void onRestoreEnd(final TopicPartition topicPartition,
-                                 final String storeName,
-                                 final long totalRestored) {
-            rocksDBStore.toggleDbForBulkLoading(false);
-        }
     }
 
     // for testing
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
index bc6c17f..6c31e9b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
@@ -249,21 +249,6 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
             oldColumnFamily.close();
             newColumnFamily.close();
         }
-
-        @Override
-        @SuppressWarnings("deprecation")
-        public void toggleDbForBulkLoading() {
-            try {
-                db.compactRange(oldColumnFamily, true, 1, 0);
-            } catch (final RocksDBException e) {
-                throw new ProcessorStateException("Error while range compacting during restoring  store " + name, e);
-            }
-            try {
-                db.compactRange(newColumnFamily, true, 1, 0);
-            } catch (final RocksDBException e) {
-                throw new ProcessorStateException("Error while range compacting during restoring  store " + name, e);
-            }
-        }
     }
 
     private class RocksDBDualCFIterator extends AbstractIterator<KeyValue<Bytes, byte[]>>
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index fe1fc33..c86ee96 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.io.IOException;
 
-public interface Segment extends KeyValueStore<Bytes, byte[]>, BulkLoadingStore {
+public interface Segment extends KeyValueStore<Bytes, byte[]>, BatchWritingStore {
 
     void destroy() throws IOException;
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
deleted file mode 100644
index c981107..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.test.MockBatchingStateRestoreListener;
-import org.apache.kafka.test.MockStateRestoreListener;
-import org.junit.Test;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.Collections;
-
-import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH;
-import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END;
-import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START;
-import static org.hamcrest.core.Is.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-
-public class CompositeRestoreListenerTest {
-
-    private final MockStateRestoreCallback stateRestoreCallback = new MockStateRestoreCallback();
-    private final MockBatchingStateRestoreListener batchingStateRestoreCallback = new MockBatchingStateRestoreListener();
-    private final MockNoListenBatchingStateRestoreCallback
-        noListenBatchingStateRestoreCallback =
-        new MockNoListenBatchingStateRestoreCallback();
-    private final MockStateRestoreListener reportingStoreListener = new MockStateRestoreListener();
-    private final byte[] key = "key".getBytes(StandardCharsets.UTF_8);
-    private final byte[] value = "value".getBytes(StandardCharsets.UTF_8);
-    private final Collection<KeyValue<byte[], byte[]>> records = Collections.singletonList(KeyValue.pair(key, value));
-    private final Collection<ConsumerRecord<byte[], byte[]>> consumerRecords = Collections.singletonList(
-        new ConsumerRecord<>("", 0, 0L, key, value)
-    );
-    private final String storeName = "test_store";
-    private final long startOffset = 0L;
-    private final long endOffset = 1L;
-    private final long batchOffset = 1L;
-    private final long numberRestored = 1L;
-    private final TopicPartition topicPartition = new TopicPartition("testTopic", 1);
-
-    private CompositeRestoreListener compositeRestoreListener;
-
-
-    @Test
-    public void shouldRestoreInNonBatchMode() {
-        setUpCompositeRestoreListener(stateRestoreCallback);
-        compositeRestoreListener.restoreBatch(consumerRecords);
-        assertThat(stateRestoreCallback.restoredKey, is(key));
-        assertThat(stateRestoreCallback.restoredValue, is(value));
-    }
-
-    @Test
-    public void shouldRestoreInBatchMode() {
-        setUpCompositeRestoreListener(batchingStateRestoreCallback);
-        compositeRestoreListener.restoreBatch(consumerRecords);
-        assertThat(batchingStateRestoreCallback.getRestoredRecords(), is(records));
-    }
-
-    @Test
-    public void shouldNotifyRestoreStartNonBatchMode() {
-        setUpCompositeRestoreListener(stateRestoreCallback);
-        compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset);
-        assertStateRestoreListenerOnStartNotification(stateRestoreCallback);
-        assertStateRestoreListenerOnStartNotification(reportingStoreListener);
-    }
-
-    @Test
-    public void shouldNotifyRestoreStartBatchMode() {
-        setUpCompositeRestoreListener(batchingStateRestoreCallback);
-        compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset);
-        assertStateRestoreListenerOnStartNotification(batchingStateRestoreCallback);
-        assertStateRestoreListenerOnStartNotification(reportingStoreListener);
-    }
-
-    @Test
-    public void shouldNotifyRestoreProgressNonBatchMode() {
-        setUpCompositeRestoreListener(stateRestoreCallback);
-        compositeRestoreListener.onBatchRestored(topicPartition, storeName, endOffset, numberRestored);
-        assertStateRestoreListenerOnBatchCompleteNotification(stateRestoreCallback);
-        assertStateRestoreListenerOnBatchCompleteNotification(reportingStoreListener);
-    }
-
-    @Test
-    public void shouldNotifyRestoreProgressBatchMode() {
-        setUpCompositeRestoreListener(batchingStateRestoreCallback);
-        compositeRestoreListener.onBatchRestored(topicPartition, storeName, endOffset, numberRestored);
-        assertStateRestoreListenerOnBatchCompleteNotification(batchingStateRestoreCallback);
-        assertStateRestoreListenerOnBatchCompleteNotification(reportingStoreListener);
-    }
-
-    @Test
-    public void shouldNotifyRestoreEndInNonBatchMode() {
-        setUpCompositeRestoreListener(stateRestoreCallback);
-        compositeRestoreListener.onRestoreEnd(topicPartition, storeName, numberRestored);
-        assertStateRestoreOnEndNotification(stateRestoreCallback);
-        assertStateRestoreOnEndNotification(reportingStoreListener);
-    }
-
-    @Test
-    public void shouldNotifyRestoreEndInBatchMode() {
-        setUpCompositeRestoreListener(batchingStateRestoreCallback);
-        compositeRestoreListener.onRestoreEnd(topicPartition, storeName, numberRestored);
-        assertStateRestoreOnEndNotification(batchingStateRestoreCallback);
-        assertStateRestoreOnEndNotification(reportingStoreListener);
-    }
-
-    @Test
-    public void shouldHandleNullReportStoreListener() {
-        compositeRestoreListener = new CompositeRestoreListener(batchingStateRestoreCallback);
-        compositeRestoreListener.setUserRestoreListener(null);
-
-        compositeRestoreListener.restoreBatch(consumerRecords);
-        compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset);
-        compositeRestoreListener.onBatchRestored(topicPartition, storeName, batchOffset, numberRestored);
-        compositeRestoreListener.onRestoreEnd(topicPartition, storeName, numberRestored);
-
-        assertThat(batchingStateRestoreCallback.getRestoredRecords(), is(records));
-        assertStateRestoreOnEndNotification(batchingStateRestoreCallback);
-    }
-
-    @Test
-    public void shouldHandleNoRestoreListener() {
-        compositeRestoreListener = new CompositeRestoreListener(noListenBatchingStateRestoreCallback);
-        compositeRestoreListener.setUserRestoreListener(null);
-
-        compositeRestoreListener.restoreBatch(consumerRecords);
-        compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset);
-        compositeRestoreListener.onBatchRestored(topicPartition, storeName, batchOffset, numberRestored);
-        compositeRestoreListener.onRestoreEnd(topicPartition, storeName, numberRestored);
-
-        assertThat(noListenBatchingStateRestoreCallback.restoredRecords, is(records));
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void shouldThrowExceptionWhenSinglePutDirectlyCalled() {
-        compositeRestoreListener = new CompositeRestoreListener(noListenBatchingStateRestoreCallback);
-        compositeRestoreListener.restore(key, value);
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void shouldThrowExceptionWhenRestoreAllDirectlyCalled() {
-        compositeRestoreListener = new CompositeRestoreListener(noListenBatchingStateRestoreCallback);
-        compositeRestoreListener.restoreAll(Collections.emptyList());
-    }
-
-    private void assertStateRestoreListenerOnStartNotification(final MockStateRestoreListener restoreListener) {
-        assertTrue(restoreListener.storeNameCalledStates.containsKey(RESTORE_START));
-        assertThat(restoreListener.restoreTopicPartition, is(topicPartition));
-        assertThat(restoreListener.restoreStartOffset, is(startOffset));
-        assertThat(restoreListener.restoreEndOffset, is(endOffset));
-    }
-
-    private void assertStateRestoreListenerOnBatchCompleteNotification(final MockStateRestoreListener restoreListener) {
-        assertTrue(restoreListener.storeNameCalledStates.containsKey(RESTORE_BATCH));
-        assertThat(restoreListener.restoreTopicPartition, is(topicPartition));
-        assertThat(restoreListener.restoredBatchOffset, is(batchOffset));
-        assertThat(restoreListener.numBatchRestored, is(numberRestored));
-    }
-
-    private void assertStateRestoreOnEndNotification(final MockStateRestoreListener restoreListener) {
-        assertTrue(restoreListener.storeNameCalledStates.containsKey(RESTORE_END));
-        assertThat(restoreListener.restoreTopicPartition, is(topicPartition));
-        assertThat(restoreListener.totalNumRestored, is(numberRestored));
-    }
-
-
-    private void setUpCompositeRestoreListener(final StateRestoreCallback stateRestoreCallback) {
-        compositeRestoreListener = new CompositeRestoreListener(stateRestoreCallback);
-        compositeRestoreListener.setUserRestoreListener(reportingStoreListener);
-    }
-
-
-    private static class MockStateRestoreCallback extends MockStateRestoreListener implements StateRestoreCallback {
-
-        byte[] restoredKey;
-        byte[] restoredValue;
-
-        @Override
-        public void restore(final byte[] key, final byte[] value) {
-            restoredKey = key;
-            restoredValue = value;
-        }
-    }
-
-    private static class MockNoListenBatchingStateRestoreCallback implements BatchingStateRestoreCallback {
-
-        Collection<KeyValue<byte[], byte[]>> restoredRecords;
-
-        @Override
-        public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) {
-            restoredRecords = records;
-        }
-
-        @Override
-        public void restore(final byte[] key, final byte[] value) {
-            throw new IllegalStateException("Should not be called");
-
-        }
-    }
-
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
index ad7dad6..0bba6aa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
@@ -64,7 +64,7 @@ public class MockChangelogReader implements ChangelogReader {
     }
 
     @Override
-    public void unregister(final Collection<TopicPartition> partitions, final boolean triggerOnRestoreEnd) {
+    public void unregister(final Collection<TopicPartition> partitions) {
         restoringPartitions.removeAll(partitions);
 
         for (final TopicPartition partition : partitions) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index fdfb3c2..77ed3a4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -33,8 +33,8 @@ import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
 import org.apache.kafka.streams.state.TimestampedBytesStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
-import org.apache.kafka.test.MockBatchingStateRestoreListener;
 import org.apache.kafka.test.MockKeyValueStore;
+import org.apache.kafka.test.MockRestoreCallback;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
@@ -51,7 +51,6 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -212,21 +211,21 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void shouldRestoreStoreWithRestoreCallback() {
-        final MockBatchingStateRestoreListener batchingRestoreCallback = new MockBatchingStateRestoreListener();
+        final MockRestoreCallback restoreCallback = new MockRestoreCallback();
 
         final KeyValue<byte[], byte[]> expectedKeyValue = KeyValue.pair(keyBytes, valueBytes);
 
         final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE);
 
         try {
-            stateMgr.registerStore(persistentStore, batchingRestoreCallback);
+            stateMgr.registerStore(persistentStore, restoreCallback);
             final StateStoreMetadata storeMetadata = stateMgr.storeMetadata(persistentStorePartition);
             assertThat(storeMetadata, notNullValue());
 
             stateMgr.restore(storeMetadata, singletonList(consumerRecord));
 
-            assertThat(batchingRestoreCallback.getRestoredRecords().size(), is(1));
-            assertTrue(batchingRestoreCallback.getRestoredRecords().contains(expectedKeyValue));
+            assertThat(restoreCallback.restored.size(), is(1));
+            assertTrue(restoreCallback.restored.contains(expectedKeyValue));
 
             assertEquals(Collections.singletonMap(persistentStorePartition, 101L), stateMgr.changelogOffsets());
         } finally {
@@ -714,11 +713,8 @@ public class ProcessorStateManagerTest {
     public void shouldThrowIfRestoreCallbackThrows() {
         final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE);
 
-        stateMgr.registerStore(persistentStore, new MockBatchingStateRestoreListener() {
-            @Override
-            public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) {
-                throw new RuntimeException("KABOOM!");
-            }
+        stateMgr.registerStore(persistentStore, (key, value) -> {
+            throw new RuntimeException("KABOOM!");
         });
 
         final StateStoreMetadata storeMetadata = stateMgr.storeMetadata(persistentStorePartition);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 6956e79..6769f4a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -31,7 +31,6 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.test.MockBatchingStateRestoreListener;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.easymock.EasyMock;
@@ -114,11 +113,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
     private final KafkaException kaboom = new KafkaException("KABOOM!");
     private final MockStateRestoreListener exceptionCallback = new MockStateRestoreListener() {
         @Override
-        public void restore(final byte[] key, final byte[] value) {
-            throw kaboom;
-        }
-
-        @Override
         public void onRestoreStart(final TopicPartition tp, final String store, final long stOffset, final long edOffset) {
             throw kaboom;
         }
@@ -897,7 +891,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
         assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp2).state());
 
         // should support removing and clearing changelogs
-        changelogReader.unregister(Collections.singletonList(tp), false);
+        changelogReader.unregister(Collections.singletonList(tp));
         assertNull(changelogReader.changelogMetadata(tp));
         assertFalse(changelogReader.isEmpty());
         assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp1).state());
@@ -910,79 +904,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
     }
 
     @Test
-    public void shouldTriggerRestoreCallbackAsListener() {
-        // do not need this test for standby task
-        if (type == STANDBY)
-            return;
-
-        final MockBatchingStateRestoreListener restoreListener = new MockBatchingStateRestoreListener();
-        EasyMock.expect(storeMetadata.restoreCallback()).andReturn(restoreListener).anyTimes();
-        EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
-        EasyMock.replay(stateManager, storeMetadata, store);
-
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
-            @Override
-            public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
-                return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 11L));
-            }
-        };
-        consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
-
-        final StoreChangelogReader changelogReader =
-                new StoreChangelogReader(time, config, logContext, consumer, callback);
-
-        changelogReader.register(tp, stateManager);
-
-        changelogReader.restore();
-
-        assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state());
-        assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored());
-        assertEquals(5L, consumer.position(tp));
-        assertEquals(Collections.emptySet(), consumer.paused());
-
-        assertEquals(11L, (long) changelogReader.changelogMetadata(tp).endOffset());
-
-        assertEquals(tp, callback.restoreTopicPartition);
-        assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_START));
-        assertNull(callback.storeNameCalledStates.get(RESTORE_END));
-        assertNull(callback.storeNameCalledStates.get(RESTORE_BATCH));
-        assertEquals(5L, restoreListener.restoreStartOffset);
-        assertEquals(11L, restoreListener.restoreEndOffset);
-        assertEquals(storeName, restoreListener.storeNameCalledStates.get(RESTORE_START));
-
-        consumer.addRecord(new ConsumerRecord<>(topicName, 0, 6L, "key".getBytes(), "value".getBytes()));
-        consumer.addRecord(new ConsumerRecord<>(topicName, 0, 7L, "key".getBytes(), "value".getBytes()));
-        // null key should be ignored
-        consumer.addRecord(new ConsumerRecord<>(topicName, 0, 8L, null, "value".getBytes()));
-        consumer.addRecord(new ConsumerRecord<>(topicName, 0, 9L, "key".getBytes(), "value".getBytes()));
-
-        changelogReader.restore();
-
-        assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state());
-        assertEquals(3L, changelogReader.changelogMetadata(tp).totalRestored());
-        assertEquals(0, changelogReader.changelogMetadata(tp).bufferedRecords().size());
-        assertEquals(0, changelogReader.changelogMetadata(tp).bufferedLimitIndex());
-        assertEquals(storeName, restoreListener.storeNameCalledStates.get(RESTORE_BATCH));
-
-        // consumer position bypassing the gap in the next poll
-        consumer.seek(tp, 11L);
-
-        changelogReader.restore();
-
-        assertEquals(11L, consumer.position(tp));
-        assertEquals(3L, changelogReader.changelogMetadata(tp).totalRestored());
-
-        assertEquals(StoreChangelogReader.ChangelogState.COMPLETED, changelogReader.changelogMetadata(tp).state());
-        assertEquals(3L, changelogReader.changelogMetadata(tp).totalRestored());
-        assertEquals(Collections.singleton(tp), changelogReader.completedChangelogs());
-        assertEquals(Collections.singleton(tp), consumer.paused());
-
-        assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_BATCH));
-        assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_END));
-        assertEquals(storeName, restoreListener.storeNameCalledStates.get(RESTORE_END));
-    }
-
-    @Test
     public void shouldTransitState() {
         EasyMock.expect(storeMetadataOne.changelogPartition()).andReturn(tp1).anyTimes();
         EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes();
@@ -1038,7 +959,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
         // transition to update standby is NOT idempotent
         assertThrows(IllegalStateException.class, changelogReader::transitToUpdateStandby);
 
-        changelogReader.unregister(Collections.singletonList(tp), false);
+        changelogReader.unregister(Collections.singletonList(tp));
         changelogReader.register(tp, activeStateManager);
 
         // if a new active is registered, we should immediately transit to standby updating
@@ -1092,7 +1013,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
     public void shouldNotThrowOnUnknownRevokedPartition() {
         LogCaptureAppender.setClassLoggerToDebug(StoreChangelogReader.class);
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StoreChangelogReader.class)) {
-            changelogReader.unregister(Collections.singletonList(new TopicPartition("unknown", 0)), false);
+            changelogReader.unregister(Collections.singletonList(new TopicPartition("unknown", 0)));
 
             assertThat(
                 appender.getMessages(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 5b0081c..4b3f9d5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -30,7 +30,6 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
-import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -47,7 +46,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
-import org.rocksdb.Options;
 import org.rocksdb.WriteBatch;
 
 import java.io.File;
@@ -144,8 +142,6 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
 
     abstract AbstractSegments<S> newSegments();
 
-    abstract Options getOptions(S segment);
-
     @Test
     public void shouldPutAndFetch() {
         final String key = "a";
@@ -381,11 +377,6 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         // 2 segments are created during restoration.
         assertEquals(2, bytesStore.getSegments().size());
 
-        // Bulk loading is disabled during recovery for stand-by tasks.
-        for (final S segment : bytesStore.getSegments()) {
-            assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(taskType == TaskType.ACTIVE ? 1 << 30 : 4));
-        }
-
         final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
         expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
         expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 100L));
@@ -395,28 +386,6 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
     }
 
     @Test
-    public void shouldRespectBulkLoadOptionsDuringInit() {
-        bytesStore.init(context, bytesStore);
-        final String key = "a";
-        bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
-        bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L));
-        assertEquals(2, bytesStore.getSegments().size());
-
-        final StateRestoreListener restoreListener = context.getRestoreListener(bytesStore.name());
-
-        restoreListener.onRestoreStart(null, bytesStore.name(), 0L, 0L);
-
-        for (final S segment : bytesStore.getSegments()) {
-            assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(1 << 30));
-        }
-
-        restoreListener.onRestoreEnd(null, bytesStore.name(), 0L);
-        for (final S segment : bytesStore.getSegments()) {
-            assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(4));
-        }
-    }
-
-    @Test
     public void shouldLogAndMeasureExpiredRecordsWithBuiltInMetricsVersionLatest() {
         shouldLogAndMeasureExpiredRecords(StreamsConfig.METRICS_LATEST);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index b9e49b2..3b6904f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.rocksdb.Options;
-
 public class RocksDBSegmentedBytesStoreTest extends AbstractRocksDBSegmentedBytesStoreTest<KeyValueSegment> {
 
     private final static String METRICS_SCOPE = "metrics-scope";
@@ -37,9 +35,4 @@ public class RocksDBSegmentedBytesStoreTest extends AbstractRocksDBSegmentedByte
     KeyValueSegments newSegments() {
         return new KeyValueSegments(storeName, METRICS_SCOPE, retention, segmentInterval);
     }
-
-    @Override
-    Options getOptions(final KeyValueSegment segment) {
-        return segment.getOptions();
-    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index f61b9c6..c86b914 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -33,7 +33,6 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -238,25 +237,6 @@ public class RocksDBStoreTest {
     }
 
     @Test
-    public void shouldRespectBulkloadOptionsDuringInit() {
-        rocksDBStore.init(context, rocksDBStore);
-
-        final StateRestoreListener restoreListener = context.getRestoreListener(rocksDBStore.name());
-
-        restoreListener.onRestoreStart(null, rocksDBStore.name(), 0L, 0L);
-
-        assertThat(rocksDBStore.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30));
-        assertThat(rocksDBStore.getOptions().level0SlowdownWritesTrigger(), equalTo(1 << 30));
-        assertThat(rocksDBStore.getOptions().level0StopWritesTrigger(), equalTo(1 << 30));
-
-        restoreListener.onRestoreEnd(null, rocksDBStore.name(), 0L);
-
-        assertThat(rocksDBStore.getOptions().level0FileNumCompactionTrigger(), equalTo(10));
-        assertThat(rocksDBStore.getOptions().level0SlowdownWritesTrigger(), equalTo(20));
-        assertThat(rocksDBStore.getOptions().level0StopWritesTrigger(), equalTo(36));
-    }
-
-    @Test
     public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() {
         rocksDBStore.init(context, rocksDBStore);
         rocksDBStore.put(new Bytes("existingKey".getBytes(UTF_8)), "existingValue".getBytes(UTF_8));
@@ -331,36 +311,6 @@ public class RocksDBStoreTest {
     }
 
     @Test
-    public void shouldTogglePrepareForBulkloadSetting() {
-        rocksDBStore.init(context, rocksDBStore);
-        final RocksDBStore.RocksDBBatchingRestoreCallback restoreListener =
-            (RocksDBStore.RocksDBBatchingRestoreCallback) rocksDBStore.batchingStateRestoreCallback;
-
-        restoreListener.onRestoreStart(null, null, 0, 0);
-        assertTrue("Should have set bulk loading to true", rocksDBStore.isPrepareForBulkload());
-
-        restoreListener.onRestoreEnd(null, null, 0);
-        assertFalse("Should have set bulk loading to false", rocksDBStore.isPrepareForBulkload());
-    }
-
-    @Test
-    public void shouldTogglePrepareForBulkloadSettingWhenPrexistingSstFiles() {
-        final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
-
-        rocksDBStore.init(context, rocksDBStore);
-        context.restore(rocksDBStore.name(), entries);
-
-        final RocksDBStore.RocksDBBatchingRestoreCallback restoreListener =
-            (RocksDBStore.RocksDBBatchingRestoreCallback) rocksDBStore.batchingStateRestoreCallback;
-
-        restoreListener.onRestoreStart(null, null, 0, 0);
-        assertTrue("Should have not set bulk loading to true", rocksDBStore.isPrepareForBulkload());
-
-        restoreListener.onRestoreEnd(null, null, 0);
-        assertFalse("Should have set bulk loading to false", rocksDBStore.isPrepareForBulkload());
-    }
-
-    @Test
     public void shouldRestoreAll() {
         final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreTest.java
index 01510ff..814a04c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.rocksdb.Options;
-
 public class RocksDBTimestampedSegmentedBytesStoreTest
     extends AbstractRocksDBSegmentedBytesStoreTest<TimestampedSegment> {
 
@@ -37,9 +35,4 @@ public class RocksDBTimestampedSegmentedBytesStoreTest
     TimestampedSegments newSegments() {
         return new TimestampedSegments(storeName, METRICS_SCOPE, retention, segmentInterval);
     }
-
-    @Override
-    Options getOptions(final TimestampedSegment segment) {
-        return segment.getOptions();
-    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index ed5d943..f1fd916 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -29,12 +29,10 @@ import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
-import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
@@ -393,31 +391,13 @@ public class InternalMockProcessorContext
         cache().addDirtyEntryFlushListener(namespace, listener);
     }
 
-    public StateRestoreListener getRestoreListener(final String storeName) {
-        return getStateRestoreListener(restoreFuncs.get(storeName));
-    }
-
     public void restore(final String storeName, final Iterable<KeyValue<byte[], byte[]>> changeLog) {
         final RecordBatchingStateRestoreCallback restoreCallback = adapt(restoreFuncs.get(storeName));
-        final StateRestoreListener restoreListener = getRestoreListener(storeName);
-
-        restoreListener.onRestoreStart(null, storeName, 0L, 0L);
 
         final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
         for (final KeyValue<byte[], byte[]> keyValue : changeLog) {
             records.add(new ConsumerRecord<>("", 0, 0L, keyValue.key, keyValue.value));
         }
-
         restoreCallback.restoreBatch(records);
-
-        restoreListener.onRestoreEnd(null, storeName, 0L);
-    }
-
-    private StateRestoreListener getStateRestoreListener(final StateRestoreCallback restoreCallback) {
-        if (restoreCallback instanceof StateRestoreListener) {
-            return (StateRestoreListener) restoreCallback;
-        }
-
-        return CompositeRestoreListener.NO_OP_STATE_RESTORE_LISTENER;
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockBatchingStateRestoreListener.java b/streams/src/test/java/org/apache/kafka/test/MockBatchingStateRestoreListener.java
deleted file mode 100644
index 1736a54..0000000
--- a/streams/src/test/java/org/apache/kafka/test/MockBatchingStateRestoreListener.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
-
-import java.util.ArrayList;
-import java.util.Collection;
-
-public class MockBatchingStateRestoreListener extends MockStateRestoreListener implements BatchingStateRestoreCallback {
-
-    private final Collection<KeyValue<byte[], byte[]>> restoredRecords = new ArrayList<>();
-
-    @Override
-    public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) {
-        restoredRecords.addAll(records);
-    }
-
-    @Override
-    public void restore(final byte[] key, final byte[] value) {
-        throw new IllegalStateException("Should not be called");
-
-    }
-
-    public Collection<KeyValue<byte[], byte[]>> getRestoredRecords() {
-        return restoredRecords;
-    }
-}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockRestoreCallback.java b/streams/src/test/java/org/apache/kafka/test/MockRestoreCallback.java
index 096fa11..fa5b465 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRestoreCallback.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreCallback.java
@@ -25,7 +25,6 @@ import java.util.List;
 public class MockRestoreCallback implements StateRestoreCallback {
     public List<KeyValue<byte[], byte[]>> restored = new ArrayList<>();
 
-
     @Override
     public void restore(final byte[] key, final byte[] value) {
         restored.add(KeyValue.pair(key, value));
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java b/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java
index 3138167..1026969 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java
@@ -18,19 +18,15 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-public class MockStateRestoreListener extends AbstractNotifyingRestoreCallback {
+public class MockStateRestoreListener implements StateRestoreListener {
 
     // verifies store name called for each state
     public final Map<String, String> storeNameCalledStates = new HashMap<>();
-    public final List<KeyValue<byte[], byte[]>> restored = new ArrayList<>();
     public long restoreStartOffset;
     public long restoreEndOffset;
     public long restoredBatchOffset;
@@ -43,11 +39,6 @@ public class MockStateRestoreListener extends AbstractNotifyingRestoreCallback {
     public static final String RESTORE_END = "restore_end";
 
     @Override
-    public void restore(final byte[] key, final byte[] value) {
-        restored.add(KeyValue.pair(key, value));
-    }
-
-    @Override
     public void onRestoreStart(final TopicPartition topicPartition,
                                final String storeName,
                                final long startingOffset,
@@ -82,7 +73,6 @@ public class MockStateRestoreListener extends AbstractNotifyingRestoreCallback {
     public String toString() {
         return "MockStateRestoreListener{" +
                "storeNameCalledStates=" + storeNameCalledStates +
-               ", restored=" + restored +
                ", restoreStartOffset=" + restoreStartOffset +
                ", restoreEndOffset=" + restoreEndOffset +
                ", restoredBatchOffset=" + restoredBatchOffset +