You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2015/02/12 19:41:41 UTC

hadoop git commit: HDFS-7694. FSDataInputStream should support "unbuffer" (cmccabe)

Repository: hadoop
Updated Branches:
  refs/heads/trunk bc5aa7d0b -> 6b39ad086


HDFS-7694. FSDataInputStream should support "unbuffer" (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6b39ad08
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6b39ad08
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6b39ad08

Branch: refs/heads/trunk
Commit: 6b39ad0865cb2a7960dd59d68178f0bf28865ce2
Parents: bc5aa7d
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Thu Feb 12 10:40:46 2015 -0800
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Thu Feb 12 10:40:46 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/hadoop/fs/CanUnbuffer.java  |  36 ++++++
 .../org/apache/hadoop/fs/FSDataInputStream.java |  12 +-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../org/apache/hadoop/hdfs/DFSInputStream.java  |   8 +-
 .../java/org/apache/hadoop/hdfs/PeerCache.java  |   9 +-
 .../hadoop-hdfs/src/main/native/libhdfs/hdfs.c  |  28 ++++
 .../hadoop-hdfs/src/main/native/libhdfs/hdfs.h  |   9 ++
 .../java/org/apache/hadoop/fs/TestUnbuffer.java | 127 +++++++++++++++++++
 8 files changed, 227 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b39ad08/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanUnbuffer.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanUnbuffer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanUnbuffer.java
new file mode 100644
index 0000000..07e65f5
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanUnbuffer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * FSDataInputStreams implement this interface to indicate that they can clear
+ * their buffers on request.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface CanUnbuffer {
+  /**
+   * Reduce the buffering.  This will also free sockets and file descriptors
+   * held by the stream, if possible.
+   */
+  public void unbuffer();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b39ad08/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
index 6d39d1e..477bd6f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.util.IdentityHashStore;
 public class FSDataInputStream extends DataInputStream
     implements Seekable, PositionedReadable, 
       ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
-      HasEnhancedByteBufferAccess {
+      HasEnhancedByteBufferAccess, CanUnbuffer {
   /**
    * Map ByteBuffers that we have handed out to readers to ByteBufferPool 
    * objects
@@ -220,4 +220,14 @@ public class FSDataInputStream extends DataInputStream
       bufferPool.putBuffer(buffer);
     }
   }
+
+  @Override
+  public void unbuffer() {
+    try {
+      ((CanUnbuffer)in).unbuffer();
+    } catch (ClassCastException e) {
+      throw new UnsupportedOperationException("this stream does not " +
+          "support unbuffering.");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b39ad08/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index cd43bd2..98dbab5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -624,6 +624,8 @@ Release 2.7.0 - UNRELEASED
     HDFS-7703. Support favouredNodes for the append for new blocks
     (vinayakumarb)
 
+    HDFS-7694. FSDataInputStream should support "unbuffer" (cmccabe)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b39ad08/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 618f040..25c23e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.ByteBufferUtil;
 import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
@@ -86,7 +87,7 @@ import com.google.common.annotations.VisibleForTesting;
 @InterfaceAudience.Private
 public class DFSInputStream extends FSInputStream
 implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
-    HasEnhancedByteBufferAccess {
+    HasEnhancedByteBufferAccess, CanUnbuffer {
   @VisibleForTesting
   public static boolean tcpReadsDisabledForTesting = false;
   private long hedgedReadOpsLoopNumForTesting = 0;
@@ -1818,4 +1819,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       ((ByteBufferPool)val).putBuffer(buffer);
     }
   }
+
+  @Override
+  public synchronized void unbuffer() {
+    closeCurrentBlockReader();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b39ad08/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
index 07c562e..08b0468 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
@@ -29,16 +29,21 @@ import com.google.common.collect.LinkedListMultimap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Time;
 
 /**
  * A cache of input stream sockets to Data Node.
  */
-class PeerCache {
+@InterfaceStability.Unstable
+@InterfaceAudience.Private
+@VisibleForTesting
+public class PeerCache {
   private static final Log LOG = LogFactory.getLog(PeerCache.class);
   
   private static class Key {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b39ad08/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
index 95d8f01..34acccc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
@@ -1037,6 +1037,34 @@ done:
     return file;
 }
 
+int hdfsUnbufferFile(hdfsFile file)
+{
+    int ret;
+    jthrowable jthr;
+    JNIEnv *env = getJNIEnv();
+
+    if (!env) {
+        ret = EINTERNAL;
+        goto done;
+    }
+    if (file->type != HDFS_STREAM_INPUT) {
+        ret = ENOTSUP;
+        goto done;
+    }
+    jthr = invokeMethod(env, NULL, INSTANCE, file->file, HADOOP_ISTRM,
+                     "unbuffer", "()V");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                HADOOP_ISTRM "#unbuffer failed:");
+        goto done;
+    }
+    ret = 0;
+
+done:
+    errno = ret;
+    return ret;
+}
+
 int hdfsCloseFile(hdfsFS fs, hdfsFile file)
 {
     int ret;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b39ad08/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
index 072051f..3406d6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
@@ -347,6 +347,15 @@ extern  "C" {
     hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
                           int bufferSize, short replication, tSize blocksize);
 
+    /**
+     * hdfsUnbufferFile - Reduce the buffering done on a file.
+     *
+     * @param file  The file to unbuffer.
+     * @return      0 on success
+     *              ENOTSUP if the file does not support unbuffering
+     *              Errno will also be set to this value.
+     */
+    int hdfsUnbufferFile(hdfsFile file);
 
     /** 
      * hdfsCloseFile - Close an open file. 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b39ad08/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUnbuffer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUnbuffer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUnbuffer.java
new file mode 100644
index 0000000..52c33e9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUnbuffer.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.PeerCache;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestUnbuffer {
+  private static final Log LOG =
+      LogFactory.getLog(TestUnbuffer.class.getName());
+
+  /**
+   * Test that calling Unbuffer closes sockets.
+   */
+  @Test
+  public void testUnbufferClosesSockets() throws Exception {
+    Configuration conf = new Configuration();
+    // Set a new ClientContext.  This way, we will have our own PeerCache,
+    // rather than sharing one with other unit tests.
+    conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT,
+        "testUnbufferClosesSocketsContext");
+
+    // Disable short-circuit reads.  With short-circuit, we wouldn't hold open a
+    // TCP socket.
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
+
+    // Set a really long socket timeout to avoid test timing issues.
+    conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+        100000000L);
+    conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
+        100000000L);
+
+    MiniDFSCluster cluster = null;
+    FSDataInputStream stream = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      DistributedFileSystem dfs = (DistributedFileSystem)
+          FileSystem.newInstance(conf);
+      final Path TEST_PATH = new Path("/test1");
+      DFSTestUtil.createFile(dfs, TEST_PATH, 128, (short)1, 1);
+      stream = dfs.open(TEST_PATH);
+      // Read a byte.  This will trigger the creation of a block reader.
+      stream.seek(2);
+      int b = stream.read();
+      Assert.assertTrue(-1 != b);
+
+      // The Peer cache should start off empty.
+      PeerCache cache = dfs.getClient().getClientContext().getPeerCache();
+      Assert.assertEquals(0, cache.size());
+
+      // Unbuffer should clear the block reader and return the socket to the
+      // cache.
+      stream.unbuffer();
+      stream.seek(2);
+      Assert.assertEquals(1, cache.size());
+      int b2 = stream.read();
+      Assert.assertEquals(b, b2);
+    } finally {
+      if (stream != null) {
+        IOUtils.cleanup(null, stream);
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test opening many files via TCP (not short-circuit).
+   *
+   * This is practical when using unbuffer, because it reduces the number of
+   * sockets and amount of memory that we use.
+   */
+  @Test
+  public void testOpenManyFilesViaTcp() throws Exception {
+    final int NUM_OPENS = 500;
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
+    MiniDFSCluster cluster = null;
+    FSDataInputStream[] streams = new FSDataInputStream[NUM_OPENS];
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      final Path TEST_PATH = new Path("/testFile");
+      DFSTestUtil.createFile(dfs, TEST_PATH, 131072, (short)1, 1);
+
+      for (int i = 0; i < NUM_OPENS; i++) {
+        streams[i] = dfs.open(TEST_PATH);
+        LOG.info("opening file " + i + "...");
+        Assert.assertTrue(-1 != streams[i].read());
+        streams[i].unbuffer();
+      }
+    } finally {
+      for (FSDataInputStream stream : streams) {
+        IOUtils.cleanup(null, stream);
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+}