You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/04 17:57:06 UTC

[GitHub] [kafka] chia7712 commented on a change in pull request #8852: MINOR: code cleanup for Kafka Streams task classes

chia7712 commented on a change in pull request #8852:
URL: https://github.com/apache/kafka/pull/8852#discussion_r465229314



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -116,89 +112,100 @@ public boolean isValidTransition(final State newState) {
      */
     void completeRestoration();
 
-    void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records);
-
-    boolean commitNeeded();
-
-    /**
-     * @throws StreamsException fatal error, should close the thread
-     */
-    Map<TopicPartition, OffsetAndMetadata> prepareCommit();
-
-    void postCommit();
-
     void suspend();
 
     /**
-     *
      * @throws StreamsException fatal error, should close the thread
      */
     void resume();
 
-    /**
-     * Must be idempotent.
-     */
+    void closeDirty();
+
     void closeClean();
 
-    /**
-     * Must be idempotent.
-     */
-    void closeDirty();
+
+    // non-idempotent life-cycle methods
 
     /**
-     * Updates input partitions and topology after rebalance
+     * Revive a closed task to a created one; should never throw an exception
      */
-    void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
+    void revive();
 
     /**
      * Attempt a clean close but do not close the underlying state
      */
     void closeAndRecycleState();
 
-    /**
-     * Revive a closed task to a created one; should never throw an exception
-     */
-    void revive();
-
-    StateStore getStore(final String name);
-
-    Set<TopicPartition> inputPartitions();
+    void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);
 
-    /**
-     * @return any changelog partitions associated with this task
-     */
-    Collection<TopicPartition> changelogPartitions();
 
-    /**
-     * @return the offsets of all the changelog partitions associated with this task,
-     *         indicating the current positions of the logged state stores of the task.
-     */
-    Map<TopicPartition, Long> changelogOffsets();
+    // runtime methods (using in RUNNING state)
 
-    void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);
+    void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records);
 
-    default Map<TopicPartition, Long> purgeableOffsets() {
-        return Collections.emptyMap();
+    default boolean process(final long wallClockTime) {
+        return false;
     }
 
     default void recordProcessBatchTime(final long processBatchTime) {}
 
     default void recordProcessTimeRatioAndBufferSize(final long allTaskProcessMs, final long now) {}
 
-    default boolean process(final long wallClockTime) {
+    default boolean maybePunctuateStreamTime() {
         return false;
     }
 
-    default boolean commitRequested() {
+    default boolean maybePunctuateSystemTime() {
         return false;
     }
 
-    default boolean maybePunctuateStreamTime() {
+    boolean commitNeeded();
+
+    default boolean commitRequested() {
         return false;
     }
 
-    default boolean maybePunctuateSystemTime() {
-        return false;
+    /**
+     * @throws StreamsException fatal error, should close the thread
+     */
+    Map<TopicPartition, OffsetAndMetadata> prepareCommit();
+
+    void postCommit();
+
+    default Map<TopicPartition, Long> purgeableOffsets() {
+        return Collections.emptyMap();
     }
 
+
+    // task status inquiry
+
+    TaskId id();
+
+    State state();
+
+    boolean isActive();
+
+    /**
+     * Updates input partitions after a rebalance
+     */
+    void updateInputPartitions(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
+
+    Set<TopicPartition> inputPartitions();
+
+    /**
+     * @return any changelog partitions associated with this task
+     */
+    Collection<TopicPartition> changelogPartitions();
+
+
+    // IQ related methods
+
+    StateStore getStore(final String name);
+
+    /**
+     * @return the offsets of all the changelog partitions associated with this task,
+     *         indicating the current positions of the logged state stores of the task.
+     */
+    Map<TopicPartition, Long> changelogOffsets();

Review comment:
       just curious. the implementation of ```changelogOffsets``` (```StandbyTask``` and ```StreamTask```) return an unmodified view. By contrast, the implementation of ```inputPartitions```/```changelogPartitions``` don't wrap the return object. Is there a reason? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org