You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2023/03/22 17:03:38 UTC
[samza] branch master updated: Changed uses of Apache Commons IO FileUtils.deleteDirectory to PathUtils.deleteDirectory (#1658)
This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 985a6048c Changed uses of Apache Commons IO FileUtils.deleteDirectory to PathUtils.deleteDirectory (#1658)
985a6048c is described below
commit 985a6048c1e589d0b3af5d0040f26b64741c9260
Author: Prateek Maheshwari <pr...@utexas.edu>
AuthorDate: Wed Mar 22 10:03:31 2023 -0700
Changed uses of Apache Commons IO FileUtils.deleteDirectory to PathUtils.deleteDirectory (#1658)
Symptom: Blob store state restore fails when verifying restored directory contents due to a mismatch between permissions of local vs remote files.
Cause: Apache Commons 2.11 (at least, more versions may be affected) FileUtils#deleteDirectory() has a bug where it changes the permissions of symlink/hardlink files to read-only after deletion. For more details, see https://issues.apache.org/jira/browse/IO-751.
Changes: This PR changes all uses of FileUtils#deleteDirectory to PathUtils#deleteDirectory() which is not affected.
---
.../org/apache/samza/storage/TaskStorageCommitManager.java | 6 ++++--
.../samza/storage/blobstore/BlobStoreRestoreManager.java | 10 +++++++---
.../samza/storage/blobstore/TestBlobStoreBackupManager.java | 11 ++++++++---
.../java/org/apache/samza/monitor/LocalStoreMonitor.java | 6 ++++--
.../org/apache/samza/monitor/TestLocalStoreMonitor.java | 13 ++++++++++---
.../storage/kv/BlobStoreStateBackendIntegrationTest.java | 12 ++++++++++--
.../kv/KafkaNonTransactionalStateIntegrationTest.java | 10 +++++++++-
.../storage/kv/KafkaTransactionalStateIntegrationTest.java | 10 +++++++++-
8 files changed, 61 insertions(+), 17 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java b/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
index dca0fa2b1..59b010172 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
@@ -33,7 +33,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.file.PathUtils;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
@@ -286,7 +286,9 @@ public class TaskStorageCommitManager {
for (File checkpointDir : checkpointDirs) {
if (!checkpointDir.getName().contains(latestCheckpointId.serialize())) {
try {
- FileUtils.deleteDirectory(checkpointDir);
+ if (checkpointDir.exists() && checkpointDir.isDirectory()) {
+ PathUtils.deleteDirectory(checkpointDir.toPath());
+ }
} catch (IOException e) {
throw new SamzaException(
String.format("Unable to delete checkpoint directory: %s", checkpointDir.getName()), e);
diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
index bde659e78..5010fd0c6 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
@@ -35,7 +35,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
-import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.file.PathUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.Checkpoint;
@@ -247,7 +247,9 @@ public class BlobStoreRestoreManager implements TaskRestoreManager {
try {
LOG.debug("Deleting local store directory: {}. Will be restored from local store checkpoint directory " +
"or remote snapshot.", storeDir);
- FileUtils.deleteDirectory(storeDir);
+ if (storeDir.exists() && storeDir.isDirectory()) {
+ PathUtils.deleteDirectory(storeDir.toPath());
+ }
} catch (IOException e) {
throw new SamzaException(String.format("Error deleting store directory: %s", storeDir), e);
}
@@ -356,7 +358,9 @@ public class BlobStoreRestoreManager implements TaskRestoreManager {
loggedBaseDir, storeName, taskName, TaskMode.Active);
for (File checkpointDir: checkpointDirs) {
LOG.debug("Deleting local store checkpoint directory: {} before restore.", checkpointDir);
- FileUtils.deleteDirectory(checkpointDir);
+ if (checkpointDir.exists() && checkpointDir.isDirectory()) {
+ PathUtils.deleteDirectory(checkpointDir.toPath());
+ }
}
} catch (Exception e) {
throw new SamzaException(
diff --git a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
index 803bf677a..7a0cdc0a3 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java
@@ -25,6 +25,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -39,7 +40,7 @@ import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.file.PathUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.Checkpoint;
@@ -266,7 +267,9 @@ public class TestBlobStoreBackupManager {
// cleanup
checkpointDirsToClean.forEach(path -> {
try {
- FileUtils.deleteDirectory(new File(path));
+ if (Files.exists(Paths.get(path)) && Files.isDirectory(Paths.get(path))) {
+ PathUtils.deleteDirectory(Paths.get(path));
+ }
} catch (IOException exception) {
Assert.fail("Failed to cleanup temporary checkpoint dirs.");
}
@@ -369,7 +372,9 @@ public class TestBlobStoreBackupManager {
// cleanup
checkpointDirsToClean.forEach(path -> {
try {
- FileUtils.deleteDirectory(new File(path));
+ if (Files.exists(Paths.get(path)) && Files.isDirectory(Paths.get(path))) {
+ PathUtils.deleteDirectory(Paths.get(path));
+ }
} catch (IOException exception) {
Assert.fail("Failed to cleanup temporary checkpoint dirs.");
}
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
index ebf81f2d4..f3f367c86 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
@@ -25,8 +25,8 @@ import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.io.file.PathUtils;
import org.apache.commons.io.filefilter.DirectoryFileFilter;
-import org.apache.commons.io.FileUtils;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.rest.model.JobStatus;
@@ -162,7 +162,9 @@ public class LocalStoreMonitor implements Monitor {
if (!offsetFile.exists()) {
LOG.info("Deleting the task store: {}, since it has no offset file.", taskStorePath);
long taskStoreSizeInBytes = taskStoreDir.getTotalSpace();
- FileUtils.deleteDirectory(taskStoreDir);
+ if (taskStoreDir.exists() && taskStoreDir.isDirectory()) {
+ PathUtils.deleteDirectory(taskStoreDir.toPath());
+ }
localStoreMonitorMetrics.diskSpaceFreedInBytes.inc(taskStoreSizeInBytes);
localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.inc();
} else if ((CLOCK.currentTimeMillis() - offsetFile.lastModified()) >= config.getOffsetFileTTL()) {
diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java b/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
index 972598f86..ce25565ed 100644
--- a/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
+++ b/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.file.PathUtils;
import org.apache.samza.config.MapConfig;
import org.apache.samza.rest.model.JobStatus;
import org.apache.samza.rest.model.Task;
@@ -97,7 +98,9 @@ public class TestLocalStoreMonitor {
public void cleanUp() {
// Clean up the entire temp local store directory and all files underneath it.
try {
- FileUtils.deleteDirectory(localStoreDir);
+ if (localStoreDir.exists() && localStoreDir.isDirectory()) {
+ PathUtils.deleteDirectory(localStoreDir.toPath());
+ }
} catch (IOException e) {
// Happens when task store can't be deleted after test finishes.
LOG.error("Deletion of directory: {} resulted in the exception: {}.", new Object[]{localStoreDir, e});
@@ -133,7 +136,9 @@ public class TestLocalStoreMonitor {
assertTrue("Inactive task store directory should not exist.", !inActiveTaskDir.exists());
assertEquals(taskStoreSize + inActiveTaskDirSize, localStoreMonitorMetrics.diskSpaceFreedInBytes.getCount());
assertEquals(2, localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.getCount());
- FileUtils.deleteDirectory(inActiveStoreDir);
+ if (inActiveStoreDir.exists() && inActiveStoreDir.isDirectory()) {
+ PathUtils.deleteDirectory(inActiveStoreDir.toPath());
+ }
}
@Test
@@ -192,7 +197,9 @@ public class TestLocalStoreMonitor {
// Non failing job directory should be cleaned up.
assertTrue("Task store directory should not exist.", !taskStoreDir.exists());
- FileUtils.deleteDirectory(testFailingJobDir);
+ if (testFailingJobDir.exists() && testFailingJobDir.isDirectory()) {
+ PathUtils.deleteDirectory(testFailingJobDir.toPath());
+ }
}
private static File createOffsetFile(File taskStoreDir) throws Exception {
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
index f654459c6..1779ac735 100644
--- a/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
@@ -82,8 +82,16 @@ public class BlobStoreStateBackendIntegrationTest extends BaseStateBackendIntegr
private static final String IN_MEMORY_STORE_CHANGELOG_TOPIC = "inMemoryStoreChangelog";
- private static final String LOGGED_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
- private static final String BLOB_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir"), "blob-store").getAbsolutePath();
+ private static final String LOGGED_STORE_BASE_DIR;
+ private static final String BLOB_STORE_BASE_DIR;
+ static {
+ try {
+ LOGGED_STORE_BASE_DIR = Files.createTempDirectory("logged-store-").toString();
+ BLOB_STORE_BASE_DIR = Files.createTempDirectory("blob-store-").toString();
+ } catch (Exception e) {
+ throw new RuntimeException("Error creating temp directory.", e);
+ }
+ }
private static final String BLOB_STORE_LEDGER_DIR = new File(BLOB_STORE_BASE_DIR, "ledger").getAbsolutePath();
private static final Map<String, String> CONFIGS = new HashMap<String, String>() { {
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaNonTransactionalStateIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaNonTransactionalStateIntegrationTest.java
index 66ef3eb4c..d921bacc6 100644
--- a/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaNonTransactionalStateIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaNonTransactionalStateIntegrationTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.File;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -58,7 +59,14 @@ public class KafkaNonTransactionalStateIntegrationTest extends BaseStateBackendI
private static final String IN_MEMORY_STORE_CHANGELOG_TOPIC = "inMemoryStoreChangelog";
private static final String SIDE_INPUT_STORE_NAME = "sideInputStore";
- private static final String LOGGED_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
+ private static final String LOGGED_STORE_BASE_DIR;
+ static {
+ try {
+ LOGGED_STORE_BASE_DIR = Files.createTempDirectory("logged-store-").toString();
+ } catch (Exception e) {
+ throw new RuntimeException("Error creating temp directory.", e);
+ }
+ }
private static final Map<String, String> CONFIGS = new HashMap<String, String>() { {
put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaTransactionalStateIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaTransactionalStateIntegrationTest.java
index 8bec2439f..edda4f5dd 100644
--- a/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaTransactionalStateIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaTransactionalStateIntegrationTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.File;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -60,7 +61,14 @@ public class KafkaTransactionalStateIntegrationTest extends BaseStateBackendInte
private static final String IN_MEMORY_STORE_CHANGELOG_TOPIC = "inMemoryStoreChangelog";
private static final String SIDE_INPUT_STORE_NAME = "sideInputStore";
- private static final String LOGGED_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
+ private static final String LOGGED_STORE_BASE_DIR;
+ static {
+ try {
+ LOGGED_STORE_BASE_DIR = Files.createTempDirectory("logged-store-").toString();
+ } catch (Exception e) {
+ throw new RuntimeException("Error creating temp directory.", e);
+ }
+ }
private static final Map<String, String> CONFIGS = new HashMap<String, String>() { {
put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");