You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2015/02/12 19:41:41 UTC
hadoop git commit: HDFS-7694. FSDataInputStream should support
"unbuffer" (cmccabe)
Repository: hadoop
Updated Branches:
refs/heads/trunk bc5aa7d0b -> 6b39ad086
HDFS-7694. FSDataInputStream should support "unbuffer" (cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6b39ad08
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6b39ad08
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6b39ad08
Branch: refs/heads/trunk
Commit: 6b39ad0865cb2a7960dd59d68178f0bf28865ce2
Parents: bc5aa7d
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Thu Feb 12 10:40:46 2015 -0800
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Thu Feb 12 10:40:46 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/hadoop/fs/CanUnbuffer.java | 36 ++++++
.../org/apache/hadoop/fs/FSDataInputStream.java | 12 +-
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../org/apache/hadoop/hdfs/DFSInputStream.java | 8 +-
.../java/org/apache/hadoop/hdfs/PeerCache.java | 9 +-
.../hadoop-hdfs/src/main/native/libhdfs/hdfs.c | 28 ++++
.../hadoop-hdfs/src/main/native/libhdfs/hdfs.h | 9 ++
.../java/org/apache/hadoop/fs/TestUnbuffer.java | 127 +++++++++++++++++++
8 files changed, 227 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b39ad08/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanUnbuffer.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanUnbuffer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanUnbuffer.java
new file mode 100644
index 0000000..07e65f5
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanUnbuffer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * FSDataInputStreams implement this interface to indicate that they can clear
+ * their buffers on request.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface CanUnbuffer {
+ /**
+ * Reduce the buffering. This will also free sockets and file descriptors
+ * held by the stream, if possible.
+ */
+ public void unbuffer();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b39ad08/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
index 6d39d1e..477bd6f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.util.IdentityHashStore;
public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable,
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
- HasEnhancedByteBufferAccess {
+ HasEnhancedByteBufferAccess, CanUnbuffer {
/**
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
* objects
@@ -220,4 +220,14 @@ public class FSDataInputStream extends DataInputStream
bufferPool.putBuffer(buffer);
}
}
+
+ @Override
+ public void unbuffer() {
+ try {
+ ((CanUnbuffer)in).unbuffer();
+ } catch (ClassCastException e) {
+ throw new UnsupportedOperationException("this stream does not " +
+ "support unbuffering.");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b39ad08/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index cd43bd2..98dbab5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -624,6 +624,8 @@ Release 2.7.0 - UNRELEASED
HDFS-7703. Support favouredNodes for the append for new blocks
(vinayakumarb)
+ HDFS-7694. FSDataInputStream should support "unbuffer" (cmccabe)
+
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b39ad08/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 618f040..25c23e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.ByteBufferUtil;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
@@ -86,7 +87,7 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
public class DFSInputStream extends FSInputStream
implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
- HasEnhancedByteBufferAccess {
+ HasEnhancedByteBufferAccess, CanUnbuffer {
@VisibleForTesting
public static boolean tcpReadsDisabledForTesting = false;
private long hedgedReadOpsLoopNumForTesting = 0;
@@ -1818,4 +1819,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
((ByteBufferPool)val).putBuffer(buffer);
}
}
+
+ @Override
+ public synchronized void unbuffer() {
+ closeCurrentBlockReader();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b39ad08/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
index 07c562e..08b0468 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
@@ -29,16 +29,21 @@ import com.google.common.collect.LinkedListMultimap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
/**
* A cache of input stream sockets to Data Node.
*/
-class PeerCache {
+@InterfaceStability.Unstable
+@InterfaceAudience.Private
+@VisibleForTesting
+public class PeerCache {
private static final Log LOG = LogFactory.getLog(PeerCache.class);
private static class Key {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b39ad08/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
index 95d8f01..34acccc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
@@ -1037,6 +1037,34 @@ done:
return file;
}
+int hdfsUnbufferFile(hdfsFile file)
+{
+ int ret;
+ jthrowable jthr;
+ JNIEnv *env = getJNIEnv();
+
+ if (!env) {
+ ret = EINTERNAL;
+ goto done;
+ }
+ if (file->type != HDFS_STREAM_INPUT) {
+ ret = ENOTSUP;
+ goto done;
+ }
+ jthr = invokeMethod(env, NULL, INSTANCE, file->file, HADOOP_ISTRM,
+ "unbuffer", "()V");
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ HADOOP_ISTRM "#unbuffer failed:");
+ goto done;
+ }
+ ret = 0;
+
+done:
+ errno = ret;
+ return ret;
+}
+
int hdfsCloseFile(hdfsFS fs, hdfsFile file)
{
int ret;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b39ad08/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
index 072051f..3406d6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
@@ -347,6 +347,15 @@ extern "C" {
hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
int bufferSize, short replication, tSize blocksize);
+ /**
+ * hdfsUnbufferFile - Reduce the buffering done on a file.
+ *
+ * @param file The file to unbuffer.
+ * @return 0 on success
+ * ENOTSUP if the file does not support unbuffering
+ * Errno will also be set to this value.
+ */
+ int hdfsUnbufferFile(hdfsFile file);
/**
* hdfsCloseFile - Close an open file.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b39ad08/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUnbuffer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUnbuffer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUnbuffer.java
new file mode 100644
index 0000000..52c33e9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUnbuffer.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.PeerCache;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestUnbuffer {
+ private static final Log LOG =
+ LogFactory.getLog(TestUnbuffer.class.getName());
+
+ /**
+ * Test that calling Unbuffer closes sockets.
+ */
+ @Test
+ public void testUnbufferClosesSockets() throws Exception {
+ Configuration conf = new Configuration();
+ // Set a new ClientContext. This way, we will have our own PeerCache,
+ // rather than sharing one with other unit tests.
+ conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT,
+ "testUnbufferClosesSocketsContext");
+
+ // Disable short-circuit reads. With short-circuit, we wouldn't hold open a
+ // TCP socket.
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
+
+ // Set a really long socket timeout to avoid test timing issues.
+ conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+ 100000000L);
+ conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
+ 100000000L);
+
+ MiniDFSCluster cluster = null;
+ FSDataInputStream stream = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).build();
+ DistributedFileSystem dfs = (DistributedFileSystem)
+ FileSystem.newInstance(conf);
+ final Path TEST_PATH = new Path("/test1");
+ DFSTestUtil.createFile(dfs, TEST_PATH, 128, (short)1, 1);
+ stream = dfs.open(TEST_PATH);
+ // Read a byte. This will trigger the creation of a block reader.
+ stream.seek(2);
+ int b = stream.read();
+ Assert.assertTrue(-1 != b);
+
+ // The Peer cache should start off empty.
+ PeerCache cache = dfs.getClient().getClientContext().getPeerCache();
+ Assert.assertEquals(0, cache.size());
+
+ // Unbuffer should clear the block reader and return the socket to the
+ // cache.
+ stream.unbuffer();
+ stream.seek(2);
+ Assert.assertEquals(1, cache.size());
+ int b2 = stream.read();
+ Assert.assertEquals(b, b2);
+ } finally {
+ if (stream != null) {
+ IOUtils.cleanup(null, stream);
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test opening many files via TCP (not short-circuit).
+ *
+ * This is practical when using unbuffer, because it reduces the number of
+ * sockets and amount of memory that we use.
+ */
+ @Test
+ public void testOpenManyFilesViaTcp() throws Exception {
+ final int NUM_OPENS = 500;
+ Configuration conf = new Configuration();
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
+ MiniDFSCluster cluster = null;
+ FSDataInputStream[] streams = new FSDataInputStream[NUM_OPENS];
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).build();
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ final Path TEST_PATH = new Path("/testFile");
+ DFSTestUtil.createFile(dfs, TEST_PATH, 131072, (short)1, 1);
+
+ for (int i = 0; i < NUM_OPENS; i++) {
+ streams[i] = dfs.open(TEST_PATH);
+ LOG.info("opening file " + i + "...");
+ Assert.assertTrue(-1 != streams[i].read());
+ streams[i].unbuffer();
+ }
+ } finally {
+ for (FSDataInputStream stream : streams) {
+ IOUtils.cleanup(null, stream);
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+}