You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "cadonna (via GitHub)" <gi...@apache.org> on 2023/04/03 10:48:50 UTC

[GitHub] [kafka] cadonna commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

cadonna commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1155775071


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1692,43 +1699,45 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
         final TopicPartition repartition = new TopicPartition("repartition", 1);
 
         final ProcessorTopology topology = withRepartitionTopics(
-            asList(source1, source2),
-            mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
-            singleton(repartition.topic())
+                asList(source1, source2),
+                mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
+                singleton(repartition.topic())
         );
         consumer.assign(asList(partition1, repartition));
         consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L)));
 
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
+        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); // restoration checkpoint
+        EasyMock.expect(stateManager.changelogPartitions()).andStubReturn(Collections.emptySet());
         EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
         final StreamsConfig config = createConfig();
         final InternalProcessorContext context = new ProcessorContextImpl(
-            taskId,
-            config,
-            stateManager,
-            streamsMetrics,
-            null
+                taskId,
+                config,
+                stateManager,
+                streamsMetrics,
+                null

Review Comment:
   Indentation of 4 is fine. Could you revert this change, please?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1546,8 +1551,9 @@ public void shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() {
         stateManager.checkpoint();
         EasyMock.expectLastCall().once();
         EasyMock.expect(stateManager.changelogOffsets())
-            .andReturn(singletonMap(changelogPartition, 10L))
-            .andReturn(singletonMap(changelogPartition, 20L));
+                .andReturn(singletonMap(changelogPartition, 10L)) // restoration checkpoint
+                .andReturn(singletonMap(changelogPartition, 10L))
+                .andReturn(singletonMap(changelogPartition, 20L));

Review Comment:
   ```suggestion
               .andReturn(singletonMap(changelogPartition, 10L)) // restoration checkpoint
               .andReturn(singletonMap(changelogPartition, 10L))
               .andReturn(singletonMap(changelogPartition, 20L));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1692,43 +1699,45 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
         final TopicPartition repartition = new TopicPartition("repartition", 1);
 
         final ProcessorTopology topology = withRepartitionTopics(
-            asList(source1, source2),
-            mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
-            singleton(repartition.topic())
+                asList(source1, source2),
+                mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
+                singleton(repartition.topic())

Review Comment:
   Could you revert this change, please? An indentation of 4 is fine.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2505,34 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void testRestoration_CheckpointWrittenWhenEOSDisabled() {

Review Comment:
   We usually use the form `should...` for test names. My proposal here is to use `shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled`.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2505,34 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void testRestoration_CheckpointWrittenWhenEOSDisabled() {
+        EasyMock.expect(stateManager.changelogOffsets())
+                .andReturn(singletonMap(partition1, 0L)); // restoration checkpoint
+        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap());
+        stateManager.checkpoint();
+        EasyMock.expectLastCall().once(); // checkpoint should only be called once
+        EasyMock.replay(stateManager, recordCollector);
+
+        task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        EasyMock.verify(stateManager);
+    }
+
+    @Test
+    public void testRestoration_CheckpointNotWrittenWhenEOSEnabled() {
+        stateManager.checkpoint();
+        EasyMock.expectLastCall().andStubThrow(new AssertionError("should not checkpoint on EOS"));
+        stateManager.changelogOffsets();
+        EasyMock.expectLastCall().andStubThrow(new AssertionError("should not checkpoint on EOS"));
+        EasyMock.replay(stateManager, recordCollector);
+
+        task = createStatefulTask(createConfig(EXACTLY_ONCE_V2, "100"), true);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });

Review Comment:
   Could you please use Mockito instead of EasyMock here? We are migrating to Mockito and we would like to have new tests using Mockito. See `TaskManagerTest` for an example where a test class uses Mockito and EasyMock. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1692,43 +1699,45 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
         final TopicPartition repartition = new TopicPartition("repartition", 1);
 
         final ProcessorTopology topology = withRepartitionTopics(
-            asList(source1, source2),
-            mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
-            singleton(repartition.topic())
+                asList(source1, source2),
+                mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
+                singleton(repartition.topic())
         );
         consumer.assign(asList(partition1, repartition));
         consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L)));
 
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
+        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); // restoration checkpoint
+        EasyMock.expect(stateManager.changelogPartitions()).andStubReturn(Collections.emptySet());
         EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
         final StreamsConfig config = createConfig();
         final InternalProcessorContext context = new ProcessorContextImpl(
-            taskId,
-            config,
-            stateManager,
-            streamsMetrics,
-            null
+                taskId,
+                config,
+                stateManager,
+                streamsMetrics,
+                null
         );
 
         task = new StreamTask(
-            taskId,
-            mkSet(partition1, repartition),
-            topology,
-            consumer,
-            new TopologyConfig(null,  config, new Properties()).getTaskConfig(),
-            streamsMetrics,
-            stateDirectory,
-            cache,
-            time,
-            stateManager,
-            recordCollector,
-            context,
-            logContext);
+                taskId,
+                mkSet(partition1, repartition),
+                topology,
+                consumer,
+                new TopologyConfig(null, config, new Properties()).getTaskConfig(),
+                streamsMetrics,
+                stateDirectory,
+                cache,
+                time,
+                stateManager,
+                recordCollector,
+                context,
+                logContext);
 
         task.initializeIfNeeded();
-        task.completeRestoration(noOpResetter -> { });
+        task.completeRestoration(noOpResetter -> {
+        });

Review Comment:
   This change is not needed for the purpose of this PR.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2505,34 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void testRestoration_CheckpointWrittenWhenEOSDisabled() {
+        EasyMock.expect(stateManager.changelogOffsets())
+                .andReturn(singletonMap(partition1, 0L)); // restoration checkpoint
+        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap());
+        stateManager.checkpoint();
+        EasyMock.expectLastCall().once(); // checkpoint should only be called once
+        EasyMock.replay(stateManager, recordCollector);
+
+        task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        EasyMock.verify(stateManager);

Review Comment:
   Could you please use Mockito instead of EasyMock here? We are migrating to Mockito and we would like to have new tests using Mockito. See `TaskManagerTest` for an example where a test class uses Mockito and EasyMock. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2505,34 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void testRestoration_CheckpointWrittenWhenEOSDisabled() {
+        EasyMock.expect(stateManager.changelogOffsets())
+                .andReturn(singletonMap(partition1, 0L)); // restoration checkpoint
+        EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap());
+        stateManager.checkpoint();
+        EasyMock.expectLastCall().once(); // checkpoint should only be called once
+        EasyMock.replay(stateManager, recordCollector);
+
+        task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        EasyMock.verify(stateManager);
+    }
+
+    @Test
+    public void testRestoration_CheckpointNotWrittenWhenEOSEnabled() {

Review Comment:
   See aboveWe usually use the form `should...` for test names. My proposal here is to use `shouldNotCheckpointAfterRestorationWhenExactlyOnceEnabled`.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1692,43 +1699,45 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
         final TopicPartition repartition = new TopicPartition("repartition", 1);
 
         final ProcessorTopology topology = withRepartitionTopics(
-            asList(source1, source2),
-            mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
-            singleton(repartition.topic())
+                asList(source1, source2),
+                mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), source2)),
+                singleton(repartition.topic())
         );
         consumer.assign(asList(partition1, repartition));
         consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L)));
 
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
+        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); // restoration checkpoint
+        EasyMock.expect(stateManager.changelogPartitions()).andStubReturn(Collections.emptySet());
         EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
         final StreamsConfig config = createConfig();
         final InternalProcessorContext context = new ProcessorContextImpl(
-            taskId,
-            config,
-            stateManager,
-            streamsMetrics,
-            null
+                taskId,
+                config,
+                stateManager,
+                streamsMetrics,
+                null
         );
 
         task = new StreamTask(
-            taskId,
-            mkSet(partition1, repartition),
-            topology,
-            consumer,
-            new TopologyConfig(null,  config, new Properties()).getTaskConfig(),
-            streamsMetrics,
-            stateDirectory,
-            cache,
-            time,
-            stateManager,
-            recordCollector,
-            context,
-            logContext);
+                taskId,
+                mkSet(partition1, repartition),
+                topology,
+                consumer,
+                new TopologyConfig(null, config, new Properties()).getTaskConfig(),
+                streamsMetrics,
+                stateDirectory,
+                cache,
+                time,
+                stateManager,
+                recordCollector,
+                context,
+                logContext);

Review Comment:
   Thank you for removing the superfluous space! However, the indentation of 4 is fine. Could you revert the indentation, please?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -255,6 +255,9 @@ public void completeRestoration(final java.util.function.Consumer<Set<TopicParti
                 resetOffsetsIfNeededAndInitializeMetadata(offsetResetter);
                 initializeTopology();
                 processorContext.initialize();
+                if (!eosEnabled) {
+                    maybeCheckpoint(true); // enforce checkpoint upon completing restoration

Review Comment:
   Could you please remove the inline comment? I do not think it adds any information that is not already contained in the call to `maybeCheckpoint(true)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org