You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/06/18 16:18:48 UTC

[incubator-druid] branch master updated: WorkerTaskManager to create disk files atomically and ignore task file corruption (#7917)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 417fcef  WorkerTaskManager to create disk files atomically and ignore task file corruption (#7917)
417fcef is described below

commit 417fcef38547621e7874c607869a2c440013838c
Author: Himanshu <g....@gmail.com>
AuthorDate: Tue Jun 18 09:18:43 2019 -0700

    WorkerTaskManager to create disk files atomically and ignore task file corruption (#7917)
    
    * WorkerTaskManager to create disk files atomically and ignore task file
    corruptions
    
    * fixing weird checkstyle lambda indentation issues
---
 .../apache/druid/java/util/common/FileUtils.java   | 16 +++++---
 .../druid/indexing/worker/WorkerTaskManager.java   | 46 +++++++++++++++++++---
 2 files changed, 50 insertions(+), 12 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
index 7be41a8..17f0ed0 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
@@ -176,6 +176,15 @@ public class FileUtils
 
   /**
    * Write to a file atomically, by first writing to a temporary file in the same directory and then moving it to
+   * the target location. More docs at {@link FileUtils#writeAtomically(File, File, OutputStreamConsumer)} .
+   */
+  public static <T> T writeAtomically(final File file, OutputStreamConsumer<T> f) throws IOException
+  {
+    return writeAtomically(file, file.getParentFile(), f);
+  }
+
+  /**
+   * Write to a file atomically, by first writing to a temporary file in given tmpDir directory and then moving it to
    * the target location. This function attempts to clean up its temporary files when possible, but they may stick
    * around (for example, if the JVM crashes partway through executing the function). In any case, the target file
    * should be unharmed.
@@ -186,12 +195,7 @@ public class FileUtils
    *
    * This method is not just thread-safe, but is also safe to use from multiple processes on the same machine.
    */
-  public static <T> T writeAtomically(final File file, OutputStreamConsumer<T> f) throws IOException
-  {
-    return writeAtomically(file, file.getParentFile(), f);
-  }
-
-  private static <T> T writeAtomically(final File file, final File tmpDir, OutputStreamConsumer<T> f) throws IOException
+  public static <T> T writeAtomically(final File file, final File tmpDir, OutputStreamConsumer<T> f) throws IOException
   {
     final File tmpFile = new File(tmpDir, StringUtils.format(".%s.%s", file.getName(), UUID.randomUUID()));
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
index 613c69b..ac5c15c 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
@@ -39,6 +39,7 @@ import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.TaskRunner;
 import org.apache.druid.indexing.overlord.TaskRunnerListener;
+import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.concurrent.Execs;
@@ -129,6 +130,7 @@ public abstract class WorkerTaskManager
     synchronized (lock) {
       try {
         log.info("Starting...");
+        cleanupAndMakeTmpTaskDir();
         registerLocationListener();
         restoreRestorableTasks();
         initAssignedTasks();
@@ -264,7 +266,12 @@ public abstract class WorkerTaskManager
       }
 
       try {
-        jsonMapper.writeValue(new File(getAssignedTaskDir(), task.getId()), task);
+        FileUtils.writeAtomically(new File(getAssignedTaskDir(), task.getId()), getTmpTaskDir(),
+            os -> {
+            jsonMapper.writeValue(os, task);
+            return null;
+          }
+        );
         assignedTasks.put(task.getId(), task);
       }
       catch (IOException ex) {
@@ -286,6 +293,28 @@ public abstract class WorkerTaskManager
     submitNoticeToExec(new RunNotice(task));
   }
 
+  private File getTmpTaskDir()
+  {
+    return new File(taskConfig.getBaseTaskDir(), "workerTaskManagerTmp");
+  }
+
+  private void cleanupAndMakeTmpTaskDir()
+  {
+    File tmpDir = getTmpTaskDir();
+    tmpDir.mkdirs();
+    if (!tmpDir.isDirectory()) {
+      throw new ISE("Tmp Tasks Dir [%s] does not exist/not-a-directory.", tmpDir);
+    }
+
+    // Delete any tmp files left out from before due to jvm crash.
+    try {
+      org.apache.commons.io.FileUtils.cleanDirectory(tmpDir);
+    }
+    catch (IOException ex) {
+      log.warn("Failed to cleanup tmp dir [%s].", tmpDir.getAbsolutePath());
+    }
+  }
+
   public File getAssignedTaskDir()
   {
     return new File(taskConfig.getBaseTaskDir(), "assignedTasks");
@@ -311,11 +340,11 @@ public abstract class WorkerTaskManager
           assignedTasks.put(taskId, task);
           log.info("Found assigned task[%s].", taskId);
         } else {
-          throw new ISE("Corrupted assigned task on disk[%s].", taskFile.getAbsoluteFile());
+          throw new ISE("WTF! Corrupted assigned task on disk[%s].", taskFile.getAbsoluteFile());
         }
       }
       catch (IOException ex) {
-        throw new ISE(ex, "Failed to read assigned task from disk at [%s]. Ignored.", taskFile.getAbsoluteFile());
+        log.error(ex, "Failed to read assigned task from disk at [%s]. Ignored.", taskFile.getAbsoluteFile());
       }
     }
 
@@ -395,7 +424,12 @@ public abstract class WorkerTaskManager
       completedTasks.put(taskId, taskAnnouncement);
 
       try {
-        jsonMapper.writeValue(new File(getCompletedTaskDir(), taskId), taskAnnouncement);
+        FileUtils.writeAtomically(new File(getCompletedTaskDir(), taskId), getTmpTaskDir(),
+            os -> {
+            jsonMapper.writeValue(os, taskAnnouncement);
+            return null;
+          }
+        );
       }
       catch (IOException ex) {
         log.error(ex, "Error while trying to persist completed task[%s] announcement.", taskId);
@@ -423,11 +457,11 @@ public abstract class WorkerTaskManager
           completedTasks.put(taskId, taskAnnouncement);
           log.info("Found completed task[%s] with status[%s].", taskId, taskAnnouncement.getStatus());
         } else {
-          throw new ISE("Corrupted completed task on disk[%s].", taskFile.getAbsoluteFile());
+          throw new ISE("WTF! Corrupted completed task on disk[%s].", taskFile.getAbsoluteFile());
         }
       }
       catch (IOException ex) {
-        throw new ISE(ex, "Failed to read completed task from disk at [%s]. Ignored.", taskFile.getAbsoluteFile());
+        log.error(ex, "Failed to read completed task from disk at [%s]. Ignored.", taskFile.getAbsoluteFile());
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org