You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/11 04:20:08 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8851: MINOR: code cleanup for Kafka Streams task classes

vvcephei commented on a change in pull request #8851:
URL: https://github.com/apache/kafka/pull/8851#discussion_r438533186



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -959,28 +992,23 @@ private void maybeRecordE2ELatency(final long recordTimestamp, final long now, f
         }
     }
 
-    /**
-     * Request committing the current task's state
-     */
-    void requestCommit() {
-        commitRequested = true;
+    public InternalProcessorContext processorContext() {
+        return processorContext;
     }
 
-    /**
-     * Whether or not a request has been made to commit the current state
-     */
-    @Override
-    public boolean commitRequested() {
-        return commitRequested;
+    public boolean hasRecordsQueued() {
+        return numBuffered() > 0;
     }
 
+    // visible for testing

Review comment:
       Can we avoid these comments? I've come across too many cases where it had become untrue recently, to the point where it just seems pointless to have them.
   
   I know you're preserving the comment that was there before while moving the method, but I think we should just clean up the comments as well.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -116,89 +112,100 @@ public boolean isValidTransition(final State newState) {
      */
     void completeRestoration();
 
-    void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records);
-
-    boolean commitNeeded();
-
-    /**
-     * @throws StreamsException fatal error, should close the thread
-     */
-    Map<TopicPartition, OffsetAndMetadata> prepareCommit();
-
-    void postCommit();
-
     void suspend();
 
     /**
-     *
      * @throws StreamsException fatal error, should close the thread
      */
     void resume();
 
-    /**
-     * Must be idempotent.
-     */
+    void closeDirty();
+
     void closeClean();
 
-    /**
-     * Must be idempotent.
-     */
-    void closeDirty();
+
+    // non-idempotent life-cycle methods
 
     /**
-     * Updates input partitions and topology after rebalance
+     * Revive a closed task to a created one; should never throw an exception
      */
-    void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
+    void revive();
 
     /**
      * Attempt a clean close but do not close the underlying state
      */
     void closeAndRecycleState();
 
-    /**
-     * Revive a closed task to a created one; should never throw an exception
-     */
-    void revive();
-
-    StateStore getStore(final String name);
-
-    Set<TopicPartition> inputPartitions();
+    void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);
 
-    /**
-     * @return any changelog partitions associated with this task
-     */
-    Collection<TopicPartition> changelogPartitions();
 
-    /**
-     * @return the offsets of all the changelog partitions associated with this task,
-     *         indicating the current positions of the logged state stores of the task.
-     */
-    Map<TopicPartition, Long> changelogOffsets();
+    // runtime methods (using in RUNNING state)
 
-    void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);
+    void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records);
 
-    default Map<TopicPartition, Long> purgeableOffsets() {
-        return Collections.emptyMap();
+    default boolean process(final long wallClockTime) {
+        return false;
     }
 
     default void recordProcessBatchTime(final long processBatchTime) {}
 
     default void recordProcessTimeRatioAndBufferSize(final long allTaskProcessMs, final long now) {}
 
-    default boolean process(final long wallClockTime) {
+    default boolean maybePunctuateStreamTime() {
         return false;
     }
 
-    default boolean commitRequested() {
+    default boolean maybePunctuateSystemTime() {
         return false;
     }
 
-    default boolean maybePunctuateStreamTime() {
+    boolean commitNeeded();
+
+    default boolean commitRequested() {
         return false;
     }
 
-    default boolean maybePunctuateSystemTime() {
-        return false;
+    /**
+     * @throws StreamsException fatal error, should close the thread
+     */
+    Map<TopicPartition, OffsetAndMetadata> prepareCommit();
+
+    void postCommit();
+
+    default Map<TopicPartition, Long> purgeableOffsets() {
+        return Collections.emptyMap();
     }
 
+
+    // task status inquiry
+
+    TaskId id();
+
+    State state();
+
+    boolean isActive();
+
+    /**
+     * Updates input partitions after a rebalance
+     */
+    void updateInputPartitions(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
+
+    Set<TopicPartition> inputPartitions();
+
+    /**
+     * @return any changelog partitions associated with this task
+     */
+    Collection<TopicPartition> changelogPartitions();
+
+
+    // IQ related methods

Review comment:
       I appreciate that you've taken the time to organize these methods, and that these comments are an attempt to make sure they stay organized, but I'm afraid that they'll just become misleading over time, the way that the "for testing" comments do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org