You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/06/08 21:18:58 UTC
[kafka] branch 2.6 updated: KAFKA-10005: Decouple RestoreListener
from RestoreCallback (#8676)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 516ab2d KAFKA-10005: Decouple RestoreListener from RestoreCallback (#8676)
516ab2d is described below
commit 516ab2d938b1f59d5b7a2e799ae7c7d553d8e446
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Mon Jun 8 14:17:45 2020 -0700
KAFKA-10005: Decouple RestoreListener from RestoreCallback (#8676)
And remove bulk loading mechanism inside RocksDB.
Reviewers: John Roesler <vv...@apache.org>, A. Sophie Blee-Goldman <so...@confluent.io>
---
.../AbstractNotifyingBatchingRestoreCallback.java | 83 --------
.../AbstractNotifyingRestoreCallback.java | 72 -------
.../processor/BatchingStateRestoreCallback.java | 4 +
.../streams/processor/StateRestoreListener.java | 9 +-
.../processor/internals/ChangelogRegister.java | 3 +-
.../internals/CompositeRestoreListener.java | 116 -----------
.../processor/internals/ProcessorStateManager.java | 18 +-
.../RecordBatchingStateRestoreCallback.java | 5 -
.../processor/internals/StoreChangelogReader.java | 48 +----
.../AbstractRocksDBSegmentedBytesStore.java | 43 +---
...ulkLoadingStore.java => BatchWritingStore.java} | 3 +-
.../streams/state/internals/RocksDBStore.java | 60 +-----
.../state/internals/RocksDBTimestampedStore.java | 15 --
.../kafka/streams/state/internals/Segment.java | 2 +-
.../internals/CompositeRestoreListenerTest.java | 222 ---------------------
.../processor/internals/MockChangelogReader.java | 2 +-
.../internals/ProcessorStateManagerTest.java | 18 +-
.../internals/StoreChangelogReaderTest.java | 85 +-------
.../AbstractRocksDBSegmentedBytesStoreTest.java | 31 ---
.../internals/RocksDBSegmentedBytesStoreTest.java | 7 -
.../streams/state/internals/RocksDBStoreTest.java | 50 -----
.../RocksDBTimestampedSegmentedBytesStoreTest.java | 7 -
.../kafka/test/InternalMockProcessorContext.java | 20 --
.../test/MockBatchingStateRestoreListener.java | 44 ----
.../org/apache/kafka/test/MockRestoreCallback.java | 1 -
.../kafka/test/MockStateRestoreListener.java | 14 +-
26 files changed, 39 insertions(+), 943 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingBatchingRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingBatchingRestoreCallback.java
deleted file mode 100644
index 7b5b5d0..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingBatchingRestoreCallback.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-
-import org.apache.kafka.common.TopicPartition;
-
-/**
- * Abstract implementation of the {@link BatchingStateRestoreCallback} used for batch restoration operations.
- *
- * Includes default no-op methods of the {@link StateRestoreListener} {@link StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)},
- * {@link StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)}, and {@link StateRestoreListener#onRestoreEnd(TopicPartition, String, long)}.
- */
-public abstract class AbstractNotifyingBatchingRestoreCallback implements BatchingStateRestoreCallback, StateRestoreListener {
-
- /**
- * Single put restore operations not supported, please use {@link AbstractNotifyingRestoreCallback}
- * or {@link StateRestoreCallback} instead for single action restores.
- */
- @Override
- public void restore(final byte[] key,
- final byte[] value) {
- throw new UnsupportedOperationException("Single restore not supported");
- }
-
-
- /**
- * @see StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)
- *
- * This method does nothing by default; if desired, subclasses should override it with custom functionality.
- *
- */
- @Override
- public void onRestoreStart(final TopicPartition topicPartition,
- final String storeName,
- final long startingOffset,
- final long endingOffset) {
-
- }
-
-
- /**
- * @see StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)
- *
- * This method does nothing by default; if desired, subclasses should override it with custom functionality.
- *
- */
- @Override
- public void onBatchRestored(final TopicPartition topicPartition,
- final String storeName,
- final long batchEndOffset,
- final long numRestored) {
-
- }
-
- /**
- * @see StateRestoreListener#onRestoreEnd(TopicPartition, String, long)
- *
- * This method does nothing by default; if desired, subclasses should override it with custom functionality.
- *
- */
- @Override
- public void onRestoreEnd(final TopicPartition topicPartition,
- final String storeName,
- final long totalRestored) {
-
- }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingRestoreCallback.java
deleted file mode 100644
index 2eb3f66..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractNotifyingRestoreCallback.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.common.TopicPartition;
-
-/**
- * Abstract implementation of the {@link StateRestoreCallback} used for batch restoration operations.
- *
- * Includes default no-op methods of the {@link StateRestoreListener} {@link StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)},
- * {@link StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)}, and {@link StateRestoreListener#onRestoreEnd(TopicPartition, String, long)}.
- */
-public abstract class AbstractNotifyingRestoreCallback implements StateRestoreCallback, StateRestoreListener {
-
-
- /**
- * @see StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)
- *
- * This method does nothing by default; if desired, subclasses should override it with custom functionality.
- *
- */
- @Override
- public void onRestoreStart(final TopicPartition topicPartition,
- final String storeName,
- final long startingOffset,
- final long endingOffset) {
-
- }
-
-
- /**
- * @see StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)
- *
- * This method does nothing by default; if desired, subclasses should override it with custom functionality.
- *
- */
- @Override
- public void onBatchRestored(final TopicPartition topicPartition,
- final String storeName,
- final long batchEndOffset,
- final long numRestored) {
-
- }
-
- /**
- * @see StateRestoreListener#onRestoreEnd(TopicPartition, String, long)
- *
- * This method does nothing by default; if desired, subclasses should override it with custom functionality.
- *
- */
- @Override
- public void onRestoreEnd(final TopicPartition topicPartition,
- final String storeName,
- final long totalRestored) {
-
- }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/BatchingStateRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/BatchingStateRestoreCallback.java
index 3447a57..d29c7ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/BatchingStateRestoreCallback.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/BatchingStateRestoreCallback.java
@@ -38,4 +38,8 @@ public interface BatchingStateRestoreCallback extends StateRestoreCallback {
*/
void restoreAll(Collection<KeyValue<byte[], byte[]>> records);
+ @Override
+ default void restore(byte[] key, byte[] value) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
index ea1c288..210a5de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
@@ -29,9 +29,12 @@ import org.apache.kafka.common.TopicPartition;
* Users desiring stateful operations will need to provide synchronization internally in
* the {@code StateRestorerListener} implementation.
*
- * When used for monitoring a single {@link StateStore} using either {@link AbstractNotifyingRestoreCallback} or
- * {@link AbstractNotifyingBatchingRestoreCallback} no synchronization is necessary
- * as each StreamThread has its own StateStore instance.
+ * Note that this listener is only registered at the per-client level and users can base on the {@code storeName}
+ * parameter to define specific monitoring for different {@link StateStore}s. There is another
+ * {@link StateRestoreCallback} interface which is registered via the {@link ProcessorContext#register(StateStore, StateRestoreCallback)}
+ * function per-store, and it is used to apply the fetched changelog records into the local state store during restoration.
+ * These two interfaces serve different restoration purposes and users should not try to implement both of them in a single
+ * class during state store registration.
*
* Incremental updates are exposed so users can estimate how much progress has been made.
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java
index cdddd20..7403aad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java
@@ -34,7 +34,6 @@ interface ChangelogRegister {
/**
* Unregisters and removes the passed in partitions from the set of changelogs
* @param removedPartitions the set of partitions to remove
- * @param triggerOnRestoreEnd whether to trigger the onRestoreEnd callback
*/
- void unregister(final Collection<TopicPartition> removedPartitions, final boolean triggerOnRestoreEnd);
+ void unregister(final Collection<TopicPartition> removedPartitions);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
deleted file mode 100644
index 7cccad6..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateRestoreListener;
-
-import java.util.Collection;
-
-public class CompositeRestoreListener implements RecordBatchingStateRestoreCallback, StateRestoreListener {
-
- public static final NoOpStateRestoreListener NO_OP_STATE_RESTORE_LISTENER = new NoOpStateRestoreListener();
- private final RecordBatchingStateRestoreCallback internalBatchingRestoreCallback;
- private final StateRestoreListener storeRestoreListener;
- private StateRestoreListener userRestoreListener = NO_OP_STATE_RESTORE_LISTENER;
-
- CompositeRestoreListener(final StateRestoreCallback stateRestoreCallback) {
-
- if (stateRestoreCallback instanceof StateRestoreListener) {
- storeRestoreListener = (StateRestoreListener) stateRestoreCallback;
- } else {
- storeRestoreListener = NO_OP_STATE_RESTORE_LISTENER;
- }
-
- internalBatchingRestoreCallback = StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
- }
-
- /**
- * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in
- * {@link StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)}
- */
- @Override
- public void onRestoreStart(final TopicPartition topicPartition,
- final String storeName,
- final long startingOffset,
- final long endingOffset) {
- userRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
- storeRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
- }
-
- /**
- * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in
- * {@link StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)}
- */
- @Override
- public void onBatchRestored(final TopicPartition topicPartition,
- final String storeName,
- final long batchEndOffset,
- final long numRestored) {
- userRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
- storeRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
- }
-
- /**
- * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in
- * {@link StateRestoreListener#onRestoreEnd(TopicPartition, String, long)}
- */
- @Override
- public void onRestoreEnd(final TopicPartition topicPartition,
- final String storeName,
- final long totalRestored) {
- userRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
- storeRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
- }
-
- @Override
- public void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
- internalBatchingRestoreCallback.restoreBatch(records);
- }
-
- void setUserRestoreListener(final StateRestoreListener userRestoreListener) {
- if (userRestoreListener != null) {
- this.userRestoreListener = userRestoreListener;
- }
- }
-
- @Override
- public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void restore(final byte[] key,
- final byte[] value) {
- throw new UnsupportedOperationException("Single restore functionality shouldn't be called directly but "
- + "through the delegated StateRestoreCallback instance");
- }
-
- private static final class NoOpStateRestoreListener extends AbstractNotifyingBatchingRestoreCallback implements RecordBatchingStateRestoreCallback {
- @Override
- public void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
-
- }
- }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index f00284f..d78c9a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
@@ -131,10 +132,6 @@ public class ProcessorStateManager implements StateManager {
return this.stateStore;
}
- StateRestoreCallback restoreCallback() {
- return this.restoreCallback;
- }
-
@Override
public String toString() {
return "StateStoreMetadata (" + stateStore.name() + " : " + changelogPartition + " @ " + offset;
@@ -303,6 +300,11 @@ public class ProcessorStateManager implements StateManager {
throw new IllegalArgumentException(format("%sStore %s has already been registered.", logPrefix, storeName));
}
+ if (stateRestoreCallback instanceof StateRestoreListener) {
+ log.warn("The registered state restore callback is also implementing the state restore listener interface, " +
+ "which is not expected and would be ignored");
+ }
+
final StateStoreMetadata storeMetadata = isLoggingEnabled(storeName) ?
new StateStoreMetadata(
store,
@@ -459,7 +461,7 @@ public class ProcessorStateManager implements StateManager {
public void close() throws ProcessorStateException {
log.debug("Closing its state manager and all the registered state stores: {}", stores);
- changelogReader.unregister(getAllChangelogTopicPartitions(), false);
+ changelogReader.unregister(getAllChangelogTopicPartitions());
RuntimeException firstException = null;
// attempting to close the stores, just in case they
@@ -499,11 +501,7 @@ public class ProcessorStateManager implements StateManager {
log.debug("Recycling state for {} task {}.", taskType, taskId);
final List<TopicPartition> allChangelogs = getAllChangelogTopicPartitions();
- if (taskType.equals(TaskType.ACTIVE)) {
- changelogReader.unregister(allChangelogs, true);
- } else {
- changelogReader.unregister(allChangelogs, false);
- }
+ changelogReader.unregister(allChangelogs);
}
void transitionTaskType(final TaskType newType, final LogContext logContext) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordBatchingStateRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordBatchingStateRestoreCallback.java
index 78a885d..300b60d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordBatchingStateRestoreCallback.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordBatchingStateRestoreCallback.java
@@ -29,9 +29,4 @@ public interface RecordBatchingStateRestoreCallback extends BatchingStateRestore
default void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) {
throw new UnsupportedOperationException();
}
-
- @Override
- default void restore(final byte[] key, final byte[] value) {
- throw new UnsupportedOperationException();
- }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 6c6ff39..cecbbd3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -29,7 +28,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
@@ -524,13 +522,6 @@ public class StoreChangelogReader implements ChangelogReader {
// do not trigger restore listener if we are processing standby tasks
if (changelogMetadata.stateManager.taskType() == Task.TaskType.ACTIVE) {
try {
- // first trigger the store's specific listener if its registered callback is also an lister,
- // then trigger the user registered global listener
- final StateRestoreCallback restoreCallback = storeMetadata.restoreCallback();
- if (restoreCallback instanceof StateRestoreListener) {
- ((StateRestoreListener) restoreCallback).onBatchRestored(partition, storeName, currentOffset, numRecords);
- }
-
stateRestoreListener.onBatchRestored(partition, storeName, currentOffset, numRecords);
} catch (final Exception e) {
throw new StreamsException("State restore listener failed on batch restored", e);
@@ -547,13 +538,6 @@ public class StoreChangelogReader implements ChangelogReader {
pauseChangelogsFromRestoreConsumer(Collections.singleton(partition));
try {
- // first trigger the store's specific listener if its registered callback is also an listener,
- // then trigger the user registered global listener
- final StateRestoreCallback restoreCallback = storeMetadata.restoreCallback();
- if (restoreCallback instanceof StateRestoreListener) {
- ((StateRestoreListener) restoreCallback).onRestoreEnd(partition, storeName, changelogMetadata.totalRestored);
- }
-
stateRestoreListener.onRestoreEnd(partition, storeName, changelogMetadata.totalRestored);
} catch (final Exception e) {
throw new StreamsException("State restore listener failed on restore completed", e);
@@ -797,13 +781,6 @@ public class StoreChangelogReader implements ChangelogReader {
}
try {
- // first trigger the store's specific listener if its registered callback is also an lister,
- // then trigger the user registered global listener
- final StateRestoreCallback restoreCallback = storeMetadata.restoreCallback();
- if (restoreCallback instanceof StateRestoreListener) {
- ((StateRestoreListener) restoreCallback).onRestoreStart(partition, storeName, startOffset, changelogMetadata.restoreEndOffset);
- }
-
stateRestoreListener.onRestoreStart(partition, storeName, startOffset, changelogMetadata.restoreEndOffset);
} catch (final Exception e) {
throw new StreamsException("State restore listener failed on batch restored", e);
@@ -812,37 +789,14 @@ public class StoreChangelogReader implements ChangelogReader {
}
}
- private RuntimeException invokeOnRestoreEnd(final TopicPartition partition,
- final ChangelogMetadata changelogMetadata) {
- // only trigger the store's specific listener to make sure we disable bulk loading before transition to standby
- final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
- final StateRestoreCallback restoreCallback = storeMetadata.restoreCallback();
- final String storeName = storeMetadata.store().name();
- if (restoreCallback instanceof StateRestoreListener) {
- try {
- ((StateRestoreListener) restoreCallback).onRestoreEnd(partition, storeName, changelogMetadata.totalRestored);
- } catch (final RuntimeException e) {
- return e;
- }
- }
- return null;
- }
-
@Override
- public void unregister(final Collection<TopicPartition> revokedChangelogs,
- final boolean triggerOnRestoreEnd) {
- final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
-
+ public void unregister(final Collection<TopicPartition> revokedChangelogs) {
// Only changelogs that are initialized have been added to the restore consumer's assignment
final List<TopicPartition> revokedInitializedChangelogs = new ArrayList<>();
for (final TopicPartition partition : revokedChangelogs) {
final ChangelogMetadata changelogMetadata = changelogs.remove(partition);
if (changelogMetadata != null) {
- if (triggerOnRestoreEnd && changelogMetadata.state().equals(ChangelogState.RESTORING)) {
- firstException.compareAndSet(null, invokeOnRestoreEnd(partition, changelogMetadata));
- }
-
if (!changelogMetadata.state().equals(ChangelogState.REGISTERED)) {
revokedInitializedChangelogs.add(partition);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index ce64bf2..f0979f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -17,16 +17,14 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
+import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -37,10 +35,8 @@ import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements SegmentedBytesStore {
private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSegmentedBytesStore.class);
@@ -50,7 +46,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
private final KeySchema keySchema;
private InternalProcessorContext context;
private volatile boolean open;
- private Set<S> bulkLoadSegments;
private Sensor expiredRecordSensor;
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
@@ -186,8 +181,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
segments.openExisting(this.context, observedStreamTime);
- bulkLoadSegments = new HashSet<>(segments.allSegments());
-
// register and possibly restore the state from the logs
context.register(root, new RocksDBSegmentsBatchingRestoreCallback());
@@ -249,17 +242,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
final long segmentId = segments.segmentId(timestamp);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
if (segment != null) {
- // This handles the case that state store is moved to a new client and does not
- // have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading
- // will only close the database and open it again with bulk loading enabled.
- if (!bulkLoadSegments.contains(segment) && context.taskType() == TaskType.ACTIVE) {
- segment.toggleDbForBulkLoading(true);
- // If the store does not exist yet, the getOrCreateSegmentIfLive will call openDB that
- // makes the open flag for the newly created store.
- // if the store does exist already, then toggleDbForBulkLoading will make sure that
- // the store is already open here.
- bulkLoadSegments = new HashSet<>(segments.allSegments());
- }
try {
final WriteBatch batch = writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch());
segment.addToBatch(record, batch);
@@ -271,32 +253,11 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
return writeBatchMap;
}
- private void toggleForBulkLoading(final boolean prepareForBulkload) {
- for (final S segment : segments.allSegments()) {
- segment.toggleDbForBulkLoading(prepareForBulkload);
- }
- }
-
- private class RocksDBSegmentsBatchingRestoreCallback extends AbstractNotifyingBatchingRestoreCallback {
+ private class RocksDBSegmentsBatchingRestoreCallback implements BatchingStateRestoreCallback {
@Override
public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) {
restoreAllInternal(records);
}
-
- @Override
- public void onRestoreStart(final TopicPartition topicPartition,
- final String storeName,
- final long startingOffset,
- final long endingOffset) {
- toggleForBulkLoading(true);
- }
-
- @Override
- public void onRestoreEnd(final TopicPartition topicPartition,
- final String storeName,
- final long totalRestored) {
- toggleForBulkLoading(false);
- }
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/BulkLoadingStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/BatchWritingStore.java
similarity index 91%
rename from streams/src/main/java/org/apache/kafka/streams/state/internals/BulkLoadingStore.java
rename to streams/src/main/java/org/apache/kafka/streams/state/internals/BatchWritingStore.java
index 1e27cc2..2ac1e3b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/BulkLoadingStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/BatchWritingStore.java
@@ -20,8 +20,7 @@ import org.apache.kafka.streams.KeyValue;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
-public interface BulkLoadingStore {
- void toggleDbForBulkLoading(final boolean prepareForBulkload);
+public interface BatchWritingStore {
void addToBatch(final KeyValue<byte[], byte[]> record,
final WriteBatch batch) throws RocksDBException;
void write(final WriteBatch batch) throws RocksDBException;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index a57f31e..fc2bb24 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
@@ -24,7 +23,6 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
@@ -73,7 +71,7 @@ import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CON
/**
* A persistent key-value store based on RocksDB.
*/
-public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingStore {
+public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingStore {
private static final Logger log = LoggerFactory.getLogger(RocksDBStore.class);
private static final Pattern SST_FILE_EXTENSION = Pattern.compile(".*\\.sst");
@@ -106,7 +104,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
private final RocksDBMetricsRecorder metricsRecorder;
private boolean isStatisticsRegistered = false;
- private volatile boolean prepareForBulkload = false;
ProcessorContext internalProcessorContext;
// visible for testing
volatile BatchingStateRestoreCallback batchingStateRestoreCallback = null;
@@ -175,10 +172,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
configSetter.setConfig(name, userSpecifiedOptions, configs);
}
- if (prepareForBulkload) {
- userSpecifiedOptions.prepareForBulkLoad();
- }
-
dbDir = new File(new File(context.stateDir(), parentDir), name);
try {
@@ -236,11 +229,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
context.register(root, batchingStateRestoreCallback);
}
- // visible for testing
- boolean isPrepareForBulkload() {
- return prepareForBulkload;
- }
-
@Override
public String name() {
return name;
@@ -262,7 +250,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
}
}
- @SuppressWarnings("unchecked")
@Override
public synchronized void put(final Bytes key,
final byte[] value) {
@@ -391,22 +378,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
}
@Override
- public void toggleDbForBulkLoading(final boolean prepareForBulkload) {
- if (prepareForBulkload) {
- // if the store is not empty, we need to compact to get around the num.levels check for bulk loading
- final String[] sstFileNames = dbDir.list((dir, name) -> SST_FILE_EXTENSION.matcher(name).matches());
-
- if (sstFileNames != null && sstFileNames.length > 0) {
- dbAccessor.toggleDbForBulkLoading();
- }
- }
-
- close();
- this.prepareForBulkload = prepareForBulkload;
- openDB(internalProcessorContext);
- }
-
- @Override
public void addToBatch(final KeyValue<byte[], byte[]> record,
final WriteBatch batch) throws RocksDBException {
dbAccessor.addToBatch(record.key, record.value, batch);
@@ -506,8 +477,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
final WriteBatch batch) throws RocksDBException;
void close();
-
- void toggleDbForBulkLoading();
}
class SingleColumnFamilyAccessor implements RocksDBAccessor {
@@ -607,20 +576,10 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
public void close() {
columnFamily.close();
}
-
- @Override
- @SuppressWarnings("deprecation")
- public void toggleDbForBulkLoading() {
- try {
- db.compactRange(columnFamily, true, 1, 0);
- } catch (final RocksDBException e) {
- throw new ProcessorStateException("Error while range compacting during restoring store " + name, e);
- }
- }
}
// not private for testing
- static class RocksDBBatchingRestoreCallback extends AbstractNotifyingBatchingRestoreCallback {
+ static class RocksDBBatchingRestoreCallback implements BatchingStateRestoreCallback {
private final RocksDBStore rocksDBStore;
@@ -637,21 +596,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
throw new ProcessorStateException("Error restoring batch to store " + rocksDBStore.name, e);
}
}
-
- @Override
- public void onRestoreStart(final TopicPartition topicPartition,
- final String storeName,
- final long startingOffset,
- final long endingOffset) {
- rocksDBStore.toggleDbForBulkLoading(true);
- }
-
- @Override
- public void onRestoreEnd(final TopicPartition topicPartition,
- final String storeName,
- final long totalRestored) {
- rocksDBStore.toggleDbForBulkLoading(false);
- }
}
// for testing
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
index bc6c17f..6c31e9b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
@@ -249,21 +249,6 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
oldColumnFamily.close();
newColumnFamily.close();
}
-
- @Override
- @SuppressWarnings("deprecation")
- public void toggleDbForBulkLoading() {
- try {
- db.compactRange(oldColumnFamily, true, 1, 0);
- } catch (final RocksDBException e) {
- throw new ProcessorStateException("Error while range compacting during restoring store " + name, e);
- }
- try {
- db.compactRange(newColumnFamily, true, 1, 0);
- } catch (final RocksDBException e) {
- throw new ProcessorStateException("Error while range compacting during restoring store " + name, e);
- }
- }
}
private class RocksDBDualCFIterator extends AbstractIterator<KeyValue<Bytes, byte[]>>
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index fe1fc33..c86ee96 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
import java.io.IOException;
-public interface Segment extends KeyValueStore<Bytes, byte[]>, BulkLoadingStore {
+public interface Segment extends KeyValueStore<Bytes, byte[]>, BatchWritingStore {
void destroy() throws IOException;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
deleted file mode 100644
index c981107..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.test.MockBatchingStateRestoreListener;
-import org.apache.kafka.test.MockStateRestoreListener;
-import org.junit.Test;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.Collections;
-
-import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH;
-import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END;
-import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START;
-import static org.hamcrest.core.Is.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-
-public class CompositeRestoreListenerTest {
-
- private final MockStateRestoreCallback stateRestoreCallback = new MockStateRestoreCallback();
- private final MockBatchingStateRestoreListener batchingStateRestoreCallback = new MockBatchingStateRestoreListener();
- private final MockNoListenBatchingStateRestoreCallback
- noListenBatchingStateRestoreCallback =
- new MockNoListenBatchingStateRestoreCallback();
- private final MockStateRestoreListener reportingStoreListener = new MockStateRestoreListener();
- private final byte[] key = "key".getBytes(StandardCharsets.UTF_8);
- private final byte[] value = "value".getBytes(StandardCharsets.UTF_8);
- private final Collection<KeyValue<byte[], byte[]>> records = Collections.singletonList(KeyValue.pair(key, value));
- private final Collection<ConsumerRecord<byte[], byte[]>> consumerRecords = Collections.singletonList(
- new ConsumerRecord<>("", 0, 0L, key, value)
- );
- private final String storeName = "test_store";
- private final long startOffset = 0L;
- private final long endOffset = 1L;
- private final long batchOffset = 1L;
- private final long numberRestored = 1L;
- private final TopicPartition topicPartition = new TopicPartition("testTopic", 1);
-
- private CompositeRestoreListener compositeRestoreListener;
-
-
- @Test
- public void shouldRestoreInNonBatchMode() {
- setUpCompositeRestoreListener(stateRestoreCallback);
- compositeRestoreListener.restoreBatch(consumerRecords);
- assertThat(stateRestoreCallback.restoredKey, is(key));
- assertThat(stateRestoreCallback.restoredValue, is(value));
- }
-
- @Test
- public void shouldRestoreInBatchMode() {
- setUpCompositeRestoreListener(batchingStateRestoreCallback);
- compositeRestoreListener.restoreBatch(consumerRecords);
- assertThat(batchingStateRestoreCallback.getRestoredRecords(), is(records));
- }
-
- @Test
- public void shouldNotifyRestoreStartNonBatchMode() {
- setUpCompositeRestoreListener(stateRestoreCallback);
- compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset);
- assertStateRestoreListenerOnStartNotification(stateRestoreCallback);
- assertStateRestoreListenerOnStartNotification(reportingStoreListener);
- }
-
- @Test
- public void shouldNotifyRestoreStartBatchMode() {
- setUpCompositeRestoreListener(batchingStateRestoreCallback);
- compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset);
- assertStateRestoreListenerOnStartNotification(batchingStateRestoreCallback);
- assertStateRestoreListenerOnStartNotification(reportingStoreListener);
- }
-
- @Test
- public void shouldNotifyRestoreProgressNonBatchMode() {
- setUpCompositeRestoreListener(stateRestoreCallback);
- compositeRestoreListener.onBatchRestored(topicPartition, storeName, endOffset, numberRestored);
- assertStateRestoreListenerOnBatchCompleteNotification(stateRestoreCallback);
- assertStateRestoreListenerOnBatchCompleteNotification(reportingStoreListener);
- }
-
- @Test
- public void shouldNotifyRestoreProgressBatchMode() {
- setUpCompositeRestoreListener(batchingStateRestoreCallback);
- compositeRestoreListener.onBatchRestored(topicPartition, storeName, endOffset, numberRestored);
- assertStateRestoreListenerOnBatchCompleteNotification(batchingStateRestoreCallback);
- assertStateRestoreListenerOnBatchCompleteNotification(reportingStoreListener);
- }
-
- @Test
- public void shouldNotifyRestoreEndInNonBatchMode() {
- setUpCompositeRestoreListener(stateRestoreCallback);
- compositeRestoreListener.onRestoreEnd(topicPartition, storeName, numberRestored);
- assertStateRestoreOnEndNotification(stateRestoreCallback);
- assertStateRestoreOnEndNotification(reportingStoreListener);
- }
-
- @Test
- public void shouldNotifyRestoreEndInBatchMode() {
- setUpCompositeRestoreListener(batchingStateRestoreCallback);
- compositeRestoreListener.onRestoreEnd(topicPartition, storeName, numberRestored);
- assertStateRestoreOnEndNotification(batchingStateRestoreCallback);
- assertStateRestoreOnEndNotification(reportingStoreListener);
- }
-
- @Test
- public void shouldHandleNullReportStoreListener() {
- compositeRestoreListener = new CompositeRestoreListener(batchingStateRestoreCallback);
- compositeRestoreListener.setUserRestoreListener(null);
-
- compositeRestoreListener.restoreBatch(consumerRecords);
- compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset);
- compositeRestoreListener.onBatchRestored(topicPartition, storeName, batchOffset, numberRestored);
- compositeRestoreListener.onRestoreEnd(topicPartition, storeName, numberRestored);
-
- assertThat(batchingStateRestoreCallback.getRestoredRecords(), is(records));
- assertStateRestoreOnEndNotification(batchingStateRestoreCallback);
- }
-
- @Test
- public void shouldHandleNoRestoreListener() {
- compositeRestoreListener = new CompositeRestoreListener(noListenBatchingStateRestoreCallback);
- compositeRestoreListener.setUserRestoreListener(null);
-
- compositeRestoreListener.restoreBatch(consumerRecords);
- compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset);
- compositeRestoreListener.onBatchRestored(topicPartition, storeName, batchOffset, numberRestored);
- compositeRestoreListener.onRestoreEnd(topicPartition, storeName, numberRestored);
-
- assertThat(noListenBatchingStateRestoreCallback.restoredRecords, is(records));
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void shouldThrowExceptionWhenSinglePutDirectlyCalled() {
- compositeRestoreListener = new CompositeRestoreListener(noListenBatchingStateRestoreCallback);
- compositeRestoreListener.restore(key, value);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void shouldThrowExceptionWhenRestoreAllDirectlyCalled() {
- compositeRestoreListener = new CompositeRestoreListener(noListenBatchingStateRestoreCallback);
- compositeRestoreListener.restoreAll(Collections.emptyList());
- }
-
- private void assertStateRestoreListenerOnStartNotification(final MockStateRestoreListener restoreListener) {
- assertTrue(restoreListener.storeNameCalledStates.containsKey(RESTORE_START));
- assertThat(restoreListener.restoreTopicPartition, is(topicPartition));
- assertThat(restoreListener.restoreStartOffset, is(startOffset));
- assertThat(restoreListener.restoreEndOffset, is(endOffset));
- }
-
- private void assertStateRestoreListenerOnBatchCompleteNotification(final MockStateRestoreListener restoreListener) {
- assertTrue(restoreListener.storeNameCalledStates.containsKey(RESTORE_BATCH));
- assertThat(restoreListener.restoreTopicPartition, is(topicPartition));
- assertThat(restoreListener.restoredBatchOffset, is(batchOffset));
- assertThat(restoreListener.numBatchRestored, is(numberRestored));
- }
-
- private void assertStateRestoreOnEndNotification(final MockStateRestoreListener restoreListener) {
- assertTrue(restoreListener.storeNameCalledStates.containsKey(RESTORE_END));
- assertThat(restoreListener.restoreTopicPartition, is(topicPartition));
- assertThat(restoreListener.totalNumRestored, is(numberRestored));
- }
-
-
- private void setUpCompositeRestoreListener(final StateRestoreCallback stateRestoreCallback) {
- compositeRestoreListener = new CompositeRestoreListener(stateRestoreCallback);
- compositeRestoreListener.setUserRestoreListener(reportingStoreListener);
- }
-
-
- private static class MockStateRestoreCallback extends MockStateRestoreListener implements StateRestoreCallback {
-
- byte[] restoredKey;
- byte[] restoredValue;
-
- @Override
- public void restore(final byte[] key, final byte[] value) {
- restoredKey = key;
- restoredValue = value;
- }
- }
-
- private static class MockNoListenBatchingStateRestoreCallback implements BatchingStateRestoreCallback {
-
- Collection<KeyValue<byte[], byte[]>> restoredRecords;
-
- @Override
- public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) {
- restoredRecords = records;
- }
-
- @Override
- public void restore(final byte[] key, final byte[] value) {
- throw new IllegalStateException("Should not be called");
-
- }
- }
-
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
index ad7dad6..0bba6aa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockChangelogReader.java
@@ -64,7 +64,7 @@ public class MockChangelogReader implements ChangelogReader {
}
@Override
- public void unregister(final Collection<TopicPartition> partitions, final boolean triggerOnRestoreEnd) {
+ public void unregister(final Collection<TopicPartition> partitions) {
restoringPartitions.removeAll(partitions);
for (final TopicPartition partition : partitions) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index fdfb3c2..77ed3a4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -33,8 +33,8 @@ import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
-import org.apache.kafka.test.MockBatchingStateRestoreListener;
import org.apache.kafka.test.MockKeyValueStore;
+import org.apache.kafka.test.MockRestoreCallback;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
@@ -51,7 +51,6 @@ import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -212,21 +211,21 @@ public class ProcessorStateManagerTest {
@Test
public void shouldRestoreStoreWithRestoreCallback() {
- final MockBatchingStateRestoreListener batchingRestoreCallback = new MockBatchingStateRestoreListener();
+ final MockRestoreCallback restoreCallback = new MockRestoreCallback();
final KeyValue<byte[], byte[]> expectedKeyValue = KeyValue.pair(keyBytes, valueBytes);
final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE);
try {
- stateMgr.registerStore(persistentStore, batchingRestoreCallback);
+ stateMgr.registerStore(persistentStore, restoreCallback);
final StateStoreMetadata storeMetadata = stateMgr.storeMetadata(persistentStorePartition);
assertThat(storeMetadata, notNullValue());
stateMgr.restore(storeMetadata, singletonList(consumerRecord));
- assertThat(batchingRestoreCallback.getRestoredRecords().size(), is(1));
- assertTrue(batchingRestoreCallback.getRestoredRecords().contains(expectedKeyValue));
+ assertThat(restoreCallback.restored.size(), is(1));
+ assertTrue(restoreCallback.restored.contains(expectedKeyValue));
assertEquals(Collections.singletonMap(persistentStorePartition, 101L), stateMgr.changelogOffsets());
} finally {
@@ -714,11 +713,8 @@ public class ProcessorStateManagerTest {
public void shouldThrowIfRestoreCallbackThrows() {
final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE);
- stateMgr.registerStore(persistentStore, new MockBatchingStateRestoreListener() {
- @Override
- public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) {
- throw new RuntimeException("KABOOM!");
- }
+ stateMgr.registerStore(persistentStore, (key, value) -> {
+ throw new RuntimeException("KABOOM!");
});
final StateStoreMetadata storeMetadata = stateMgr.storeMetadata(persistentStorePartition);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 6956e79..6769f4a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -31,7 +31,6 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.test.MockBatchingStateRestoreListener;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.StreamsTestUtils;
import org.easymock.EasyMock;
@@ -114,11 +113,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
private final KafkaException kaboom = new KafkaException("KABOOM!");
private final MockStateRestoreListener exceptionCallback = new MockStateRestoreListener() {
@Override
- public void restore(final byte[] key, final byte[] value) {
- throw kaboom;
- }
-
- @Override
public void onRestoreStart(final TopicPartition tp, final String store, final long stOffset, final long edOffset) {
throw kaboom;
}
@@ -897,7 +891,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp2).state());
// should support removing and clearing changelogs
- changelogReader.unregister(Collections.singletonList(tp), false);
+ changelogReader.unregister(Collections.singletonList(tp));
assertNull(changelogReader.changelogMetadata(tp));
assertFalse(changelogReader.isEmpty());
assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp1).state());
@@ -910,79 +904,6 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
}
@Test
- public void shouldTriggerRestoreCallbackAsListener() {
- // do not need this test for standby task
- if (type == STANDBY)
- return;
-
- final MockBatchingStateRestoreListener restoreListener = new MockBatchingStateRestoreListener();
- EasyMock.expect(storeMetadata.restoreCallback()).andReturn(restoreListener).anyTimes();
- EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
- EasyMock.replay(stateManager, storeMetadata, store);
-
- final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
- @Override
- public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
- return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 11L));
- }
- };
- consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
-
- final StoreChangelogReader changelogReader =
- new StoreChangelogReader(time, config, logContext, consumer, callback);
-
- changelogReader.register(tp, stateManager);
-
- changelogReader.restore();
-
- assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state());
- assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored());
- assertEquals(5L, consumer.position(tp));
- assertEquals(Collections.emptySet(), consumer.paused());
-
- assertEquals(11L, (long) changelogReader.changelogMetadata(tp).endOffset());
-
- assertEquals(tp, callback.restoreTopicPartition);
- assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_START));
- assertNull(callback.storeNameCalledStates.get(RESTORE_END));
- assertNull(callback.storeNameCalledStates.get(RESTORE_BATCH));
- assertEquals(5L, restoreListener.restoreStartOffset);
- assertEquals(11L, restoreListener.restoreEndOffset);
- assertEquals(storeName, restoreListener.storeNameCalledStates.get(RESTORE_START));
-
- consumer.addRecord(new ConsumerRecord<>(topicName, 0, 6L, "key".getBytes(), "value".getBytes()));
- consumer.addRecord(new ConsumerRecord<>(topicName, 0, 7L, "key".getBytes(), "value".getBytes()));
- // null key should be ignored
- consumer.addRecord(new ConsumerRecord<>(topicName, 0, 8L, null, "value".getBytes()));
- consumer.addRecord(new ConsumerRecord<>(topicName, 0, 9L, "key".getBytes(), "value".getBytes()));
-
- changelogReader.restore();
-
- assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state());
- assertEquals(3L, changelogReader.changelogMetadata(tp).totalRestored());
- assertEquals(0, changelogReader.changelogMetadata(tp).bufferedRecords().size());
- assertEquals(0, changelogReader.changelogMetadata(tp).bufferedLimitIndex());
- assertEquals(storeName, restoreListener.storeNameCalledStates.get(RESTORE_BATCH));
-
- // consumer position bypassing the gap in the next poll
- consumer.seek(tp, 11L);
-
- changelogReader.restore();
-
- assertEquals(11L, consumer.position(tp));
- assertEquals(3L, changelogReader.changelogMetadata(tp).totalRestored());
-
- assertEquals(StoreChangelogReader.ChangelogState.COMPLETED, changelogReader.changelogMetadata(tp).state());
- assertEquals(3L, changelogReader.changelogMetadata(tp).totalRestored());
- assertEquals(Collections.singleton(tp), changelogReader.completedChangelogs());
- assertEquals(Collections.singleton(tp), consumer.paused());
-
- assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_BATCH));
- assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_END));
- assertEquals(storeName, restoreListener.storeNameCalledStates.get(RESTORE_END));
- }
-
- @Test
public void shouldTransitState() {
EasyMock.expect(storeMetadataOne.changelogPartition()).andReturn(tp1).anyTimes();
EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes();
@@ -1038,7 +959,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
// transition to update standby is NOT idempotent
assertThrows(IllegalStateException.class, changelogReader::transitToUpdateStandby);
- changelogReader.unregister(Collections.singletonList(tp), false);
+ changelogReader.unregister(Collections.singletonList(tp));
changelogReader.register(tp, activeStateManager);
// if a new active is registered, we should immediately transit to standby updating
@@ -1092,7 +1013,7 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
public void shouldNotThrowOnUnknownRevokedPartition() {
LogCaptureAppender.setClassLoggerToDebug(StoreChangelogReader.class);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StoreChangelogReader.class)) {
- changelogReader.unregister(Collections.singletonList(new TopicPartition("unknown", 0)), false);
+ changelogReader.unregister(Collections.singletonList(new TopicPartition("unknown", 0)));
assertThat(
appender.getMessages(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 5b0081c..4b3f9d5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -30,7 +30,6 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
-import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -47,7 +46,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
-import org.rocksdb.Options;
import org.rocksdb.WriteBatch;
import java.io.File;
@@ -144,8 +142,6 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
abstract AbstractSegments<S> newSegments();
- abstract Options getOptions(S segment);
-
@Test
public void shouldPutAndFetch() {
final String key = "a";
@@ -381,11 +377,6 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
// 2 segments are created during restoration.
assertEquals(2, bytesStore.getSegments().size());
- // Bulk loading is disabled during recovery for stand-by tasks.
- for (final S segment : bytesStore.getSegments()) {
- assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(taskType == TaskType.ACTIVE ? 1 << 30 : 4));
- }
-
final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 100L));
@@ -395,28 +386,6 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
}
@Test
- public void shouldRespectBulkLoadOptionsDuringInit() {
- bytesStore.init(context, bytesStore);
- final String key = "a";
- bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
- bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L));
- assertEquals(2, bytesStore.getSegments().size());
-
- final StateRestoreListener restoreListener = context.getRestoreListener(bytesStore.name());
-
- restoreListener.onRestoreStart(null, bytesStore.name(), 0L, 0L);
-
- for (final S segment : bytesStore.getSegments()) {
- assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(1 << 30));
- }
-
- restoreListener.onRestoreEnd(null, bytesStore.name(), 0L);
- for (final S segment : bytesStore.getSegments()) {
- assertThat(getOptions(segment).level0FileNumCompactionTrigger(), equalTo(4));
- }
- }
-
- @Test
public void shouldLogAndMeasureExpiredRecordsWithBuiltInMetricsVersionLatest() {
shouldLogAndMeasureExpiredRecords(StreamsConfig.METRICS_LATEST);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index b9e49b2..3b6904f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -16,8 +16,6 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.rocksdb.Options;
-
public class RocksDBSegmentedBytesStoreTest extends AbstractRocksDBSegmentedBytesStoreTest<KeyValueSegment> {
private final static String METRICS_SCOPE = "metrics-scope";
@@ -37,9 +35,4 @@ public class RocksDBSegmentedBytesStoreTest extends AbstractRocksDBSegmentedByte
KeyValueSegments newSegments() {
return new KeyValueSegments(storeName, METRICS_SCOPE, retention, segmentInterval);
}
-
- @Override
- Options getOptions(final KeyValueSegment segment) {
- return segment.getOptions();
- }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index f61b9c6..c86b914 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -33,7 +33,6 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -238,25 +237,6 @@ public class RocksDBStoreTest {
}
@Test
- public void shouldRespectBulkloadOptionsDuringInit() {
- rocksDBStore.init(context, rocksDBStore);
-
- final StateRestoreListener restoreListener = context.getRestoreListener(rocksDBStore.name());
-
- restoreListener.onRestoreStart(null, rocksDBStore.name(), 0L, 0L);
-
- assertThat(rocksDBStore.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30));
- assertThat(rocksDBStore.getOptions().level0SlowdownWritesTrigger(), equalTo(1 << 30));
- assertThat(rocksDBStore.getOptions().level0StopWritesTrigger(), equalTo(1 << 30));
-
- restoreListener.onRestoreEnd(null, rocksDBStore.name(), 0L);
-
- assertThat(rocksDBStore.getOptions().level0FileNumCompactionTrigger(), equalTo(10));
- assertThat(rocksDBStore.getOptions().level0SlowdownWritesTrigger(), equalTo(20));
- assertThat(rocksDBStore.getOptions().level0StopWritesTrigger(), equalTo(36));
- }
-
- @Test
public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() {
rocksDBStore.init(context, rocksDBStore);
rocksDBStore.put(new Bytes("existingKey".getBytes(UTF_8)), "existingValue".getBytes(UTF_8));
@@ -331,36 +311,6 @@ public class RocksDBStoreTest {
}
@Test
- public void shouldTogglePrepareForBulkloadSetting() {
- rocksDBStore.init(context, rocksDBStore);
- final RocksDBStore.RocksDBBatchingRestoreCallback restoreListener =
- (RocksDBStore.RocksDBBatchingRestoreCallback) rocksDBStore.batchingStateRestoreCallback;
-
- restoreListener.onRestoreStart(null, null, 0, 0);
- assertTrue("Should have set bulk loading to true", rocksDBStore.isPrepareForBulkload());
-
- restoreListener.onRestoreEnd(null, null, 0);
- assertFalse("Should have set bulk loading to false", rocksDBStore.isPrepareForBulkload());
- }
-
- @Test
- public void shouldTogglePrepareForBulkloadSettingWhenPrexistingSstFiles() {
- final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
-
- rocksDBStore.init(context, rocksDBStore);
- context.restore(rocksDBStore.name(), entries);
-
- final RocksDBStore.RocksDBBatchingRestoreCallback restoreListener =
- (RocksDBStore.RocksDBBatchingRestoreCallback) rocksDBStore.batchingStateRestoreCallback;
-
- restoreListener.onRestoreStart(null, null, 0, 0);
- assertTrue("Should have not set bulk loading to true", rocksDBStore.isPrepareForBulkload());
-
- restoreListener.onRestoreEnd(null, null, 0);
- assertFalse("Should have set bulk loading to false", rocksDBStore.isPrepareForBulkload());
- }
-
- @Test
public void shouldRestoreAll() {
final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreTest.java
index 01510ff..814a04c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreTest.java
@@ -16,8 +16,6 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.rocksdb.Options;
-
public class RocksDBTimestampedSegmentedBytesStoreTest
extends AbstractRocksDBSegmentedBytesStoreTest<TimestampedSegment> {
@@ -37,9 +35,4 @@ public class RocksDBTimestampedSegmentedBytesStoreTest
TimestampedSegments newSegments() {
return new TimestampedSegments(storeName, METRICS_SCOPE, retention, segmentInterval);
}
-
- @Override
- Options getOptions(final TimestampedSegment segment) {
- return segment.getOptions();
- }
}
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index ed5d943..f1fd916 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -29,12 +29,10 @@ import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
-import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
@@ -393,31 +391,13 @@ public class InternalMockProcessorContext
cache().addDirtyEntryFlushListener(namespace, listener);
}
- public StateRestoreListener getRestoreListener(final String storeName) {
- return getStateRestoreListener(restoreFuncs.get(storeName));
- }
-
public void restore(final String storeName, final Iterable<KeyValue<byte[], byte[]>> changeLog) {
final RecordBatchingStateRestoreCallback restoreCallback = adapt(restoreFuncs.get(storeName));
- final StateRestoreListener restoreListener = getRestoreListener(storeName);
-
- restoreListener.onRestoreStart(null, storeName, 0L, 0L);
final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
for (final KeyValue<byte[], byte[]> keyValue : changeLog) {
records.add(new ConsumerRecord<>("", 0, 0L, keyValue.key, keyValue.value));
}
-
restoreCallback.restoreBatch(records);
-
- restoreListener.onRestoreEnd(null, storeName, 0L);
- }
-
- private StateRestoreListener getStateRestoreListener(final StateRestoreCallback restoreCallback) {
- if (restoreCallback instanceof StateRestoreListener) {
- return (StateRestoreListener) restoreCallback;
- }
-
- return CompositeRestoreListener.NO_OP_STATE_RESTORE_LISTENER;
}
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockBatchingStateRestoreListener.java b/streams/src/test/java/org/apache/kafka/test/MockBatchingStateRestoreListener.java
deleted file mode 100644
index 1736a54..0000000
--- a/streams/src/test/java/org/apache/kafka/test/MockBatchingStateRestoreListener.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
-
-import java.util.ArrayList;
-import java.util.Collection;
-
-public class MockBatchingStateRestoreListener extends MockStateRestoreListener implements BatchingStateRestoreCallback {
-
- private final Collection<KeyValue<byte[], byte[]>> restoredRecords = new ArrayList<>();
-
- @Override
- public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) {
- restoredRecords.addAll(records);
- }
-
- @Override
- public void restore(final byte[] key, final byte[] value) {
- throw new IllegalStateException("Should not be called");
-
- }
-
- public Collection<KeyValue<byte[], byte[]>> getRestoredRecords() {
- return restoredRecords;
- }
-}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockRestoreCallback.java b/streams/src/test/java/org/apache/kafka/test/MockRestoreCallback.java
index 096fa11..fa5b465 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRestoreCallback.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreCallback.java
@@ -25,7 +25,6 @@ import java.util.List;
public class MockRestoreCallback implements StateRestoreCallback {
public List<KeyValue<byte[], byte[]>> restored = new ArrayList<>();
-
@Override
public void restore(final byte[] key, final byte[] value) {
restored.add(KeyValue.pair(key, value));
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java b/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java
index 3138167..1026969 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java
@@ -18,19 +18,15 @@
package org.apache.kafka.test;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreListener;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-public class MockStateRestoreListener extends AbstractNotifyingRestoreCallback {
+public class MockStateRestoreListener implements StateRestoreListener {
// verifies store name called for each state
public final Map<String, String> storeNameCalledStates = new HashMap<>();
- public final List<KeyValue<byte[], byte[]>> restored = new ArrayList<>();
public long restoreStartOffset;
public long restoreEndOffset;
public long restoredBatchOffset;
@@ -43,11 +39,6 @@ public class MockStateRestoreListener extends AbstractNotifyingRestoreCallback {
public static final String RESTORE_END = "restore_end";
@Override
- public void restore(final byte[] key, final byte[] value) {
- restored.add(KeyValue.pair(key, value));
- }
-
- @Override
public void onRestoreStart(final TopicPartition topicPartition,
final String storeName,
final long startingOffset,
@@ -82,7 +73,6 @@ public class MockStateRestoreListener extends AbstractNotifyingRestoreCallback {
public String toString() {
return "MockStateRestoreListener{" +
"storeNameCalledStates=" + storeNameCalledStates +
- ", restored=" + restored +
", restoreStartOffset=" + restoreStartOffset +
", restoreEndOffset=" + restoreEndOffset +
", restoredBatchOffset=" + restoredBatchOffset +