You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/02 17:33:08 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

ableegoldman commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434051190



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -247,6 +247,9 @@ private void close(final boolean clean) {
                 "state manager close",
                 log
             );
+        } else if (state() == State.CLOSED) {

Review comment:
       Should we switch to `switch` here as well? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -218,6 +218,10 @@ public void initializeIfNeeded() {
      */
     @Override
     public void completeRestoration() {
+        if (state() == State.RUNNING) {
+            return;
+        }
+

Review comment:
       Can we use if/ else if here for consistency?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -466,6 +510,11 @@ public void closeAndRecycleState() {
                 stateMgr.recycle();
                 recordCollector.close();
                 break;
+
+            case CLOSED:
+                log.trace("Skip close since state is {}", state());

Review comment:
       I think this might be one of those exceptions where we should still enforce that the state is not `CLOSED` (ie throw `IllegalStateException`) since there are related actions that occur outside of the Task implementation that will fail if we try to recycle a CLOSED task. Similar to prepare/post commit, resume, etc

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -390,19 +387,17 @@ boolean tryToCompleteRestoration() {
 
         final List<Task> restoringTasks = new LinkedList<>();
         for (final Task task : tasks.values()) {
-            if (task.state() == CREATED) {
-                try {
-                    task.initializeIfNeeded();
-                } catch (final LockException | TimeoutException e) {
-                    // it is possible that if there are multiple threads within the instance that one thread
-                    // trying to grab the task from the other, while the other has not released the lock since
-                    // it did not participate in the rebalance. In this case we can just retry in the next iteration
-                    log.debug("Could not initialize {} due to the following exception; will retry", task.id(), e);
-                    allRunning = false;
-                }
+            try {
+                task.initializeIfNeeded();
+            } catch (final LockException | TimeoutException e) {
+                // it is possible that if there are multiple threads within the instance that one thread
+                // trying to grab the task from the other, while the other has not released the lock since
+                // it did not participate in the rebalance. In this case we can just retry in the next iteration
+                log.debug("Could not initialize {} due to the following exception; will retry", task.id(), e);
+                allRunning = false;
             }
 
-            if (task.state() == RESTORING) {
+            if (task.isActive()) {

Review comment:
       Can we add a comment or rename `restoringTasks` to clarify that it's ok to put an active-but-not-restoring task in here since `Task#completeRestoration` is idempotent/no-op for RUNNING tasks?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -218,6 +218,10 @@ public void initializeIfNeeded() {
      */
     @Override
     public void completeRestoration() {
+        if (state() == State.RUNNING) {
+            return;
+        }
+
         if (state() == State.RESTORING) {
             initializeMetadata();
             initializeTopology();

Review comment:
       github won't let me leave a comment below this line, but can we use the `"Illegal state"`/`"Unknown state"` improvement in this method as well?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -247,6 +247,9 @@ private void close(final boolean clean) {
                 "state manager close",
                 log
             );
+        } else if (state() == State.CLOSED) {
+            log.trace("Skip closing since state is {}", state());
+            return;
         } else {
             throw new IllegalStateException("Illegal state " + state() + " while closing standby task " + id);

Review comment:
       `Illegal state` -> `Unknown state`? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map<TopicPartition, Lo
         for (final Map.Entry<TopicPartition, Long> changelogEntry : changelogOffsets.entrySet()) {
             final long offset = changelogEntry.getValue();
 
-            offsetSum += offset;
-            if (offsetSum < 0) {
-                log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id);
-                return Long.MAX_VALUE;
+            if (offset == Task.LATEST_OFFSET) {

Review comment:
       nice




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org