You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/02/07 19:08:00 UTC

[jira] [Commented] (KAFKA-6367) Fix StateRestoreListener To Use Correct Batch Ending Offset

    [ https://issues.apache.org/jira/browse/KAFKA-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355902#comment-16355902 ] 

ASF GitHub Bot commented on KAFKA-6367:
---------------------------------------

mjsax closed pull request #4507: KAFKA-6367: StateRestoreListener use actual last restored offset for restored batch
URL: https://github.com/apache/kafka/pull/4507
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
index c80a736734d..ea1c2888409 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
@@ -43,7 +43,7 @@
      * @param topicPartition the TopicPartition containing the values to restore
      * @param storeName      the name of the store undergoing restoration
      * @param startingOffset the starting offset of the entire restoration process for this TopicPartition
-     * @param endingOffset   the ending offset of the entire restoration process for this TopicPartition
+     * @param endingOffset   the exclusive ending offset of the entire restoration process for this TopicPartition
      */
     void onRestoreStart(final TopicPartition topicPartition,
                         final String storeName,
@@ -62,7 +62,7 @@ void onRestoreStart(final TopicPartition topicPartition,
      *
      * @param topicPartition the TopicPartition containing the values to restore
      * @param storeName the name of the store undergoing restoration
-     * @param batchEndOffset the ending offset for the current restored batch for this TopicPartition
+     * @param batchEndOffset the inclusive ending offset for the current restored batch for this TopicPartition
      * @param numRestored the total number of records restored in this batch for this TopicPartition
      */
     void onBatchRestored(final TopicPartition topicPartition,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index ba17ce95ede..b11c45ba313 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -273,12 +273,14 @@ private long processNext(final List<ConsumerRecord<byte[], byte[]>> records,
         long nextPosition = -1;
         int numberRecords = records.size();
         int numberRestored = 0;
+        long lastRestoredOffset = -1;
         for (final ConsumerRecord<byte[], byte[]> record : records) {
             final long offset = record.offset();
             if (restorer.hasCompleted(offset, endOffset)) {
                 nextPosition = record.offset();
                 break;
             }
+            lastRestoredOffset = offset;
             numberRestored++;
             if (record.key() != null) {
                 restoreRecords.add(KeyValue.pair(record.key(), record.value()));
@@ -295,8 +297,7 @@ private long processNext(final List<ConsumerRecord<byte[], byte[]>> records,
 
         if (!restoreRecords.isEmpty()) {
             restorer.restore(restoreRecords);
-            restorer.restoreBatchCompleted(nextPosition, records.size());
-
+            restorer.restoreBatchCompleted(lastRestoredOffset, records.size());
         }
 
         return nextPosition;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index ee964513415..e69cede23fd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -234,15 +234,28 @@ public void shouldRestoreAndNotifyMultipleStores() throws Exception {
         assertThat(callbackTwo.restored.size(), equalTo(3));
 
         assertAllCallbackStatesExecuted(callback, "storeName1");
-        assertCorrectOffsetsReportedByListener(callback, 0L, 10L, 10L);
+        assertCorrectOffsetsReportedByListener(callback, 0L, 9L, 10L);
 
         assertAllCallbackStatesExecuted(callbackOne, "storeName2");
-        assertCorrectOffsetsReportedByListener(callbackOne, 0L, 5L, 5L);
+        assertCorrectOffsetsReportedByListener(callbackOne, 0L, 4L, 5L);
 
         assertAllCallbackStatesExecuted(callbackTwo, "storeName3");
-        assertCorrectOffsetsReportedByListener(callbackTwo, 0L, 3L, 3L);
+        assertCorrectOffsetsReportedByListener(callbackTwo, 0L, 2L, 3L);
     }
 
+    @Test
+    public void shouldOnlyReportTheLastRestoredOffset() {
+        setupConsumer(10, topicPartition);
+        changelogReader
+            .register(new StateRestorer(topicPartition, restoreListener, null, 5, true, "storeName1"));
+        changelogReader.restore(active);
+
+        assertThat(callback.restored.size(), equalTo(5));
+        assertAllCallbackStatesExecuted(callback, "storeName1");
+        assertCorrectOffsetsReportedByListener(callback, 0L, 4L, 5L);
+    }
+
+
     private void assertAllCallbackStatesExecuted(final MockStateRestoreListener restoreListener,
                                                  final String storeName) {
         assertThat(restoreListener.storeNameCalledStates.get(RESTORE_START), equalTo(storeName));
@@ -253,11 +266,12 @@ private void assertAllCallbackStatesExecuted(final MockStateRestoreListener rest
 
     private void assertCorrectOffsetsReportedByListener(final MockStateRestoreListener restoreListener,
                                                         final long startOffset,
-                                                        final long batchOffset, final long endOffset) {
+                                                        final long batchOffset,
+                                                        final long totalRestored) {
 
         assertThat(restoreListener.restoreStartOffset, equalTo(startOffset));
         assertThat(restoreListener.restoredBatchOffset, equalTo(batchOffset));
-        assertThat(restoreListener.restoreEndOffset, equalTo(endOffset));
+        assertThat(restoreListener.totalNumRestored, equalTo(totalRestored));
     }
 
     @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Fix StateRestoreListener To Use Correct Batch Ending Offset
> -----------------------------------------------------------
>
>                 Key: KAFKA-6367
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6367
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.0, 1.0.0
>            Reporter: Bill Bejeck
>            Assignee: Bill Bejeck
>            Priority: Major
>             Fix For: 1.0.2
>
>
> {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} longĀ for the batch ending offset, but the {{nextPosition}} is not correct, it should be the offset of the latest restored offset, but {{nextPosition}} is the offset of the first not restored offset.
> We can't automatically use {{nextPosition}} - 1 as this could be a commit marker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)