You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/02 07:58:18 UTC

[06/50] [abbrv] hadoop git commit: HDFS-8951. Move the shortcircuit package to hdfs-client. Contributed by Mingliang Liu.

HDFS-8951. Move the shortcircuit package to hdfs-client. Contributed by Mingliang Liu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c992bcf9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c992bcf9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c992bcf9

Branch: refs/heads/HDFS-7285
Commit: c992bcf9c136d3df686655a80e636bb7bb0664da
Parents: a4d9acc
Author: Haohui Mai <wh...@apache.org>
Authored: Wed Aug 26 14:02:48 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Wed Aug 26 14:02:48 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSUtilClient.java   |   26 +
 .../server/datanode/BlockMetadataHeader.java    |  209 ++++
 .../hadoop/hdfs/shortcircuit/ClientMmap.java    |   75 ++
 .../hdfs/shortcircuit/DomainSocketFactory.java  |  196 ++++
 .../hdfs/shortcircuit/ShortCircuitCache.java    | 1066 +++++++++++++++++
 .../hdfs/shortcircuit/ShortCircuitReplica.java  |  352 ++++++
 .../shortcircuit/ShortCircuitReplicaInfo.java   |   64 ++
 .../apache/hadoop/hdfs/util/IOUtilsClient.java  |   46 +
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |    3 +
 .../apache/hadoop/hdfs/BlockReaderFactory.java  |    2 +-
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |   25 -
 .../apache/hadoop/hdfs/RemoteBlockReader.java   |    2 +-
 .../apache/hadoop/hdfs/RemoteBlockReader2.java  |    2 +-
 .../server/datanode/BlockMetadataHeader.java    |  211 ----
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |   19 +-
 .../impl/RamDiskAsyncLazyPersistService.java    |    8 +-
 .../hadoop/hdfs/shortcircuit/ClientMmap.java    |   75 --
 .../hdfs/shortcircuit/DomainSocketFactory.java  |  194 ----
 .../hdfs/shortcircuit/ShortCircuitCache.java    | 1068 ------------------
 .../hdfs/shortcircuit/ShortCircuitReplica.java  |  349 ------
 .../shortcircuit/ShortCircuitReplicaInfo.java   |   64 --
 21 files changed, 2056 insertions(+), 2000 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index fa1f5e6..3d0acb0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -36,11 +36,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.text.SimpleDateFormat;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -429,4 +431,28 @@ public class DFSUtilClient {
         new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.ENGLISH);
     return df.format(date);
   }
+
+  private static final Map<String, Boolean> localAddrMap = Collections
+      .synchronizedMap(new HashMap<String, Boolean>());
+
+  public static boolean isLocalAddress(InetSocketAddress targetAddr) {
+    InetAddress addr = targetAddr.getAddress();
+    Boolean cached = localAddrMap.get(addr.getHostAddress());
+    if (cached != null) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Address " + targetAddr +
+            (cached ? " is local" : " is not local"));
+      }
+      return cached;
+    }
+
+    boolean local = NetUtils.isLocalAddress(addr);
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Address " + targetAddr +
+          (local ? " is local" : " is not local"));
+    }
+    localAddrMap.put(addr.getHostAddress(), local);
+    return local;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
new file mode 100644
index 0000000..d298690
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.DataChecksum;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * BlockMetadataHeader manages metadata for data blocks on Datanodes.
+ * This is not related to the Block related functionality in Namenode.
+ * The biggest part of data block metadata is CRC for the block.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockMetadataHeader {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      BlockMetadataHeader.class);
+
+  public static final short VERSION = 1;
+  
+  /**
+   * Header includes everything except the checksum(s) themselves.
+   * Version is two bytes. Following it is the DataChecksum
+   * that occupies 5 bytes. 
+   */
+  private final short version;
+  private DataChecksum checksum = null;
+
+  @VisibleForTesting
+  public BlockMetadataHeader(short version, DataChecksum checksum) {
+    this.checksum = checksum;
+    this.version = version;
+  }
+  
+  /** Get the version */
+  public short getVersion() {
+    return version;
+  }
+
+  /** Get the checksum */
+  public DataChecksum getChecksum() {
+    return checksum;
+  }
+
+  /**
+   * Read the checksum header from the meta file.
+   * @return the data checksum obtained from the header.
+   */
+  public static DataChecksum readDataChecksum(File metaFile, int bufSize)
+      throws IOException {
+    DataInputStream in = null;
+    try {
+      in = new DataInputStream(new BufferedInputStream(
+        new FileInputStream(metaFile), bufSize));
+      return readDataChecksum(in, metaFile);
+    } finally {
+      IOUtils.closeStream(in);
+    }
+  }
+
+  /**
+   * Read the checksum header from the meta input stream.
+   * @return the data checksum obtained from the header.
+   */
+  public static DataChecksum readDataChecksum(final DataInputStream metaIn,
+      final Object name) throws IOException {
+    // read and handle the common header here. For now just a version
+    final BlockMetadataHeader header = readHeader(metaIn);
+    if (header.getVersion() != VERSION) {
+      LOG.warn("Unexpected meta-file version for " + name
+          + ": version in file is " + header.getVersion()
+          + " but expected version is " + VERSION);
+    }
+    return header.getChecksum();
+  }
+
+  /**
+   * Read the header without changing the position of the FileChannel.
+   *
+   * @param fc The FileChannel to read.
+   * @return the Metadata Header.
+   * @throws IOException on error.
+   */
+  public static BlockMetadataHeader preadHeader(FileChannel fc)
+      throws IOException {
+    final byte arr[] = new byte[getHeaderSize()];
+    ByteBuffer buf = ByteBuffer.wrap(arr);
+
+    while (buf.hasRemaining()) {
+      if (fc.read(buf, 0) <= 0) {
+        throw new EOFException("unexpected EOF while reading " +
+            "metadata file header");
+      }
+    }
+    short version = (short)((arr[0] << 8) | (arr[1] & 0xff));
+    DataChecksum dataChecksum = DataChecksum.newDataChecksum(arr, 2);
+    return new BlockMetadataHeader(version, dataChecksum);
+  }
+
+  /**
+   * This reads all the fields till the beginning of checksum.
+   * @return Metadata Header
+   * @throws IOException
+   */
+  public static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
+    return readHeader(in.readShort(), in);
+  }
+  
+  /**
+   * Reads header at the top of metadata file and returns the header.
+   * 
+   * @return metadata header for the block
+   * @throws IOException
+   */
+  public static BlockMetadataHeader readHeader(File file) throws IOException {
+    DataInputStream in = null;
+    try {
+      in = new DataInputStream(new BufferedInputStream(
+                               new FileInputStream(file)));
+      return readHeader(in);
+    } finally {
+      IOUtils.closeStream(in);
+    }
+  }
+  
+  /**
+   * Read the header at the beginning of the given block meta file.
+   * The current file position will be altered by this method.
+   * If an error occurs, the file is <em>not</em> closed.
+   */
+  public static BlockMetadataHeader readHeader(RandomAccessFile raf) throws IOException {
+    byte[] buf = new byte[getHeaderSize()];
+    raf.seek(0);
+    raf.readFully(buf, 0, buf.length);
+    return readHeader(new DataInputStream(new ByteArrayInputStream(buf)));
+  }
+  
+  // Version is already read.
+  private static BlockMetadataHeader readHeader(short version, DataInputStream in) 
+                                   throws IOException {
+    DataChecksum checksum = DataChecksum.newDataChecksum(in);
+    return new BlockMetadataHeader(version, checksum);
+  }
+  
+  /**
+   * This writes all the fields till the beginning of checksum.
+   * @param out DataOutputStream
+   * @throws IOException
+   */
+  @VisibleForTesting
+  public static void writeHeader(DataOutputStream out, 
+                                  BlockMetadataHeader header) 
+                                  throws IOException {
+    out.writeShort(header.getVersion());
+    header.getChecksum().writeHeader(out);
+  }
+  
+  /**
+   * Writes all the fields till the beginning of checksum.
+   * @throws IOException on error
+   */
+  public static void writeHeader(DataOutputStream out, DataChecksum checksum)
+                         throws IOException {
+    writeHeader(out, new BlockMetadataHeader(VERSION, checksum));
+  }
+
+  /**
+   * Returns the size of the header
+   */
+  public static int getHeaderSize() {
+    return Short.SIZE/Byte.SIZE + DataChecksum.getChecksumHeaderSize();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java
new file mode 100644
index 0000000..2d871fc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ClientMmap.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.shortcircuit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.io.Closeable;
+import java.nio.MappedByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A reference to a memory-mapped region used by an HDFS client.
+ */
+@InterfaceAudience.Private
+public class ClientMmap implements Closeable {
+  static final Logger LOG = LoggerFactory.getLogger(ClientMmap.class);
+  
+  /**
+   * A reference to the block replica which this mmap relates to.
+   */
+  private ShortCircuitReplica replica;
+  
+  /**
+   * The java ByteBuffer object.
+   */
+  private final MappedByteBuffer map;
+
+  /**
+   * Whether or not this ClientMmap anchors the replica into memory while
+   * it exists.  Closing an anchored ClientMmap unanchors the replica.
+   */
+  private final boolean anchored;
+
+  ClientMmap(ShortCircuitReplica replica, MappedByteBuffer map,
+      boolean anchored) {
+    this.replica = replica;
+    this.map = map;
+    this.anchored = anchored;
+  }
+
+  /**
+   * Close the ClientMmap object.
+   */
+  @Override
+  public void close() {
+    if (replica != null) {
+      if (anchored) {
+        replica.removeNoChecksumAnchor();
+      }
+      replica.unref();
+    }
+    replica = null;
+  }
+
+  public MappedByteBuffer getMappedByteBuffer() {
+    return map;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
new file mode 100644
index 0000000..6a7d39d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.shortcircuit;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.util.PerformanceAdvisory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DomainSocketFactory {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      DomainSocketFactory.class);
+
+  public enum PathState {
+    UNUSABLE(false, false),
+    SHORT_CIRCUIT_DISABLED(true, false),
+    VALID(true, true);
+
+    PathState(boolean usableForDataTransfer, boolean usableForShortCircuit) {
+      this.usableForDataTransfer = usableForDataTransfer;
+      this.usableForShortCircuit = usableForShortCircuit;
+    }
+
+    public boolean getUsableForDataTransfer() {
+      return usableForDataTransfer;
+    }
+
+    public boolean getUsableForShortCircuit() {
+      return usableForShortCircuit;
+    }
+
+    private final boolean usableForDataTransfer;
+    private final boolean usableForShortCircuit;
+  }
+
+  public static class PathInfo {
+    private final static PathInfo NOT_CONFIGURED =
+          new PathInfo("", PathState.UNUSABLE);
+
+    final private String path;
+    final private PathState state;
+
+    PathInfo(String path, PathState state) {
+      this.path = path;
+      this.state = state;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    public PathState getPathState() {
+      return state;
+    }
+    
+    @Override
+    public String toString() {
+      return new StringBuilder().append("PathInfo{path=").append(path).
+          append(", state=").append(state).append("}").toString();
+    }
+  }
+
+  /**
+   * Information about domain socket paths.
+   */
+  final Cache<String, PathState> pathMap =
+      CacheBuilder.newBuilder()
+      .expireAfterWrite(10, TimeUnit.MINUTES)
+      .build();
+
+  public DomainSocketFactory(ShortCircuitConf conf) {
+    final String feature;
+    if (conf.isShortCircuitLocalReads() && (!conf.isUseLegacyBlockReaderLocal())) {
+      feature = "The short-circuit local reads feature";
+    } else if (conf.isDomainSocketDataTraffic()) {
+      feature = "UNIX domain socket data traffic";
+    } else {
+      feature = null;
+    }
+
+    if (feature == null) {
+      PerformanceAdvisory.LOG.debug(
+          "Both short-circuit local reads and UNIX domain socket are disabled.");
+    } else {
+      if (conf.getDomainSocketPath().isEmpty()) {
+        throw new HadoopIllegalArgumentException(feature + " is enabled but "
+            + HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY + " is not set.");
+      } else if (DomainSocket.getLoadingFailureReason() != null) {
+        LOG.warn(feature + " cannot be used because "
+            + DomainSocket.getLoadingFailureReason());
+      } else {
+        LOG.debug(feature + " is enabled.");
+      }
+    }
+  }
+
+  /**
+   * Get information about a domain socket path.
+   *
+   * @param addr         The inet address to use.
+   * @param conf         The client configuration.
+   *
+   * @return             Information about the socket path.
+   */
+  public PathInfo getPathInfo(InetSocketAddress addr, ShortCircuitConf conf) {
+    // If there is no domain socket path configured, we can't use domain
+    // sockets.
+    if (conf.getDomainSocketPath().isEmpty()) return PathInfo.NOT_CONFIGURED;
+    // If we can't do anything with the domain socket, don't create it.
+    if (!conf.isDomainSocketDataTraffic() &&
+        (!conf.isShortCircuitLocalReads() || conf.isUseLegacyBlockReaderLocal())) {
+      return PathInfo.NOT_CONFIGURED;
+    }
+    // If the DomainSocket code is not loaded, we can't create
+    // DomainSocket objects.
+    if (DomainSocket.getLoadingFailureReason() != null) {
+      return PathInfo.NOT_CONFIGURED;
+    }
+    // UNIX domain sockets can only be used to talk to local peers
+    if (!DFSUtilClient.isLocalAddress(addr)) return PathInfo.NOT_CONFIGURED;
+    String escapedPath = DomainSocket.getEffectivePath(
+        conf.getDomainSocketPath(), addr.getPort());
+    PathState status = pathMap.getIfPresent(escapedPath);
+    if (status == null) {
+      return new PathInfo(escapedPath, PathState.VALID);
+    } else {
+      return new PathInfo(escapedPath, status);
+    }
+  }
+
+  public DomainSocket createSocket(PathInfo info, int socketTimeout) {
+    Preconditions.checkArgument(info.getPathState() != PathState.UNUSABLE);
+    boolean success = false;
+    DomainSocket sock = null;
+    try {
+      sock = DomainSocket.connect(info.getPath());
+      sock.setAttribute(DomainSocket.RECEIVE_TIMEOUT, socketTimeout);
+      success = true;
+    } catch (IOException e) {
+      LOG.warn("error creating DomainSocket", e);
+      // fall through
+    } finally {
+      if (!success) {
+        if (sock != null) {
+          IOUtils.closeQuietly(sock);
+        }
+        pathMap.put(info.getPath(), PathState.UNUSABLE);
+        sock = null;
+      }
+    }
+    return sock;
+  }
+
+  public void disableShortCircuitForPath(String path) {
+    pathMap.put(path, PathState.SHORT_CIRCUIT_DISABLED);
+  }
+
+  public void disableDomainSocketPath(String path) {
+    pathMap.put(path, PathState.UNUSABLE);
+  }
+
+  @VisibleForTesting
+  public void clearPathMap() {
+    pathMap.invalidateAll();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
new file mode 100644
index 0000000..52c1a6e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
@@ -0,0 +1,1066 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.shortcircuit;
+
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.hdfs.net.DomainPeer;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
+import org.apache.hadoop.hdfs.util.IOUtilsClient;
+import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.DomainSocketWatcher;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.Waitable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The ShortCircuitCache tracks things which the client needs to access
+ * HDFS block files via short-circuit.
+ *
+ * These things include: memory-mapped regions, file descriptors, and shared
+ * memory areas for communicating with the DataNode.
+ */
+@InterfaceAudience.Private
+public class ShortCircuitCache implements Closeable {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      ShortCircuitCache.class);
+
+  /**
+   * Expiry thread which makes sure that the file descriptors get closed
+   * after a while.
+   */
+  private class CacheCleaner implements Runnable, Closeable {
+    private ScheduledFuture<?> future;
+
+    /**
+     * Run the CacheCleaner thread.
+     *
+     * Whenever a thread requests a ShortCircuitReplica object, we will make
+     * sure it gets one.  That ShortCircuitReplica object can then be re-used
+     * when another thread requests a ShortCircuitReplica object for the same
+     * block.  So in that sense, there is no maximum size to the cache.
+     *
+     * However, when a ShortCircuitReplica object is unreferenced by the
+     * thread(s) that are using it, it becomes evictable.  There are two
+     * separate eviction lists-- one for mmaped objects, and another for
+     * non-mmaped objects.  We do this in order to avoid having the regular
+     * files kick the mmaped files out of the cache too quickly.  Reusing
+     * an already-existing mmap gives a huge performance boost, since the
+     * page table entries don't have to be re-populated.  Both the mmap
+     * and non-mmap evictable lists have maximum sizes and maximum lifespans.
+     */
+    @Override
+    public void run() {
+      ShortCircuitCache.this.lock.lock();
+      try {
+        if (ShortCircuitCache.this.closed) return;
+        long curMs = Time.monotonicNow();
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(this + ": cache cleaner running at " + curMs);
+        }
+
+        int numDemoted = demoteOldEvictableMmaped(curMs);
+        int numPurged = 0;
+        Long evictionTimeNs = Long.valueOf(0);
+        while (true) {
+          Entry<Long, ShortCircuitReplica> entry = 
+              evictable.ceilingEntry(evictionTimeNs);
+          if (entry == null) break;
+          evictionTimeNs = entry.getKey();
+          long evictionTimeMs = 
+              TimeUnit.MILLISECONDS.convert(evictionTimeNs, TimeUnit.NANOSECONDS);
+          if (evictionTimeMs + maxNonMmappedEvictableLifespanMs >= curMs) break;
+          ShortCircuitReplica replica = entry.getValue();
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("CacheCleaner: purging " + replica + ": " + 
+                  StringUtils.getStackTrace(Thread.currentThread()));
+          }
+          purge(replica);
+          numPurged++;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(this + ": finishing cache cleaner run started at " +
+            curMs + ".  Demoted " + numDemoted + " mmapped replicas; " +
+            "purged " + numPurged + " replicas.");
+        }
+      } finally {
+        ShortCircuitCache.this.lock.unlock();
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (future != null) {
+        future.cancel(false);
+      }
+    }
+
+    public void setFuture(ScheduledFuture<?> future) {
+      this.future = future;
+    }
+
+    /**
+     * Get the rate at which this cleaner thread should be scheduled.
+     *
+     * We do this by taking the minimum expiration time and dividing by 4.
+     *
+     * @return the rate in milliseconds at which this thread should be
+     *         scheduled.
+     */
+    public long getRateInMs() {
+      long minLifespanMs =
+          Math.min(maxNonMmappedEvictableLifespanMs,
+              maxEvictableMmapedLifespanMs);
+      long sampleTimeMs = minLifespanMs / 4;
+      return (sampleTimeMs < 1) ? 1 : sampleTimeMs;
+    }
+  }
+
+  /**
+   * A task which asks the DataNode to release a short-circuit shared memory
+   * slot.  If successful, this will tell the DataNode to stop monitoring
+   * changes to the mlock status of the replica associated with the slot.
+   * It will also allow us (the client) to re-use this slot for another
+   * replica.  If we can't communicate with the DataNode for some reason,
+   * we tear down the shared memory segment to avoid being in an inconsistent
+   * state.
+   */
+  private class SlotReleaser implements Runnable {
+    /**
+     * The slot that we need to release.
+     */
+    private final Slot slot;
+
+    SlotReleaser(Slot slot) {
+      this.slot = slot;
+    }
+
+    @Override
+    public void run() {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(ShortCircuitCache.this + ": about to release " + slot);
+      }
+      final DfsClientShm shm = (DfsClientShm)slot.getShm();
+      final DomainSocket shmSock = shm.getPeer().getDomainSocket();
+      final String path = shmSock.getPath();
+      boolean success = false;
+      try (DomainSocket sock = DomainSocket.connect(path);
+           DataOutputStream out = new DataOutputStream(
+               new BufferedOutputStream(sock.getOutputStream()))) {
+        new Sender(out).releaseShortCircuitFds(slot.getSlotId());
+        DataInputStream in = new DataInputStream(sock.getInputStream());
+        ReleaseShortCircuitAccessResponseProto resp =
+            ReleaseShortCircuitAccessResponseProto.parseFrom(
+                PBHelperClient.vintPrefixed(in));
+        if (resp.getStatus() != Status.SUCCESS) {
+          String error = resp.hasError() ? resp.getError() : "(unknown)";
+          throw new IOException(resp.getStatus().toString() + ": " + error);
+        }
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(ShortCircuitCache.this + ": released " + slot);
+        }
+        success = true;
+      } catch (IOException e) {
+        LOG.error(ShortCircuitCache.this + ": failed to release " +
+            "short-circuit shared memory slot " + slot + " by sending " +
+            "ReleaseShortCircuitAccessRequestProto to " + path +
+            ".  Closing shared memory segment.", e);
+      } finally {
+        if (success) {
+          shmManager.freeSlot(slot);
+        } else {
+          shm.getEndpointShmManager().shutdown(shm);
+        }
+      }
+    }
+  }
+
+  public interface ShortCircuitReplicaCreator {
+    /**
+     * Attempt to create a ShortCircuitReplica object.
+     *
+     * This callback will be made without holding any locks.
+     *
+     * @return a non-null ShortCircuitReplicaInfo object.
+     */
+    ShortCircuitReplicaInfo createShortCircuitReplicaInfo();
+  }
+
+  /**
+   * Lock protecting the cache.
+   */
+  private final ReentrantLock lock = new ReentrantLock();
+
+  /**
+   * The executor service that runs the cacheCleaner.
+   */
+  private final ScheduledThreadPoolExecutor cleanerExecutor
+  = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
+          setDaemon(true).setNameFormat("ShortCircuitCache_Cleaner").
+          build());
+
+  /**
+   * The executor service that runs the cacheCleaner.
+   */
+  private final ScheduledThreadPoolExecutor releaserExecutor
+      = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
+          setDaemon(true).setNameFormat("ShortCircuitCache_SlotReleaser").
+          build());
+
+  /**
+   * A map containing all ShortCircuitReplicaInfo objects, organized by Key.
+   * ShortCircuitReplicaInfo objects may contain a replica, or an InvalidToken
+   * exception.
+   */
+  private final HashMap<ExtendedBlockId, Waitable<ShortCircuitReplicaInfo>> 
+      replicaInfoMap = new HashMap<ExtendedBlockId,
+          Waitable<ShortCircuitReplicaInfo>>();
+
+  /**
+   * The CacheCleaner.  We don't create this and schedule it until it becomes
+   * necessary.
+   */
+  private CacheCleaner cacheCleaner;
+
+  /**
+   * Tree of evictable elements.
+   *
+   * Maps (unique) insertion time in nanoseconds to the element.
+   */
+  private final TreeMap<Long, ShortCircuitReplica> evictable =
+      new TreeMap<Long, ShortCircuitReplica>();
+
+  /**
+   * Maximum total size of the cache, including both mmapped and
+   * no$-mmapped elements.
+   */
+  private final int maxTotalSize;
+
+  /**
+   * Non-mmaped elements older than this will be closed.
+   */
+  private long maxNonMmappedEvictableLifespanMs;
+
+  /**
+   * Tree of mmaped evictable elements.
+   *
+   * Maps (unique) insertion time in nanoseconds to the element.
+   */
+  private final TreeMap<Long, ShortCircuitReplica> evictableMmapped =
+      new TreeMap<Long, ShortCircuitReplica>();
+
+  /**
+   * Maximum number of mmaped evictable elements.
+   */
+  private int maxEvictableMmapedSize;
+
+  /**
+   * Mmaped elements older than this will be closed.
+   */
+  private final long maxEvictableMmapedLifespanMs;
+
+  /**
+   * The minimum number of milliseconds we'll wait after an unsuccessful
+   * mmap attempt before trying again.
+   */
+  private final long mmapRetryTimeoutMs;
+
+  /**
+   * How long we will keep replicas in the cache before declaring them
+   * to be stale.
+   */
+  private final long staleThresholdMs;
+
+  /**
+   * True if the ShortCircuitCache is closed.
+   */
+  private boolean closed = false;
+
+  /**
+   * Number of existing mmaps associated with this cache.
+   */
+  private int outstandingMmapCount = 0;
+
+  /**
+   * Manages short-circuit shared memory segments for the client.
+   */
+  private final DfsClientShmManager shmManager;
+
+  public static ShortCircuitCache fromConf(ShortCircuitConf conf) {
+    return new ShortCircuitCache(
+        conf.getShortCircuitStreamsCacheSize(),
+        conf.getShortCircuitStreamsCacheExpiryMs(),
+        conf.getShortCircuitMmapCacheSize(),
+        conf.getShortCircuitMmapCacheExpiryMs(),
+        conf.getShortCircuitMmapCacheRetryTimeout(),
+        conf.getShortCircuitCacheStaleThresholdMs(),
+        conf.getShortCircuitSharedMemoryWatcherInterruptCheckMs());
+  }
+
+  public ShortCircuitCache(int maxTotalSize, long maxNonMmappedEvictableLifespanMs,
+      int maxEvictableMmapedSize, long maxEvictableMmapedLifespanMs,
+      long mmapRetryTimeoutMs, long staleThresholdMs, int shmInterruptCheckMs) {
+    Preconditions.checkArgument(maxTotalSize >= 0);
+    this.maxTotalSize = maxTotalSize;
+    Preconditions.checkArgument(maxNonMmappedEvictableLifespanMs >= 0);
+    this.maxNonMmappedEvictableLifespanMs = maxNonMmappedEvictableLifespanMs;
+    Preconditions.checkArgument(maxEvictableMmapedSize >= 0);
+    this.maxEvictableMmapedSize = maxEvictableMmapedSize;
+    Preconditions.checkArgument(maxEvictableMmapedLifespanMs >= 0);
+    this.maxEvictableMmapedLifespanMs = maxEvictableMmapedLifespanMs;
+    this.mmapRetryTimeoutMs = mmapRetryTimeoutMs;
+    this.staleThresholdMs = staleThresholdMs;
+    DfsClientShmManager shmManager = null;
+    if ((shmInterruptCheckMs > 0) &&
+        (DomainSocketWatcher.getLoadingFailureReason() == null)) {
+      try {
+        shmManager = new DfsClientShmManager(shmInterruptCheckMs);
+      } catch (IOException e) {
+        LOG.error("failed to create ShortCircuitShmManager", e);
+      }
+    }
+    this.shmManager = shmManager;
+  }
+
+  public long getStaleThresholdMs() {
+    return staleThresholdMs;
+  }
+
+  /**
+   * Increment the reference count of a replica, and remove it from any free
+   * list it may be in.
+   *
+   * You must hold the cache lock while calling this function.
+   *
+   * @param replica      The replica we're removing.
+   */
+  private void ref(ShortCircuitReplica replica) {
+    lock.lock();
+    try {
+      Preconditions.checkArgument(replica.refCount > 0,
+          "can't ref %s because its refCount reached %d", replica,
+          replica.refCount);
+      Long evictableTimeNs = replica.getEvictableTimeNs();
+      replica.refCount++;
+      if (evictableTimeNs != null) {
+        String removedFrom = removeEvictable(replica);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + ": " + removedFrom +
+              " no longer contains " + replica + ".  refCount " +
+              (replica.refCount - 1) + " -> " + replica.refCount +
+              StringUtils.getStackTrace(Thread.currentThread()));
+
+        }
+      } else if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": replica  refCount " +
+            (replica.refCount - 1) + " -> " + replica.refCount +
+            StringUtils.getStackTrace(Thread.currentThread()));
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Unreference a replica.
+   *
+   * You must hold the cache lock while calling this function.
+   *
+   * @param replica   The replica being unreferenced.
+   */
+  void unref(ShortCircuitReplica replica) {
+    lock.lock();
+    try {
+      // If the replica is stale or unusable, but we haven't purged it yet,
+      // let's do that.  It would be a shame to evict a non-stale replica so
+      // that we could put a stale or unusable one into the cache.
+      if (!replica.purged) {
+        String purgeReason = null;
+        if (!replica.getDataStream().getChannel().isOpen()) {
+          purgeReason = "purging replica because its data channel is closed.";
+        } else if (!replica.getMetaStream().getChannel().isOpen()) {
+          purgeReason = "purging replica because its meta channel is closed.";
+        } else if (replica.isStale()) {
+          purgeReason = "purging replica because it is stale.";
+        }
+        if (purgeReason != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(this + ": " + purgeReason);
+          }
+          purge(replica);
+        }
+      }
+      String addedString = "";
+      boolean shouldTrimEvictionMaps = false;
+      int newRefCount = --replica.refCount;
+      if (newRefCount == 0) {
+        // Close replica, since there are no remaining references to it.
+        Preconditions.checkArgument(replica.purged,
+          "Replica %s reached a refCount of 0 without being purged", replica);
+        replica.close();
+      } else if (newRefCount == 1) {
+        Preconditions.checkState(null == replica.getEvictableTimeNs(),
+            "Replica %s had a refCount higher than 1, " +
+              "but was still evictable (evictableTimeNs = %d)",
+              replica, replica.getEvictableTimeNs());
+        if (!replica.purged) {
+          // Add the replica to the end of an eviction list.
+          // Eviction lists are sorted by time.
+          if (replica.hasMmap()) {
+            insertEvictable(System.nanoTime(), replica, evictableMmapped);
+            addedString = "added to evictableMmapped, ";
+          } else {
+            insertEvictable(System.nanoTime(), replica, evictable);
+            addedString = "added to evictable, ";
+          }
+          shouldTrimEvictionMaps = true;
+        }
+      } else {
+        Preconditions.checkArgument(replica.refCount >= 0,
+            "replica's refCount went negative (refCount = %d" +
+            " for %s)", replica.refCount, replica);
+      }
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": unref replica " + replica +
+            ": " + addedString + " refCount " +
+            (newRefCount + 1) + " -> " + newRefCount +
+            StringUtils.getStackTrace(Thread.currentThread()));
+      }
+      if (shouldTrimEvictionMaps) {
+        trimEvictionMaps();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Demote old evictable mmaps into the regular eviction map.
+   *
+   * You must hold the cache lock while calling this function.
+   *
+   * @param now   Current time in monotonic milliseconds.
+   * @return      Number of replicas demoted.
+   */
+  private int demoteOldEvictableMmaped(long now) {
+    int numDemoted = 0;
+    boolean needMoreSpace = false;
+    Long evictionTimeNs = Long.valueOf(0);
+
+    while (true) {
+      Entry<Long, ShortCircuitReplica> entry = 
+          evictableMmapped.ceilingEntry(evictionTimeNs);
+      if (entry == null) break;
+      evictionTimeNs = entry.getKey();
+      long evictionTimeMs = 
+          TimeUnit.MILLISECONDS.convert(evictionTimeNs, TimeUnit.NANOSECONDS);
+      if (evictionTimeMs + maxEvictableMmapedLifespanMs >= now) {
+        if (evictableMmapped.size() < maxEvictableMmapedSize) {
+          break;
+        }
+        needMoreSpace = true;
+      }
+      ShortCircuitReplica replica = entry.getValue();
+      if (LOG.isTraceEnabled()) {
+        String rationale = needMoreSpace ? "because we need more space" : 
+            "because it's too old";
+        LOG.trace("demoteOldEvictable: demoting " + replica + ": " +
+            rationale + ": " +
+            StringUtils.getStackTrace(Thread.currentThread()));
+      }
+      removeEvictable(replica, evictableMmapped);
+      munmap(replica);
+      insertEvictable(evictionTimeNs, replica, evictable);
+      numDemoted++;
+    }
+    return numDemoted;
+  }
+
+  /**
+   * Trim the eviction lists.
+   */
+  private void trimEvictionMaps() {
+    long now = Time.monotonicNow();
+    demoteOldEvictableMmaped(now);
+
+    while (true) {
+      long evictableSize = evictable.size();
+      long evictableMmappedSize = evictableMmapped.size();
+      if (evictableSize + evictableMmappedSize <= maxTotalSize) {
+        return;
+      }
+      ShortCircuitReplica replica;
+      if (evictableSize == 0) {
+       replica = evictableMmapped.firstEntry().getValue();
+      } else {
+       replica = evictable.firstEntry().getValue();
+      }
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": trimEvictionMaps is purging " + replica +
+          StringUtils.getStackTrace(Thread.currentThread()));
+      }
+      purge(replica);
+    }
+  }
+
+  /**
+   * Munmap a replica, updating outstandingMmapCount.
+   *
+   * @param replica  The replica to munmap.
+   */
+  private void munmap(ShortCircuitReplica replica) {
+    replica.munmap();
+    outstandingMmapCount--;
+  }
+
+  /**
+   * Remove a replica from an evictable map.
+   *
+   * @param replica   The replica to remove.
+   * @return          The map it was removed from.
+   */
+  private String removeEvictable(ShortCircuitReplica replica) {
+    if (replica.hasMmap()) {
+      removeEvictable(replica, evictableMmapped);
+      return "evictableMmapped";
+    } else {
+      removeEvictable(replica, evictable);
+      return "evictable";
+    }
+  }
+
+  /**
+   * Remove a replica from an evictable map.
+   *
+   * @param replica   The replica to remove.
+   * @param map       The map to remove it from.
+   */
+  private void removeEvictable(ShortCircuitReplica replica,
+      TreeMap<Long, ShortCircuitReplica> map) {
+    Long evictableTimeNs = replica.getEvictableTimeNs();
+    Preconditions.checkNotNull(evictableTimeNs);
+    ShortCircuitReplica removed = map.remove(evictableTimeNs);
+    Preconditions.checkState(removed == replica,
+        "failed to make %s unevictable", replica);
+    replica.setEvictableTimeNs(null);
+  }
+
+  /**
+   * Insert a replica into an evictable map.
+   *
+   * If an element already exists with this eviction time, we add a nanosecond
+   * to it until we find an unused key.
+   *
+   * @param evictionTimeNs   The eviction time in absolute nanoseconds.
+   * @param replica          The replica to insert.
+   * @param map              The map to insert it into.
+   */
+  private void insertEvictable(Long evictionTimeNs,
+      ShortCircuitReplica replica, TreeMap<Long, ShortCircuitReplica> map) {
+    while (map.containsKey(evictionTimeNs)) {
+      evictionTimeNs++;
+    }
+    Preconditions.checkState(null == replica.getEvictableTimeNs());
+    replica.setEvictableTimeNs(evictionTimeNs);
+    map.put(evictionTimeNs, replica);
+  }
+
+  /**
+   * Purge a replica from the cache.
+   *
+   * This doesn't necessarily close the replica, since there may be
+   * outstanding references to it.  However, it does mean the cache won't
+   * hand it out to anyone after this.
+   *
+   * You must hold the cache lock while calling this function.
+   *
+   * @param replica   The replica being removed.
+   */
+  private void purge(ShortCircuitReplica replica) {
+    boolean removedFromInfoMap = false;
+    String evictionMapName = null;
+    Preconditions.checkArgument(!replica.purged);
+    replica.purged = true;
+    Waitable<ShortCircuitReplicaInfo> val = replicaInfoMap.get(replica.key);
+    if (val != null) {
+      ShortCircuitReplicaInfo info = val.getVal();
+      if ((info != null) && (info.getReplica() == replica)) {
+        replicaInfoMap.remove(replica.key);
+        removedFromInfoMap = true;
+      }
+    }
+    Long evictableTimeNs = replica.getEvictableTimeNs();
+    if (evictableTimeNs != null) {
+      evictionMapName = removeEvictable(replica);
+    }
+    if (LOG.isTraceEnabled()) {
+      StringBuilder builder = new StringBuilder();
+      builder.append(this).append(": ").append(": purged ").
+          append(replica).append(" from the cache.");
+      if (removedFromInfoMap) {
+        builder.append("  Removed from the replicaInfoMap.");
+      }
+      if (evictionMapName != null) {
+        builder.append("  Removed from ").append(evictionMapName);
+      }
+      LOG.trace(builder.toString());
+    }
+    unref(replica);
+  }
+
+  /**
+   * Fetch or create a replica.
+   *
+   * You must hold the cache lock while calling this function.
+   *
+   * @param key          Key to use for lookup.
+   * @param creator      Replica creator callback.  Will be called without
+   *                     the cache lock being held.
+   *
+   * @return             Null if no replica could be found or created.
+   *                     The replica, otherwise.
+   */
+  public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key,
+      ShortCircuitReplicaCreator creator) {
+    Waitable<ShortCircuitReplicaInfo> newWaitable = null;
+    lock.lock();
+    try {
+      ShortCircuitReplicaInfo info = null;
+      do {
+        if (closed) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": can't fetchOrCreate " + key +
+                " because the cache is closed.");
+          }
+          return null;
+        }
+        Waitable<ShortCircuitReplicaInfo> waitable = replicaInfoMap.get(key);
+        if (waitable != null) {
+          try {
+            info = fetch(key, waitable);
+          } catch (RetriableException e) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(this + ": retrying " + e.getMessage());
+            }
+            continue;
+          }
+        }
+      } while (false);
+      if (info != null) return info;
+      // We need to load the replica ourselves.
+      newWaitable = new Waitable<ShortCircuitReplicaInfo>(lock.newCondition());
+      replicaInfoMap.put(key, newWaitable);
+    } finally {
+      lock.unlock();
+    }
+    return create(key, creator, newWaitable);
+  }
+
+  /**
+   * Fetch an existing ReplicaInfo object.
+   *
+   * @param key       The key that we're using.
+   * @param waitable  The waitable object to wait on.
+   * @return          The existing ReplicaInfo object, or null if there is
+   *                  none.
+   *
+   * @throws RetriableException   If the caller needs to retry.
+   */
+  private ShortCircuitReplicaInfo fetch(ExtendedBlockId key,
+      Waitable<ShortCircuitReplicaInfo> waitable) throws RetriableException {
+    // Another thread is already in the process of loading this
+    // ShortCircuitReplica.  So we simply wait for it to complete.
+    ShortCircuitReplicaInfo info;
+    try {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": found waitable for " + key);
+      }
+      info = waitable.await();
+    } catch (InterruptedException e) {
+      LOG.info(this + ": interrupted while waiting for " + key);
+      Thread.currentThread().interrupt();
+      throw new RetriableException("interrupted");
+    }
+    if (info.getInvalidTokenException() != null) {
+      LOG.info(this + ": could not get " + key + " due to InvalidToken " +
+            "exception.", info.getInvalidTokenException());
+      return info;
+    }
+    ShortCircuitReplica replica = info.getReplica();
+    if (replica == null) {
+      LOG.warn(this + ": failed to get " + key);
+      return info;
+    }
+    if (replica.purged) {
+      // Ignore replicas that have already been purged from the cache.
+      throw new RetriableException("Ignoring purged replica " +
+          replica + ".  Retrying.");
+    }
+    // Check if the replica is stale before using it.
+    // If it is, purge it and retry.
+    if (replica.isStale()) {
+      LOG.info(this + ": got stale replica " + replica + ".  Removing " +
+          "this replica from the replicaInfoMap and retrying.");
+      // Remove the cache's reference to the replica.  This may or may not
+      // trigger a close.
+      purge(replica);
+      throw new RetriableException("ignoring stale replica " + replica);
+    }
+    ref(replica);
+    return info;
+  }
+
+  private ShortCircuitReplicaInfo create(ExtendedBlockId key,
+      ShortCircuitReplicaCreator creator,
+      Waitable<ShortCircuitReplicaInfo> newWaitable) {
+    // Handle loading a new replica.
+    ShortCircuitReplicaInfo info = null;
+    try {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": loading " + key);
+      }
+      info = creator.createShortCircuitReplicaInfo();
+    } catch (RuntimeException e) {
+      LOG.warn(this + ": failed to load " + key, e);
+    }
+    if (info == null) info = new ShortCircuitReplicaInfo();
+    lock.lock();
+    try {
+      if (info.getReplica() != null) {
+        // On success, make sure the cache cleaner thread is running.
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + ": successfully loaded " + info.getReplica());
+        }
+        startCacheCleanerThreadIfNeeded();
+        // Note: new ShortCircuitReplicas start with a refCount of 2,
+        // indicating that both this cache and whoever requested the 
+        // creation of the replica hold a reference.  So we don't need
+        // to increment the reference count here.
+      } else {
+        // On failure, remove the waitable from the replicaInfoMap.
+        Waitable<ShortCircuitReplicaInfo> waitableInMap = replicaInfoMap.get(key);
+        if (waitableInMap == newWaitable) replicaInfoMap.remove(key);
+        if (info.getInvalidTokenException() != null) {
+          LOG.info(this + ": could not load " + key + " due to InvalidToken " +
+              "exception.", info.getInvalidTokenException());
+        } else {
+          LOG.warn(this + ": failed to load " + key);
+        }
+      }
+      newWaitable.provide(info);
+    } finally {
+      lock.unlock();
+    }
+    return info;
+  }
+
+  private void startCacheCleanerThreadIfNeeded() {
+    if (cacheCleaner == null) {
+      cacheCleaner = new CacheCleaner();
+      long rateMs = cacheCleaner.getRateInMs();
+      ScheduledFuture<?> future =
+          cleanerExecutor.scheduleAtFixedRate(cacheCleaner, rateMs, rateMs,
+              TimeUnit.MILLISECONDS);
+      cacheCleaner.setFuture(future);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(this + ": starting cache cleaner thread which will run " +
+          "every " + rateMs + " ms");
+      }
+    }
+  }
+
+  ClientMmap getOrCreateClientMmap(ShortCircuitReplica replica,
+      boolean anchored) {
+    Condition newCond;
+    lock.lock();
+    try {
+      while (replica.mmapData != null) {
+        if (replica.mmapData instanceof MappedByteBuffer) {
+          ref(replica);
+          MappedByteBuffer mmap = (MappedByteBuffer)replica.mmapData;
+          return new ClientMmap(replica, mmap, anchored);
+        } else if (replica.mmapData instanceof Long) {
+          long lastAttemptTimeMs = (Long)replica.mmapData;
+          long delta = Time.monotonicNow() - lastAttemptTimeMs;
+          if (delta < mmapRetryTimeoutMs) {
+            if (LOG.isTraceEnabled()) {
+              LOG.trace(this + ": can't create client mmap for " +
+                  replica + " because we failed to " +
+                  "create one just " + delta + "ms ago.");
+            }
+            return null;
+          }
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(this + ": retrying client mmap for " + replica +
+                ", " + delta + " ms after the previous failure.");
+          }
+        } else if (replica.mmapData instanceof Condition) {
+          Condition cond = (Condition)replica.mmapData;
+          cond.awaitUninterruptibly();
+        } else {
+          Preconditions.checkState(false, "invalid mmapData type %s",
+              replica.mmapData.getClass().getName());
+        }
+      }
+      newCond = lock.newCondition();
+      replica.mmapData = newCond;
+    } finally {
+      lock.unlock();
+    }
+    MappedByteBuffer map = replica.loadMmapInternal();
+    lock.lock();
+    try {
+      if (map == null) {
+        replica.mmapData = Long.valueOf(Time.monotonicNow());
+        newCond.signalAll();
+        return null;
+      } else {
+        outstandingMmapCount++;
+        replica.mmapData = map;
+        ref(replica);
+        newCond.signalAll();
+        return new ClientMmap(replica, map, anchored);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Close the cache and free all associated resources.
+   */
+  @Override
+  public void close() {
+    try {
+      lock.lock();
+      if (closed) return;
+      closed = true;
+      LOG.info(this + ": closing");
+      maxNonMmappedEvictableLifespanMs = 0;
+      maxEvictableMmapedSize = 0;
+      // Close and join cacheCleaner thread.
+      IOUtilsClient.cleanup(LOG, cacheCleaner);
+      // Purge all replicas.
+      while (true) {
+        Entry<Long, ShortCircuitReplica> entry = evictable.firstEntry();
+        if (entry == null) break;
+        purge(entry.getValue());
+      }
+      while (true) {
+        Entry<Long, ShortCircuitReplica> entry = evictableMmapped.firstEntry();
+        if (entry == null) break;
+        purge(entry.getValue());
+      }
+    } finally {
+      lock.unlock();
+    }
+
+    releaserExecutor.shutdown();
+    cleanerExecutor.shutdown();
+    // wait for existing tasks to terminate
+    try {
+      if (!releaserExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+        LOG.error("Forcing SlotReleaserThreadPool to shutdown!");
+        releaserExecutor.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      releaserExecutor.shutdownNow();
+      Thread.currentThread().interrupt();
+      LOG.error("Interrupted while waiting for SlotReleaserThreadPool "
+          + "to terminate", e);
+    }
+
+    // wait for existing tasks to terminate
+    try {
+      if (!cleanerExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+        LOG.error("Forcing CleanerThreadPool to shutdown!");
+        cleanerExecutor.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      cleanerExecutor.shutdownNow();
+      Thread.currentThread().interrupt();
+      LOG.error("Interrupted while waiting for CleanerThreadPool "
+          + "to terminate", e);
+    }
+    IOUtilsClient.cleanup(LOG, shmManager);
+  }
+
+  @VisibleForTesting // ONLY for testing
+  public interface CacheVisitor {
+    void visit(int numOutstandingMmaps,
+        Map<ExtendedBlockId, ShortCircuitReplica> replicas,
+        Map<ExtendedBlockId, InvalidToken> failedLoads,
+        Map<Long, ShortCircuitReplica> evictable,
+        Map<Long, ShortCircuitReplica> evictableMmapped);
+  }
+
+  @VisibleForTesting // ONLY for testing
+  public void accept(CacheVisitor visitor) {
+    lock.lock();
+    try {
+      Map<ExtendedBlockId, ShortCircuitReplica> replicas =
+          new HashMap<ExtendedBlockId, ShortCircuitReplica>();
+      Map<ExtendedBlockId, InvalidToken> failedLoads =
+          new HashMap<ExtendedBlockId, InvalidToken>();
+      for (Entry<ExtendedBlockId, Waitable<ShortCircuitReplicaInfo>> entry :
+            replicaInfoMap.entrySet()) {
+        Waitable<ShortCircuitReplicaInfo> waitable = entry.getValue();
+        if (waitable.hasVal()) {
+          if (waitable.getVal().getReplica() != null) {
+            replicas.put(entry.getKey(), waitable.getVal().getReplica());
+          } else {
+            // The exception may be null here, indicating a failed load that
+            // isn't the result of an invalid block token.
+            failedLoads.put(entry.getKey(),
+                waitable.getVal().getInvalidTokenException());
+          }
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("visiting ").append(visitor.getClass().getName()).
+            append("with outstandingMmapCount=").append(outstandingMmapCount).
+            append(", replicas=");
+        String prefix = "";
+        for (Entry<ExtendedBlockId, ShortCircuitReplica> entry : replicas.entrySet()) {
+          builder.append(prefix).append(entry.getValue());
+          prefix = ",";
+        }
+        prefix = "";
+        builder.append(", failedLoads=");
+        for (Entry<ExtendedBlockId, InvalidToken> entry : failedLoads.entrySet()) {
+          builder.append(prefix).append(entry.getValue());
+          prefix = ",";
+        }
+        prefix = "";
+        builder.append(", evictable=");
+        for (Entry<Long, ShortCircuitReplica> entry : evictable.entrySet()) {
+          builder.append(prefix).append(entry.getKey()).
+              append(":").append(entry.getValue());
+          prefix = ",";
+        }
+        prefix = "";
+        builder.append(", evictableMmapped=");
+        for (Entry<Long, ShortCircuitReplica> entry : evictableMmapped.entrySet()) {
+          builder.append(prefix).append(entry.getKey()).
+              append(":").append(entry.getValue());
+          prefix = ",";
+        }
+        LOG.debug(builder.toString());
+      }
+      visitor.visit(outstandingMmapCount, replicas, failedLoads,
+            evictable, evictableMmapped);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "ShortCircuitCache(0x" +
+        Integer.toHexString(System.identityHashCode(this)) + ")";
+  }
+
+  /**
+   * Allocate a new shared memory slot.
+   *
+   * @param datanode       The datanode to allocate a shm slot with.
+   * @param peer           A peer connected to the datanode.
+   * @param usedPeer       Will be set to true if we use up the provided peer.
+   * @param blockId        The block id and block pool id of the block we're 
+   *                         allocating this slot for.
+   * @param clientName     The name of the DFSClient allocating the shared
+   *                         memory.
+   * @return               Null if short-circuit shared memory is disabled;
+   *                         a short-circuit memory slot otherwise.
+   * @throws IOException   An exception if there was an error talking to 
+   *                         the datanode.
+   */
+  public Slot allocShmSlot(DatanodeInfo datanode,
+        DomainPeer peer, MutableBoolean usedPeer,
+        ExtendedBlockId blockId, String clientName) throws IOException {
+    if (shmManager != null) {
+      return shmManager.allocSlot(datanode, peer, usedPeer,
+          blockId, clientName);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Free a slot immediately.
+   *
+   * ONLY use this if the DataNode is not yet aware of the slot.
+   * 
+   * @param slot           The slot to free.
+   */
+  public void freeSlot(Slot slot) {
+    Preconditions.checkState(shmManager != null);
+    slot.makeInvalid();
+    shmManager.freeSlot(slot);
+  }
+  
+  /**
+   * Schedule a shared memory slot to be released.
+   *
+   * @param slot           The slot to release.
+   */
+  public void scheduleSlotReleaser(Slot slot) {
+    Preconditions.checkState(shmManager != null);
+    releaserExecutor.execute(new SlotReleaser(slot));
+  }
+
+  @VisibleForTesting
+  public DfsClientShmManager getDfsClientShmManager() {
+    return shmManager;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java
new file mode 100644
index 0000000..37566e2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.shortcircuit;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
+import org.apache.hadoop.hdfs.util.IOUtilsClient;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A ShortCircuitReplica object contains file descriptors for a block that
+ * we are reading via short-circuit local reads.
+ *
+ * The file descriptors can be shared between multiple threads because
+ * all the operations we perform are stateless-- i.e., we use pread
+ * instead of read, to avoid using the shared position state.
+ */
+@InterfaceAudience.Private
+public class ShortCircuitReplica {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      ShortCircuitCache.class);
+
+  /**
+   * Identifies this ShortCircuitReplica object.
+   */
+  final ExtendedBlockId key;
+
+  /**
+   * The block data input stream.
+   */
+  private final FileInputStream dataStream;
+
+  /**
+   * The block metadata input stream.
+   *
+   * TODO: make this nullable if the file has no checksums on disk.
+   */
+  private final FileInputStream metaStream;
+
+  /**
+   * Block metadata header.
+   */
+  private final BlockMetadataHeader metaHeader;
+
+  /**
+   * The cache we belong to.
+   */
+  private final ShortCircuitCache cache;
+
+  /**
+   * Monotonic time at which the replica was created.
+   */
+  private final long creationTimeMs;
+
+  /**
+   * If non-null, the shared memory slot associated with this replica.
+   */
+  private final Slot slot;
+  
+  /**
+   * Current mmap state.
+   *
+   * Protected by the cache lock.
+   */
+  Object mmapData;
+
+  /**
+   * True if this replica has been purged from the cache; false otherwise.
+   *
+   * Protected by the cache lock.
+   */
+  boolean purged = false;
+
+  /**
+   * Number of external references to this replica.  Replicas are referenced
+   * by the cache, BlockReaderLocal instances, and by ClientMmap instances.
+   * The number starts at 2 because when we create a replica, it is referenced
+   * by both the cache and the requester.
+   *
+   * Protected by the cache lock.
+   */
+  int refCount = 2;
+
+  /**
+   * The monotonic time in nanoseconds at which the replica became evictable, or
+   * null if it is not evictable.
+   *
+   * Protected by the cache lock.
+   */
+  private Long evictableTimeNs = null;
+
+  public ShortCircuitReplica(ExtendedBlockId key,
+      FileInputStream dataStream, FileInputStream metaStream,
+      ShortCircuitCache cache, long creationTimeMs, Slot slot) throws IOException {
+    this.key = key;
+    this.dataStream = dataStream;
+    this.metaStream = metaStream;
+    this.metaHeader =
+          BlockMetadataHeader.preadHeader(metaStream.getChannel());
+    if (metaHeader.getVersion() != 1) {
+      throw new IOException("invalid metadata header version " +
+          metaHeader.getVersion() + ".  Can only handle version 1.");
+    }
+    this.cache = cache;
+    this.creationTimeMs = creationTimeMs;
+    this.slot = slot;
+  }
+
+  /**
+   * Decrement the reference count.
+   */
+  public void unref() {
+    cache.unref(this);
+  }
+
+  /**
+   * Check if the replica is stale.
+   *
+   * Must be called with the cache lock held.
+   */
+  boolean isStale() {
+    if (slot != null) {
+      // Check staleness by looking at the shared memory area we use to
+      // communicate with the DataNode.
+      boolean stale = !slot.isValid();
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": checked shared memory segment.  isStale=" + stale);
+      }
+      return stale;
+    } else {
+      // Fall back to old, time-based staleness method.
+      long deltaMs = Time.monotonicNow() - creationTimeMs;
+      long staleThresholdMs = cache.getStaleThresholdMs();
+      if (deltaMs > staleThresholdMs) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + " is stale because it's " + deltaMs +
+              " ms old, and staleThresholdMs = " + staleThresholdMs);
+        }
+        return true;
+      } else {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + " is not stale because it's only " + deltaMs +
+              " ms old, and staleThresholdMs = " + staleThresholdMs);
+        }
+        return false;
+      }
+    }
+  }
+  
+  /**
+   * Try to add a no-checksum anchor to our shared memory slot.
+   *
+   * It is only possible to add this anchor when the block is mlocked on the Datanode.
+   * The DataNode will not munlock the block until the number of no-checksum anchors
+   * for the block reaches zero.
+   * 
+   * This method does not require any synchronization.
+   *
+   * @return     True if we successfully added a no-checksum anchor.
+   */
+  public boolean addNoChecksumAnchor() {
+    if (slot == null) {
+      return false;
+    }
+    boolean result = slot.addAnchor();
+    if (LOG.isTraceEnabled()) {
+      if (result) {
+        LOG.trace(this + ": added no-checksum anchor to slot " + slot);
+      } else {
+        LOG.trace(this + ": could not add no-checksum anchor to slot " + slot);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Remove a no-checksum anchor for our shared memory slot.
+   *
+   * This method does not require any synchronization.
+   */
+  public void removeNoChecksumAnchor() {
+    if (slot != null) {
+      slot.removeAnchor();
+    }
+  }
+
+  /**
+   * Check if the replica has an associated mmap that has been fully loaded.
+   *
+   * Must be called with the cache lock held.
+   */
+  @VisibleForTesting
+  public boolean hasMmap() {
+    return ((mmapData != null) && (mmapData instanceof MappedByteBuffer));
+  }
+
+  /**
+   * Free the mmap associated with this replica.
+   *
+   * Must be called with the cache lock held.
+   */
+  void munmap() {
+    MappedByteBuffer mmap = (MappedByteBuffer)mmapData;
+    NativeIO.POSIX.munmap(mmap);
+    mmapData = null;
+  }
+
+  /**
+   * Close the replica.
+   *
+   * Must be called after there are no more references to the replica in the
+   * cache or elsewhere.
+   */
+  void close() {
+    String suffix = "";
+    
+    Preconditions.checkState(refCount == 0,
+        "tried to close replica with refCount %d: %s", refCount, this);
+    refCount = -1;
+    Preconditions.checkState(purged,
+        "tried to close unpurged replica %s", this);
+    if (hasMmap()) {
+      munmap();
+      if (LOG.isTraceEnabled()) {
+        suffix += "  munmapped.";
+      }
+    }
+    IOUtilsClient.cleanup(LOG, dataStream, metaStream);
+    if (slot != null) {
+      cache.scheduleSlotReleaser(slot);
+      if (LOG.isTraceEnabled()) {
+        suffix += "  scheduling " + slot + " for later release.";
+      }
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("closed " + this + suffix);
+    }
+  }
+
+  public FileInputStream getDataStream() {
+    return dataStream;
+  }
+
+  public FileInputStream getMetaStream() {
+    return metaStream;
+  }
+
+  public BlockMetadataHeader getMetaHeader() {
+    return metaHeader;
+  }
+
+  public ExtendedBlockId getKey() {
+    return key;
+  }
+
+  public ClientMmap getOrCreateClientMmap(boolean anchor) {
+    return cache.getOrCreateClientMmap(this, anchor);
+  }
+
+  MappedByteBuffer loadMmapInternal() {
+    try {
+      FileChannel channel = dataStream.getChannel();
+      MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, 0, 
+          Math.min(Integer.MAX_VALUE, channel.size()));
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(this + ": created mmap of size " + channel.size());
+      }
+      return mmap;
+    } catch (IOException e) {
+      LOG.warn(this + ": mmap error", e);
+      return null;
+    } catch (RuntimeException e) {
+      LOG.warn(this + ": mmap error", e);
+      return null;
+    }
+  }
+
+  /**
+   * Get the evictable time in nanoseconds.
+   *
+   * Note: you must hold the cache lock to call this function.
+   *
+   * @return the evictable time in nanoseconds.
+   */
+  public Long getEvictableTimeNs() {
+    return evictableTimeNs;
+  }
+
+  /**
+   * Set the evictable time in nanoseconds.
+   *
+   * Note: you must hold the cache lock to call this function.
+   *
+   * @param evictableTimeNs   The evictable time in nanoseconds, or null
+   *                          to set no evictable time.
+   */
+  void setEvictableTimeNs(Long evictableTimeNs) {
+    this.evictableTimeNs = evictableTimeNs;
+  }
+
+  @VisibleForTesting
+  public Slot getSlot() {
+    return slot;
+  }
+
+  /**
+   * Convert the replica to a string for debugging purposes.
+   * Note that we can't take the lock here.
+   */
+  @Override
+  public String toString() {
+    return new StringBuilder().append("ShortCircuitReplica{").
+        append("key=").append(key).
+        append(", metaHeader.version=").append(metaHeader.getVersion()).
+        append(", metaHeader.checksum=").append(metaHeader.getChecksum()).
+        append(", ident=").append("0x").
+          append(Integer.toHexString(System.identityHashCode(this))).
+        append(", creationTimeMs=").append(creationTimeMs).
+        append("}").toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java
new file mode 100644
index 0000000..ef0019f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplicaInfo.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.shortcircuit;
+
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+
+public final class ShortCircuitReplicaInfo {
+  private final ShortCircuitReplica replica;
+  private final InvalidToken exc; 
+
+  public ShortCircuitReplicaInfo() {
+    this.replica = null;
+    this.exc = null;
+  }
+
+  public ShortCircuitReplicaInfo(ShortCircuitReplica replica) {
+    this.replica = replica;
+    this.exc = null;
+  }
+
+  public ShortCircuitReplicaInfo(InvalidToken exc) {
+    this.replica = null;
+    this.exc = exc;
+  }
+
+  public ShortCircuitReplica getReplica() {
+    return replica;
+  }
+
+  public InvalidToken getInvalidTokenException() {
+    return exc; 
+  }
+  
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    String prefix = "";
+    builder.append("ShortCircuitReplicaInfo{");
+    if (replica != null) {
+      builder.append(prefix).append(replica);
+      prefix = ", ";
+    }
+    if (exc != null) {
+      builder.append(prefix).append(exc);
+      prefix = ", ";
+    }
+    builder.append("}");
+    return builder.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java
new file mode 100644
index 0000000..56f8ecc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/IOUtilsClient.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import org.slf4j.Logger;
+
+import java.io.IOException;
+
+public class IOUtilsClient {
+  /**
+   * Close the Closeable objects and <b>ignore</b> any {@link IOException} or
+   * null pointers. Must only be used for cleanup in exception handlers.
+   *
+   * @param log the log to record problems to at debug level. Can be null.
+   * @param closeables the objects to close
+   */
+  public static void cleanup(Logger log, java.io.Closeable... closeables) {
+    for (java.io.Closeable c : closeables) {
+      if (c != null) {
+        try {
+          c.close();
+        } catch(Throwable e) {
+          if (log != null && log.isDebugEnabled()) {
+            log.debug("Exception in closing " + c, e);
+          }
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index fd91744..607de79 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -838,6 +838,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8846. Add a unit test for INotify functionality across a layout
     version upgrade (Zhe Zhang via Colin P. McCabe)
 
+    HDFS-8951. Move the shortcircuit package to hdfs-client.
+    (Mingliang Liu via wheat9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
index fec6b85..52ba899 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -419,7 +419,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
     if (LOG.isTraceEnabled()) {
       LOG.trace(this + ": trying to construct BlockReaderLocalLegacy");
     }
-    if (!DFSClient.isLocalAddress(inetSocketAddress)) {
+    if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
       if (LOG.isTraceEnabled()) {
         LOG.trace(this + ": can't construct BlockReaderLocalLegacy because " +
             "the address " + inetSocketAddress + " is not local");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 47aaed6..3c49ef7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -39,7 +39,6 @@ import java.net.URI;
 import java.net.UnknownHostException;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
@@ -703,30 +702,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     }
   }
   
-  private static final Map<String, Boolean> localAddrMap = Collections
-      .synchronizedMap(new HashMap<String, Boolean>());
-  
-  public static boolean isLocalAddress(InetSocketAddress targetAddr) {
-    InetAddress addr = targetAddr.getAddress();
-    Boolean cached = localAddrMap.get(addr.getHostAddress());
-    if (cached != null) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Address " + targetAddr +
-                  (cached ? " is local" : " is not local"));
-      }
-      return cached;
-    }
-    
-    boolean local = NetUtils.isLocalAddress(addr);
-
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Address " + targetAddr +
-                (local ? " is local" : " is not local"));
-    }
-    localAddrMap.put(addr.getHostAddress(), local);
-    return local;
-  }
-  
   /**
    * Cancel a delegation token
    * @param token the token to cancel

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
index 05a9f2c..015e154 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@ -351,7 +351,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
           checksum.getBytesPerChecksum(),
           checksum.getChecksumSize());
 
-    this.isLocal = DFSClient.isLocalAddress(NetUtils.
+    this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
         createSocketAddr(datanodeID.getXferAddr()));
     
     this.peer = peer;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c992bcf9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
index 4c23d36..2a77cb6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@ -290,7 +290,7 @@ public class RemoteBlockReader2  implements BlockReader {
       DataChecksum checksum, boolean verifyChecksum,
       long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
       DatanodeID datanodeID, PeerCache peerCache) {
-    this.isLocal = DFSClient.isLocalAddress(NetUtils.
+    this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
         createSocketAddr(datanodeID.getXferAddr()));
     // Path is used only for printing block and file information in debug
     this.peer = peer;