You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/10/18 01:31:18 UTC

[31/34] git commit: HDFS-7090. Use unbuffered writes when persisting in-memory replicas. Contributed by Xiaoyu Yao.

HDFS-7090. Use unbuffered writes when persisting in-memory replicas. Contributed by Xiaoyu Yao.

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2e98379b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2e98379b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2e98379b

Branch: refs/heads/branch-2.6
Commit: 2e98379bcb47235afb97c09bcb90d539cbd534fc
Parents: 1b6d115
Author: cnauroth <cn...@apache.org>
Authored: Mon Oct 13 10:50:25 2014 -0700
Committer: Jitendra Pandey <Ji...@Jitendra-Pandeys-MacBook-Pro-4.local>
Committed: Fri Oct 17 16:00:53 2014 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/io/nativeio/Errno.java    |  1 +
 .../org/apache/hadoop/io/nativeio/NativeIO.java | 27 ++++++
 .../org/apache/hadoop/io/nativeio/NativeIO.c    | 67 +++++++++++++++
 .../org/apache/hadoop/io/nativeio/errno_enum.c  |  1 +
 .../apache/hadoop/io/nativeio/TestNativeIO.java | 34 ++++++++
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../hadoop/hdfs/server/common/Storage.java      | 88 ++++++++++++++++++++
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  4 +-
 8 files changed, 223 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e98379b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/Errno.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/Errno.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/Errno.java
index f823978..f6377c7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/Errno.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/Errno.java
@@ -58,6 +58,7 @@ public enum Errno {
   ELOOP,
   ENAMETOOLONG,
   ENOTEMPTY,
+  EOVERFLOW,
 
   UNKNOWN;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e98379b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
index 53d31d6..43f1cb1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
@@ -29,6 +29,7 @@ import java.nio.MappedByteBuffer;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -846,4 +847,30 @@ public class NativeIO {
 
   private static native void link0(String src, String dst)
       throws NativeIOException;
+
+  /**
+   * Unbuffered file copy from src to dst without tainting OS buffer cache
+   * In Linux, it uses sendfile() which uses O_DIRECT flag internally
+   * In Windows, it uses CopyFileEx with COPY_FILE_NO_BUFFERING flag
+   *
+   * Note: This does not support FreeBSD/OSX which have a different sendfile()
+   * semantic. Also, this simple native wrapper does minimal parameter checking
+   * It is recommended to use wrapper function like
+   * the Storage#nativeCopyFileUnbuffered() function in hadoop-hdfs.
+   *
+   *
+   * @param src                  The source path
+   * @param dst                  The destination path
+   * @throws IOException
+   */
+  public static void copyFileUnbuffered(File src, File dst) throws IOException {
+    if ((nativeLoaded) && (Shell.WINDOWS || Shell.LINUX)) {
+      copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath());
+    } else {
+      FileUtils.copyFile(src, dst);
+    }
+  }
+
+  private static native void copyFileUnbuffered0(String src, String dst)
+      throws NativeIOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e98379b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
index b98aa0c..f19d6be 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c
@@ -35,6 +35,9 @@
 #include <sys/resource.h>
 #include <sys/stat.h>
 #include <sys/syscall.h>
+#if !(defined(__FreeBSD__) || defined(__MACH__))
+#include <sys/sendfile.h>
+#endif
 #include <sys/time.h>
 #include <sys/types.h>
 #include <unistd.h>
@@ -1142,6 +1145,70 @@ JNIEnv *env, jclass clazz)
 #endif
 }
 
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_nativeio_NativeIO_copyFileUnbuffered0(
+JNIEnv *env, jclass clazz, jstring jsrc, jstring jdst)
+{
+#ifdef UNIX
+#if (defined(__FreeBSD__) || defined(__MACH__))
+  THROW(env, "java/io/IOException",
+      "The function copyFileUnbuffered() is not supported on FreeBSD or Mac OS");
+  return;
+#else
+  const char *src = NULL, *dst = NULL;
+  int srcFd = -1;
+  int dstFd = -1;
+  struct stat s;
+  off_t offset = 0;
+
+  src = (*env)->GetStringUTFChars(env, jsrc, NULL);
+  if (!src) goto cleanup; // exception was thrown
+  dst = (*env)->GetStringUTFChars(env, jdst, NULL);
+  if (!dst) goto cleanup; // exception was thrown
+
+  srcFd = open(src, O_RDONLY);
+  if (srcFd == -1) {
+    throw_ioe(env, errno);
+    goto cleanup;
+  }
+  if (fstat(srcFd, &s) == -1){
+    throw_ioe(env, errno);
+    goto cleanup;
+  }
+  dstFd = open(dst, O_WRONLY | O_CREAT, s.st_mode);
+  if (dstFd == -1) {
+    throw_ioe(env, errno);
+    goto cleanup;
+  }
+  if (sendfile(dstFd, srcFd, &offset, s.st_size) == -1) {
+    throw_ioe(env, errno);
+  }
+
+cleanup:
+  if (src) (*env)->ReleaseStringUTFChars(env, jsrc, src);
+  if (dst) (*env)->ReleaseStringUTFChars(env, jdst, dst);
+  if (srcFd != -1) close(srcFd);
+  if (dstFd != -1) close(dstFd);
+#endif
+#endif
+
+#ifdef WINDOWS
+  LPCWSTR src = NULL, dst = NULL;
+
+  src = (LPCWSTR) (*env)->GetStringChars(env, jsrc, NULL);
+  if (!src) goto cleanup; // exception was thrown
+  dst = (LPCWSTR) (*env)->GetStringChars(env, jdst, NULL);
+  if (!dst) goto cleanup; // exception was thrown
+  if (!CopyFileEx(src, dst, NULL, NULL, NULL, COPY_FILE_NO_BUFFERING)) {
+    throw_ioe(env, GetLastError());
+  }
+
+cleanup:
+  if (src) (*env)->ReleaseStringChars(env, jsrc, src);
+  if (dst) (*env)->ReleaseStringChars(env, jdst, dst);
+#endif
+}
+
 /**
  * vim: sw=2: ts=2: et:
  */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e98379b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/errno_enum.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/errno_enum.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/errno_enum.c
index 4d07c31..08cc305 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/errno_enum.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/errno_enum.c
@@ -66,6 +66,7 @@ static errno_mapping_t ERRNO_MAPPINGS[] = {
   MAPPING(ELOOP),
   MAPPING(ENAMETOOLONG),
   MAPPING(ENOTEMPTY),
+  MAPPING(EOVERFLOW),
   {-1, NULL}
 };
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e98379b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
index 6c3f003..5425c49 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java
@@ -24,14 +24,18 @@ import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileChannel.MapMode;
+import java.util.Random;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -622,4 +626,34 @@ public class TestNativeIO {
     assumeTrue(NativeIO.isAvailable());
     NativeIO.getMemlockLimit();
   }
+
+  @Test (timeout = 30000)
+  public void testCopyFileUnbuffered() throws Exception {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    File srcFile = new File(TEST_DIR, METHOD_NAME + ".src.dat");
+    File dstFile = new File(TEST_DIR, METHOD_NAME + ".dst.dat");
+    final int fileSize = 0x8FFFFFF; // 128 MB
+    final int SEED = 0xBEEF;
+    final int batchSize = 4096;
+    final int numBatches = fileSize / batchSize;
+    Random rb = new Random(SEED);
+    FileChannel channel = null;
+    RandomAccessFile raSrcFile = null;
+    try {
+      raSrcFile = new RandomAccessFile(srcFile, "rw");
+      channel = raSrcFile.getChannel();
+      byte bytesToWrite[] = new byte[batchSize];
+      MappedByteBuffer mapBuf;
+      mapBuf = channel.map(MapMode.READ_WRITE, 0, fileSize);
+      for (int i = 0; i < numBatches; i++) {
+        rb.nextBytes(bytesToWrite);
+        mapBuf.put(bytesToWrite);
+      }
+      NativeIO.copyFileUnbuffered(srcFile, dstFile);
+    }finally {
+      IOUtils.cleanup(LOG, channel);
+      IOUtils.cleanup(LOG, raSrcFile);
+      FileUtils.deleteQuietly(TEST_DIR);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e98379b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index afb8d35..55dc567 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -665,6 +665,9 @@ Release 2.6.0 - UNRELEASED
       HDFS-7112. LazyWriter should use either async IO or one thread per physical
       disk. (Xiaoyu Yao via cnauroth)
 
+      HDFS-7090. Use unbuffered writes when persisting in-memory replicas.
+      (Xiaoyu Yao via cnauroth)
+
     BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
   
       HDFS-6387. HDFS CLI admin tool for creating & deleting an

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e98379b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
index 0661026..4320e22 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.common;
 
 import java.io.File;
 import java.io.FileOutputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.lang.management.ManagementFactory;
@@ -997,6 +998,93 @@ public abstract class Storage extends StorageInfo {
   }
 
   /**
+   * Copies a file (usually large) to a new location using native unbuffered IO.
+   * <p>
+   * This method copies the contents of the specified source file
+   * to the specified destination file using OS specific unbuffered IO.
+   * The goal is to avoid churning the file system buffer cache when copying
+   * large files. TheFileUtils#copyLarge function from apache-commons-io library
+   * can be used to achieve this with an internal memory buffer but is less
+   * efficient than the native unbuffered APIs such as sendfile() in Linux and
+   * CopyFileEx() in Windows wrapped in {@link NativeIO#copyFileUnbuffered}.
+   *
+   * The directory holding the destination file is created if it does not exist.
+   * If the destination file exists, then this method will delete it first.
+   * <p>
+   * <strong>Note:</strong> Setting <code>preserveFileDate</code> to
+   * {@code true} tries to preserve the file's last modified
+   * date/times using {@link File#setLastModified(long)}, however it is
+   * not guaranteed that the operation will succeed.
+   * If the modification operation fails, no indication is provided.
+   *
+   * @param srcFile  an existing file to copy, must not be {@code null}
+   * @param destFile  the new file, must not be {@code null}
+   * @param preserveFileDate  true if the file date of the copy
+   *  should be the same as the original
+   *
+   * @throws NullPointerException if source or destination is {@code null}
+   * @throws IOException if source or destination is invalid
+   * @throws IOException if an IO error occurs during copying
+   */
+  public static void nativeCopyFileUnbuffered(File srcFile, File destFile,
+      boolean preserveFileDate) throws IOException {
+    if (srcFile == null) {
+      throw new NullPointerException("Source must not be null");
+    }
+    if (destFile == null) {
+      throw new NullPointerException("Destination must not be null");
+    }
+    if (srcFile.exists() == false) {
+      throw new FileNotFoundException("Source '" + srcFile + "' does not exist");
+    }
+    if (srcFile.isDirectory()) {
+      throw new IOException("Source '" + srcFile + "' exists but is a directory");
+    }
+    if (srcFile.getCanonicalPath().equals(destFile.getCanonicalPath())) {
+      throw new IOException("Source '" + srcFile + "' and destination '" +
+          destFile + "' are the same");
+    }
+    File parentFile = destFile.getParentFile();
+    if (parentFile != null) {
+      if (!parentFile.mkdirs() && !parentFile.isDirectory()) {
+        throw new IOException("Destination '" + parentFile
+            + "' directory cannot be created");
+      }
+    }
+    if (destFile.exists()) {
+      if (FileUtil.canWrite(destFile) == false) {
+        throw new IOException("Destination '" + destFile
+            + "' exists but is read-only");
+      } else {
+        if (destFile.delete() == false) {
+          throw new IOException("Destination '" + destFile
+              + "' exists but cannot be deleted");
+        }
+      }
+    }
+    try {
+      NativeIO.copyFileUnbuffered(srcFile, destFile);
+    } catch (NativeIOException e) {
+      throw new IOException("Failed to copy " + srcFile.getCanonicalPath()
+          + " to " + destFile.getCanonicalPath()
+          + " due to failure in NativeIO#copyFileUnbuffered(). "
+          + e.toString());
+    }
+    if (srcFile.length() != destFile.length()) {
+      throw new IOException("Failed to copy full contents from '" + srcFile
+          + "' to '" + destFile + "'");
+    }
+    if (preserveFileDate) {
+      if (destFile.setLastModified(srcFile.lastModified()) == false) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Failed to preserve last modified date from'" + srcFile
+            + "' to '" + destFile + "'");
+        }
+      }
+    }
+  }
+
+  /**
    * Recursively delete all the content of the directory first and then 
    * the directory itself from the local filesystem.
    * @param dir The directory to delete

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e98379b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 07e19cf..1709066 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -724,12 +724,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     final File dstFile = new File(destDir, srcFile.getName());
     final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
     try {
-      FileUtils.copyFile(srcMeta, dstMeta);
+      Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true);
     } catch (IOException e) {
       throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e);
     }
     try {
-      FileUtils.copyFile(srcFile, dstFile);
+      Storage.nativeCopyFileUnbuffered(srcFile, dstFile, true);
     } catch (IOException e) {
       throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e);
     }