You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2023/03/22 17:03:38 UTC

[samza] branch master updated: Changed uses of Apache Commons IO FileUtils.deleteDirectory to PathUtils.deleteDirectory (#1658)

This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 985a6048c Changed uses of Apache Commons IO FileUtils.deleteDirectory to PathUtils.deleteDirectory (#1658)
985a6048c is described below

commit 985a6048c1e589d0b3af5d0040f26b64741c9260
Author: Prateek Maheshwari <pr...@utexas.edu>
AuthorDate: Wed Mar 22 10:03:31 2023 -0700

    Changed uses of Apache Commons IO FileUtils.deleteDirectory to PathUtils.deleteDirectory (#1658)
    
    Symptom: Blob store state restore fails when verifying restored directory contents due to a mismatch between permissions of local vs remote files.
    
    Cause: Apache Commons 2.11 (at least, more versions may be affected) FileUtils#deleteDirectory() has a bug where it changes the permissions of symlink/hardlink files to read-only after deletion. For more details, see https://issues.apache.org/jira/browse/IO-751.
    
    Changes: This PR changes all uses of FileUtils#deleteDirectory to PathUtils#deleteDirectory() which is not affected.
---
 .../org/apache/samza/storage/TaskStorageCommitManager.java  |  6 ++++--
 .../samza/storage/blobstore/BlobStoreRestoreManager.java    | 10 +++++++---
 .../samza/storage/blobstore/TestBlobStoreBackupManager.java | 11 ++++++++---
 .../java/org/apache/samza/monitor/LocalStoreMonitor.java    |  6 ++++--
 .../org/apache/samza/monitor/TestLocalStoreMonitor.java     | 13 ++++++++++---
 .../storage/kv/BlobStoreStateBackendIntegrationTest.java    | 12 ++++++++++--
 .../kv/KafkaNonTransactionalStateIntegrationTest.java       | 10 +++++++++-
 .../storage/kv/KafkaTransactionalStateIntegrationTest.java  | 10 +++++++++-
 8 files changed, 61 insertions(+), 17 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java b/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
index dca0fa2b1..59b010172 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
@@ -33,7 +33,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.file.PathUtils;
 import org.apache.commons.io.filefilter.WildcardFileFilter;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
@@ -286,7 +286,9 @@ public class TaskStorageCommitManager {
             for (File checkpointDir : checkpointDirs) {
               if (!checkpointDir.getName().contains(latestCheckpointId.serialize())) {
                 try {
-                  FileUtils.deleteDirectory(checkpointDir);
+                  if (checkpointDir.exists() && checkpointDir.isDirectory()) {
+                    PathUtils.deleteDirectory(checkpointDir.toPath());
+                  }
                 } catch (IOException e) {
                   throw new SamzaException(
                       String.format("Unable to delete checkpoint directory: %s", checkpointDir.getName()), e);
diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
index bde659e78..5010fd0c6 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
@@ -35,7 +35,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutorService;
-import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.file.PathUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.SamzaException;
 import org.apache.samza.checkpoint.Checkpoint;
@@ -247,7 +247,9 @@ public class BlobStoreRestoreManager implements TaskRestoreManager {
       try {
         LOG.debug("Deleting local store directory: {}. Will be restored from local store checkpoint directory " +
             "or remote snapshot.", storeDir);
-        FileUtils.deleteDirectory(storeDir);
+        if (storeDir.exists() && storeDir.isDirectory()) {
+          PathUtils.deleteDirectory(storeDir.toPath());
+        }
       } catch (IOException e) {
         throw new SamzaException(String.format("Error deleting store directory: %s", storeDir), e);
       }
@@ -356,7 +358,9 @@ public class BlobStoreRestoreManager implements TaskRestoreManager {
           loggedBaseDir, storeName, taskName, TaskMode.Active);
       for (File checkpointDir: checkpointDirs) {
         LOG.debug("Deleting local store checkpoint directory: {} before restore.", checkpointDir);
-        FileUtils.deleteDirectory(checkpointDir);
+        if (checkpointDir.exists() && checkpointDir.isDirectory()) {
+          PathUtils.deleteDirectory(checkpointDir.toPath());
+        }
       }
     } catch (Exception e) {
       throw new SamzaException(
diff --git a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
index 803bf677a..7a0cdc0a3 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
@@ -25,6 +25,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -39,7 +40,7 @@ import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
-import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.file.PathUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.SamzaException;
 import org.apache.samza.checkpoint.Checkpoint;
@@ -266,7 +267,9 @@ public class TestBlobStoreBackupManager {
     // cleanup
     checkpointDirsToClean.forEach(path -> {
       try {
-        FileUtils.deleteDirectory(new File(path));
+        if (Files.exists(Paths.get(path)) && Files.isDirectory(Paths.get(path))) {
+          PathUtils.deleteDirectory(Paths.get(path));
+        }
       } catch (IOException exception) {
         Assert.fail("Failed to cleanup temporary checkpoint dirs.");
       }
@@ -369,7 +372,9 @@ public class TestBlobStoreBackupManager {
     // cleanup
     checkpointDirsToClean.forEach(path -> {
       try {
-        FileUtils.deleteDirectory(new File(path));
+        if (Files.exists(Paths.get(path)) && Files.isDirectory(Paths.get(path))) {
+          PathUtils.deleteDirectory(Paths.get(path));
+        }
       } catch (IOException exception) {
         Assert.fail("Failed to cleanup temporary checkpoint dirs.");
       }
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
index ebf81f2d4..f3f367c86 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
@@ -25,8 +25,8 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.commons.io.file.PathUtils;
 import org.apache.commons.io.filefilter.DirectoryFileFilter;
-import org.apache.commons.io.FileUtils;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.rest.model.JobStatus;
@@ -162,7 +162,9 @@ public class LocalStoreMonitor implements Monitor {
     if (!offsetFile.exists()) {
       LOG.info("Deleting the task store: {}, since it has no offset file.", taskStorePath);
       long taskStoreSizeInBytes = taskStoreDir.getTotalSpace();
-      FileUtils.deleteDirectory(taskStoreDir);
+      if (taskStoreDir.exists() && taskStoreDir.isDirectory()) {
+        PathUtils.deleteDirectory(taskStoreDir.toPath());
+      }
       localStoreMonitorMetrics.diskSpaceFreedInBytes.inc(taskStoreSizeInBytes);
       localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.inc();
     } else if ((CLOCK.currentTimeMillis() - offsetFile.lastModified()) >= config.getOffsetFileTTL()) {
diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java b/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
index 972598f86..ce25565ed 100644
--- a/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
+++ b/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.file.PathUtils;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.rest.model.JobStatus;
 import org.apache.samza.rest.model.Task;
@@ -97,7 +98,9 @@ public class TestLocalStoreMonitor {
   public void cleanUp() {
     // Clean up the entire temp local store directory and all files underneath it.
     try {
-      FileUtils.deleteDirectory(localStoreDir);
+      if (localStoreDir.exists() && localStoreDir.isDirectory()) {
+        PathUtils.deleteDirectory(localStoreDir.toPath());
+      }
     } catch (IOException e) {
       // Happens when task store can't be deleted after test finishes.
       LOG.error("Deletion of directory: {} resulted in the exception: {}.", new Object[]{localStoreDir, e});
@@ -133,7 +136,9 @@ public class TestLocalStoreMonitor {
     assertTrue("Inactive task store directory should not exist.", !inActiveTaskDir.exists());
     assertEquals(taskStoreSize + inActiveTaskDirSize, localStoreMonitorMetrics.diskSpaceFreedInBytes.getCount());
     assertEquals(2, localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.getCount());
-    FileUtils.deleteDirectory(inActiveStoreDir);
+    if (inActiveStoreDir.exists() && inActiveStoreDir.isDirectory()) {
+      PathUtils.deleteDirectory(inActiveStoreDir.toPath());
+    }
   }
 
   @Test
@@ -192,7 +197,9 @@ public class TestLocalStoreMonitor {
 
     // Non failing job directory should be cleaned up.
     assertTrue("Task store directory should not exist.", !taskStoreDir.exists());
-    FileUtils.deleteDirectory(testFailingJobDir);
+    if (testFailingJobDir.exists() && testFailingJobDir.isDirectory()) {
+      PathUtils.deleteDirectory(testFailingJobDir.toPath());
+    }
   }
 
   private static File createOffsetFile(File taskStoreDir) throws Exception {
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
index f654459c6..1779ac735 100644
--- a/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
@@ -82,8 +82,16 @@ public class BlobStoreStateBackendIntegrationTest extends BaseStateBackendIntegr
 
   private static final String IN_MEMORY_STORE_CHANGELOG_TOPIC = "inMemoryStoreChangelog";
 
-  private static final String LOGGED_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
-  private static final String BLOB_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir"), "blob-store").getAbsolutePath();
+  private static final String LOGGED_STORE_BASE_DIR;
+  private static final String BLOB_STORE_BASE_DIR;
+  static {
+    try {
+      LOGGED_STORE_BASE_DIR = Files.createTempDirectory("logged-store-").toString();
+      BLOB_STORE_BASE_DIR = Files.createTempDirectory("blob-store-").toString();
+    } catch (Exception e) {
+      throw new RuntimeException("Error creating temp directory.", e);
+    }
+  }
   private static final String BLOB_STORE_LEDGER_DIR = new File(BLOB_STORE_BASE_DIR, "ledger").getAbsolutePath();
 
   private static final Map<String, String> CONFIGS = new HashMap<String, String>() { {
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaNonTransactionalStateIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaNonTransactionalStateIntegrationTest.java
index 66ef3eb4c..d921bacc6 100644
--- a/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaNonTransactionalStateIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaNonTransactionalStateIntegrationTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 import java.io.File;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -58,7 +59,14 @@ public class KafkaNonTransactionalStateIntegrationTest extends BaseStateBackendI
   private static final String IN_MEMORY_STORE_CHANGELOG_TOPIC = "inMemoryStoreChangelog";
   private static final String SIDE_INPUT_STORE_NAME = "sideInputStore";
 
-  private static final String LOGGED_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
+  private static final String LOGGED_STORE_BASE_DIR;
+  static {
+    try {
+      LOGGED_STORE_BASE_DIR = Files.createTempDirectory("logged-store-").toString();
+    } catch (Exception e) {
+      throw new RuntimeException("Error creating temp directory.", e);
+    }
+  }
 
   private static final Map<String, String> CONFIGS = new HashMap<String, String>() { {
       put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaTransactionalStateIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaTransactionalStateIntegrationTest.java
index 8bec2439f..edda4f5dd 100644
--- a/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaTransactionalStateIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaTransactionalStateIntegrationTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 import java.io.File;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -60,7 +61,14 @@ public class KafkaTransactionalStateIntegrationTest extends BaseStateBackendInte
   private static final String IN_MEMORY_STORE_CHANGELOG_TOPIC = "inMemoryStoreChangelog";
   private static final String SIDE_INPUT_STORE_NAME = "sideInputStore";
 
-  private static final String LOGGED_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
+  private static final String LOGGED_STORE_BASE_DIR;
+  static {
+    try {
+      LOGGED_STORE_BASE_DIR = Files.createTempDirectory("logged-store-").toString();
+    } catch (Exception e) {
+      throw new RuntimeException("Error creating temp directory.", e);
+    }
+  }
 
   private static final Map<String, String> CONFIGS = new HashMap<String, String>() { {
       put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");