You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/07/24 02:25:58 UTC
[incubator-druid] 05/14: WorkerTaskManager to create disk files
atomically and ignore task file corruption (#7917)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch 0.15.1-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
commit b88dbf15c64c662563f15b0359eddaf68ccec2e9
Author: Himanshu <g....@gmail.com>
AuthorDate: Tue Jun 18 09:18:43 2019 -0700
WorkerTaskManager to create disk files atomically and ignore task file corruption (#7917)
* WorkerTaskManager to create disk files atomically and ignore task file
corruptions
* fixing weird checkstyle lambda indentation issues
---
.../apache/druid/java/util/common/FileUtils.java | 16 +++++---
.../druid/indexing/worker/WorkerTaskManager.java | 46 +++++++++++++++++++---
2 files changed, 50 insertions(+), 12 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
index 7be41a8..17f0ed0 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
@@ -176,6 +176,15 @@ public class FileUtils
/**
* Write to a file atomically, by first writing to a temporary file in the same directory and then moving it to
+ * the target location. More docs at {@link FileUtils#writeAtomically(File, File, OutputStreamConsumer)} .
+ */
+ public static <T> T writeAtomically(final File file, OutputStreamConsumer<T> f) throws IOException
+ {
+ return writeAtomically(file, file.getParentFile(), f);
+ }
+
+ /**
+ * Write to a file atomically, by first writing to a temporary file in given tmpDir directory and then moving it to
* the target location. This function attempts to clean up its temporary files when possible, but they may stick
* around (for example, if the JVM crashes partway through executing the function). In any case, the target file
* should be unharmed.
@@ -186,12 +195,7 @@ public class FileUtils
*
* This method is not just thread-safe, but is also safe to use from multiple processes on the same machine.
*/
- public static <T> T writeAtomically(final File file, OutputStreamConsumer<T> f) throws IOException
- {
- return writeAtomically(file, file.getParentFile(), f);
- }
-
- private static <T> T writeAtomically(final File file, final File tmpDir, OutputStreamConsumer<T> f) throws IOException
+ public static <T> T writeAtomically(final File file, final File tmpDir, OutputStreamConsumer<T> f) throws IOException
{
final File tmpFile = new File(tmpDir, StringUtils.format(".%s.%s", file.getName(), UUID.randomUUID()));
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
index 613c69b..ac5c15c 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
@@ -39,6 +39,7 @@ import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
+import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
@@ -129,6 +130,7 @@ public abstract class WorkerTaskManager
synchronized (lock) {
try {
log.info("Starting...");
+ cleanupAndMakeTmpTaskDir();
registerLocationListener();
restoreRestorableTasks();
initAssignedTasks();
@@ -264,7 +266,12 @@ public abstract class WorkerTaskManager
}
try {
- jsonMapper.writeValue(new File(getAssignedTaskDir(), task.getId()), task);
+ FileUtils.writeAtomically(new File(getAssignedTaskDir(), task.getId()), getTmpTaskDir(),
+ os -> {
+ jsonMapper.writeValue(os, task);
+ return null;
+ }
+ );
assignedTasks.put(task.getId(), task);
}
catch (IOException ex) {
@@ -286,6 +293,28 @@ public abstract class WorkerTaskManager
submitNoticeToExec(new RunNotice(task));
}
+ private File getTmpTaskDir()
+ {
+ return new File(taskConfig.getBaseTaskDir(), "workerTaskManagerTmp");
+ }
+
+ private void cleanupAndMakeTmpTaskDir()
+ {
+ File tmpDir = getTmpTaskDir();
+ tmpDir.mkdirs();
+ if (!tmpDir.isDirectory()) {
+ throw new ISE("Tmp Tasks Dir [%s] does not exist/not-a-directory.", tmpDir);
+ }
+
+ // Delete any tmp files left out from before due to jvm crash.
+ try {
+ org.apache.commons.io.FileUtils.cleanDirectory(tmpDir);
+ }
+ catch (IOException ex) {
+ log.warn("Failed to cleanup tmp dir [%s].", tmpDir.getAbsolutePath());
+ }
+ }
+
public File getAssignedTaskDir()
{
return new File(taskConfig.getBaseTaskDir(), "assignedTasks");
@@ -311,11 +340,11 @@ public abstract class WorkerTaskManager
assignedTasks.put(taskId, task);
log.info("Found assigned task[%s].", taskId);
} else {
- throw new ISE("Corrupted assigned task on disk[%s].", taskFile.getAbsoluteFile());
+ throw new ISE("WTF! Corrupted assigned task on disk[%s].", taskFile.getAbsoluteFile());
}
}
catch (IOException ex) {
- throw new ISE(ex, "Failed to read assigned task from disk at [%s]. Ignored.", taskFile.getAbsoluteFile());
+ log.error(ex, "Failed to read assigned task from disk at [%s]. Ignored.", taskFile.getAbsoluteFile());
}
}
@@ -395,7 +424,12 @@ public abstract class WorkerTaskManager
completedTasks.put(taskId, taskAnnouncement);
try {
- jsonMapper.writeValue(new File(getCompletedTaskDir(), taskId), taskAnnouncement);
+ FileUtils.writeAtomically(new File(getCompletedTaskDir(), taskId), getTmpTaskDir(),
+ os -> {
+ jsonMapper.writeValue(os, taskAnnouncement);
+ return null;
+ }
+ );
}
catch (IOException ex) {
log.error(ex, "Error while trying to persist completed task[%s] announcement.", taskId);
@@ -423,11 +457,11 @@ public abstract class WorkerTaskManager
completedTasks.put(taskId, taskAnnouncement);
log.info("Found completed task[%s] with status[%s].", taskId, taskAnnouncement.getStatus());
} else {
- throw new ISE("Corrupted completed task on disk[%s].", taskFile.getAbsoluteFile());
+ throw new ISE("WTF! Corrupted completed task on disk[%s].", taskFile.getAbsoluteFile());
}
}
catch (IOException ex) {
- throw new ISE(ex, "Failed to read completed task from disk at [%s]. Ignored.", taskFile.getAbsoluteFile());
+ log.error(ex, "Failed to read completed task from disk at [%s]. Ignored.", taskFile.getAbsoluteFile());
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org