You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/01 20:29:00 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8962: KAFKA-10166: checkpoint recycled standbys and ignore empty rocksdb base directory

vvcephei commented on a change in pull request #8962:
URL: https://github.com/apache/kafka/pull/8962#discussion_r448598854



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -270,8 +270,11 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                 if (oldTask.isActive()) {
                     final Set<TopicPartition> partitions = standbyTasksToCreate.remove(oldTask.id());
                     newTask = standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, partitions);
+                    cleanUpTaskProducer(oldTask, taskCloseExceptions);
                 } else {
                     oldTask.suspend(); // Only need to suspend transitioning standbys, actives should be suspended already

Review comment:
       This comment makes me a bit twitchy. Can we assert it instead? I mention it because I assume that active tasks also don't need to be committed because it should have happened already. Can we assert that as well?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -136,7 +143,59 @@ private boolean taskDirEmpty(final File taskDir) {
                 !pathname.getName().equals(CHECKPOINT_FILE_NAME));
 
         // if the task is stateless, storeDirs would be null
-        return storeDirs == null || storeDirs.length == 0;
+        if (storeDirs == null || storeDirs.length == 0) {
+            return true;
+        }
+
+        final List<File> baseSubDirectories = new LinkedList<>();
+        for (final File file : storeDirs) {
+            if (file.isDirectory()) {
+                baseSubDirectories.add(file);
+            } else {
+                return false;
+            }
+        }
+
+        for (final File dir : baseSubDirectories) {
+            final boolean isEmpty;
+            if (dir.getName().equals(ROCKSDB_DIRECTORY_NAME)) {
+                isEmpty = taskSubDirectoriesEmpty(dir, true);
+            } else {
+                isEmpty =  taskSubDirectoriesEmpty(dir, false);
+            }
+            if (!isEmpty) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    // BFS through the task directory to look for any files that are not more subdirectories
+    private boolean taskSubDirectoriesEmpty(final File baseDir, final boolean sstOnly) {
+        final Queue<File> subDirectories = new LinkedList<>();
+        subDirectories.offer(baseDir);
+
+        final Set<File> visited = new HashSet<>();
+        while (!subDirectories.isEmpty()) {
+            final File dir = subDirectories.poll();
+            if (!visited.contains(dir)) {
+                final  File[] files = dir.listFiles();
+                if (files == null) {
+                    continue;
+                }
+                for (final File file : files) {
+                    if (file.isDirectory()) {
+                        subDirectories.offer(file);
+                    } else if (sstOnly && file.getName().endsWith(ROCKSDB_SST_SUFFIX)) {

Review comment:
       Thanks for this. What's the impact of the exception that we're avoiding here? It might be better to just not do anything right now than to introduce assumptions about the implementation of the default persistent store implementation here. If those assumptions become false later, it could be pretty bad.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org