You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/05/15 18:27:37 UTC
[kafka] branch trunk updated: KAFKA-10001: Should trigger store
specific callback if it is also a listener (#8670)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d534b5d KAFKA-10001: Should trigger store specific callback if it is also a listener (#8670)
d534b5d is described below
commit d534b5d81771148b3b1db2db837ef72c84b2f61d
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Fri May 15 11:27:02 2020 -0700
KAFKA-10001: Should trigger store specific callback if it is also a listener (#8670)
The store's registered callback could also be a restore listener, in which case it should be triggered along with the user specified global listener as well.
Reviewers: Boyang Chen <bo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../processor/internals/ProcessorStateManager.java | 6 +-
.../processor/internals/StoreChangelogReader.java | 27 +++++++-
.../internals/StoreChangelogReaderTest.java | 74 ++++++++++++++++++++++
3 files changed, 104 insertions(+), 3 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 6553df4..a3ab881 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -130,6 +130,10 @@ public class ProcessorStateManager implements StateManager {
return this.stateStore;
}
+ StateRestoreCallback restoreCallback() {
+ return this.restoreCallback;
+ }
+
@Override
public String toString() {
return "StateStoreMetadata (" + stateStore.name() + " : " + changelogPartition + " @ " + offset;
@@ -358,7 +362,7 @@ public class ProcessorStateManager implements StateManager {
// used by the changelog reader only
void restore(final StateStoreMetadata storeMetadata, final List<ConsumerRecord<byte[], byte[]>> restoreRecords) {
- if (!stores.values().contains(storeMetadata)) {
+ if (!stores.containsValue(storeMetadata)) {
throw new IllegalStateException("Restoring " + storeMetadata + " which is not registered in this state manager, " +
"this should not happen.");
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 5f95e44..aed6c54 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
@@ -522,6 +523,13 @@ public class StoreChangelogReader implements ChangelogReader {
// do not trigger restore listener if we are processing standby tasks
if (changelogMetadata.stateManager.taskType() == Task.TaskType.ACTIVE) {
try {
+ // first trigger the store's specific listener if its registered callback is also an lister,
+ // then trigger the user registered global listener
+ final StateRestoreCallback restoreCallback = storeMetadata.restoreCallback();
+ if (restoreCallback instanceof StateRestoreListener) {
+ ((StateRestoreListener) restoreCallback).onBatchRestored(partition, storeName, currentOffset, numRecords);
+ }
+
stateRestoreListener.onBatchRestored(partition, storeName, currentOffset, numRecords);
} catch (final Exception e) {
throw new StreamsException("State restore listener failed on batch restored", e);
@@ -538,6 +546,13 @@ public class StoreChangelogReader implements ChangelogReader {
pauseChangelogsFromRestoreConsumer(Collections.singleton(partition));
try {
+ // first trigger the store's specific listener if its registered callback is also an lister,
+ // then trigger the user registered global listener
+ final StateRestoreCallback restoreCallback = storeMetadata.restoreCallback();
+ if (restoreCallback instanceof StateRestoreListener) {
+ ((StateRestoreListener) restoreCallback).onRestoreEnd(partition, storeName, changelogMetadata.totalRestored);
+ }
+
stateRestoreListener.onRestoreEnd(partition, storeName, changelogMetadata.totalRestored);
} catch (final Exception e) {
throw new StreamsException("State restore listener failed on restore completed", e);
@@ -764,8 +779,9 @@ public class StoreChangelogReader implements ChangelogReader {
// do not trigger restore listener if we are processing standby tasks
for (final ChangelogMetadata changelogMetadata : newPartitionsToRestore) {
if (changelogMetadata.stateManager.taskType() == Task.TaskType.ACTIVE) {
- final TopicPartition partition = changelogMetadata.storeMetadata.changelogPartition();
- final String storeName = changelogMetadata.storeMetadata.store().name();
+ final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
+ final TopicPartition partition = storeMetadata.changelogPartition();
+ final String storeName = storeMetadata.store().name();
long startOffset = 0L;
try {
@@ -780,6 +796,13 @@ public class StoreChangelogReader implements ChangelogReader {
}
try {
+ // first trigger the store's specific listener if its registered callback is also an lister,
+ // then trigger the user registered global listener
+ final StateRestoreCallback restoreCallback = storeMetadata.restoreCallback();
+ if (restoreCallback instanceof StateRestoreListener) {
+ ((StateRestoreListener) restoreCallback).onRestoreStart(partition, storeName, startOffset, changelogMetadata.restoreEndOffset);
+ }
+
stateRestoreListener.onRestoreStart(partition, storeName, startOffset, changelogMetadata.restoreEndOffset);
} catch (final Exception e) {
throw new StreamsException("State restore listener failed on batch restored", e);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 17e9f74..54599e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.test.MockBatchingStateRestoreListener;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.StreamsTestUtils;
import org.easymock.EasyMock;
@@ -877,6 +878,79 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
}
@Test
+ public void shouldTriggerRestoreCallbackAsListener() {
+ // do not need this test for standby task
+ if (type == STANDBY)
+ return;
+
+ final MockBatchingStateRestoreListener restoreListener = new MockBatchingStateRestoreListener();
+ EasyMock.expect(storeMetadata.restoreCallback()).andReturn(restoreListener).anyTimes();
+ EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
+ EasyMock.replay(stateManager, storeMetadata, store);
+
+ final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
+ @Override
+ public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
+ return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 11L));
+ }
+ };
+ consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
+
+ final StoreChangelogReader changelogReader =
+ new StoreChangelogReader(time, config, logContext, consumer, callback);
+
+ changelogReader.register(tp, stateManager);
+
+ changelogReader.restore();
+
+ assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state());
+ assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored());
+ assertEquals(5L, consumer.position(tp));
+ assertEquals(Collections.emptySet(), consumer.paused());
+
+ assertEquals(11L, (long) changelogReader.changelogMetadata(tp).endOffset());
+
+ assertEquals(tp, callback.restoreTopicPartition);
+ assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_START));
+ assertNull(callback.storeNameCalledStates.get(RESTORE_END));
+ assertNull(callback.storeNameCalledStates.get(RESTORE_BATCH));
+ assertEquals(5L, restoreListener.restoreStartOffset);
+ assertEquals(11L, restoreListener.restoreEndOffset);
+ assertEquals(storeName, restoreListener.storeNameCalledStates.get(RESTORE_START));
+
+ consumer.addRecord(new ConsumerRecord<>(topicName, 0, 6L, "key".getBytes(), "value".getBytes()));
+ consumer.addRecord(new ConsumerRecord<>(topicName, 0, 7L, "key".getBytes(), "value".getBytes()));
+ // null key should be ignored
+ consumer.addRecord(new ConsumerRecord<>(topicName, 0, 8L, null, "value".getBytes()));
+ consumer.addRecord(new ConsumerRecord<>(topicName, 0, 9L, "key".getBytes(), "value".getBytes()));
+
+ changelogReader.restore();
+
+ assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state());
+ assertEquals(3L, changelogReader.changelogMetadata(tp).totalRestored());
+ assertEquals(0, changelogReader.changelogMetadata(tp).bufferedRecords().size());
+ assertEquals(0, changelogReader.changelogMetadata(tp).bufferedLimitIndex());
+ assertEquals(storeName, restoreListener.storeNameCalledStates.get(RESTORE_BATCH));
+
+ // consumer position bypassing the gap in the next poll
+ consumer.seek(tp, 11L);
+
+ changelogReader.restore();
+
+ assertEquals(11L, consumer.position(tp));
+ assertEquals(3L, changelogReader.changelogMetadata(tp).totalRestored());
+
+ assertEquals(StoreChangelogReader.ChangelogState.COMPLETED, changelogReader.changelogMetadata(tp).state());
+ assertEquals(3L, changelogReader.changelogMetadata(tp).totalRestored());
+ assertEquals(Collections.singleton(tp), changelogReader.completedChangelogs());
+ assertEquals(Collections.singleton(tp), consumer.paused());
+
+ assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_BATCH));
+ assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_END));
+ assertEquals(storeName, restoreListener.storeNameCalledStates.get(RESTORE_END));
+ }
+
+ @Test
public void shouldTransitState() {
EasyMock.expect(storeMetadataOne.changelogPartition()).andReturn(tp1).anyTimes();
EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes();