You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2020/10/31 00:07:45 UTC

[kafka] branch 2.6 updated (5200905 -> cef7ca1)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a change to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from 5200905  MINOR: Fix verification in StreamsUpgradeTest.test_version_probing_upgrade (#9530)
     new aa306b8  KAFKA-10664: Delete existing checkpoint when writing empty offsets (#9534)
     new cef7ca1  KAFKA-10651: read  offsets directly from checkpoint for uninitialized tasks (#9515)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../streams/processor/internals/TaskManager.java   |  3 +-
 .../streams/state/internals/OffsetCheckpoint.java  |  4 +-
 .../processor/internals/TaskManagerTest.java       | 66 +++++++++++++++++++---
 .../state/internals/OffsetCheckpointTest.java      | 21 ++++++-
 4 files changed, 83 insertions(+), 11 deletions(-)


[kafka] 01/02: KAFKA-10664: Delete existing checkpoint when writing empty offsets (#9534)

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit aa306b8acbe41461b9748ee62275897a109989ab
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Oct 30 13:28:31 2020 -0700

    KAFKA-10664: Delete existing checkpoint when writing empty offsets (#9534)
    
    Delete the existing checkpoint file if told to write empty offsets map to ensure that corrupted offsets are not re-initialized from
    
    Reviewers: Bruno Cadonna <br...@confluent.io>, Guozhang Wang <gu...@apache.org>
---
 .../streams/state/internals/OffsetCheckpoint.java   |  4 +++-
 .../state/internals/OffsetCheckpointTest.java       | 21 ++++++++++++++++++++-
 2 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index 59afbb3..3ec2386 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -78,8 +78,10 @@ public class OffsetCheckpoint {
      * @throws IOException if any file operation fails with an IO exception
      */
     public void write(final Map<TopicPartition, Long> offsets) throws IOException {
-        // if there is no offsets, skip writing the file to save disk IOs
+        // if there are no offsets, skip writing the file to save disk IOs
+        // but make sure to delete the existing file if one exists
         if (offsets.isEmpty()) {
+            Utils.delete(file);
             return;
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
index fe871e1..d9ddff1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
@@ -34,6 +34,7 @@ import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.writeEnt
 import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.writeIntLine;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
@@ -74,7 +75,7 @@ public class OffsetCheckpointTest {
         final File f = new File(TestUtils.tempDirectory().getAbsolutePath(), "kafka.tmp");
         final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
 
-        checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
+        checkpoint.write(Collections.emptyMap());
 
         assertFalse(f.exists());
 
@@ -85,6 +86,24 @@ public class OffsetCheckpointTest {
     }
 
     @Test
+    public void shouldDeleteExistingCheckpointWhenNoOffsets() throws IOException {
+        final File file = TestUtils.tempFile();
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(file);
+
+        final Map<TopicPartition, Long> offsets = Collections.singletonMap(new TopicPartition(topic, 0), 1L);
+
+        checkpoint.write(offsets);
+
+        assertThat(file.exists(), is(true));
+        assertThat(offsets, is(checkpoint.read()));
+
+        checkpoint.write(Collections.emptyMap());
+
+        assertThat(file.exists(), is(false));
+        assertThat(Collections.<TopicPartition, Long>emptyMap(), is(checkpoint.read()));
+    }
+
+    @Test
     public void shouldSkipInvalidOffsetsDuringRead() throws IOException {
         final File file = TestUtils.tempFile();
         final OffsetCheckpoint checkpoint = new OffsetCheckpoint(file);


[kafka] 02/02: KAFKA-10651: read offsets directly from checkpoint for uninitialized tasks (#9515)

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit cef7ca13822fb8627ae93923ebbbca9a23872173
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Oct 30 13:36:35 2020 -0700

    KAFKA-10651: read  offsets directly from checkpoint for uninitialized tasks (#9515)
    
    Read offsets directly from the checkpoint file if a task is uninitialized or closed
    
    Reviewers: Bruno Cadonna <br...@confluent.io>, John Roesler <vv...@apache.org>
---
 .../streams/processor/internals/TaskManager.java   |  3 +-
 .../processor/internals/TaskManagerTest.java       | 66 +++++++++++++++++++---
 2 files changed, 60 insertions(+), 9 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 7f019c6..a1eb2fa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -604,7 +604,8 @@ public class TaskManager {
         // just have an empty changelogOffsets map.
         for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.keySet())) {
             final Task task = tasks.get(id);
-            if (task != null) {
+            // Closed and uninitialized tasks don't have any offsets so we should read directly from the checkpoint
+            if (task != null && task.state() != State.CREATED && task.state() != State.CLOSED) {
                 final Map<TopicPartition, Long> changelogOffsets = task.changelogOffsets();
                 if (changelogOffsets.isEmpty()) {
                     log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", id);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index a7433fa3..2c8b740 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode;
+import org.apache.kafka.streams.processor.internals.Task.State;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.easymock.EasyMock;
@@ -95,7 +96,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
@@ -339,6 +339,57 @@ public class TaskManagerTest {
     }
 
     @Test
+    public void shouldComputeOffsetSumFromCheckpointFileForUninitializedTask() throws Exception {
+        final Map<TopicPartition, Long> changelogOffsets = mkMap(
+            mkEntry(new TopicPartition("changelog", 0), 5L),
+            mkEntry(new TopicPartition("changelog", 1), 10L)
+        );
+        final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 15L));
+
+        expectLockObtainedFor(taskId00);
+        makeTaskFolders(taskId00.toString());
+        writeCheckpointFile(taskId00, changelogOffsets);
+        replay(stateDirectory);
+
+        taskManager.handleRebalanceStart(singleton("topic"));
+        final StateMachineTask uninitializedTask = new StateMachineTask(taskId00, taskId00Partitions, true);
+        expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singleton(uninitializedTask));
+        replay(activeTaskCreator);
+        taskManager.handleAssignment(taskId00Assignment, emptyMap());
+
+        assertThat(uninitializedTask.state(), is(State.CREATED));
+
+        assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
+    }
+
+    @Test
+    public void shouldComputeOffsetSumFromCheckpointFileForClosedTask() throws Exception {
+        final Map<TopicPartition, Long> changelogOffsets = mkMap(
+            mkEntry(new TopicPartition("changelog", 0), 5L),
+            mkEntry(new TopicPartition("changelog", 1), 10L)
+        );
+        final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 15L));
+
+        expectLockObtainedFor(taskId00);
+        makeTaskFolders(taskId00.toString());
+        writeCheckpointFile(taskId00, changelogOffsets);
+        replay(stateDirectory);
+
+        final StateMachineTask closedTask = new StateMachineTask(taskId00, taskId00Partitions, true);
+
+        taskManager.handleRebalanceStart(singleton("topic"));
+        expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singleton(closedTask));
+        replay(activeTaskCreator);
+        taskManager.handleAssignment(taskId00Assignment, emptyMap());
+
+        closedTask.suspend();
+        closedTask.closeClean();
+        assertThat(closedTask.state(), is(State.CLOSED));
+
+        assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
+    }
+    
+    @Test
     public void shouldNotReportOffsetSumsForTaskWeCantLock() throws Exception {
         expectLockFailedFor(taskId00);
         makeTaskFolders(taskId00.toString());
@@ -2430,8 +2481,10 @@ public class TaskManagerTest {
                                            .map(t -> new StateMachineTask(t.getKey(), t.getValue(), false))
                                            .collect(Collectors.toSet());
         final Set<Task> restoringTasks = restoringActiveAssignment.entrySet().stream()
-                                             .map(t -> new StateMachineTask(t.getKey(), t.getValue(), true))
-                                             .collect(Collectors.toSet());
+                                           .map(t -> new StateMachineTask(t.getKey(), t.getValue(), true))
+                                           .collect(Collectors.toSet());
+        // give the restoring tasks some uncompleted changelog partitions so they'll stay in restoring
+        restoringTasks.forEach(t -> ((StateMachineTask) t).setChangelogOffsets(singletonMap(new TopicPartition("changelog", 0), 0L)));
 
         // Initially assign only the active tasks we want to complete restoration
         final Map<TaskId, Set<TopicPartition>> allActiveTasksAssignment = new HashMap<>(runningActiveAssignment);
@@ -2439,17 +2492,14 @@ public class TaskManagerTest {
         final Set<Task> allActiveTasks = new HashSet<>(runningTasks);
         allActiveTasks.addAll(restoringTasks);
 
-        expect(activeTaskCreator.createTasks(anyObject(), eq(runningActiveAssignment))).andStubReturn(runningTasks);
         expect(standbyTaskCreator.createTasks(eq(standbyAssignment))).andStubReturn(standbyTasks);
         expect(activeTaskCreator.createTasks(anyObject(), eq(allActiveTasksAssignment))).andStubReturn(allActiveTasks);
 
         expectRestoreToBeCompleted(consumer, changeLogReader);
         replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
 
-        taskManager.handleAssignment(runningActiveAssignment, standbyAssignment);
-        assertThat(taskManager.tryToCompleteRestoration(), is(true));
-
         taskManager.handleAssignment(allActiveTasksAssignment, standbyAssignment);
+        taskManager.tryToCompleteRestoration();
 
         final Map<TaskId, StateMachineTask> allTasks = new HashMap<>();
 
@@ -2459,7 +2509,7 @@ public class TaskManagerTest {
             allTasks.put(task.id(), (StateMachineTask) task);
         }
         for (final Task task : restoringTasks) {
-            assertThat(task.state(), not(Task.State.RUNNING));
+            assertThat(task.state(), is(Task.State.RESTORING));
             allTasks.put(task.id(), (StateMachineTask) task);
         }
         for (final Task task : standbyTasks) {