You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/01 00:03:51 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #8962: KAFKA-10166: checkpoint recycled standbys and ignore empty rocksdb base directory

ableegoldman commented on a change in pull request #8962:
URL: https://github.com/apache/kafka/pull/8962#discussion_r448044329



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -136,7 +143,59 @@ private boolean taskDirEmpty(final File taskDir) {
                 !pathname.getName().equals(CHECKPOINT_FILE_NAME));
 
         // if the task is stateless, storeDirs would be null
-        return storeDirs == null || storeDirs.length == 0;
+        if (storeDirs == null || storeDirs.length == 0) {
+            return true;
+        }
+
+        final List<File> baseSubDirectories = new LinkedList<>();
+        for (final File file : storeDirs) {
+            if (file.isDirectory()) {
+                baseSubDirectories.add(file);
+            } else {
+                return false;
+            }
+        }
+
+        for (final File dir : baseSubDirectories) {
+            final boolean isEmpty;
+            if (dir.getName().equals(ROCKSDB_DIRECTORY_NAME)) {
+                isEmpty = taskSubDirectoriesEmpty(dir, true);
+            } else {
+                isEmpty =  taskSubDirectoriesEmpty(dir, false);
+            }
+            if (!isEmpty) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    // BFS through the task directory to look for any files that are not more subdirectories
+    private boolean taskSubDirectoriesEmpty(final File baseDir, final boolean sstOnly) {
+        final Queue<File> subDirectories = new LinkedList<>();
+        subDirectories.offer(baseDir);
+
+        final Set<File> visited = new HashSet<>();
+        while (!subDirectories.isEmpty()) {
+            final File dir = subDirectories.poll();
+            if (!visited.contains(dir)) {
+                final  File[] files = dir.listFiles();
+                if (files == null) {
+                    continue;
+                }
+                for (final File file : files) {
+                    if (file.isDirectory()) {
+                        subDirectories.offer(file);
+                    } else if (sstOnly && file.getName().endsWith(ROCKSDB_SST_SUFFIX)) {

Review comment:
       This is admittedly quite hacky, and of course does not solve the problem for custom state stores that might write some non-data files upon open. A "better" fix would probably be to write some sentinel value in the checkpoint ala `OFFSET_UNKNOWN`, so we do have an entry in there if the store was opened but does not yet have any data.
   
   But, I wanted to keep things simple (a very relative term here, I know) and low-risk before the 2.6 release. We can discuss better solutions once we're not at the doorstep of the release (and blocking the door, I might add)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org