You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/06/06 03:42:54 UTC

[kafka] branch 2.6 updated: MINOR: improve code encapsulation between StreamThread and TaskManager (#8819)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 775d1b3  MINOR: improve code encapsulation between StreamThread and TaskManager (#8819)
775d1b3 is described below

commit 775d1b3bee1efcd56e3d118d4bb124dd9532ef7c
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri Jun 5 20:29:50 2020 -0700

    MINOR: improve code encapsulation between StreamThread and TaskManager (#8819)
    
    Reviewers: Boyang Chen <bo...@confluent.io>, John Roesler <jo...@confluent.io>
---
 .../streams/processor/internals/StreamThread.java  | 21 +----------------
 .../streams/processor/internals/TaskManager.java   | 26 +++++++++++++++++-----
 .../processor/internals/TaskManagerTest.java       |  1 -
 3 files changed, 22 insertions(+), 26 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 6e7a3aa..3a57cc6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -644,7 +644,7 @@ public class StreamThread extends Thread {
         if (records != null && !records.isEmpty()) {
             pollSensor.record(pollLatency, now);
             pollRecordsSensor.record(records.count(), now);
-            addRecordsToTasks(records);
+            taskManager.addRecordsToTasks(records);
         }
 
         // Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN during #pollRequests().
@@ -820,25 +820,6 @@ public class StreamThread extends Thread {
     }
 
     /**
-     * Take records and add them to each respective task
-     *
-     * @param records Records, can be null
-     */
-    private void addRecordsToTasks(final ConsumerRecords<byte[], byte[]> records) {
-        for (final TopicPartition partition : records.partitions()) {
-            final Task task = taskManager.taskForInputPartition(partition);
-
-            if (task == null) {
-                log.error("Unable to locate active task for received-record partition {}. Current tasks: {}",
-                    partition, taskManager.toString(">"));
-                throw new NullPointerException("Task was unexpectedly missing for partition " + partition);
-            }
-
-            task.addRecords(partition, records.records(partition));
-        }
-    }
-
-    /**
      * Try to commit all active tasks owned by this thread.
      *
      * Visible for testing.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index d1be8a3..5c55093 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.DeleteRecordsResult;
 import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
@@ -86,7 +87,7 @@ public class TaskManager {
     private boolean rebalanceInProgress = false;  // if we are in the middle of a rebalance, it is not safe to commit
 
     // includes assigned & initialized tasks and unassigned tasks we locked temporarily during rebalance
-    private Set<TaskId> lockedTaskDirectories = new HashSet<>();
+    private final Set<TaskId> lockedTaskDirectories = new HashSet<>();
 
     TaskManager(final ChangelogReader changelogReader,
                 final UUID processId,
@@ -743,10 +744,6 @@ public class TaskManager {
             .collect(Collectors.toSet());
     }
 
-    Task taskForInputPartition(final TopicPartition partition) {
-        return partitionToTask.get(partition);
-    }
-
     Map<TaskId, Task> tasks() {
         // not bothering with an unmodifiable map, since the tasks themselves are mutable, but
         // if any outside code modifies the map or the tasks, it would be a severe transgression.
@@ -779,6 +776,25 @@ public class TaskManager {
     }
 
     /**
+     * Take records and add them to each respective task
+     *
+     * @param records Records, can be null
+     */
+    void addRecordsToTasks(final ConsumerRecords<byte[], byte[]> records) {
+        for (final TopicPartition partition : records.partitions()) {
+            final Task task = partitionToTask.get(partition);
+
+            if (task == null) {
+                log.error("Unable to locate active task for received-record partition {}. Current tasks: {}",
+                    partition, toString(">"));
+                throw new NullPointerException("Task was unexpectedly missing for partition " + partition);
+            }
+
+            task.addRecords(partition, records.records(partition));
+        }
+    }
+
+    /**
      * @throws TaskMigratedException if committing offsets failed (non-EOS)
      *                               or if the task producer got fenced (EOS)
      * @return number of committed offsets, or -1 if we are in the middle of a rebalance and cannot commit
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 4d38b2f..7f31d7e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -694,7 +694,6 @@ public class TaskManagerTest {
         assertThat(taskManager.tryToCompleteRestoration(), is(true));
         assertThat(task00.state(), is(Task.State.RUNNING));
         assertEquals(newPartitionsSet, task00.inputPartitions());
-        assertEquals(task00, taskManager.taskForInputPartition(t1p1));
         verify(activeTaskCreator, consumer, changeLogReader);
     }