You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/05/15 18:27:37 UTC

[kafka] branch trunk updated: KAFKA-10001: Should trigger store specific callback if it is also a listener (#8670)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d534b5d  KAFKA-10001: Should trigger store specific callback if it is also a listener (#8670)
d534b5d is described below

commit d534b5d81771148b3b1db2db837ef72c84b2f61d
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Fri May 15 11:27:02 2020 -0700

    KAFKA-10001: Should trigger store specific callback if it is also a listener (#8670)
    
    The store's registered callback could also be a restore listener, in which case it should be triggered along with the user specified global listener as well.
    
    Reviewers: Boyang Chen <bo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../processor/internals/ProcessorStateManager.java |  6 +-
 .../processor/internals/StoreChangelogReader.java  | 27 +++++++-
 .../internals/StoreChangelogReaderTest.java        | 74 ++++++++++++++++++++++
 3 files changed, 104 insertions(+), 3 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 6553df4..a3ab881 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -130,6 +130,10 @@ public class ProcessorStateManager implements StateManager {
             return this.stateStore;
         }
 
+        StateRestoreCallback restoreCallback() {
+            return this.restoreCallback;
+        }
+
         @Override
         public String toString() {
             return "StateStoreMetadata (" + stateStore.name() + " : " + changelogPartition + " @ " + offset;
@@ -358,7 +362,7 @@ public class ProcessorStateManager implements StateManager {
 
     // used by the changelog reader only
     void restore(final StateStoreMetadata storeMetadata, final List<ConsumerRecord<byte[], byte[]>> restoreRecords) {
-        if (!stores.values().contains(storeMetadata)) {
+        if (!stores.containsValue(storeMetadata)) {
             throw new IllegalStateException("Restoring " + storeMetadata + " which is not registered in this state manager, " +
                 "this should not happen.");
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 5f95e44..aed6c54 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
@@ -522,6 +523,13 @@ public class StoreChangelogReader implements ChangelogReader {
             // do not trigger restore listener if we are processing standby tasks
             if (changelogMetadata.stateManager.taskType() == Task.TaskType.ACTIVE) {
                 try {
+                    // first trigger the store's specific listener if its registered callback is also an lister,
+                    // then trigger the user registered global listener
+                    final StateRestoreCallback restoreCallback = storeMetadata.restoreCallback();
+                    if (restoreCallback instanceof StateRestoreListener) {
+                        ((StateRestoreListener) restoreCallback).onBatchRestored(partition, storeName, currentOffset, numRecords);
+                    }
+
                     stateRestoreListener.onBatchRestored(partition, storeName, currentOffset, numRecords);
                 } catch (final Exception e) {
                     throw new StreamsException("State restore listener failed on batch restored", e);
@@ -538,6 +546,13 @@ public class StoreChangelogReader implements ChangelogReader {
             pauseChangelogsFromRestoreConsumer(Collections.singleton(partition));
 
             try {
+                // first trigger the store's specific listener if its registered callback is also an lister,
+                // then trigger the user registered global listener
+                final StateRestoreCallback restoreCallback = storeMetadata.restoreCallback();
+                if (restoreCallback instanceof StateRestoreListener) {
+                    ((StateRestoreListener) restoreCallback).onRestoreEnd(partition, storeName, changelogMetadata.totalRestored);
+                }
+
                 stateRestoreListener.onRestoreEnd(partition, storeName, changelogMetadata.totalRestored);
             } catch (final Exception e) {
                 throw new StreamsException("State restore listener failed on restore completed", e);
@@ -764,8 +779,9 @@ public class StoreChangelogReader implements ChangelogReader {
         // do not trigger restore listener if we are processing standby tasks
         for (final ChangelogMetadata changelogMetadata : newPartitionsToRestore) {
             if (changelogMetadata.stateManager.taskType() == Task.TaskType.ACTIVE) {
-                final TopicPartition partition = changelogMetadata.storeMetadata.changelogPartition();
-                final String storeName = changelogMetadata.storeMetadata.store().name();
+                final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
+                final TopicPartition partition = storeMetadata.changelogPartition();
+                final String storeName = storeMetadata.store().name();
 
                 long startOffset = 0L;
                 try {
@@ -780,6 +796,13 @@ public class StoreChangelogReader implements ChangelogReader {
                 }
 
                 try {
+                    // first trigger the store's specific listener if its registered callback is also an lister,
+                    // then trigger the user registered global listener
+                    final StateRestoreCallback restoreCallback = storeMetadata.restoreCallback();
+                    if (restoreCallback instanceof StateRestoreListener) {
+                        ((StateRestoreListener) restoreCallback).onRestoreStart(partition, storeName, startOffset, changelogMetadata.restoreEndOffset);
+                    }
+
                     stateRestoreListener.onRestoreStart(partition, storeName, startOffset, changelogMetadata.restoreEndOffset);
                 } catch (final Exception e) {
                     throw new StreamsException("State restore listener failed on batch restored", e);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 17e9f74..54599e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.test.MockBatchingStateRestoreListener;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.easymock.EasyMock;
@@ -877,6 +878,79 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
     }
 
     @Test
+    public void shouldTriggerRestoreCallbackAsListener() {
+        // do not need this test for standby task
+        if (type == STANDBY)
+            return;
+
+        final MockBatchingStateRestoreListener restoreListener = new MockBatchingStateRestoreListener();
+        EasyMock.expect(storeMetadata.restoreCallback()).andReturn(restoreListener).anyTimes();
+        EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
+        EasyMock.replay(stateManager, storeMetadata, store);
+
+        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
+            @Override
+            public Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
+                return partitions.stream().collect(Collectors.toMap(Function.identity(), partition -> 11L));
+            }
+        };
+        consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
+
+        final StoreChangelogReader changelogReader =
+                new StoreChangelogReader(time, config, logContext, consumer, callback);
+
+        changelogReader.register(tp, stateManager);
+
+        changelogReader.restore();
+
+        assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state());
+        assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored());
+        assertEquals(5L, consumer.position(tp));
+        assertEquals(Collections.emptySet(), consumer.paused());
+
+        assertEquals(11L, (long) changelogReader.changelogMetadata(tp).endOffset());
+
+        assertEquals(tp, callback.restoreTopicPartition);
+        assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_START));
+        assertNull(callback.storeNameCalledStates.get(RESTORE_END));
+        assertNull(callback.storeNameCalledStates.get(RESTORE_BATCH));
+        assertEquals(5L, restoreListener.restoreStartOffset);
+        assertEquals(11L, restoreListener.restoreEndOffset);
+        assertEquals(storeName, restoreListener.storeNameCalledStates.get(RESTORE_START));
+
+        consumer.addRecord(new ConsumerRecord<>(topicName, 0, 6L, "key".getBytes(), "value".getBytes()));
+        consumer.addRecord(new ConsumerRecord<>(topicName, 0, 7L, "key".getBytes(), "value".getBytes()));
+        // null key should be ignored
+        consumer.addRecord(new ConsumerRecord<>(topicName, 0, 8L, null, "value".getBytes()));
+        consumer.addRecord(new ConsumerRecord<>(topicName, 0, 9L, "key".getBytes(), "value".getBytes()));
+
+        changelogReader.restore();
+
+        assertEquals(StoreChangelogReader.ChangelogState.RESTORING, changelogReader.changelogMetadata(tp).state());
+        assertEquals(3L, changelogReader.changelogMetadata(tp).totalRestored());
+        assertEquals(0, changelogReader.changelogMetadata(tp).bufferedRecords().size());
+        assertEquals(0, changelogReader.changelogMetadata(tp).bufferedLimitIndex());
+        assertEquals(storeName, restoreListener.storeNameCalledStates.get(RESTORE_BATCH));
+
+        // consumer position bypassing the gap in the next poll
+        consumer.seek(tp, 11L);
+
+        changelogReader.restore();
+
+        assertEquals(11L, consumer.position(tp));
+        assertEquals(3L, changelogReader.changelogMetadata(tp).totalRestored());
+
+        assertEquals(StoreChangelogReader.ChangelogState.COMPLETED, changelogReader.changelogMetadata(tp).state());
+        assertEquals(3L, changelogReader.changelogMetadata(tp).totalRestored());
+        assertEquals(Collections.singleton(tp), changelogReader.completedChangelogs());
+        assertEquals(Collections.singleton(tp), consumer.paused());
+
+        assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_BATCH));
+        assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_END));
+        assertEquals(storeName, restoreListener.storeNameCalledStates.get(RESTORE_END));
+    }
+
+    @Test
     public void shouldTransitState() {
         EasyMock.expect(storeMetadataOne.changelogPartition()).andReturn(tp1).anyTimes();
         EasyMock.expect(storeMetadataOne.store()).andReturn(store).anyTimes();