You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/25 20:55:06 UTC

[GitHub] [kafka] wcarlson5 opened a new pull request #10211: KAFKA-12347: updating TaskMetadata (WIP)

wcarlson5 opened a new pull request #10211:
URL: https://github.com/apache/kafka/pull/10211


   adding fields to taskMetadata
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #10211: KAFKA-12347: updating TaskMetadata

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #10211:
URL: https://github.com/apache/kafka/pull/10211#discussion_r586728619



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -282,6 +284,21 @@ public boolean commitNeeded() {
         return Collections.unmodifiableMap(stateMgr.changelogOffsets());
     }
 
+    @Override
+    public Map<TopicPartition, Long> getCommittedOffsets() {
+        return new HashMap<>();
+    }
+
+    @Override
+    public Map<TopicPartition, Long> getHighWaterMark() {
+        return new HashMap<>();
+    }

Review comment:
       I changed it a bit so that we don't need to have it be modifiable

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -266,13 +272,13 @@ public void suspend() {
             case CREATED:
                 log.info("Suspended created");
                 transitionTo(State.SUSPENDED);
-
+                timeCurrentIdlingStarted = Optional.of(System.currentTimeMillis());

Review comment:
       sure

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1598,7 +1598,6 @@ private long countStreamThread(final Predicate<StreamThread> predicate) {
      * @return the set of {@link ThreadMetadata}.
      */
     public Set<ThreadMetadata> localThreadsMetadata() {
-        validateIsRunningOrRebalancing();

Review comment:
       They probably could as well. But In the KIP we only changed this one.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -237,4 +238,9 @@ default boolean commitRequested() {
      */
     Map<TopicPartition, Long> changelogOffsets();
 
+    Map<TopicPartition, Long> getCommittedOffsets();
+
+    Map<TopicPartition, Long> getHighWaterMark();
+
+    Optional<Long> getTimeCurrentIdlingStarted();

Review comment:
       good idea

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1113,6 +1116,16 @@ private void commitOffsetsOrTransaction(final Map<Task, Map<TopicPartition, Offs
         }
     }
 
+    private void updateTaskMetadata(final Map<TopicPartition, OffsetAndMetadata> allOffsets) {
+        for (final Task task: tasks.activeTasks()) {
+            for (final TopicPartition topicPartition: task.inputPartitions()) {
+                if (allOffsets.containsKey(topicPartition)) {
+                    task.getCommittedOffsets().put(topicPartition, allOffsets.get(topicPartition).offset());

Review comment:
       I think you are right. I will update

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
##########
@@ -32,10 +34,22 @@
 
     private final Set<TopicPartition> topicPartitions;
 
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final Map<TopicPartition, Long> endOffsets;
+
+    private Optional<Long> timeCurrentIdlingStarted;

Review comment:
       yes

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -237,4 +238,9 @@ default boolean commitRequested() {
      */
     Map<TopicPartition, Long> changelogOffsets();
 
+    Map<TopicPartition, Long> getCommittedOffsets();

Review comment:
       sure




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #10211: KAFKA-12347: updating TaskMetadata

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #10211:
URL: https://github.com/apache/kafka/pull/10211#discussion_r586687918



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
##########
@@ -32,10 +34,22 @@
 
     private final Set<TopicPartition> topicPartitions;
 
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final Map<TopicPartition, Long> endOffsets;
+
+    private Optional<Long> timeCurrentIdlingStarted;

Review comment:
       This could be final as well

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -266,13 +272,13 @@ public void suspend() {
             case CREATED:
                 log.info("Suspended created");
                 transitionTo(State.SUSPENDED);
-
+                timeCurrentIdlingStarted = Optional.of(System.currentTimeMillis());

Review comment:
       Since this is coupled with suspend state transition, could we refactor a helper method instead?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -282,6 +284,21 @@ public boolean commitNeeded() {
         return Collections.unmodifiableMap(stateMgr.changelogOffsets());
     }
 
+    @Override
+    public Map<TopicPartition, Long> getCommittedOffsets() {
+        return new HashMap<>();
+    }
+
+    @Override
+    public Map<TopicPartition, Long> getHighWaterMark() {
+        return new HashMap<>();
+    }

Review comment:
       Could you elaborate? Why do we need to modify the results afterwards?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1113,6 +1116,16 @@ private void commitOffsetsOrTransaction(final Map<Task, Map<TopicPartition, Offs
         }
     }
 
+    private void updateTaskMetadata(final Map<TopicPartition, OffsetAndMetadata> allOffsets) {
+        for (final Task task: tasks.activeTasks()) {
+            for (final TopicPartition topicPartition: task.inputPartitions()) {
+                if (allOffsets.containsKey(topicPartition)) {
+                    task.getCommittedOffsets().put(topicPartition, allOffsets.get(topicPartition).offset());

Review comment:
       I see why we need to return a modifiable map here. My suggestion is to separate the setter and getter so that when we return a map for `get` call, it is a deep-copy and doesn't attach to the thread anymore. The benefit is that application level won't accidentally mutate the thread state, thus providing better encapsulation IMHO.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -237,4 +238,9 @@ default boolean commitRequested() {
      */
     Map<TopicPartition, Long> changelogOffsets();
 
+    Map<TopicPartition, Long> getCommittedOffsets();

Review comment:
       Could we add some comments for these base functions?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -237,4 +238,9 @@ default boolean commitRequested() {
      */
     Map<TopicPartition, Long> changelogOffsets();
 
+    Map<TopicPartition, Long> getCommittedOffsets();
+
+    Map<TopicPartition, Long> getHighWaterMark();
+
+    Optional<Long> getTimeCurrentIdlingStarted();

Review comment:
       In Kafka, we try to avoid using `get***` pattern to simplify the function signature. Let's remove the `get` part then.

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1598,7 +1598,6 @@ private long countStreamThread(final Predicate<StreamThread> predicate) {
      * @return the set of {@link ThreadMetadata}.
      */
     public Set<ThreadMetadata> localThreadsMetadata() {
-        validateIsRunningOrRebalancing();

Review comment:
       If this function could bypass rebalancing state check, what about other functions such as `allMetadata`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #10211: KAFKA-12347: updating TaskMetadata

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #10211:
URL: https://github.com/apache/kafka/pull/10211#discussion_r587832660



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##########
@@ -3112,6 +3122,26 @@ void setChangelogOffsets(final Map<TopicPartition, Long> changelogOffsets) {
             return changelogOffsets;
         }
 
+        @Override
+        public Map<TopicPartition, Long> committedOffsets() {
+            return Collections.emptyMap();
+        }
+
+        @Override
+        public Map<TopicPartition, Long> highWaterMark() {
+            return Collections.emptyMap();
+        }
+
+        @Override
+        public Optional<Long> timeCurrentIdlingStarted() {
+            return Optional.empty();
+        }
+
+        @Override
+        public void updateCommittedOffsets(final TopicPartition topicPartition, final Long offset) {

Review comment:
       Could we add a test case for `updateCommittedOffsets`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -266,13 +272,13 @@ public void suspend() {
             case CREATED:
                 log.info("Suspended created");
                 transitionTo(State.SUSPENDED);
-
+                setTimeCurrentIdlingStarted();

Review comment:
       Actually I mean getting a helper like:
   ```
   void transitToSuspend() {
     log.info("Suspended {}", state()); 
     transitionTo(State.SUSPENDED);
     timeCurrentIdlingStarted = Optional.of(System.currentTimeMillis());
   }
   ```
   since the operation is the same for all 3 transition calls




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #10211: KAFKA-12347: updating TaskMetadata

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #10211:
URL: https://github.com/apache/kafka/pull/10211#discussion_r587907372



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -1136,6 +1139,33 @@ public boolean commitNeeded() {
         }
     }
 
+    @Override
+    public Map<TopicPartition, Long> committedOffsets() {
+        return Collections.unmodifiableMap(committedOffsets);
+    }
+
+    @Override
+    public Map<TopicPartition, Long> highWaterMark() {
+        highWatermark.putAll(recordCollector.offsets());
+        return Collections.unmodifiableMap(highWatermark);
+    }
+
+    private void transitToSuspend() {
+        log.info("Suspended created");

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #10211: KAFKA-12347: updating TaskMetadata

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #10211:
URL: https://github.com/apache/kafka/pull/10211#discussion_r587869809



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -266,13 +272,13 @@ public void suspend() {
             case CREATED:
                 log.info("Suspended created");
                 transitionTo(State.SUSPENDED);
-
+                setTimeCurrentIdlingStarted();

Review comment:
       sounds good




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10211: KAFKA-12347: updating TaskMetadata

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10211:
URL: https://github.com/apache/kafka/pull/10211#discussion_r586605597



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -1136,6 +1144,22 @@ public boolean commitNeeded() {
         }
     }
 
+    @Override
+    public Map<TopicPartition, Long> getCommittedOffsets() {
+        return committedOffsets;
+    }
+
+    @Override
+    public Map<TopicPartition, Long> getHighWaterMark() {
+        highWatermark.putAll(recordCollector.offsets());
+        return highWatermark;

Review comment:
       Seems the `highWatermark` is not used anywhere. Why not returning `recordCollector.offsets()` only?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #10211: KAFKA-12347: updating TaskMetadata

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #10211:
URL: https://github.com/apache/kafka/pull/10211#discussion_r586622218



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -282,6 +284,21 @@ public boolean commitNeeded() {
         return Collections.unmodifiableMap(stateMgr.changelogOffsets());
     }
 
+    @Override
+    public Map<TopicPartition, Long> getCommittedOffsets() {
+        return new HashMap<>();
+    }
+
+    @Override
+    public Map<TopicPartition, Long> getHighWaterMark() {
+        return new HashMap<>();
+    }

Review comment:
       The maps returned by they different types tasks are intended to modifiable

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
##########
@@ -32,10 +34,22 @@
 
     private final Set<TopicPartition> topicPartitions;
 
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final Map<TopicPartition, Long> endOffsets;
+
+    private Optional<Long> timeCurrentIdlingStarted;
+
     public TaskMetadata(final String taskId,
-                        final Set<TopicPartition> topicPartitions) {
+                        final Set<TopicPartition> topicPartitions,
+                        final Map<TopicPartition, Long> committedOffsets,
+                        final Map<TopicPartition, Long> endOffsets,
+                        final Optional<Long> timeCurrentIdlingStarted) {
         this.taskId = taskId;
         this.topicPartitions = Collections.unmodifiableSet(topicPartitions);
+        this.committedOffsets = committedOffsets;
+        this.endOffsets = endOffsets;

Review comment:
       The Metadata map could be immutable I will make this change.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -1136,6 +1144,22 @@ public boolean commitNeeded() {
         }
     }
 
+    @Override
+    public Map<TopicPartition, Long> getCommittedOffsets() {
+        return committedOffsets;
+    }
+
+    @Override
+    public Map<TopicPartition, Long> getHighWaterMark() {
+        highWatermark.putAll(recordCollector.offsets());
+        return highWatermark;

Review comment:
       `recordCollector.offsets()` could be missing some entires so we collect them in `highWatermark` if it is called multiple times for the Task which is expected.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #10211: KAFKA-12347: updating TaskMetadata

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #10211:
URL: https://github.com/apache/kafka/pull/10211#discussion_r587874211



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##########
@@ -3112,6 +3122,26 @@ void setChangelogOffsets(final Map<TopicPartition, Long> changelogOffsets) {
             return changelogOffsets;
         }
 
+        @Override
+        public Map<TopicPartition, Long> committedOffsets() {
+            return Collections.emptyMap();
+        }
+
+        @Override
+        public Map<TopicPartition, Long> highWaterMark() {
+            return Collections.emptyMap();
+        }
+
+        @Override
+        public Optional<Long> timeCurrentIdlingStarted() {
+            return Optional.empty();
+        }
+
+        @Override
+        public void updateCommittedOffsets(final TopicPartition topicPartition, final Long offset) {

Review comment:
       Since this is a testing implementation of Task I just added some checks to make sure the calls are valid, but I think that is sufficient. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #10211: KAFKA-12347: updating TaskMetadata

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #10211:
URL: https://github.com/apache/kafka/pull/10211#discussion_r587906332



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -1136,6 +1139,33 @@ public boolean commitNeeded() {
         }
     }
 
+    @Override
+    public Map<TopicPartition, Long> committedOffsets() {
+        return Collections.unmodifiableMap(committedOffsets);
+    }
+
+    @Override
+    public Map<TopicPartition, Long> highWaterMark() {
+        highWatermark.putAll(recordCollector.offsets());
+        return Collections.unmodifiableMap(highWatermark);
+    }
+
+    private void transitToSuspend() {
+        log.info("Suspended created");

Review comment:
       nit: log.info("Suspended {}", state());
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10211: KAFKA-12347: updating TaskMetadata

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10211:
URL: https://github.com/apache/kafka/pull/10211#discussion_r586578441



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -282,6 +284,21 @@ public boolean commitNeeded() {
         return Collections.unmodifiableMap(stateMgr.changelogOffsets());
     }
 
+    @Override
+    public Map<TopicPartition, Long> getCommittedOffsets() {
+        return new HashMap<>();
+    }
+
+    @Override
+    public Map<TopicPartition, Long> getHighWaterMark() {
+        return new HashMap<>();
+    }

Review comment:
       Should this two return a `Collections.emptyMap()` instead? Or is it expected that the returned map is modifiable?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
##########
@@ -32,10 +34,22 @@
 
     private final Set<TopicPartition> topicPartitions;
 
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final Map<TopicPartition, Long> endOffsets;
+
+    private Optional<Long> timeCurrentIdlingStarted;
+
     public TaskMetadata(final String taskId,
-                        final Set<TopicPartition> topicPartitions) {
+                        final Set<TopicPartition> topicPartitions,
+                        final Map<TopicPartition, Long> committedOffsets,
+                        final Map<TopicPartition, Long> endOffsets,
+                        final Optional<Long> timeCurrentIdlingStarted) {
         this.taskId = taskId;
         this.topicPartitions = Collections.unmodifiableSet(topicPartitions);
+        this.committedOffsets = committedOffsets;
+        this.endOffsets = endOffsets;

Review comment:
       Should these be wrapped with `Collections.unmodifiableMap()` or `ImmutableMap.copyOf()`? If the two maps are not expected to be modified after passing them as parameter, then I suggest `ImmutableMap.copyOf()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda merged pull request #10211: KAFKA-12347: updating TaskMetadata

Posted by GitBox <gi...@apache.org>.
abbccdda merged pull request #10211:
URL: https://github.com/apache/kafka/pull/10211


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org