You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/12/08 16:13:01 UTC

[GitHub] leventov closed pull request #6677: FileUtils: Sync directory entry too on writeAtomically.

leventov closed pull request #6677: FileUtils: Sync directory entry too on writeAtomically.
URL: https://github.com/apache/incubator-druid/pull/6677
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/java/org/apache/druid/java/util/common/CompressionUtils.java b/core/src/main/java/org/apache/druid/java/util/common/CompressionUtils.java
index 79e010aa40c..ce94a4a8c96 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/CompressionUtils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/CompressionUtils.java
@@ -41,7 +41,10 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.StandardOpenOption;
 import java.util.Enumeration;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
@@ -78,16 +81,19 @@ public static long zip(File directory, File outputZipFile, boolean fsync) throws
       log.warn("No .zip suffix[%s], putting files from [%s] into it anyway.", outputZipFile, directory);
     }
 
-    try (final FileOutputStream out = new FileOutputStream(outputZipFile)) {
-      long bytes = zip(directory, out);
-
-      // For explanation of why fsyncing here is a good practice:
-      // https://github.com/apache/incubator-druid/pull/5187#pullrequestreview-85188984
-      if (fsync) {
-        out.getChannel().force(true);
+    if (fsync) {
+      return FileUtils.writeAtomically(outputZipFile, out -> zip(directory, out));
+    } else {
+      try (
+          final FileChannel fileChannel = FileChannel.open(
+              outputZipFile.toPath(),
+              StandardOpenOption.WRITE,
+              StandardOpenOption.CREATE
+          );
+          final OutputStream out = Channels.newOutputStream(fileChannel)
+      ) {
+        return zip(directory, out);
       }
-
-      return bytes;
     }
   }
 
diff --git a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
index 97ac6cb51ad..1ba63c10599 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java
@@ -24,16 +24,19 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.io.ByteSource;
 import com.google.common.io.Files;
+import org.apache.druid.java.util.common.logger.Logger;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.MappedByteBuffer;
+import java.nio.channels.Channels;
 import java.nio.channels.FileChannel;
 import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -41,6 +44,8 @@
 
 public class FileUtils
 {
+  private static final Logger log = new Logger(FileUtils.class);
+
   /**
    * Useful for retry functionality that doesn't want to stop Throwables, but does want to retry on Exceptions
    */
@@ -182,22 +187,35 @@ public static MappedByteBufferHandler map(File file) throws IOException
    *
    * This method is not just thread-safe, but is also safe to use from multiple processes on the same machine.
    */
-  public static void writeAtomically(final File file, OutputStreamConsumer f) throws IOException
+  public static <T> T writeAtomically(final File file, OutputStreamConsumer<T> f) throws IOException
   {
-    writeAtomically(file, file.getParentFile(), f);
+    return writeAtomically(file, file.getParentFile(), f);
   }
 
-  private static void writeAtomically(final File file, final File tmpDir, OutputStreamConsumer f) throws IOException
+  private static <T> T writeAtomically(final File file, final File tmpDir, OutputStreamConsumer<T> f) throws IOException
   {
     final File tmpFile = new File(tmpDir, StringUtils.format(".%s.%s", file.getName(), UUID.randomUUID()));
 
-    try {
-      try (final FileOutputStream out = new FileOutputStream(tmpFile)) {
+    //noinspection unused
+    try (final Closeable deleter = () -> java.nio.file.Files.deleteIfExists(tmpFile.toPath())) {
+      final T retVal;
+
+      try (
+          final FileChannel fileChannel = FileChannel.open(
+              tmpFile.toPath(),
+              StandardOpenOption.WRITE,
+              StandardOpenOption.CREATE_NEW
+          );
+          final OutputStream out = Channels.newOutputStream(fileChannel)
+      ) {
         // Pass f an uncloseable stream so we can fsync before closing.
-        f.accept(uncloseable(out));
+        retVal = f.apply(uncloseable(out));
 
         // fsync to avoid write-then-rename-then-crash causing empty files on some filesystems.
-        out.getChannel().force(true);
+        // Must do this before "out" or "fileChannel" is closed. No need to flush "out" first, since
+        // Channels.newOutputStream is unbuffered.
+        // See also https://github.com/apache/incubator-druid/pull/5187#pullrequestreview-85188984
+        fileChannel.force(true);
       }
 
       // No exception thrown; do the move.
@@ -207,9 +225,13 @@ private static void writeAtomically(final File file, final File tmpDir, OutputSt
           StandardCopyOption.ATOMIC_MOVE,
           StandardCopyOption.REPLACE_EXISTING
       );
-    }
-    finally {
-      tmpFile.delete();
+
+      // fsync the directory entry to ensure the new file will be visible after a crash.
+      try (final FileChannel directory = FileChannel.open(file.getParentFile().toPath(), StandardOpenOption.READ)) {
+        directory.force(true);
+      }
+
+      return retVal;
     }
   }
 
@@ -225,8 +247,8 @@ public void close()
     };
   }
 
-  public interface OutputStreamConsumer
+  public interface OutputStreamConsumer<T>
   {
-    void accept(OutputStream outputStream) throws IOException;
+    T apply(OutputStream outputStream) throws IOException;
   }
 }
diff --git a/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java b/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java
index ca8432cec31..746453f59d1 100644
--- a/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java
@@ -55,7 +55,10 @@ public void testWriteAtomically() throws IOException
   {
     final File tmpDir = folder.newFolder();
     final File tmpFile = new File(tmpDir, "file1");
-    FileUtils.writeAtomically(tmpFile, out -> out.write(StringUtils.toUtf8("foo")));
+    FileUtils.writeAtomically(tmpFile, out -> {
+      out.write(StringUtils.toUtf8("foo"));
+      return null;
+    });
     Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath())));
 
     // Try writing again, throw error partway through.
@@ -71,7 +74,10 @@ public void testWriteAtomically() throws IOException
     }
     Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath())));
 
-    FileUtils.writeAtomically(tmpFile, out -> out.write(StringUtils.toUtf8("baz")));
+    FileUtils.writeAtomically(tmpFile, out -> {
+      out.write(StringUtils.toUtf8("baz"));
+      return null;
+    });
     Assert.assertEquals("baz", StringUtils.fromUtf8(Files.readAllBytes(tmpFile.toPath())));
   }
 }
diff --git a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java
index 035568feb54..2641280ed9d 100644
--- a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java
+++ b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authentication/db/cache/CoordinatorPollingBasicAuthenticatorCacheManager.java
@@ -236,7 +236,13 @@ private void writeUserMapToDisk(String prefix, byte[] userMapBytes) throws IOExc
     File cacheDir = new File(commonCacheConfig.getCacheDirectory());
     cacheDir.mkdirs();
     File userMapFile = new File(commonCacheConfig.getCacheDirectory(), getUserMapFilename(prefix));
-    FileUtils.writeAtomically(userMapFile, out -> out.write(userMapBytes));
+    FileUtils.writeAtomically(
+        userMapFile,
+        out -> {
+          out.write(userMapBytes);
+          return null;
+        }
+    );
   }
 
   private Map<String, BasicAuthenticatorUser> tryFetchUserMapFromCoordinator(String prefix) throws Exception
diff --git a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authorization/db/cache/CoordinatorPollingBasicAuthorizerCacheManager.java b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authorization/db/cache/CoordinatorPollingBasicAuthorizerCacheManager.java
index c3115c3a24b..29c3f572bc8 100644
--- a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authorization/db/cache/CoordinatorPollingBasicAuthorizerCacheManager.java
+++ b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/authorization/db/cache/CoordinatorPollingBasicAuthorizerCacheManager.java
@@ -212,7 +212,13 @@ private void writeMapToDisk(String prefix, byte[] userMapBytes) throws IOExcepti
     File cacheDir = new File(commonCacheConfig.getCacheDirectory());
     cacheDir.mkdirs();
     File userMapFile = new File(commonCacheConfig.getCacheDirectory(), getUserRoleMapFilename(prefix));
-    FileUtils.writeAtomically(userMapFile, out -> out.write(userMapBytes));
+    FileUtils.writeAtomically(
+        userMapFile,
+        out -> {
+          out.write(userMapBytes);
+          return null;
+        }
+    );
   }
 
   @Nullable
diff --git a/processing/src/main/java/org/apache/druid/query/lookup/LookupSnapshotTaker.java b/processing/src/main/java/org/apache/druid/query/lookup/LookupSnapshotTaker.java
index 6d60aafdedc..b2f21329e41 100644
--- a/processing/src/main/java/org/apache/druid/query/lookup/LookupSnapshotTaker.java
+++ b/processing/src/main/java/org/apache/druid/query/lookup/LookupSnapshotTaker.java
@@ -88,7 +88,13 @@ public synchronized void takeSnapshot(String tier, List<LookupBean> lookups)
     final File persistFile = getPersistFile(tier);
 
     try {
-      FileUtils.writeAtomically(persistFile, out -> objectMapper.writeValue(out, lookups));
+      FileUtils.writeAtomically(
+          persistFile,
+          out -> {
+            objectMapper.writeValue(out, lookups);
+            return null;
+          }
+      );
     }
     catch (IOException e) {
       throw new ISE(e, "Exception during serialization of lookups using file [%s]", persistFile.getAbsolutePath());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org