You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/03 18:56:08 UTC

[GitHub] [kafka] abbccdda commented on a change in pull request #10211: KAFKA-12347: updating TaskMetadata

abbccdda commented on a change in pull request #10211:
URL: https://github.com/apache/kafka/pull/10211#discussion_r586687918



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
##########
@@ -32,10 +34,22 @@
 
     private final Set<TopicPartition> topicPartitions;
 
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final Map<TopicPartition, Long> endOffsets;
+
+    private Optional<Long> timeCurrentIdlingStarted;

Review comment:
       This could be final as well

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -266,13 +272,13 @@ public void suspend() {
             case CREATED:
                 log.info("Suspended created");
                 transitionTo(State.SUSPENDED);
-
+                timeCurrentIdlingStarted = Optional.of(System.currentTimeMillis());

Review comment:
       Since this is coupled with suspend state transition, could we refactor a helper method instead?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -282,6 +284,21 @@ public boolean commitNeeded() {
         return Collections.unmodifiableMap(stateMgr.changelogOffsets());
     }
 
+    @Override
+    public Map<TopicPartition, Long> getCommittedOffsets() {
+        return new HashMap<>();
+    }
+
+    @Override
+    public Map<TopicPartition, Long> getHighWaterMark() {
+        return new HashMap<>();
+    }

Review comment:
       Could you elaborate? Why do we need to modify the results afterwards?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1113,6 +1116,16 @@ private void commitOffsetsOrTransaction(final Map<Task, Map<TopicPartition, Offs
         }
     }
 
+    private void updateTaskMetadata(final Map<TopicPartition, OffsetAndMetadata> allOffsets) {
+        for (final Task task: tasks.activeTasks()) {
+            for (final TopicPartition topicPartition: task.inputPartitions()) {
+                if (allOffsets.containsKey(topicPartition)) {
+                    task.getCommittedOffsets().put(topicPartition, allOffsets.get(topicPartition).offset());

Review comment:
       I see why we need to return a modifiable map here. My suggestion is to separate the setter and getter so that when we return a map for `get` call, it is a deep-copy and doesn't attach to the thread anymore. The benefit is that application level won't accidentally mutate the thread state, thus providing better encapsulation IMHO.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -237,4 +238,9 @@ default boolean commitRequested() {
      */
     Map<TopicPartition, Long> changelogOffsets();
 
+    Map<TopicPartition, Long> getCommittedOffsets();

Review comment:
       Could we add some comments for these base functions?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -237,4 +238,9 @@ default boolean commitRequested() {
      */
     Map<TopicPartition, Long> changelogOffsets();
 
+    Map<TopicPartition, Long> getCommittedOffsets();
+
+    Map<TopicPartition, Long> getHighWaterMark();
+
+    Optional<Long> getTimeCurrentIdlingStarted();

Review comment:
       In Kafka, we try to avoid using `get***` pattern to simplify the function signature. Let's remove the `get` part then.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1598,7 +1598,6 @@ private long countStreamThread(final Predicate<StreamThread> predicate) {
      * @return the set of {@link ThreadMetadata}.
      */
     public Set<ThreadMetadata> localThreadsMetadata() {
-        validateIsRunningOrRebalancing();

Review comment:
       If this function could bypass rebalancing state check, what about other functions such as `allMetadata`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org