You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/02 07:58:18 UTC
[06/50] [abbrv] hadoop git commit: HDFS-8951. Move the shortcircuit
package to hdfs-client. Contributed by Mingliang Liu.
HDFS-8951. Move the shortcircuit package to hdfs-client. Contributed by Mingliang Liu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c992bcf9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c992bcf9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c992bcf9
Branch: refs/heads/HDFS-7285
Commit: c992bcf9c136d3df686655a80e636bb7bb0664da
Parents: a4d9acc
Author: Haohui Mai <wh...@apache.org>
Authored: Wed Aug 26 14:02:48 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Wed Aug 26 14:02:48 2015 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSUtilClient.java | 26 +
.../server/datanode/BlockMetadataHeader.java | 209 ++++
.../hadoop/hdfs/shortcircuit/ClientMmap.java | 75 ++
.../hdfs/shortcircuit/DomainSocketFactory.java | 196 ++++
.../hdfs/shortcircuit/ShortCircuitCache.java | 1066 +++++++++++++++++
.../hdfs/shortcircuit/ShortCircuitReplica.java | 352 ++++++
.../shortcircuit/ShortCircuitReplicaInfo.java | 64 ++
.../apache/hadoop/hdfs/util/IOUtilsClient.java | 46 +
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../apache/hadoop/hdfs/BlockReaderFactory.java | 2 +-
.../java/org/apache/hadoop/hdfs/DFSClient.java | 25 -
.../apache/hadoop/hdfs/RemoteBlockReader.java | 2 +-
.../apache/hadoop/hdfs/RemoteBlockReader2.java | 2 +-
.../server/datanode/BlockMetadataHeader.java | 211 ----
.../datanode/fsdataset/impl/FsDatasetImpl.java | 19 +-
.../impl/RamDiskAsyncLazyPersistService.java | 8 +-
.../hadoop/hdfs/shortcircuit/ClientMmap.java | 75 --
.../hdfs/shortcircuit/DomainSocketFactory.java | 194 ----
.../hdfs/shortcircuit/ShortCircuitCache.java | 1068 ------------------
.../hdfs/shortcircuit/ShortCircuitReplica.java | 349 ------
.../shortcircuit/ShortCircuitReplicaInfo.java | 64 --
21 files changed, 2056 insertions(+), 2000 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index fa1f5e6..3d0acb0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -36,11 +36,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -429,4 +431,28 @@ public class DFSUtilClient {
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.ENGLISH);
return df.format(date);
}
+
+ private static final Map<String, Boolean> localAddrMap = Collections
+ .synchronizedMap(new HashMap<String, Boolean>());
+
+ public static boolean isLocalAddress(InetSocketAddress targetAddr) {
+ InetAddress addr = targetAddr.getAddress();
+ Boolean cached = localAddrMap.get(addr.getHostAddress());
+ if (cached != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Address " + targetAddr +
+ (cached ? " is local" : " is not local"));
+ }
+ return cached;
+ }
+
+ boolean local = NetUtils.isLocalAddress(addr);
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Address " + targetAddr +
+ (local ? " is local" : " is not local"));
+ }
+ localAddrMap.put(addr.getHostAddress(), local);
+ return local;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
new file mode 100644
index 0000000..d298690
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.DataChecksum;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * BlockMetadataHeader manages metadata for data blocks on Datanodes.
+ * This is not related to the Block related functionality in Namenode.
+ * The biggest part of data block metadata is CRC for the block.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockMetadataHeader {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ BlockMetadataHeader.class);
+
+ public static final short VERSION = 1;
+
+ /**
+ * Header includes everything except the checksum(s) themselves.
+ * Version is two bytes. Following it is the DataChecksum
+ * that occupies 5 bytes.
+ */
+ private final short version;
+ private DataChecksum checksum = null;
+
+ @VisibleForTesting
+ public BlockMetadataHeader(short version, DataChecksum checksum) {
+ this.checksum = checksum;
+ this.version = version;
+ }
+
+ /** Get the version */
+ public short getVersion() {
+ return version;
+ }
+
+ /** Get the checksum */
+ public DataChecksum getChecksum() {
+ return checksum;
+ }
+
+ /**
+ * Read the checksum header from the meta file.
+ * @return the data checksum obtained from the header.
+ */
+ public static DataChecksum readDataChecksum(File metaFile, int bufSize)
+ throws IOException {
+ DataInputStream in = null;
+ try {
+ in = new DataInputStream(new BufferedInputStream(
+ new FileInputStream(metaFile), bufSize));
+ return readDataChecksum(in, metaFile);
+ } finally {
+ IOUtils.closeStream(in);
+ }
+ }
+
+ /**
+ * Read the checksum header from the meta input stream.
+ * @return the data checksum obtained from the header.
+ */
+ public static DataChecksum readDataChecksum(final DataInputStream metaIn,
+ final Object name) throws IOException {
+ // read and handle the common header here. For now just a version
+ final BlockMetadataHeader header = readHeader(metaIn);
+ if (header.getVersion() != VERSION) {
+ LOG.warn("Unexpected meta-file version for " + name
+ + ": version in file is " + header.getVersion()
+ + " but expected version is " + VERSION);
+ }
+ return header.getChecksum();
+ }
+
+ /**
+ * Read the header without changing the position of the FileChannel.
+ *
+ * @param fc The FileChannel to read.
+ * @return the Metadata Header.
+ * @throws IOException on error.
+ */
+ public static BlockMetadataHeader preadHeader(FileChannel fc)
+ throws IOException {
+ final byte arr[] = new byte[getHeaderSize()];
+ ByteBuffer buf = ByteBuffer.wrap(arr);
+
+ while (buf.hasRemaining()) {
+ if (fc.read(buf, 0) <= 0) {
+ throw new EOFException("unexpected EOF while reading " +
+ "metadata file header");
+ }
+ }
+ short version = (short)((arr[0] << 8) | (arr[1] & 0xff));
+ DataChecksum dataChecksum = DataChecksum.newDataChecksum(arr, 2);
+ return new BlockMetadataHeader(version, dataChecksum);
+ }
+
+ /**
+ * This reads all the fields till the beginning of checksum.
+ * @return Metadata Header
+ * @throws IOException
+ */
+ public static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
+ return readHeader(in.readShort(), in);
+ }
+
+ /**
+ * Reads header at the top of metadata file and returns the header.
+ *
+ * @return metadata header for the block
+ * @throws IOException
+ */
+ public static BlockMetadataHeader readHeader(File file) throws IOException {
+ DataInputStream in = null;
+ try {
+ in = new DataInputStream(new BufferedInputStream(
+ new FileInputStream(file)));
+ return readHeader(in);
+ } finally {
+ IOUtils.closeStream(in);
+ }
+ }
+
+ /**
+ * Read the header at the beginning of the given block meta file.
+ * The current file position will be altered by this method.
+ * If an error occurs, the file is <em>not</em> closed.
+ */
+ public static BlockMetadataHeader readHeader(RandomAccessFile raf) throws IOException {
+ byte[] buf = new byte[getHeaderSize()];
+ raf.seek(0);
+ raf.readFully(buf, 0, buf.length);
+ return readHeader(new DataInputStream(new ByteArrayInputStream(buf)));
+ }
+
+ // Version is already read.
+ private static BlockMetadataHeader readHeader(short version, DataInputStream in)
+ throws IOException {
+ DataChecksum checksum = DataChecksum.newDataChecksum(in);
+ return new BlockMetadataHeader(version, checksum);
+ }
+
+ /**
+ * This writes all the fields till the beginning of checksum.
+ * @param out DataOutputStream
+ * @throws IOException
+ */
+ @VisibleForTesting
+ public static void writeHeader(DataOutputStream out,
+ BlockMetadataHeader header)
+ throws IOException {
+ out.writeShort(header.getVersion());
+ header.getChecksum().writeHeader(out);
+ }
+
+ /**
+ * Writes all the fields till the beginning of checksum.
+ * @throws IOException on error
+ */
+ public static void writeHeader(DataOutputStream out, DataChecksum checksum)
+ throws IOException {
+ writeHeader(out, new BlockMetadataHeader(VERSION, checksum));
+ }
+
+ /**
+ * Returns the size of the header
+ */
+ public static int getHeaderSize() {
+ return Short.SIZE/Byte.SIZE + DataChecksum.getChecksumHeaderSize();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java
new file mode 100644
index 0000000..2d871fc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.shortcircuit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.io.Closeable;
+import java.nio.MappedByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A reference to a memory-mapped region used by an HDFS client.
+ */
+@InterfaceAudience.Private
+public class ClientMmap implements Closeable {
+ static final Logger LOG = LoggerFactory.getLogger(ClientMmap.class);
+
+ /**
+ * A reference to the block replica which this mmap relates to.
+ */
+ private ShortCircuitReplica replica;
+
+ /**
+ * The java ByteBuffer object.
+ */
+ private final MappedByteBuffer map;
+
+ /**
+ * Whether or not this ClientMmap anchors the replica into memory while
+ * it exists. Closing an anchored ClientMmap unanchors the replica.
+ */
+ private final boolean anchored;
+
+ ClientMmap(ShortCircuitReplica replica, MappedByteBuffer map,
+ boolean anchored) {
+ this.replica = replica;
+ this.map = map;
+ this.anchored = anchored;
+ }
+
+ /**
+ * Close the ClientMmap object.
+ */
+ @Override
+ public void close() {
+ if (replica != null) {
+ if (anchored) {
+ replica.removeNoChecksumAnchor();
+ }
+ replica.unref();
+ }
+ replica = null;
+ }
+
+ public MappedByteBuffer getMappedByteBuffer() {
+ return map;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
new file mode 100644
index 0000000..6a7d39d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.shortcircuit;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.util.PerformanceAdvisory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DomainSocketFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ DomainSocketFactory.class);
+
+ public enum PathState {
+ UNUSABLE(false, false),
+ SHORT_CIRCUIT_DISABLED(true, false),
+ VALID(true, true);
+
+ PathState(boolean usableForDataTransfer, boolean usableForShortCircuit) {
+ this.usableForDataTransfer = usableForDataTransfer;
+ this.usableForShortCircuit = usableForShortCircuit;
+ }
+
+ public boolean getUsableForDataTransfer() {
+ return usableForDataTransfer;
+ }
+
+ public boolean getUsableForShortCircuit() {
+ return usableForShortCircuit;
+ }
+
+ private final boolean usableForDataTransfer;
+ private final boolean usableForShortCircuit;
+ }
+
+ public static class PathInfo {
+ private final static PathInfo NOT_CONFIGURED =
+ new PathInfo("", PathState.UNUSABLE);
+
+ final private String path;
+ final private PathState state;
+
+ PathInfo(String path, PathState state) {
+ this.path = path;
+ this.state = state;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public PathState getPathState() {
+ return state;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("PathInfo{path=").append(path).
+ append(", state=").append(state).append("}").toString();
+ }
+ }
+
+ /**
+ * Information about domain socket paths.
+ */
+ final Cache<String, PathState> pathMap =
+ CacheBuilder.newBuilder()
+ .expireAfterWrite(10, TimeUnit.MINUTES)
+ .build();
+
+ public DomainSocketFactory(ShortCircuitConf conf) {
+ final String feature;
+ if (conf.isShortCircuitLocalReads() && (!conf.isUseLegacyBlockReaderLocal())) {
+ feature = "The short-circuit local reads feature";
+ } else if (conf.isDomainSocketDataTraffic()) {
+ feature = "UNIX domain socket data traffic";
+ } else {
+ feature = null;
+ }
+
+ if (feature == null) {
+ PerformanceAdvisory.LOG.debug(
+ "Both short-circuit local reads and UNIX domain socket are disabled.");
+ } else {
+ if (conf.getDomainSocketPath().isEmpty()) {
+ throw new HadoopIllegalArgumentException(feature + " is enabled but "
+ + HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY + " is not set.");
+ } else if (DomainSocket.getLoadingFailureReason() != null) {
+ LOG.warn(feature + " cannot be used because "
+ + DomainSocket.getLoadingFailureReason());
+ } else {
+ LOG.debug(feature + " is enabled.");
+ }
+ }
+ }
+
+ /**
+ * Get information about a domain socket path.
+ *
+ * @param addr The inet address to use.
+ * @param conf The client configuration.
+ *
+ * @return Information about the socket path.
+ */
+ public PathInfo getPathInfo(InetSocketAddress addr, ShortCircuitConf conf) {
+ // If there is no domain socket path configured, we can't use domain
+ // sockets.
+ if (conf.getDomainSocketPath().isEmpty()) return PathInfo.NOT_CONFIGURED;
+ // If we can't do anything with the domain socket, don't create it.
+ if (!conf.isDomainSocketDataTraffic() &&
+ (!conf.isShortCircuitLocalReads() || conf.isUseLegacyBlockReaderLocal())) {
+ return PathInfo.NOT_CONFIGURED;
+ }
+ // If the DomainSocket code is not loaded, we can't create
+ // DomainSocket objects.
+ if (DomainSocket.getLoadingFailureReason() != null) {
+ return PathInfo.NOT_CONFIGURED;
+ }
+ // UNIX domain sockets can only be used to talk to local peers
+ if (!DFSUtilClient.isLocalAddress(addr)) return PathInfo.NOT_CONFIGURED;
+ String escapedPath = DomainSocket.getEffectivePath(
+ conf.getDomainSocketPath(), addr.getPort());
+ PathState status = pathMap.getIfPresent(escapedPath);
+ if (status == null) {
+ return new PathInfo(escapedPath, PathState.VALID);
+ } else {
+ return new PathInfo(escapedPath, status);
+ }
+ }
+
+ public DomainSocket createSocket(PathInfo info, int socketTimeout) {
+ Preconditions.checkArgument(info.getPathState() != PathState.UNUSABLE);
+ boolean success = false;
+ DomainSocket sock = null;
+ try {
+ sock = DomainSocket.connect(info.getPath());
+ sock.setAttribute(DomainSocket.RECEIVE_TIMEOUT, socketTimeout);
+ success = true;
+ } catch (IOException e) {
+ LOG.warn("error creating DomainSocket", e);
+ // fall through
+ } finally {
+ if (!success) {
+ if (sock != null) {
+ IOUtils.closeQuietly(sock);
+ }
+ pathMap.put(info.getPath(), PathState.UNUSABLE);
+ sock = null;
+ }
+ }
+ return sock;
+ }
+
+ public void disableShortCircuitForPath(String path) {
+ pathMap.put(path, PathState.SHORT_CIRCUIT_DISABLED);
+ }
+
+ public void disableDomainSocketPath(String path) {
+ pathMap.put(path, PathState.UNUSABLE);
+ }
+
+ @VisibleForTesting
+ public void clearPathMap() {
+ pathMap.invalidateAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
new file mode 100644
index 0000000..52c1a6e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
@@ -0,0 +1,1066 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.shortcircuit;
+
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.hdfs.net.DomainPeer;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
+import org.apache.hadoop.hdfs.util.IOUtilsClient;
+import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.DomainSocketWatcher;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.Waitable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The ShortCircuitCache tracks things which the client needs to access
+ * HDFS block files via short-circuit.
+ *
+ * These things include: memory-mapped regions, file descriptors, and shared
+ * memory areas for communicating with the DataNode.
+ */
+@InterfaceAudience.Private
+public class ShortCircuitCache implements Closeable {
+ public static final Logger LOG = LoggerFactory.getLogger(
+ ShortCircuitCache.class);
+
+ /**
+ * Expiry thread which makes sure that the file descriptors get closed
+ * after a while.
+ */
+ private class CacheCleaner implements Runnable, Closeable {
+ private ScheduledFuture<?> future;
+
+ /**
+ * Run the CacheCleaner thread.
+ *
+ * Whenever a thread requests a ShortCircuitReplica object, we will make
+ * sure it gets one. That ShortCircuitReplica object can then be re-used
+ * when another thread requests a ShortCircuitReplica object for the same
+ * block. So in that sense, there is no maximum size to the cache.
+ *
+ * However, when a ShortCircuitReplica object is unreferenced by the
+ * thread(s) that are using it, it becomes evictable. There are two
+ * separate eviction lists-- one for mmaped objects, and another for
+ * non-mmaped objects. We do this in order to avoid having the regular
+ * files kick the mmaped files out of the cache too quickly. Reusing
+ * an already-existing mmap gives a huge performance boost, since the
+ * page table entries don't have to be re-populated. Both the mmap
+ * and non-mmap evictable lists have maximum sizes and maximum lifespans.
+ */
+ @Override
+ public void run() {
+ ShortCircuitCache.this.lock.lock();
+ try {
+ if (ShortCircuitCache.this.closed) return;
+ long curMs = Time.monotonicNow();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this + ": cache cleaner running at " + curMs);
+ }
+
+ int numDemoted = demoteOldEvictableMmaped(curMs);
+ int numPurged = 0;
+ Long evictionTimeNs = Long.valueOf(0);
+ while (true) {
+ Entry<Long, ShortCircuitReplica> entry =
+ evictable.ceilingEntry(evictionTimeNs);
+ if (entry == null) break;
+ evictionTimeNs = entry.getKey();
+ long evictionTimeMs =
+ TimeUnit.MILLISECONDS.convert(evictionTimeNs, TimeUnit.NANOSECONDS);
+ if (evictionTimeMs + maxNonMmappedEvictableLifespanMs >= curMs) break;
+ ShortCircuitReplica replica = entry.getValue();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("CacheCleaner: purging " + replica + ": " +
+ StringUtils.getStackTrace(Thread.currentThread()));
+ }
+ purge(replica);
+ numPurged++;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this + ": finishing cache cleaner run started at " +
+ curMs + ". Demoted " + numDemoted + " mmapped replicas; " +
+ "purged " + numPurged + " replicas.");
+ }
+ } finally {
+ ShortCircuitCache.this.lock.unlock();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (future != null) {
+ future.cancel(false);
+ }
+ }
+
+ public void setFuture(ScheduledFuture<?> future) {
+ this.future = future;
+ }
+
+ /**
+ * Get the rate at which this cleaner thread should be scheduled.
+ *
+ * We do this by taking the minimum expiration time and dividing by 4.
+ *
+ * @return the rate in milliseconds at which this thread should be
+ * scheduled.
+ */
+ public long getRateInMs() {
+ long minLifespanMs =
+ Math.min(maxNonMmappedEvictableLifespanMs,
+ maxEvictableMmapedLifespanMs);
+ long sampleTimeMs = minLifespanMs / 4;
+ return (sampleTimeMs < 1) ? 1 : sampleTimeMs;
+ }
+ }
+
+ /**
+ * A task which asks the DataNode to release a short-circuit shared memory
+ * slot. If successful, this will tell the DataNode to stop monitoring
+ * changes to the mlock status of the replica associated with the slot.
+ * It will also allow us (the client) to re-use this slot for another
+ * replica. If we can't communicate with the DataNode for some reason,
+ * we tear down the shared memory segment to avoid being in an inconsistent
+ * state.
+ */
+ private class SlotReleaser implements Runnable {
+ /**
+ * The slot that we need to release.
+ */
+ private final Slot slot;
+
+ SlotReleaser(Slot slot) {
+ this.slot = slot;
+ }
+
+ @Override
+ public void run() {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(ShortCircuitCache.this + ": about to release " + slot);
+ }
+ final DfsClientShm shm = (DfsClientShm)slot.getShm();
+ final DomainSocket shmSock = shm.getPeer().getDomainSocket();
+ final String path = shmSock.getPath();
+ boolean success = false;
+ try (DomainSocket sock = DomainSocket.connect(path);
+ DataOutputStream out = new DataOutputStream(
+ new BufferedOutputStream(sock.getOutputStream()))) {
+ new Sender(out).releaseShortCircuitFds(slot.getSlotId());
+ DataInputStream in = new DataInputStream(sock.getInputStream());
+ ReleaseShortCircuitAccessResponseProto resp =
+ ReleaseShortCircuitAccessResponseProto.parseFrom(
+ PBHelperClient.vintPrefixed(in));
+ if (resp.getStatus() != Status.SUCCESS) {
+ String error = resp.hasError() ? resp.getError() : "(unknown)";
+ throw new IOException(resp.getStatus().toString() + ": " + error);
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(ShortCircuitCache.this + ": released " + slot);
+ }
+ success = true;
+ } catch (IOException e) {
+ LOG.error(ShortCircuitCache.this + ": failed to release " +
+ "short-circuit shared memory slot " + slot + " by sending " +
+ "ReleaseShortCircuitAccessRequestProto to " + path +
+ ". Closing shared memory segment.", e);
+ } finally {
+ if (success) {
+ shmManager.freeSlot(slot);
+ } else {
+ shm.getEndpointShmManager().shutdown(shm);
+ }
+ }
+ }
+ }
+
+ public interface ShortCircuitReplicaCreator {
+ /**
+ * Attempt to create a ShortCircuitReplica object.
+ *
+ * This callback will be made without holding any locks.
+ *
+ * @return a non-null ShortCircuitReplicaInfo object.
+ */
+ ShortCircuitReplicaInfo createShortCircuitReplicaInfo();
+ }
+
+ /**
+ * Lock protecting the cache.
+ */
+ private final ReentrantLock lock = new ReentrantLock();
+
+ /**
+ * The executor service that runs the cacheCleaner.
+ */
+ private final ScheduledThreadPoolExecutor cleanerExecutor
+ = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
+ setDaemon(true).setNameFormat("ShortCircuitCache_Cleaner").
+ build());
+
+ /**
+ * The executor service that runs the cacheCleaner.
+ */
+ private final ScheduledThreadPoolExecutor releaserExecutor
+ = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
+ setDaemon(true).setNameFormat("ShortCircuitCache_SlotReleaser").
+ build());
+
+ /**
+ * A map containing all ShortCircuitReplicaInfo objects, organized by Key.
+ * ShortCircuitReplicaInfo objects may contain a replica, or an InvalidToken
+ * exception.
+ */
+ private final HashMap<ExtendedBlockId, Waitable<ShortCircuitReplicaInfo>>
+ replicaInfoMap = new HashMap<ExtendedBlockId,
+ Waitable<ShortCircuitReplicaInfo>>();
+
+ /**
+ * The CacheCleaner. We don't create this and schedule it until it becomes
+ * necessary.
+ */
+ private CacheCleaner cacheCleaner;
+
+ /**
+ * Tree of evictable elements.
+ *
+ * Maps (unique) insertion time in nanoseconds to the element.
+ */
+ private final TreeMap<Long, ShortCircuitReplica> evictable =
+ new TreeMap<Long, ShortCircuitReplica>();
+
+ /**
+ * Maximum total size of the cache, including both mmapped and
+ * no$-mmapped elements.
+ */
+ private final int maxTotalSize;
+
+ /**
+ * Non-mmaped elements older than this will be closed.
+ */
+ private long maxNonMmappedEvictableLifespanMs;
+
+ /**
+ * Tree of mmaped evictable elements.
+ *
+ * Maps (unique) insertion time in nanoseconds to the element.
+ */
+ private final TreeMap<Long, ShortCircuitReplica> evictableMmapped =
+ new TreeMap<Long, ShortCircuitReplica>();
+
+ /**
+ * Maximum number of mmaped evictable elements.
+ */
+ private int maxEvictableMmapedSize;
+
+ /**
+ * Mmaped elements older than this will be closed.
+ */
+ private final long maxEvictableMmapedLifespanMs;
+
+ /**
+ * The minimum number of milliseconds we'll wait after an unsuccessful
+ * mmap attempt before trying again.
+ */
+ private final long mmapRetryTimeoutMs;
+
+ /**
+ * How long we will keep replicas in the cache before declaring them
+ * to be stale.
+ */
+ private final long staleThresholdMs;
+
+ /**
+ * True if the ShortCircuitCache is closed.
+ */
+ private boolean closed = false;
+
+ /**
+ * Number of existing mmaps associated with this cache.
+ */
+ private int outstandingMmapCount = 0;
+
+ /**
+ * Manages short-circuit shared memory segments for the client.
+ */
+ private final DfsClientShmManager shmManager;
+
+ public static ShortCircuitCache fromConf(ShortCircuitConf conf) {
+ return new ShortCircuitCache(
+ conf.getShortCircuitStreamsCacheSize(),
+ conf.getShortCircuitStreamsCacheExpiryMs(),
+ conf.getShortCircuitMmapCacheSize(),
+ conf.getShortCircuitMmapCacheExpiryMs(),
+ conf.getShortCircuitMmapCacheRetryTimeout(),
+ conf.getShortCircuitCacheStaleThresholdMs(),
+ conf.getShortCircuitSharedMemoryWatcherInterruptCheckMs());
+ }
+
+ public ShortCircuitCache(int maxTotalSize, long maxNonMmappedEvictableLifespanMs,
+ int maxEvictableMmapedSize, long maxEvictableMmapedLifespanMs,
+ long mmapRetryTimeoutMs, long staleThresholdMs, int shmInterruptCheckMs) {
+ Preconditions.checkArgument(maxTotalSize >= 0);
+ this.maxTotalSize = maxTotalSize;
+ Preconditions.checkArgument(maxNonMmappedEvictableLifespanMs >= 0);
+ this.maxNonMmappedEvictableLifespanMs = maxNonMmappedEvictableLifespanMs;
+ Preconditions.checkArgument(maxEvictableMmapedSize >= 0);
+ this.maxEvictableMmapedSize = maxEvictableMmapedSize;
+ Preconditions.checkArgument(maxEvictableMmapedLifespanMs >= 0);
+ this.maxEvictableMmapedLifespanMs = maxEvictableMmapedLifespanMs;
+ this.mmapRetryTimeoutMs = mmapRetryTimeoutMs;
+ this.staleThresholdMs = staleThresholdMs;
+ DfsClientShmManager shmManager = null;
+ if ((shmInterruptCheckMs > 0) &&
+ (DomainSocketWatcher.getLoadingFailureReason() == null)) {
+ try {
+ shmManager = new DfsClientShmManager(shmInterruptCheckMs);
+ } catch (IOException e) {
+ LOG.error("failed to create ShortCircuitShmManager", e);
+ }
+ }
+ this.shmManager = shmManager;
+ }
+
+ public long getStaleThresholdMs() {
+ return staleThresholdMs;
+ }
+
+ /**
+ * Increment the reference count of a replica, and remove it from any free
+ * list it may be in.
+ *
+ * You must hold the cache lock while calling this function.
+ *
+ * @param replica The replica we're removing.
+ */
+ private void ref(ShortCircuitReplica replica) {
+ lock.lock();
+ try {
+ Preconditions.checkArgument(replica.refCount > 0,
+ "can't ref %s because its refCount reached %d", replica,
+ replica.refCount);
+ Long evictableTimeNs = replica.getEvictableTimeNs();
+ replica.refCount++;
+ if (evictableTimeNs != null) {
+ String removedFrom = removeEvictable(replica);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": " + removedFrom +
+ " no longer contains " + replica + ". refCount " +
+ (replica.refCount - 1) + " -> " + replica.refCount +
+ StringUtils.getStackTrace(Thread.currentThread()));
+
+ }
+ } else if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": replica refCount " +
+ (replica.refCount - 1) + " -> " + replica.refCount +
+ StringUtils.getStackTrace(Thread.currentThread()));
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Unreference a replica.
+ *
+ * You must hold the cache lock while calling this function.
+ *
+ * @param replica The replica being unreferenced.
+ */
+ void unref(ShortCircuitReplica replica) {
+ lock.lock();
+ try {
+ // If the replica is stale or unusable, but we haven't purged it yet,
+ // let's do that. It would be a shame to evict a non-stale replica so
+ // that we could put a stale or unusable one into the cache.
+ if (!replica.purged) {
+ String purgeReason = null;
+ if (!replica.getDataStream().getChannel().isOpen()) {
+ purgeReason = "purging replica because its data channel is closed.";
+ } else if (!replica.getMetaStream().getChannel().isOpen()) {
+ purgeReason = "purging replica because its meta channel is closed.";
+ } else if (replica.isStale()) {
+ purgeReason = "purging replica because it is stale.";
+ }
+ if (purgeReason != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this + ": " + purgeReason);
+ }
+ purge(replica);
+ }
+ }
+ String addedString = "";
+ boolean shouldTrimEvictionMaps = false;
+ int newRefCount = --replica.refCount;
+ if (newRefCount == 0) {
+ // Close replica, since there are no remaining references to it.
+ Preconditions.checkArgument(replica.purged,
+ "Replica %s reached a refCount of 0 without being purged", replica);
+ replica.close();
+ } else if (newRefCount == 1) {
+ Preconditions.checkState(null == replica.getEvictableTimeNs(),
+ "Replica %s had a refCount higher than 1, " +
+ "but was still evictable (evictableTimeNs = %d)",
+ replica, replica.getEvictableTimeNs());
+ if (!replica.purged) {
+ // Add the replica to the end of an eviction list.
+ // Eviction lists are sorted by time.
+ if (replica.hasMmap()) {
+ insertEvictable(System.nanoTime(), replica, evictableMmapped);
+ addedString = "added to evictableMmapped, ";
+ } else {
+ insertEvictable(System.nanoTime(), replica, evictable);
+ addedString = "added to evictable, ";
+ }
+ shouldTrimEvictionMaps = true;
+ }
+ } else {
+ Preconditions.checkArgument(replica.refCount >= 0,
+ "replica's refCount went negative (refCount = %d" +
+ " for %s)", replica.refCount, replica);
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": unref replica " + replica +
+ ": " + addedString + " refCount " +
+ (newRefCount + 1) + " -> " + newRefCount +
+ StringUtils.getStackTrace(Thread.currentThread()));
+ }
+ if (shouldTrimEvictionMaps) {
+ trimEvictionMaps();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Demote old evictable mmaps into the regular eviction map.
+ *
+ * You must hold the cache lock while calling this function.
+ *
+ * @param now Current time in monotonic milliseconds.
+ * @return Number of replicas demoted.
+ */
+ private int demoteOldEvictableMmaped(long now) {
+ int numDemoted = 0;
+ boolean needMoreSpace = false;
+ Long evictionTimeNs = Long.valueOf(0);
+
+ while (true) {
+ Entry<Long, ShortCircuitReplica> entry =
+ evictableMmapped.ceilingEntry(evictionTimeNs);
+ if (entry == null) break;
+ evictionTimeNs = entry.getKey();
+ long evictionTimeMs =
+ TimeUnit.MILLISECONDS.convert(evictionTimeNs, TimeUnit.NANOSECONDS);
+ if (evictionTimeMs + maxEvictableMmapedLifespanMs >= now) {
+ if (evictableMmapped.size() < maxEvictableMmapedSize) {
+ break;
+ }
+ needMoreSpace = true;
+ }
+ ShortCircuitReplica replica = entry.getValue();
+ if (LOG.isTraceEnabled()) {
+ String rationale = needMoreSpace ? "because we need more space" :
+ "because it's too old";
+ LOG.trace("demoteOldEvictable: demoting " + replica + ": " +
+ rationale + ": " +
+ StringUtils.getStackTrace(Thread.currentThread()));
+ }
+ removeEvictable(replica, evictableMmapped);
+ munmap(replica);
+ insertEvictable(evictionTimeNs, replica, evictable);
+ numDemoted++;
+ }
+ return numDemoted;
+ }
+
+ /**
+ * Trim the eviction lists.
+ */
+ private void trimEvictionMaps() {
+ long now = Time.monotonicNow();
+ demoteOldEvictableMmaped(now);
+
+ while (true) {
+ long evictableSize = evictable.size();
+ long evictableMmappedSize = evictableMmapped.size();
+ if (evictableSize + evictableMmappedSize <= maxTotalSize) {
+ return;
+ }
+ ShortCircuitReplica replica;
+ if (evictableSize == 0) {
+ replica = evictableMmapped.firstEntry().getValue();
+ } else {
+ replica = evictable.firstEntry().getValue();
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": trimEvictionMaps is purging " + replica +
+ StringUtils.getStackTrace(Thread.currentThread()));
+ }
+ purge(replica);
+ }
+ }
+
+ /**
+ * Munmap a replica, updating outstandingMmapCount.
+ *
+ * @param replica The replica to munmap.
+ */
+ private void munmap(ShortCircuitReplica replica) {
+ replica.munmap();
+ outstandingMmapCount--;
+ }
+
+ /**
+ * Remove a replica from an evictable map.
+ *
+ * @param replica The replica to remove.
+ * @return The map it was removed from.
+ */
+ private String removeEvictable(ShortCircuitReplica replica) {
+ if (replica.hasMmap()) {
+ removeEvictable(replica, evictableMmapped);
+ return "evictableMmapped";
+ } else {
+ removeEvictable(replica, evictable);
+ return "evictable";
+ }
+ }
+
+ /**
+ * Remove a replica from an evictable map.
+ *
+ * @param replica The replica to remove.
+ * @param map The map to remove it from.
+ */
+ private void removeEvictable(ShortCircuitReplica replica,
+ TreeMap<Long, ShortCircuitReplica> map) {
+ Long evictableTimeNs = replica.getEvictableTimeNs();
+ Preconditions.checkNotNull(evictableTimeNs);
+ ShortCircuitReplica removed = map.remove(evictableTimeNs);
+ Preconditions.checkState(removed == replica,
+ "failed to make %s unevictable", replica);
+ replica.setEvictableTimeNs(null);
+ }
+
+ /**
+ * Insert a replica into an evictable map.
+ *
+ * If an element already exists with this eviction time, we add a nanosecond
+ * to it until we find an unused key.
+ *
+ * @param evictionTimeNs The eviction time in absolute nanoseconds.
+ * @param replica The replica to insert.
+ * @param map The map to insert it into.
+ */
+ private void insertEvictable(Long evictionTimeNs,
+ ShortCircuitReplica replica, TreeMap<Long, ShortCircuitReplica> map) {
+ while (map.containsKey(evictionTimeNs)) {
+ evictionTimeNs++;
+ }
+ Preconditions.checkState(null == replica.getEvictableTimeNs());
+ replica.setEvictableTimeNs(evictionTimeNs);
+ map.put(evictionTimeNs, replica);
+ }
+
+ /**
+ * Purge a replica from the cache.
+ *
+ * This doesn't necessarily close the replica, since there may be
+ * outstanding references to it. However, it does mean the cache won't
+ * hand it out to anyone after this.
+ *
+ * You must hold the cache lock while calling this function.
+ *
+ * @param replica The replica being removed.
+ */
+ private void purge(ShortCircuitReplica replica) {
+ boolean removedFromInfoMap = false;
+ String evictionMapName = null;
+ Preconditions.checkArgument(!replica.purged);
+ replica.purged = true;
+ Waitable<ShortCircuitReplicaInfo> val = replicaInfoMap.get(replica.key);
+ if (val != null) {
+ ShortCircuitReplicaInfo info = val.getVal();
+ if ((info != null) && (info.getReplica() == replica)) {
+ replicaInfoMap.remove(replica.key);
+ removedFromInfoMap = true;
+ }
+ }
+ Long evictableTimeNs = replica.getEvictableTimeNs();
+ if (evictableTimeNs != null) {
+ evictionMapName = removeEvictable(replica);
+ }
+ if (LOG.isTraceEnabled()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(this).append(": ").append(": purged ").
+ append(replica).append(" from the cache.");
+ if (removedFromInfoMap) {
+ builder.append(" Removed from the replicaInfoMap.");
+ }
+ if (evictionMapName != null) {
+ builder.append(" Removed from ").append(evictionMapName);
+ }
+ LOG.trace(builder.toString());
+ }
+ unref(replica);
+ }
+
+ /**
+ * Fetch or create a replica.
+ *
+ * You must hold the cache lock while calling this function.
+ *
+ * @param key Key to use for lookup.
+ * @param creator Replica creator callback. Will be called without
+ * the cache lock being held.
+ *
+ * @return Null if no replica could be found or created.
+ * The replica, otherwise.
+ */
+ public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key,
+ ShortCircuitReplicaCreator creator) {
+ Waitable<ShortCircuitReplicaInfo> newWaitable = null;
+ lock.lock();
+ try {
+ ShortCircuitReplicaInfo info = null;
+ do {
+ if (closed) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": can't fetchOrCreate " + key +
+ " because the cache is closed.");
+ }
+ return null;
+ }
+ Waitable<ShortCircuitReplicaInfo> waitable = replicaInfoMap.get(key);
+ if (waitable != null) {
+ try {
+ info = fetch(key, waitable);
+ } catch (RetriableException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this + ": retrying " + e.getMessage());
+ }
+ continue;
+ }
+ }
+ } while (false);
+ if (info != null) return info;
+ // We need to load the replica ourselves.
+ newWaitable = new Waitable<ShortCircuitReplicaInfo>(lock.newCondition());
+ replicaInfoMap.put(key, newWaitable);
+ } finally {
+ lock.unlock();
+ }
+ return create(key, creator, newWaitable);
+ }
+
+ /**
+ * Fetch an existing ReplicaInfo object.
+ *
+ * @param key The key that we're using.
+ * @param waitable The waitable object to wait on.
+ * @return The existing ReplicaInfo object, or null if there is
+ * none.
+ *
+ * @throws RetriableException If the caller needs to retry.
+ */
+ private ShortCircuitReplicaInfo fetch(ExtendedBlockId key,
+ Waitable<ShortCircuitReplicaInfo> waitable) throws RetriableException {
+ // Another thread is already in the process of loading this
+ // ShortCircuitReplica. So we simply wait for it to complete.
+ ShortCircuitReplicaInfo info;
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": found waitable for " + key);
+ }
+ info = waitable.await();
+ } catch (InterruptedException e) {
+ LOG.info(this + ": interrupted while waiting for " + key);
+ Thread.currentThread().interrupt();
+ throw new RetriableException("interrupted");
+ }
+ if (info.getInvalidTokenException() != null) {
+ LOG.info(this + ": could not get " + key + " due to InvalidToken " +
+ "exception.", info.getInvalidTokenException());
+ return info;
+ }
+ ShortCircuitReplica replica = info.getReplica();
+ if (replica == null) {
+ LOG.warn(this + ": failed to get " + key);
+ return info;
+ }
+ if (replica.purged) {
+ // Ignore replicas that have already been purged from the cache.
+ throw new RetriableException("Ignoring purged replica " +
+ replica + ". Retrying.");
+ }
+ // Check if the replica is stale before using it.
+ // If it is, purge it and retry.
+ if (replica.isStale()) {
+ LOG.info(this + ": got stale replica " + replica + ". Removing " +
+ "this replica from the replicaInfoMap and retrying.");
+ // Remove the cache's reference to the replica. This may or may not
+ // trigger a close.
+ purge(replica);
+ throw new RetriableException("ignoring stale replica " + replica);
+ }
+ ref(replica);
+ return info;
+ }
+
+ private ShortCircuitReplicaInfo create(ExtendedBlockId key,
+ ShortCircuitReplicaCreator creator,
+ Waitable<ShortCircuitReplicaInfo> newWaitable) {
+ // Handle loading a new replica.
+ ShortCircuitReplicaInfo info = null;
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": loading " + key);
+ }
+ info = creator.createShortCircuitReplicaInfo();
+ } catch (RuntimeException e) {
+ LOG.warn(this + ": failed to load " + key, e);
+ }
+ if (info == null) info = new ShortCircuitReplicaInfo();
+ lock.lock();
+ try {
+ if (info.getReplica() != null) {
+ // On success, make sure the cache cleaner thread is running.
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": successfully loaded " + info.getReplica());
+ }
+ startCacheCleanerThreadIfNeeded();
+ // Note: new ShortCircuitReplicas start with a refCount of 2,
+ // indicating that both this cache and whoever requested the
+ // creation of the replica hold a reference. So we don't need
+ // to increment the reference count here.
+ } else {
+ // On failure, remove the waitable from the replicaInfoMap.
+ Waitable<ShortCircuitReplicaInfo> waitableInMap = replicaInfoMap.get(key);
+ if (waitableInMap == newWaitable) replicaInfoMap.remove(key);
+ if (info.getInvalidTokenException() != null) {
+ LOG.info(this + ": could not load " + key + " due to InvalidToken " +
+ "exception.", info.getInvalidTokenException());
+ } else {
+ LOG.warn(this + ": failed to load " + key);
+ }
+ }
+ newWaitable.provide(info);
+ } finally {
+ lock.unlock();
+ }
+ return info;
+ }
+
+ private void startCacheCleanerThreadIfNeeded() {
+ if (cacheCleaner == null) {
+ cacheCleaner = new CacheCleaner();
+ long rateMs = cacheCleaner.getRateInMs();
+ ScheduledFuture<?> future =
+ cleanerExecutor.scheduleAtFixedRate(cacheCleaner, rateMs, rateMs,
+ TimeUnit.MILLISECONDS);
+ cacheCleaner.setFuture(future);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this + ": starting cache cleaner thread which will run " +
+ "every " + rateMs + " ms");
+ }
+ }
+ }
+
+ ClientMmap getOrCreateClientMmap(ShortCircuitReplica replica,
+ boolean anchored) {
+ Condition newCond;
+ lock.lock();
+ try {
+ while (replica.mmapData != null) {
+ if (replica.mmapData instanceof MappedByteBuffer) {
+ ref(replica);
+ MappedByteBuffer mmap = (MappedByteBuffer)replica.mmapData;
+ return new ClientMmap(replica, mmap, anchored);
+ } else if (replica.mmapData instanceof Long) {
+ long lastAttemptTimeMs = (Long)replica.mmapData;
+ long delta = Time.monotonicNow() - lastAttemptTimeMs;
+ if (delta < mmapRetryTimeoutMs) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": can't create client mmap for " +
+ replica + " because we failed to " +
+ "create one just " + delta + "ms ago.");
+ }
+ return null;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": retrying client mmap for " + replica +
+ ", " + delta + " ms after the previous failure.");
+ }
+ } else if (replica.mmapData instanceof Condition) {
+ Condition cond = (Condition)replica.mmapData;
+ cond.awaitUninterruptibly();
+ } else {
+ Preconditions.checkState(false, "invalid mmapData type %s",
+ replica.mmapData.getClass().getName());
+ }
+ }
+ newCond = lock.newCondition();
+ replica.mmapData = newCond;
+ } finally {
+ lock.unlock();
+ }
+ MappedByteBuffer map = replica.loadMmapInternal();
+ lock.lock();
+ try {
+ if (map == null) {
+ replica.mmapData = Long.valueOf(Time.monotonicNow());
+ newCond.signalAll();
+ return null;
+ } else {
+ outstandingMmapCount++;
+ replica.mmapData = map;
+ ref(replica);
+ newCond.signalAll();
+ return new ClientMmap(replica, map, anchored);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Close the cache and free all associated resources.
+ */
+ @Override
+ public void close() {
+ try {
+ lock.lock();
+ if (closed) return;
+ closed = true;
+ LOG.info(this + ": closing");
+ maxNonMmappedEvictableLifespanMs = 0;
+ maxEvictableMmapedSize = 0;
+ // Close and join cacheCleaner thread.
+ IOUtilsClient.cleanup(LOG, cacheCleaner);
+ // Purge all replicas.
+ while (true) {
+ Entry<Long, ShortCircuitReplica> entry = evictable.firstEntry();
+ if (entry == null) break;
+ purge(entry.getValue());
+ }
+ while (true) {
+ Entry<Long, ShortCircuitReplica> entry = evictableMmapped.firstEntry();
+ if (entry == null) break;
+ purge(entry.getValue());
+ }
+ } finally {
+ lock.unlock();
+ }
+
+ releaserExecutor.shutdown();
+ cleanerExecutor.shutdown();
+ // wait for existing tasks to terminate
+ try {
+ if (!releaserExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+ LOG.error("Forcing SlotReleaserThreadPool to shutdown!");
+ releaserExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ releaserExecutor.shutdownNow();
+ Thread.currentThread().interrupt();
+ LOG.error("Interrupted while waiting for SlotReleaserThreadPool "
+ + "to terminate", e);
+ }
+
+ // wait for existing tasks to terminate
+ try {
+ if (!cleanerExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+ LOG.error("Forcing CleanerThreadPool to shutdown!");
+ cleanerExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ cleanerExecutor.shutdownNow();
+ Thread.currentThread().interrupt();
+ LOG.error("Interrupted while waiting for CleanerThreadPool "
+ + "to terminate", e);
+ }
+ IOUtilsClient.cleanup(LOG, shmManager);
+ }
+
+ @VisibleForTesting // ONLY for testing
+ public interface CacheVisitor {
+ void visit(int numOutstandingMmaps,
+ Map<ExtendedBlockId, ShortCircuitReplica> replicas,
+ Map<ExtendedBlockId, InvalidToken> failedLoads,
+ Map<Long, ShortCircuitReplica> evictable,
+ Map<Long, ShortCircuitReplica> evictableMmapped);
+ }
+
+ @VisibleForTesting // ONLY for testing
+ public void accept(CacheVisitor visitor) {
+ lock.lock();
+ try {
+ Map<ExtendedBlockId, ShortCircuitReplica> replicas =
+ new HashMap<ExtendedBlockId, ShortCircuitReplica>();
+ Map<ExtendedBlockId, InvalidToken> failedLoads =
+ new HashMap<ExtendedBlockId, InvalidToken>();
+ for (Entry<ExtendedBlockId, Waitable<ShortCircuitReplicaInfo>> entry :
+ replicaInfoMap.entrySet()) {
+ Waitable<ShortCircuitReplicaInfo> waitable = entry.getValue();
+ if (waitable.hasVal()) {
+ if (waitable.getVal().getReplica() != null) {
+ replicas.put(entry.getKey(), waitable.getVal().getReplica());
+ } else {
+ // The exception may be null here, indicating a failed load that
+ // isn't the result of an invalid block token.
+ failedLoads.put(entry.getKey(),
+ waitable.getVal().getInvalidTokenException());
+ }
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("visiting ").append(visitor.getClass().getName()).
+ append("with outstandingMmapCount=").append(outstandingMmapCount).
+ append(", replicas=");
+ String prefix = "";
+ for (Entry<ExtendedBlockId, ShortCircuitReplica> entry : replicas.entrySet()) {
+ builder.append(prefix).append(entry.getValue());
+ prefix = ",";
+ }
+ prefix = "";
+ builder.append(", failedLoads=");
+ for (Entry<ExtendedBlockId, InvalidToken> entry : failedLoads.entrySet()) {
+ builder.append(prefix).append(entry.getValue());
+ prefix = ",";
+ }
+ prefix = "";
+ builder.append(", evictable=");
+ for (Entry<Long, ShortCircuitReplica> entry : evictable.entrySet()) {
+ builder.append(prefix).append(entry.getKey()).
+ append(":").append(entry.getValue());
+ prefix = ",";
+ }
+ prefix = "";
+ builder.append(", evictableMmapped=");
+ for (Entry<Long, ShortCircuitReplica> entry : evictableMmapped.entrySet()) {
+ builder.append(prefix).append(entry.getKey()).
+ append(":").append(entry.getValue());
+ prefix = ",";
+ }
+ LOG.debug(builder.toString());
+ }
+ visitor.visit(outstandingMmapCount, replicas, failedLoads,
+ evictable, evictableMmapped);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ShortCircuitCache(0x" +
+ Integer.toHexString(System.identityHashCode(this)) + ")";
+ }
+
+ /**
+ * Allocate a new shared memory slot.
+ *
+ * @param datanode The datanode to allocate a shm slot with.
+ * @param peer A peer connected to the datanode.
+ * @param usedPeer Will be set to true if we use up the provided peer.
+ * @param blockId The block id and block pool id of the block we're
+ * allocating this slot for.
+ * @param clientName The name of the DFSClient allocating the shared
+ * memory.
+ * @return Null if short-circuit shared memory is disabled;
+ * a short-circuit memory slot otherwise.
+ * @throws IOException An exception if there was an error talking to
+ * the datanode.
+ */
+ public Slot allocShmSlot(DatanodeInfo datanode,
+ DomainPeer peer, MutableBoolean usedPeer,
+ ExtendedBlockId blockId, String clientName) throws IOException {
+ if (shmManager != null) {
+ return shmManager.allocSlot(datanode, peer, usedPeer,
+ blockId, clientName);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Free a slot immediately.
+ *
+ * ONLY use this if the DataNode is not yet aware of the slot.
+ *
+ * @param slot The slot to free.
+ */
+ public void freeSlot(Slot slot) {
+ Preconditions.checkState(shmManager != null);
+ slot.makeInvalid();
+ shmManager.freeSlot(slot);
+ }
+
+ /**
+ * Schedule a shared memory slot to be released.
+ *
+ * @param slot The slot to release.
+ */
+ public void scheduleSlotReleaser(Slot slot) {
+ Preconditions.checkState(shmManager != null);
+ releaserExecutor.execute(new SlotReleaser(slot));
+ }
+
+ @VisibleForTesting
+ public DfsClientShmManager getDfsClientShmManager() {
+ return shmManager;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java
new file mode 100644
index 0000000..37566e2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.shortcircuit;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
+import org.apache.hadoop.hdfs.util.IOUtilsClient;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A ShortCircuitReplica object contains file descriptors for a block that
+ * we are reading via short-circuit local reads.
+ *
+ * The file descriptors can be shared between multiple threads because
+ * all the operations we perform are stateless-- i.e., we use pread
+ * instead of read, to avoid using the shared position state.
+ */
+@InterfaceAudience.Private
+public class ShortCircuitReplica {
+ public static final Logger LOG = LoggerFactory.getLogger(
+ ShortCircuitCache.class);
+
+ /**
+ * Identifies this ShortCircuitReplica object.
+ */
+ final ExtendedBlockId key;
+
+ /**
+ * The block data input stream.
+ */
+ private final FileInputStream dataStream;
+
+ /**
+ * The block metadata input stream.
+ *
+ * TODO: make this nullable if the file has no checksums on disk.
+ */
+ private final FileInputStream metaStream;
+
+ /**
+ * Block metadata header.
+ */
+ private final BlockMetadataHeader metaHeader;
+
+ /**
+ * The cache we belong to.
+ */
+ private final ShortCircuitCache cache;
+
+ /**
+ * Monotonic time at which the replica was created.
+ */
+ private final long creationTimeMs;
+
+ /**
+ * If non-null, the shared memory slot associated with this replica.
+ */
+ private final Slot slot;
+
+ /**
+ * Current mmap state.
+ *
+ * Protected by the cache lock.
+ */
+ Object mmapData;
+
+ /**
+ * True if this replica has been purged from the cache; false otherwise.
+ *
+ * Protected by the cache lock.
+ */
+ boolean purged = false;
+
+ /**
+ * Number of external references to this replica. Replicas are referenced
+ * by the cache, BlockReaderLocal instances, and by ClientMmap instances.
+ * The number starts at 2 because when we create a replica, it is referenced
+ * by both the cache and the requester.
+ *
+ * Protected by the cache lock.
+ */
+ int refCount = 2;
+
+ /**
+ * The monotonic time in nanoseconds at which the replica became evictable, or
+ * null if it is not evictable.
+ *
+ * Protected by the cache lock.
+ */
+ private Long evictableTimeNs = null;
+
+ public ShortCircuitReplica(ExtendedBlockId key,
+ FileInputStream dataStream, FileInputStream metaStream,
+ ShortCircuitCache cache, long creationTimeMs, Slot slot) throws IOException {
+ this.key = key;
+ this.dataStream = dataStream;
+ this.metaStream = metaStream;
+ this.metaHeader =
+ BlockMetadataHeader.preadHeader(metaStream.getChannel());
+ if (metaHeader.getVersion() != 1) {
+ throw new IOException("invalid metadata header version " +
+ metaHeader.getVersion() + ". Can only handle version 1.");
+ }
+ this.cache = cache;
+ this.creationTimeMs = creationTimeMs;
+ this.slot = slot;
+ }
+
+ /**
+ * Decrement the reference count.
+ */
+ public void unref() {
+ cache.unref(this);
+ }
+
+ /**
+ * Check if the replica is stale.
+ *
+ * Must be called with the cache lock held.
+ */
+ boolean isStale() {
+ if (slot != null) {
+ // Check staleness by looking at the shared memory area we use to
+ // communicate with the DataNode.
+ boolean stale = !slot.isValid();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": checked shared memory segment. isStale=" + stale);
+ }
+ return stale;
+ } else {
+ // Fall back to old, time-based staleness method.
+ long deltaMs = Time.monotonicNow() - creationTimeMs;
+ long staleThresholdMs = cache.getStaleThresholdMs();
+ if (deltaMs > staleThresholdMs) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + " is stale because it's " + deltaMs +
+ " ms old, and staleThresholdMs = " + staleThresholdMs);
+ }
+ return true;
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + " is not stale because it's only " + deltaMs +
+ " ms old, and staleThresholdMs = " + staleThresholdMs);
+ }
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Try to add a no-checksum anchor to our shared memory slot.
+ *
+ * It is only possible to add this anchor when the block is mlocked on the Datanode.
+ * The DataNode will not munlock the block until the number of no-checksum anchors
+ * for the block reaches zero.
+ *
+ * This method does not require any synchronization.
+ *
+ * @return True if we successfully added a no-checksum anchor.
+ */
+ public boolean addNoChecksumAnchor() {
+ if (slot == null) {
+ return false;
+ }
+ boolean result = slot.addAnchor();
+ if (LOG.isTraceEnabled()) {
+ if (result) {
+ LOG.trace(this + ": added no-checksum anchor to slot " + slot);
+ } else {
+ LOG.trace(this + ": could not add no-checksum anchor to slot " + slot);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Remove a no-checksum anchor for our shared memory slot.
+ *
+ * This method does not require any synchronization.
+ */
+ public void removeNoChecksumAnchor() {
+ if (slot != null) {
+ slot.removeAnchor();
+ }
+ }
+
+ /**
+ * Check if the replica has an associated mmap that has been fully loaded.
+ *
+ * Must be called with the cache lock held.
+ */
+ @VisibleForTesting
+ public boolean hasMmap() {
+ return ((mmapData != null) && (mmapData instanceof MappedByteBuffer));
+ }
+
+ /**
+ * Free the mmap associated with this replica.
+ *
+ * Must be called with the cache lock held.
+ */
+ void munmap() {
+ MappedByteBuffer mmap = (MappedByteBuffer)mmapData;
+ NativeIO.POSIX.munmap(mmap);
+ mmapData = null;
+ }
+
+ /**
+ * Close the replica.
+ *
+ * Must be called after there are no more references to the replica in the
+ * cache or elsewhere.
+ */
+ void close() {
+ String suffix = "";
+
+ Preconditions.checkState(refCount == 0,
+ "tried to close replica with refCount %d: %s", refCount, this);
+ refCount = -1;
+ Preconditions.checkState(purged,
+ "tried to close unpurged replica %s", this);
+ if (hasMmap()) {
+ munmap();
+ if (LOG.isTraceEnabled()) {
+ suffix += " munmapped.";
+ }
+ }
+ IOUtilsClient.cleanup(LOG, dataStream, metaStream);
+ if (slot != null) {
+ cache.scheduleSlotReleaser(slot);
+ if (LOG.isTraceEnabled()) {
+ suffix += " scheduling " + slot + " for later release.";
+ }
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("closed " + this + suffix);
+ }
+ }
+
+ public FileInputStream getDataStream() {
+ return dataStream;
+ }
+
+ public FileInputStream getMetaStream() {
+ return metaStream;
+ }
+
+ public BlockMetadataHeader getMetaHeader() {
+ return metaHeader;
+ }
+
+ public ExtendedBlockId getKey() {
+ return key;
+ }
+
+ public ClientMmap getOrCreateClientMmap(boolean anchor) {
+ return cache.getOrCreateClientMmap(this, anchor);
+ }
+
+ MappedByteBuffer loadMmapInternal() {
+ try {
+ FileChannel channel = dataStream.getChannel();
+ MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, 0,
+ Math.min(Integer.MAX_VALUE, channel.size()));
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + ": created mmap of size " + channel.size());
+ }
+ return mmap;
+ } catch (IOException e) {
+ LOG.warn(this + ": mmap error", e);
+ return null;
+ } catch (RuntimeException e) {
+ LOG.warn(this + ": mmap error", e);
+ return null;
+ }
+ }
+
+ /**
+ * Get the evictable time in nanoseconds.
+ *
+ * Note: you must hold the cache lock to call this function.
+ *
+ * @return the evictable time in nanoseconds.
+ */
+ public Long getEvictableTimeNs() {
+ return evictableTimeNs;
+ }
+
+ /**
+ * Set the evictable time in nanoseconds.
+ *
+ * Note: you must hold the cache lock to call this function.
+ *
+ * @param evictableTimeNs The evictable time in nanoseconds, or null
+ * to set no evictable time.
+ */
+ void setEvictableTimeNs(Long evictableTimeNs) {
+ this.evictableTimeNs = evictableTimeNs;
+ }
+
+ @VisibleForTesting
+ public Slot getSlot() {
+ return slot;
+ }
+
+ /**
+ * Convert the replica to a string for debugging purposes.
+ * Note that we can't take the lock here.
+ */
+ @Override
+ public String toString() {
+ return new StringBuilder().append("ShortCircuitReplica{").
+ append("key=").append(key).
+ append(", metaHeader.version=").append(metaHeader.getVersion()).
+ append(", metaHeader.checksum=").append(metaHeader.getChecksum()).
+ append(", ident=").append("0x").
+ append(Integer.toHexString(System.identityHashCode(this))).
+ append(", creationTimeMs=").append(creationTimeMs).
+ append("}").toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java
new file mode 100644
index 0000000..ef0019f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.shortcircuit;
+
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+
+public final class ShortCircuitReplicaInfo {
+ private final ShortCircuitReplica replica;
+ private final InvalidToken exc;
+
+ public ShortCircuitReplicaInfo() {
+ this.replica = null;
+ this.exc = null;
+ }
+
+ public ShortCircuitReplicaInfo(ShortCircuitReplica replica) {
+ this.replica = replica;
+ this.exc = null;
+ }
+
+ public ShortCircuitReplicaInfo(InvalidToken exc) {
+ this.replica = null;
+ this.exc = exc;
+ }
+
+ public ShortCircuitReplica getReplica() {
+ return replica;
+ }
+
+ public InvalidToken getInvalidTokenException() {
+ return exc;
+ }
+
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ String prefix = "";
+ builder.append("ShortCircuitReplicaInfo{");
+ if (replica != null) {
+ builder.append(prefix).append(replica);
+ prefix = ", ";
+ }
+ if (exc != null) {
+ builder.append(prefix).append(exc);
+ prefix = ", ";
+ }
+ builder.append("}");
+ return builder.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java
new file mode 100644
index 0000000..56f8ecc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import org.slf4j.Logger;
+
+import java.io.IOException;
+
+public class IOUtilsClient {
+ /**
+ * Close the Closeable objects and <b>ignore</b> any {@link IOException} or
+ * null pointers. Must only be used for cleanup in exception handlers.
+ *
+ * @param log the log to record problems to at debug level. Can be null.
+ * @param closeables the objects to close
+ */
+ public static void cleanup(Logger log, java.io.Closeable... closeables) {
+ for (java.io.Closeable c : closeables) {
+ if (c != null) {
+ try {
+ c.close();
+ } catch(Throwable e) {
+ if (log != null && log.isDebugEnabled()) {
+ log.debug("Exception in closing " + c, e);
+ }
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index fd91744..607de79 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -838,6 +838,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8846. Add a unit test for INotify functionality across a layout
version upgrade (Zhe Zhang via Colin P. McCabe)
+ HDFS-8951. Move the shortcircuit package to hdfs-client.
+ (Mingliang Liu via wheat9)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
index fec6b85..52ba899 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -419,7 +419,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": trying to construct BlockReaderLocalLegacy");
}
- if (!DFSClient.isLocalAddress(inetSocketAddress)) {
+ if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " +
"the address " + inetSocketAddress + " is not local");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 47aaed6..3c49ef7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -39,7 +39,6 @@ import java.net.URI;
import java.net.UnknownHostException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
@@ -703,30 +702,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
}
- private static final Map<String, Boolean> localAddrMap = Collections
- .synchronizedMap(new HashMap<String, Boolean>());
-
- public static boolean isLocalAddress(InetSocketAddress targetAddr) {
- InetAddress addr = targetAddr.getAddress();
- Boolean cached = localAddrMap.get(addr.getHostAddress());
- if (cached != null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Address " + targetAddr +
- (cached ? " is local" : " is not local"));
- }
- return cached;
- }
-
- boolean local = NetUtils.isLocalAddress(addr);
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Address " + targetAddr +
- (local ? " is local" : " is not local"));
- }
- localAddrMap.put(addr.getHostAddress(), local);
- return local;
- }
-
/**
* Cancel a delegation token
* @param token the token to cancel
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
index 05a9f2c..015e154 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@ -351,7 +351,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
checksum.getBytesPerChecksum(),
checksum.getChecksumSize());
- this.isLocal = DFSClient.isLocalAddress(NetUtils.
+ this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
createSocketAddr(datanodeID.getXferAddr()));
this.peer = peer;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
index 4c23d36..2a77cb6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@ -290,7 +290,7 @@ public class RemoteBlockReader2 implements BlockReader {
DataChecksum checksum, boolean verifyChecksum,
long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
DatanodeID datanodeID, PeerCache peerCache) {
- this.isLocal = DFSClient.isLocalAddress(NetUtils.
+ this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
createSocketAddr(datanodeID.getXferAddr()));
// Path is used only for printing block and file information in debug
this.peer = peer;