You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/21 23:06:10 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

ableegoldman commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r458282421



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -454,6 +456,41 @@ public void flush() {
         }
     }
 
+    public void flushCache() {
+        RuntimeException firstException = null;
+        // attempting to flush the stores
+        if (!stores.isEmpty()) {
+            log.debug("Flushing all store caches registered in the state manager: {}", stores);
+            for (final StateStoreMetadata metadata : stores.values()) {
+                final StateStore store = metadata.stateStore;
+
+                try {
+                    // buffer should be flushed to send all records to changelog
+                    if (store instanceof TimeOrderedKeyValueBuffer) {
+                        store.flush();
+                    } else if (store instanceof CachedStateStore) {
+                        ((CachedStateStore) store).flushCache();
+                    }
+                    log.trace("Flushed cache or buffer {}", store.name());
+                } catch (final RuntimeException exception) {
+                    if (firstException == null) {
+                        // do NOT wrap the error if it is actually caused by Streams itself
+                        if (exception instanceof StreamsException)
+                            firstException = exception;
+                        else
+                            firstException = new ProcessorStateException(

Review comment:
       nit: can you use braces here?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -520,20 +557,27 @@ void transitionTaskType(final TaskType newType, final LogContext logContext) {
         log.debug("Transitioning state manager for {} task {} to {}", oldType, taskId, newType);
     }
 
-    @Override
-    public void checkpoint(final Map<TopicPartition, Long> writtenOffsets) {
-        // first update each state store's current offset, then checkpoint
-        // those stores that are only logged and persistent to the checkpoint file
+    void updateChangelogOffsets(final Map<TopicPartition, Long> writtenOffsets) {
         for (final Map.Entry<TopicPartition, Long> entry : writtenOffsets.entrySet()) {
             final StateStoreMetadata store = findStore(entry.getKey());
 
             if (store != null) {
                 store.setOffset(entry.getValue());
 
                 log.debug("State store {} updated to written offset {} at changelog {}",
-                    store.stateStore.name(), store.offset, store.changelogPartition);
+                        store.stateStore.name(), store.offset, store.changelogPartition);
             }
         }
+    }
+
+    @Override
+    public void checkpoint(final Map<TopicPartition, Long> writtenOffsets) {
+        // first update each state store's current offset, then checkpoint
+        // those stores that are only logged and persistent to the checkpoint file
+        // TODO: we still need to keep it as part of the checkpoint for global tasks; this could be
+        //       removed though when we consolidate global tasks / state managers into this one

Review comment:
       >TODO: we still need to keep it as part of the checkpoint for global tasks
   
   Took me a while to realize that "it" refers to the argument here -- can you clarify the comment?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -466,15 +458,8 @@ public void postCommit() {
                  */
                 partitionGroup.clear();

Review comment:
       This is also maybe beyond the scope of this PR, but it seems like there's no reason to do things like this anymore. Specifically, today we enforce the order `suspend` -> `pre/postCommit` -> `close` where `suspend` only closes the topology and we only use the `SUSPENDED` state to enforce that we suspended before closing. Why not swap the order so that we `pre/postCommit` -> `suspend` -> `close` and then we can move this call from `postCommit` to `suspend` where it makes more sense. Thoughts?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -49,6 +61,31 @@
         this.stateDirectory = stateDirectory;
     }
 
+    protected void initializeCheckpoint() {
+        // we will delete the local checkpoint file after registering the state stores and loading them into the
+        // state manager, therefore we should initialize the snapshot as empty to indicate over-write checkpoint needed
+        offsetSnapshotSinceLastFlush = Collections.emptyMap();
+    }
+
+    /**
+     * The following exceptions maybe thrown from the state manager flushing call
+     *
+     * @throws TaskMigratedException recoverable error sending changelog records that would cause the task to be removed
+     * @throws StreamsException fatal error when flushing the state store, for example sending changelog records failed
+     *                          or flushing state store get IO errors; such error should cause the thread to die
+     */
+    protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
+        final Map<TopicPartition, Long> offsetSnapshot = stateMgr.changelogOffsets();
+        if (StateManagerUtil.checkpointNeeded(enforceCheckpoint, offsetSnapshotSinceLastFlush, offsetSnapshot)) {
+            // since there's no written offsets we can checkpoint with empty map,
+            // and the state's current offset would be used to checkpoint
+            stateMgr.flush();
+            stateMgr.checkpoint(Collections.emptyMap());

Review comment:
       Should we add an assertion to `ProcessorStateManager#checkpoint` that the passed in offsets are always empty?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+    static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
 
     private StateManagerUtil() {}
 
     static RecordConverter converterForStore(final StateStore store) {
         return isTimestamped(store) ? rawValueToTimestampedValue() : identity();
     }
 
+    static boolean checkpointNeeded(final boolean enforceCheckpoint,
+                                    final Map<TopicPartition, Long> oldOffsetSnapshot,
+                                    final Map<TopicPartition, Long> newOffsetSnapshot) {
+        // we should always have the old snapshot post completing the register state stores;
+        // if it is null it means the registration is not done and hence we should not overwrite the checkpoint
+        if (oldOffsetSnapshot == null)
+            return false;
+
+        // if the previous snapshot is empty while the current snapshot is not then we should always checkpoint;
+        // note if the task is stateless or stateful but no stores logged, the snapshot would also be empty
+        // and hence it's okay to not checkpoint
+        if (oldOffsetSnapshot.isEmpty() && !newOffsetSnapshot.isEmpty())
+            return true;
+
+        // we can checkpoint if the the difference between the current and the previous snapshot is large enough
+        long totalOffsetDelta = 0L;
+        for (final Map.Entry<TopicPartition, Long> entry : newOffsetSnapshot.entrySet()) {
+            totalOffsetDelta += Math.abs(oldOffsetSnapshot.getOrDefault(entry.getKey(), 0L) - entry.getValue());
+        }
+
+        // when enforcing checkpoint is required, we should overwrite the checkpoint if it is different from the old one;
+        // otherwise, we only overwrite the checkpoint if it is largely different from the old one
+        return enforceCheckpoint ? totalOffsetDelta > 0 : totalOffsetDelta > OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;

Review comment:
       Is this out of date? It seems like we now never checkpoint during suspension so we don't have to bother with this optimization

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -120,14 +120,12 @@ public void suspend() {
         switch (state()) {
             case CREATED:
                 log.info("Suspended created");
-                checkpointNeededForSuspended = false;

Review comment:
       Happy to see this go 🙂 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -93,8 +93,8 @@ public boolean isActive() {
     public void initializeIfNeeded() {
         if (state() == State.CREATED) {
             StateManagerUtil.registerStateStores(log, logPrefix, topology, stateMgr, stateDirectory, processorContext);
-
             // initialize the snapshot with the current offsets as we don't need to commit then until they change

Review comment:
       Can you move this comment down a line? We should avoid confusion since we actually don't initialize the (flush) snapshot with the current offsets, just the (commit) snapshot.
   
   To be honest, it's already pretty confusing that we initialize the two snapshots differently. Maybe you could add a quick sentence explaining why for our own future reference

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -243,18 +242,24 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
 
         for (final Task task : tasksToClose) {
             try {
-                if (task.isActive()) {
-                    // Active tasks are revoked and suspended/committed during #handleRevocation
-                    if (!task.state().equals(State.SUSPENDED)) {
-                        log.error("Active task {} should be suspended prior to attempting to close but was in {}",
-                                  task.id(), task.state());
-                        throw new IllegalStateException("Active task " + task.id() + " should have been suspended");
-                    }
-                } else {
-                    task.suspend();
-                    task.prepareCommit();
-                    task.postCommit();
+                // Always try to first suspend and commit the task before closing it;
+                // some tasks may already be suspended which should be a no-op.
+                //
+                // Also since active tasks should already be suspended / committed and
+                // standby tasks should have no offsets to commit, we should expect nothing to commit
+                task.suspend();
+
+                final Map<TopicPartition, OffsetAndMetadata> offsets = task.prepareCommit();
+
+                if (!offsets.isEmpty()) {
+                    log.error("Task {} should has been committed prior to attempting to close, but it reports non-empty offsets {} to commit",

Review comment:
       Hm. What if we hit a TaskMigratedException during `handleRevocation`? We would never finish committing them so `commitNeeded` would still return true and `prepareCommit` would return non-empty offsets right?
   
   It's kind of a bummer that we can't enforce that the task was committed. What we really need to do is enforce that we _attempted_ to commit the task -- regardless of whether or not it was successful. If the commit failed we know that either it was fatal or it was due to TaskMigrated, in which case the task will have to be closed as dirty anyways.
   This might be beyond the scope of this PR, but just to throw out one hacky idea we could add a `commitSuccessful` parameter to `postCommit` and then always invoke that after a commit so that `commitNeeded` is set to false. (If `commitSuccessful` is false we just skip everything else in `postCommit`)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -542,13 +530,22 @@ public void closeCleanAndRecycleState() {
         log.info("Closed clean and recycled state");
     }
 
-    private void writeCheckpoint() {
+    /**
+     * The following exceptions maybe thrown from the state manager flushing call
+     *
+     * @throws TaskMigratedException recoverable error sending changelog records that would cause the task to be removed
+     * @throws StreamsException fatal error when flushing the state store, for example sending changelog records failed
+     *                          or flushing state store get IO errors; such error should cause the thread to die
+     */
+    @Override
+    protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {

Review comment:
       Was there a reason to not just add `#updateChangelogOffsets` to the `StateManager` interface and remove the checkpointable offsets argument from `#checkpoint`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org