You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/11 04:15:52 UTC

[GitHub] [kafka] mjsax opened a new pull request #8852: MINOR: code cleanup for Kafka Streams task classes

mjsax opened a new pull request #8852:
URL: https://github.com/apache/kafka/pull/8852


   Not functional change. Pure code cleanup
   
   Call for review @vvcephei


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8852: MINOR: code cleanup for Kafka Streams task classes

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8852:
URL: https://github.com/apache/kafka/pull/8852#issuecomment-642966020


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8852: MINOR: code cleanup for Kafka Streams task classes

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8852:
URL: https://github.com/apache/kafka/pull/8852#discussion_r438533205



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -16,82 +16,85 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.List;
-import java.util.Map;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import org.slf4j.Logger;
 
 import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED;
 import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
 
 public abstract class AbstractTask implements Task {
     private Task.State state = CREATED;
-    protected Set<TopicPartition> inputPartitions;
-    protected ProcessorTopology topology;
 
     protected final TaskId id;
+    protected final ProcessorTopology topology;
     protected final StateDirectory stateDirectory;
     protected final ProcessorStateManager stateMgr;
 
+    protected Set<TopicPartition> inputPartitions;
+
     AbstractTask(final TaskId id,
                  final ProcessorTopology topology,
                  final StateDirectory stateDirectory,
                  final ProcessorStateManager stateMgr,
                  final Set<TopicPartition> inputPartitions) {
         this.id = id;
-        this.stateMgr = stateMgr;
         this.topology = topology;
-        this.inputPartitions = inputPartitions;
+        this.stateMgr = stateMgr;
         this.stateDirectory = stateDirectory;
+        this.inputPartitions = inputPartitions;

Review comment:
       Align assignment to parameter order.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8852: MINOR: code cleanup for Kafka Streams task classes

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8852:
URL: https://github.com/apache/kafka/pull/8852#issuecomment-642485008


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8852: MINOR: code cleanup for Kafka Streams task classes

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8852:
URL: https://github.com/apache/kafka/pull/8852#issuecomment-643545832


   Seem `trunk` is broker atm. We can retry when fixed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax edited a comment on pull request #8852: MINOR: code cleanup for Kafka Streams task classes

Posted by GitBox <gi...@apache.org>.
mjsax edited a comment on pull request #8852:
URL: https://github.com/apache/kafka/pull/8852#issuecomment-643545832


   Seem `trunk` is broker atm. Will rebase the PR when fixed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #8852: MINOR: code cleanup for Kafka Streams task classes

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #8852:
URL: https://github.com/apache/kafka/pull/8852#discussion_r465229314



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -116,89 +112,100 @@ public boolean isValidTransition(final State newState) {
      */
     void completeRestoration();
 
-    void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records);
-
-    boolean commitNeeded();
-
-    /**
-     * @throws StreamsException fatal error, should close the thread
-     */
-    Map<TopicPartition, OffsetAndMetadata> prepareCommit();
-
-    void postCommit();
-
     void suspend();
 
     /**
-     *
      * @throws StreamsException fatal error, should close the thread
      */
     void resume();
 
-    /**
-     * Must be idempotent.
-     */
+    void closeDirty();
+
     void closeClean();
 
-    /**
-     * Must be idempotent.
-     */
-    void closeDirty();
+
+    // non-idempotent life-cycle methods
 
     /**
-     * Updates input partitions and topology after rebalance
+     * Revive a closed task to a created one; should never throw an exception
      */
-    void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
+    void revive();
 
     /**
      * Attempt a clean close but do not close the underlying state
      */
     void closeAndRecycleState();
 
-    /**
-     * Revive a closed task to a created one; should never throw an exception
-     */
-    void revive();
-
-    StateStore getStore(final String name);
-
-    Set<TopicPartition> inputPartitions();
+    void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);
 
-    /**
-     * @return any changelog partitions associated with this task
-     */
-    Collection<TopicPartition> changelogPartitions();
 
-    /**
-     * @return the offsets of all the changelog partitions associated with this task,
-     *         indicating the current positions of the logged state stores of the task.
-     */
-    Map<TopicPartition, Long> changelogOffsets();
+    // runtime methods (using in RUNNING state)
 
-    void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);
+    void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records);
 
-    default Map<TopicPartition, Long> purgeableOffsets() {
-        return Collections.emptyMap();
+    default boolean process(final long wallClockTime) {
+        return false;
     }
 
     default void recordProcessBatchTime(final long processBatchTime) {}
 
     default void recordProcessTimeRatioAndBufferSize(final long allTaskProcessMs, final long now) {}
 
-    default boolean process(final long wallClockTime) {
+    default boolean maybePunctuateStreamTime() {
         return false;
     }
 
-    default boolean commitRequested() {
+    default boolean maybePunctuateSystemTime() {
         return false;
     }
 
-    default boolean maybePunctuateStreamTime() {
+    boolean commitNeeded();
+
+    default boolean commitRequested() {
         return false;
     }
 
-    default boolean maybePunctuateSystemTime() {
-        return false;
+    /**
+     * @throws StreamsException fatal error, should close the thread
+     */
+    Map<TopicPartition, OffsetAndMetadata> prepareCommit();
+
+    void postCommit();
+
+    default Map<TopicPartition, Long> purgeableOffsets() {
+        return Collections.emptyMap();
     }
 
+
+    // task status inquiry
+
+    TaskId id();
+
+    State state();
+
+    boolean isActive();
+
+    /**
+     * Updates input partitions after a rebalance
+     */
+    void updateInputPartitions(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
+
+    Set<TopicPartition> inputPartitions();
+
+    /**
+     * @return any changelog partitions associated with this task
+     */
+    Collection<TopicPartition> changelogPartitions();
+
+
+    // IQ related methods
+
+    StateStore getStore(final String name);
+
+    /**
+     * @return the offsets of all the changelog partitions associated with this task,
+     *         indicating the current positions of the logged state stores of the task.
+     */
+    Map<TopicPartition, Long> changelogOffsets();

Review comment:
       just curious. the implementation of ```changelogOffsets``` (```StandbyTask``` and ```StreamTask```) return an unmodified view. By contrast, the implementation of ```inputPartitions```/```changelogPartitions``` don't wrap the return object. Is there a reason? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #8852: MINOR: code cleanup for Kafka Streams task classes

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #8852:
URL: https://github.com/apache/kafka/pull/8852#discussion_r550378570



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -116,89 +112,100 @@ public boolean isValidTransition(final State newState) {
      */
     void completeRestoration();
 
-    void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records);
-
-    boolean commitNeeded();
-
-    /**
-     * @throws StreamsException fatal error, should close the thread
-     */
-    Map<TopicPartition, OffsetAndMetadata> prepareCommit();
-
-    void postCommit();
-
     void suspend();
 
     /**
-     *
      * @throws StreamsException fatal error, should close the thread
      */
     void resume();
 
-    /**
-     * Must be idempotent.
-     */
+    void closeDirty();
+
     void closeClean();
 
-    /**
-     * Must be idempotent.
-     */
-    void closeDirty();
+
+    // non-idempotent life-cycle methods
 
     /**
-     * Updates input partitions and topology after rebalance
+     * Revive a closed task to a created one; should never throw an exception
      */
-    void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
+    void revive();
 
     /**
      * Attempt a clean close but do not close the underlying state
      */
     void closeAndRecycleState();
 
-    /**
-     * Revive a closed task to a created one; should never throw an exception
-     */
-    void revive();
-
-    StateStore getStore(final String name);
-
-    Set<TopicPartition> inputPartitions();
+    void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);
 
-    /**
-     * @return any changelog partitions associated with this task
-     */
-    Collection<TopicPartition> changelogPartitions();
 
-    /**
-     * @return the offsets of all the changelog partitions associated with this task,
-     *         indicating the current positions of the logged state stores of the task.
-     */
-    Map<TopicPartition, Long> changelogOffsets();
+    // runtime methods (using in RUNNING state)
 
-    void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);
+    void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records);
 
-    default Map<TopicPartition, Long> purgeableOffsets() {
-        return Collections.emptyMap();
+    default boolean process(final long wallClockTime) {
+        return false;
     }
 
     default void recordProcessBatchTime(final long processBatchTime) {}
 
     default void recordProcessTimeRatioAndBufferSize(final long allTaskProcessMs, final long now) {}
 
-    default boolean process(final long wallClockTime) {
+    default boolean maybePunctuateStreamTime() {
         return false;
     }
 
-    default boolean commitRequested() {
+    default boolean maybePunctuateSystemTime() {
         return false;
     }
 
-    default boolean maybePunctuateStreamTime() {
+    boolean commitNeeded();
+
+    default boolean commitRequested() {
         return false;
     }
 
-    default boolean maybePunctuateSystemTime() {
-        return false;
+    /**
+     * @throws StreamsException fatal error, should close the thread
+     */
+    Map<TopicPartition, OffsetAndMetadata> prepareCommit();
+
+    void postCommit();
+
+    default Map<TopicPartition, Long> purgeableOffsets() {
+        return Collections.emptyMap();
     }
 
+
+    // task status inquiry
+
+    TaskId id();
+
+    State state();
+
+    boolean isActive();
+
+    /**
+     * Updates input partitions after a rebalance
+     */
+    void updateInputPartitions(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
+
+    Set<TopicPartition> inputPartitions();
+
+    /**
+     * @return any changelog partitions associated with this task
+     */
+    Collection<TopicPartition> changelogPartitions();
+
+
+    // IQ related methods
+
+    StateStore getStore(final String name);
+
+    /**
+     * @return the offsets of all the changelog partitions associated with this task,
+     *         indicating the current positions of the logged state stores of the task.
+     */
+    Map<TopicPartition, Long> changelogOffsets();

Review comment:
       @mjsax Got it. Thanks for your response!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8852: MINOR: code cleanup for Kafka Streams task classes

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8852:
URL: https://github.com/apache/kafka/pull/8852#discussion_r438533429



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -97,13 +97,9 @@ public boolean isValidTransition(final State newState) {
         }
     }
 
-    TaskId id();

Review comment:
       Group and order methods (compare in-line comments). -- Sub-classed inherit this ordering.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -97,13 +97,9 @@ public boolean isValidTransition(final State newState) {
         }
     }
 
-    TaskId id();
 
-    State state();
 
-    boolean isActive();
-
-    boolean isClosed();

Review comment:
       This method is unused and removed. That is the only actual change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8852: MINOR: code cleanup for Kafka Streams task classes

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8852:
URL: https://github.com/apache/kafka/pull/8852#discussion_r438533374



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -57,30 +59,31 @@
      * @param stateDirectory the {@link StateDirectory} created by the thread
      */
     StandbyTask(final TaskId id,
-                final Set<TopicPartition> partitions,
                 final ProcessorTopology topology,
-                final StreamsConfig config,
-                final StreamsMetricsImpl metrics,
-                final ProcessorStateManager stateMgr,
                 final StateDirectory stateDirectory,
+                final ProcessorStateManager stateMgr,
+                final Set<TopicPartition> partitions,
+                final StreamsConfig config,
+                final InternalProcessorContext processorContext,
                 final ThreadCache cache,
-                final InternalProcessorContext processorContext) {
+                final StreamsMetricsImpl metrics) {
         super(id, topology, stateDirectory, stateMgr, partitions);

Review comment:
       Put "super parameter first" in constructor list




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8852: MINOR: code cleanup for Kafka Streams task classes

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8852:
URL: https://github.com/apache/kafka/pull/8852#discussion_r438533174



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -16,82 +16,85 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.List;
-import java.util.Map;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import org.slf4j.Logger;
 
 import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED;
 import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
 
 public abstract class AbstractTask implements Task {
     private Task.State state = CREATED;
-    protected Set<TopicPartition> inputPartitions;
-    protected ProcessorTopology topology;
 
     protected final TaskId id;
+    protected final ProcessorTopology topology;
     protected final StateDirectory stateDirectory;
     protected final ProcessorStateManager stateMgr;
 
+    protected Set<TopicPartition> inputPartitions;

Review comment:
       Align members to constructor parameter order, and group final / mutable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8852: MINOR: code cleanup for Kafka Streams task classes

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8852:
URL: https://github.com/apache/kafka/pull/8852#discussion_r438533555



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -116,89 +112,100 @@ public boolean isValidTransition(final State newState) {
      */
     void completeRestoration();
 
-    void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records);
-
-    boolean commitNeeded();
-
-    /**
-     * @throws StreamsException fatal error, should close the thread
-     */
-    Map<TopicPartition, OffsetAndMetadata> prepareCommit();
-
-    void postCommit();
-
     void suspend();
 
     /**
-     *
      * @throws StreamsException fatal error, should close the thread
      */
     void resume();
 
-    /**
-     * Must be idempotent.
-     */
+    void closeDirty();
+
     void closeClean();
 
-    /**
-     * Must be idempotent.
-     */
-    void closeDirty();
+
+    // non-idempotent life-cycle methods
 
     /**
-     * Updates input partitions and topology after rebalance
+     * Revive a closed task to a created one; should never throw an exception
      */
-    void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
+    void revive();
 
     /**
      * Attempt a clean close but do not close the underlying state
      */
     void closeAndRecycleState();
 
-    /**
-     * Revive a closed task to a created one; should never throw an exception
-     */
-    void revive();
-
-    StateStore getStore(final String name);
-
-    Set<TopicPartition> inputPartitions();
+    void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);
 
-    /**
-     * @return any changelog partitions associated with this task
-     */
-    Collection<TopicPartition> changelogPartitions();
 
-    /**
-     * @return the offsets of all the changelog partitions associated with this task,
-     *         indicating the current positions of the logged state stores of the task.
-     */
-    Map<TopicPartition, Long> changelogOffsets();
+    // runtime methods (using in RUNNING state)
 
-    void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);
+    void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records);
 
-    default Map<TopicPartition, Long> purgeableOffsets() {
-        return Collections.emptyMap();
+    default boolean process(final long wallClockTime) {
+        return false;
     }
 
     default void recordProcessBatchTime(final long processBatchTime) {}
 
     default void recordProcessTimeRatioAndBufferSize(final long allTaskProcessMs, final long now) {}
 
-    default boolean process(final long wallClockTime) {
+    default boolean maybePunctuateStreamTime() {
         return false;
     }
 
-    default boolean commitRequested() {
+    default boolean maybePunctuateSystemTime() {
         return false;
     }
 
-    default boolean maybePunctuateStreamTime() {
+    boolean commitNeeded();
+
+    default boolean commitRequested() {
         return false;
     }
 
-    default boolean maybePunctuateSystemTime() {
-        return false;
+    /**
+     * @throws StreamsException fatal error, should close the thread
+     */
+    Map<TopicPartition, OffsetAndMetadata> prepareCommit();
+
+    void postCommit();
+
+    default Map<TopicPartition, Long> purgeableOffsets() {
+        return Collections.emptyMap();
     }
 
+
+    // task status inquiry
+
+    TaskId id();
+
+    State state();
+
+    boolean isActive();
+
+    /**
+     * Updates input partitions after a rebalance
+     */
+    void updateInputPartitions(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);

Review comment:
       Renamed from `update` to `updateInputPartitions`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8852: MINOR: code cleanup for Kafka Streams task classes

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8852:
URL: https://github.com/apache/kafka/pull/8852#discussion_r550337598



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -116,89 +112,100 @@ public boolean isValidTransition(final State newState) {
      */
     void completeRestoration();
 
-    void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records);
-
-    boolean commitNeeded();
-
-    /**
-     * @throws StreamsException fatal error, should close the thread
-     */
-    Map<TopicPartition, OffsetAndMetadata> prepareCommit();
-
-    void postCommit();
-
     void suspend();
 
     /**
-     *
      * @throws StreamsException fatal error, should close the thread
      */
     void resume();
 
-    /**
-     * Must be idempotent.
-     */
+    void closeDirty();
+
     void closeClean();
 
-    /**
-     * Must be idempotent.
-     */
-    void closeDirty();
+
+    // non-idempotent life-cycle methods
 
     /**
-     * Updates input partitions and topology after rebalance
+     * Revive a closed task to a created one; should never throw an exception
      */
-    void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
+    void revive();
 
     /**
      * Attempt a clean close but do not close the underlying state
      */
     void closeAndRecycleState();
 
-    /**
-     * Revive a closed task to a created one; should never throw an exception
-     */
-    void revive();
-
-    StateStore getStore(final String name);
-
-    Set<TopicPartition> inputPartitions();
+    void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);
 
-    /**
-     * @return any changelog partitions associated with this task
-     */
-    Collection<TopicPartition> changelogPartitions();
 
-    /**
-     * @return the offsets of all the changelog partitions associated with this task,
-     *         indicating the current positions of the logged state stores of the task.
-     */
-    Map<TopicPartition, Long> changelogOffsets();
+    // runtime methods (using in RUNNING state)
 
-    void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);
+    void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records);
 
-    default Map<TopicPartition, Long> purgeableOffsets() {
-        return Collections.emptyMap();
+    default boolean process(final long wallClockTime) {
+        return false;
     }
 
     default void recordProcessBatchTime(final long processBatchTime) {}
 
     default void recordProcessTimeRatioAndBufferSize(final long allTaskProcessMs, final long now) {}
 
-    default boolean process(final long wallClockTime) {
+    default boolean maybePunctuateStreamTime() {
         return false;
     }
 
-    default boolean commitRequested() {
+    default boolean maybePunctuateSystemTime() {
         return false;
     }
 
-    default boolean maybePunctuateStreamTime() {
+    boolean commitNeeded();
+
+    default boolean commitRequested() {
         return false;
     }
 
-    default boolean maybePunctuateSystemTime() {
-        return false;
+    /**
+     * @throws StreamsException fatal error, should close the thread
+     */
+    Map<TopicPartition, OffsetAndMetadata> prepareCommit();
+
+    void postCommit();
+
+    default Map<TopicPartition, Long> purgeableOffsets() {
+        return Collections.emptyMap();
     }
 
+
+    // task status inquiry
+
+    TaskId id();
+
+    State state();
+
+    boolean isActive();
+
+    /**
+     * Updates input partitions after a rebalance
+     */
+    void updateInputPartitions(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
+
+    Set<TopicPartition> inputPartitions();
+
+    /**
+     * @return any changelog partitions associated with this task
+     */
+    Collection<TopicPartition> changelogPartitions();
+
+
+    // IQ related methods
+
+    StateStore getStore(final String name);
+
+    /**
+     * @return the offsets of all the changelog partitions associated with this task,
+     *         indicating the current positions of the logged state stores of the task.
+     */
+    Map<TopicPartition, Long> changelogOffsets();

Review comment:
       Just cycling back to this PR, that I will break into multiple smaller ones now...
   
   About your question: not totally sure atm. I guess the point is, that `inputPartitions` and `changelogPartitions` don't change and we know that they are always used read-only. In contrast the changelog offset map might change, and thus we put a guard in place that the offsets are not modified from "outside".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org