You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2013/01/12 00:52:24 UTC

svn commit: r1432335 [2/2] - in /hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/net/ src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer...

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Jan 11 23:52:22 2013
@@ -53,16 +53,15 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
-import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.URI;
 import java.net.UnknownHostException;
-import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.security.PrivilegedExceptionAction;
 import java.util.AbstractList;
@@ -90,6 +89,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.net.DomainPeerServer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@@ -149,11 +149,11 @@ import org.apache.hadoop.io.ReadaheadPoo
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -233,6 +233,7 @@ public class DataNode extends Configured
     LogFactory.getLog(DataNode.class.getName() + ".clienttrace");
   
   private static final String USAGE = "Usage: java DataNode [-rollback | -regular]";
+  static final int CURRENT_BLOCK_FORMAT_VERSION = 1;
 
   /**
    * Use {@link NetUtils#createSocketAddr(String)} instead.
@@ -250,6 +251,7 @@ public class DataNode extends Configured
   public final static String EMPTY_DEL_HINT = "";
   AtomicInteger xmitsInProgress = new AtomicInteger();
   Daemon dataXceiverServer = null;
+  Daemon localDataXceiverServer = null;
   ThreadGroup threadGroup = null;
   private DNConf dnConf;
   private volatile boolean heartbeatsDisabledForTests = false;
@@ -261,6 +263,7 @@ public class DataNode extends Configured
   private String hostName;
   private DatanodeID id;
   
+  final private String fileDescriptorPassingDisabledReason;
   boolean isBlockTokenEnabled;
   BlockPoolTokenSecretManager blockPoolTokenSecretManager;
   private boolean hasAnyBlockPoolRegistered = false;
@@ -309,6 +312,24 @@ public class DataNode extends Configured
     this.getHdfsBlockLocationsEnabled = conf.getBoolean(
         DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, 
         DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+
+    // Determine whether we should try to pass file descriptors to clients.
+    if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+              DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) {
+      String reason = DomainSocket.getLoadingFailureReason();
+      if (reason != null) {
+        LOG.warn("File descriptor passing is disabled because " + reason);
+        this.fileDescriptorPassingDisabledReason = reason;
+      } else {
+        LOG.info("File descriptor passing is enabled.");
+        this.fileDescriptorPassingDisabledReason = null;
+      }
+    } else {
+      this.fileDescriptorPassingDisabledReason =
+          "File descriptor passing was not configured.";
+      LOG.debug(this.fileDescriptorPassingDisabledReason);
+    }
+
     try {
       hostName = getHostName(conf);
       LOG.info("Configured hostname is " + hostName);
@@ -537,6 +558,41 @@ public class DataNode extends Configured
     this.dataXceiverServer = new Daemon(threadGroup, 
         new DataXceiverServer(tcpPeerServer, conf, this));
     this.threadGroup.setDaemon(true); // auto destroy when empty
+
+    DomainPeerServer domainPeerServer =
+              getDomainPeerServer(conf, streamingAddr.getPort());
+    if (domainPeerServer != null) {
+      this.localDataXceiverServer = new Daemon(threadGroup, 
+          new DataXceiverServer(domainPeerServer, conf, this));
+      LOG.info("Listening on UNIX domain socket: " +
+          domainPeerServer.getBindPath());
+    }
+  }
+
+  static DomainPeerServer getDomainPeerServer(Configuration conf,
+      int port) throws IOException {
+    String domainSocketPath =
+        conf.get(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY);
+    if (domainSocketPath == null) {
+      if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) {
+        LOG.warn("Although short-circuit local reads are configured, " +
+            "they are disabled because you didn't configure " +
+            DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY);
+      }
+      return null;
+    }
+    if (DomainSocket.getLoadingFailureReason() != null) {
+      throw new RuntimeException("Although a UNIX domain socket " +
+          "path is configured as " + domainSocketPath + ", we cannot " +
+          "start a localDataXceiverServer because " +
+          DomainSocket.getLoadingFailureReason());
+    }
+    DomainPeerServer domainPeerServer =
+      new DomainPeerServer(domainSocketPath, port);
+    domainPeerServer.setReceiveBufferSize(
+        HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
+    return domainPeerServer;
   }
   
   // calls specific to BP
@@ -1039,6 +1095,42 @@ public class DataNode extends Configured
     return info;
   }
 
+  @InterfaceAudience.LimitedPrivate("HDFS")
+  static public class ShortCircuitFdsUnsupportedException extends IOException {
+    private static final long serialVersionUID = 1L;
+    public ShortCircuitFdsUnsupportedException(String msg) {
+      super(msg);
+    }
+  }
+
+  @InterfaceAudience.LimitedPrivate("HDFS")
+  static public class ShortCircuitFdsVersionException extends IOException {
+    private static final long serialVersionUID = 1L;
+    public ShortCircuitFdsVersionException(String msg) {
+      super(msg);
+    }
+  }
+
+  FileInputStream[] requestShortCircuitFdsForRead(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> token, int maxVersion) 
+          throws ShortCircuitFdsUnsupportedException,
+            ShortCircuitFdsVersionException, IOException {
+    if (fileDescriptorPassingDisabledReason != null) {
+      throw new ShortCircuitFdsUnsupportedException(
+          fileDescriptorPassingDisabledReason);
+    }
+    checkBlockToken(blk, token, BlockTokenSecretManager.AccessMode.READ);
+    int blkVersion = CURRENT_BLOCK_FORMAT_VERSION;
+    if (maxVersion < blkVersion) {
+      throw new ShortCircuitFdsVersionException("Your client is too old " +
+        "to read this block!  Its format version is " + 
+        blkVersion + ", but the highest format version you can read is " +
+        maxVersion);
+    }
+    metrics.incrBlocksGetLocalPathInfo();
+    return data.getShortCircuitFdsForRead(blk);
+  }
+
   @Override
   public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
       List<Token<BlockTokenIdentifier>> tokens) throws IOException, 
@@ -1113,32 +1205,45 @@ public class DataNode extends Configured
     if (dataXceiverServer != null) {
       ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
       this.dataXceiverServer.interrupt();
-
-      // wait for all data receiver threads to exit
-      if (this.threadGroup != null) {
-        int sleepMs = 2;
-        while (true) {
-          this.threadGroup.interrupt();
-          LOG.info("Waiting for threadgroup to exit, active threads is " +
-                   this.threadGroup.activeCount());
-          if (this.threadGroup.activeCount() == 0) {
-            break;
-          }
-          try {
-            Thread.sleep(sleepMs);
-          } catch (InterruptedException e) {}
-          sleepMs = sleepMs * 3 / 2; // exponential backoff
-          if (sleepMs > 1000) {
-            sleepMs = 1000;
-          }
+    }
+    if (localDataXceiverServer != null) {
+      ((DataXceiverServer) this.localDataXceiverServer.getRunnable()).kill();
+      this.localDataXceiverServer.interrupt();
+    }
+    // wait for all data receiver threads to exit
+    if (this.threadGroup != null) {
+      int sleepMs = 2;
+      while (true) {
+        this.threadGroup.interrupt();
+        LOG.info("Waiting for threadgroup to exit, active threads is " +
+                 this.threadGroup.activeCount());
+        if (this.threadGroup.activeCount() == 0) {
+          break;
+        }
+        try {
+          Thread.sleep(sleepMs);
+        } catch (InterruptedException e) {}
+        sleepMs = sleepMs * 3 / 2; // exponential backoff
+        if (sleepMs > 1000) {
+          sleepMs = 1000;
         }
       }
-      // wait for dataXceiveServer to terminate
+      this.threadGroup = null;
+    }
+    if (this.dataXceiverServer != null) {
+      // wait for dataXceiverServer to terminate
       try {
         this.dataXceiverServer.join();
       } catch (InterruptedException ie) {
       }
     }
+    if (this.localDataXceiverServer != null) {
+      // wait for localDataXceiverServer to terminate
+      try {
+        this.localDataXceiverServer.join();
+      } catch (InterruptedException ie) {
+      }
+    }
     
     if(blockPoolManager != null) {
       try {
@@ -1523,6 +1628,9 @@ public class DataNode extends Configured
 
     // start dataXceiveServer
     dataXceiverServer.start();
+    if (localDataXceiverServer != null) {
+      localDataXceiverServer.start();
+    }
     ipcServer.start();
     startPlugins(conf);
   }

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Jan 11 23:52:22 2013
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED;
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN;
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
 import static org.apache.hadoop.util.Time.now;
@@ -28,6 +29,8 @@ import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InterruptedIOException;
@@ -60,11 +63,14 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
+import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
@@ -233,6 +239,68 @@ class DataXceiver extends Receiver imple
   }
 
   @Override
+  public void requestShortCircuitFds(final ExtendedBlock blk,
+      final Token<BlockTokenIdentifier> token,
+      int maxVersion) throws IOException {
+    updateCurrentThreadName("Passing file descriptors for block " + blk);
+    BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
+    FileInputStream fis[] = null;
+    try {
+      if (peer.getDomainSocket() == null) {
+        throw new IOException("You cannot pass file descriptors over " +
+            "anything but a UNIX domain socket.");
+      }
+      fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
+      bld.setStatus(SUCCESS);
+      bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
+    } catch (ShortCircuitFdsVersionException e) {
+      bld.setStatus(ERROR_UNSUPPORTED);
+      bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
+      bld.setMessage(e.getMessage());
+    } catch (ShortCircuitFdsUnsupportedException e) {
+      bld.setStatus(ERROR_UNSUPPORTED);
+      bld.setMessage(e.getMessage());
+    } catch (InvalidToken e) {
+      bld.setStatus(ERROR_ACCESS_TOKEN);
+      bld.setMessage(e.getMessage());
+    } catch (IOException e) {
+      bld.setStatus(ERROR);
+      bld.setMessage(e.getMessage());
+    }
+    try {
+      bld.build().writeDelimitedTo(socketOut);
+      if (fis != null) {
+        FileDescriptor fds[] = new FileDescriptor[fis.length];
+        for (int i = 0; i < fds.length; i++) {
+          fds[i] = fis[i].getFD();
+        }
+        byte buf[] = new byte[] { (byte)0 };
+        peer.getDomainSocket().
+          sendFileDescriptors(fds, buf, 0, buf.length);
+      }
+    } finally {
+      if (ClientTraceLog.isInfoEnabled()) {
+        DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk
+            .getBlockPoolId());
+        BlockSender.ClientTraceLog.info(String.format(
+          String.format(
+            "src: %s, dest: %s, op: %s, blockid: %s, srvID: %s, " +
+              "success: %b",
+            "127.0.0.1",                   // src IP
+            "127.0.0.1",                   // dst IP
+            "REQUEST_SHORT_CIRCUIT_FDS",   // operation
+            blk.getBlockId(),             // block id
+            dnR.getStorageID(),
+            (fis != null)
+          )));
+      }
+      if (fis != null) {
+        IOUtils.cleanup(LOG, fis);
+      }
+    }
+  }
+
+  @Override
   public void readBlock(final ExtendedBlock block,
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Fri Jan 11 23:52:22 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.da
 
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
@@ -386,4 +387,6 @@ public interface FsDatasetSpi<V extends 
   public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
       throws IOException;
 
+  FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
+      throws IOException;
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Fri Jan 11 23:52:22 2013
@@ -75,6 +75,7 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -1669,6 +1670,26 @@ class FsDatasetImpl implements FsDataset
   }
   
   @Override // FsDatasetSpi
+  public FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block) 
+      throws IOException {
+    File datafile = getBlockFile(block);
+    File metafile = FsDatasetUtil.getMetaFile(datafile,
+        block.getGenerationStamp());
+    FileInputStream fis[] = new FileInputStream[2];
+    boolean success = false;
+    try {
+      fis[0] = new FileInputStream(datafile);
+      fis[1] = new FileInputStream(metafile);
+      success = true;
+      return fis;
+    } finally {
+      if (!success) {
+        IOUtils.cleanup(null, fis);
+      }
+    }
+  }
+    
+  @Override // FsDatasetSpi
   public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
       throws IOException {
     // List of VolumeIds, one per volume on the datanode

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Fri Jan 11 23:52:22 2013
@@ -563,7 +563,7 @@ public class NamenodeFsck {
             conf, file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
             TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
                 getDataEncryptionKey()),
-            chosenNode);
+            chosenNode, null, false);
         
       }  catch (IOException ex) {
         // Put chosen node into dead list, continue

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto Fri Jan 11 23:52:22 2013
@@ -74,6 +74,8 @@ message DeleteBlockPoolResponseProto {
  * Gets the file information where block and its metadata is stored
  * block - block for which path information is being requested
  * token - block token
+ *
+ * This message is deprecated in favor of file descriptor passing.
  */
 message GetBlockLocalPathInfoRequestProto {
   required ExtendedBlockProto block = 1;
@@ -84,6 +86,8 @@ message GetBlockLocalPathInfoRequestProt
  * block - block for which file path information is being returned
  * localPath - file path where the block data is stored
  * localMetaPath - file path where the block meta data is stored
+ *
+ * This message is deprecated in favor of file descriptor passing.
  */
 message GetBlockLocalPathInfoResponseProto {
   required ExtendedBlockProto block = 1;

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Fri Jan 11 23:52:22 2013
@@ -114,6 +114,16 @@ message OpBlockChecksumProto { 
   required BaseHeaderProto header = 1;
 }
 
+message OpRequestShortCircuitAccessProto { 
+  required BaseHeaderProto header = 1;
+
+  /** In order to get short-circuit access to block data, clients must set this
+   * to the highest version of the block data that they can understand.
+   * Currently 1 is the only version, but more versions may exist in the future
+   * if the on-disk format changes.
+   */
+  required uint32 maxVersion = 2;
+}
 
 message PacketHeaderProto {
   // All fields must be fixed-length!
@@ -132,6 +142,7 @@ enum Status {
   ERROR_EXISTS = 4;
   ERROR_ACCESS_TOKEN = 5;
   CHECKSUM_OK = 6;
+  ERROR_UNSUPPORTED = 7;
 }
 
 message PipelineAckProto {
@@ -164,6 +175,16 @@ message BlockOpResponseProto {
 
   /** explanatory text which may be useful to log on the client side */
   optional string message = 5;
+
+  /** If the server chooses to agree to the request of a client for
+   * short-circuit access, it will send a response message with the relevant
+   * file descriptors attached.
+   *
+   * In the body of the message, this version number will be set to the
+   * specific version number of the block data that the client is about to
+   * read.
+   */
+  optional uint32 shortCircuitAccessVersion = 6;
 }
 
 /**

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Fri Jan 11 23:52:22 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
 
 /**
  * A helper class to setup the cluster, and get to BlockReader and DataNode for a block.
@@ -156,7 +157,7 @@ public class BlockReaderTestUtil {
       testBlock.getBlockToken(), 
       offset, lenToRead,
       true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock),
-      nodes[0]);
+      nodes[0], null, false);
   }
 
   /**
@@ -168,4 +169,12 @@ public class BlockReaderTestUtil {
     return cluster.getDataNode(ipcport);
   }
 
+  public boolean haveRequiredResources() {
+    if (conf.get(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY) != null) {
+      // To use UNIX Domain sockets, we must have the native code loaded.
+      return DomainSocket.getLoadingFailureReason() == null;
+    } else {
+      return true;
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Fri Jan 11 23:52:22 2013
@@ -2189,8 +2189,8 @@ public class MiniDFSCluster {
   /**
    * Get file correpsonding to a block
    * @param storageDir storage directory
-   * @param blk block to be corrupted
-   * @return file corresponding to the block
+   * @param blk the block
+   * @return data file corresponding to the block
    */
   public static File getBlockFile(File storageDir, ExtendedBlock blk) {
     return new File(getFinalizedDir(storageDir, blk.getBlockPoolId()), 
@@ -2198,6 +2198,19 @@ public class MiniDFSCluster {
   }
 
   /**
+   * Get the latest metadata file correpsonding to a block
+   * @param storageDir storage directory
+   * @param blk the block
+   * @return metadata file corresponding to the block
+   */
+  public static File getBlockMetadataFile(File storageDir, ExtendedBlock blk) {
+    return new File(getFinalizedDir(storageDir, blk.getBlockPoolId()), 
+        blk.getBlockName() + "_" + blk.getGenerationStamp() +
+        Block.METADATA_EXTENSION);
+    
+  }
+
+  /**
    * Shut down a cluster if it is not null
    * @param cluster cluster reference or null
    */
@@ -2224,7 +2237,7 @@ public class MiniDFSCluster {
   }
   
   /**
-   * Get files related to a block for a given datanode
+   * Get the block data file for a block from a given datanode
    * @param dnIndex Index of the datanode to get block files for
    * @param block block for which corresponding files are needed
    */
@@ -2239,6 +2252,24 @@ public class MiniDFSCluster {
     }
     return null;
   }
+
+  /**
+   * Get the block metadata file for a block from a given datanode
+   * 
+   * @param dnIndex Index of the datanode to get block files for
+   * @param block block for which corresponding files are needed
+   */
+  public static File getBlockMetadataFile(int dnIndex, ExtendedBlock block) {
+    // Check for block file in the two storage directories of the datanode
+    for (int i = 0; i <=1 ; i++) {
+      File storageDir = MiniDFSCluster.getStorageDir(dnIndex, i);
+      File blockMetaFile = getBlockMetadataFile(storageDir, block);
+      if (blockMetaFile.exists()) {
+        return blockMetaFile;
+      }
+    }
+    return null;
+  }
   
   /**
    * Throw an exception if the MiniDFSCluster is not started with a single

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Fri Jan 11 23:52:22 2013
@@ -17,90 +17,333 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestBlockReaderLocal {
-  static MiniDFSCluster cluster;
-  static HdfsConfiguration conf;
+  public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
+      int off2, int len) {
+    for (int i = 0; i < len; i++) {
+      if (buf1[off1 + i] != buf2[off2 + i]) {
+        Assert.fail("arrays differ at byte " +  i + ". " + 
+          "The first array has " + (int)buf1[off1 + i] + 
+          ", but the second array has " + (int)buf2[off2 + i]);
+      }
+    }
+  }
 
-  @BeforeClass
-  public static void setupCluster() throws IOException {
-    conf = new HdfsConfiguration();
+  /**
+   * Similar to IOUtils#readFully(). Reads bytes in a loop.
+   *
+   * @param reader           The BlockReaderLocal to read bytes from
+   * @param buf              The ByteBuffer to read into
+   * @param off              The offset in the buffer to read into
+   * @param len              The number of bytes to read.
+   * 
+   * @throws IOException     If it could not read the requested number of bytes
+   */
+  private static void readFully(BlockReaderLocal reader,
+      ByteBuffer buf, int off, int len) throws IOException {
+    int amt = len;
+    while (amt > 0) {
+      buf.limit(off + len);
+      buf.position(off);
+      long ret = reader.read(buf);
+      if (ret < 0) {
+        throw new EOFException( "Premature EOF from BlockReaderLocal " +
+            "after reading " + (len - amt) + " byte(s).");
+      }
+      amt -= ret;
+      off += ret;
+    }
+  }
+
+  private static interface BlockReaderLocalTest {
+    final int TEST_LENGTH = 12345;
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException;
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException;
+  }
+  
+  public void runBlockReaderLocalTest(BlockReaderLocalTest test,
+      boolean checksum) throws IOException {
+    MiniDFSCluster cluster = null;
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, !checksum);
+    conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
+    FileInputStream dataIn = null, checkIn = null;
+    final Path TEST_PATH = new Path("/a");
+    final long RANDOM_SEED = 4567L;
+    BlockReaderLocal blockReaderLocal = null;
+    FSDataInputStream fsIn = null;
+    byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+    
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      IOUtils.readFully(fsIn, original, 0,
+          BlockReaderLocalTest.TEST_LENGTH);
+      fsIn.close();
+      fsIn = null;
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+      File dataFile = MiniDFSCluster.getBlockFile(0, block);
+      File metaFile = MiniDFSCluster.getBlockMetadataFile(0, block);
+
+      DatanodeID datanodeID = cluster.getDataNodes().get(0).getDatanodeId();
+      cluster.shutdown();
+      cluster = null;
+      test.setup(dataFile, checksum);
+      dataIn = new FileInputStream(dataFile);
+      checkIn = new FileInputStream(metaFile);
+      blockReaderLocal = new BlockReaderLocal(conf,
+          TEST_PATH.getName(), block, 0, -1,
+          dataIn, checkIn, datanodeID, checksum);
+      dataIn = null;
+      checkIn = null;
+      test.doTest(blockReaderLocal, original);
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (cluster != null) cluster.shutdown();
+      if (dataIn != null) dataIn.close();
+      if (checkIn != null) checkIn.close();
+      if (blockReaderLocal != null) blockReaderLocal.close(null, null);
+    }
+  }
+  
+  private static class TestBlockReaderLocalImmediateClose 
+      implements BlockReaderLocalTest {
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException { }
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[]) 
+        throws IOException { }
+  }
+  
+  @Test
+  public void testBlockReaderLocalImmediateClose() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true);
+    runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false);
+  }
+  
+  private static class TestBlockReaderSimpleReads 
+      implements BlockReaderLocalTest {
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException { }
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[]) 
+        throws IOException {
+      byte buf[] = new byte[TEST_LENGTH];
+      reader.readFully(buf, 0, 512);
+      assertArrayRegionsEqual(original, 0, buf, 0, 512);
+      reader.readFully(buf, 512, 512);
+      assertArrayRegionsEqual(original, 512, buf, 512, 512);
+      reader.readFully(buf, 1024, 513);
+      assertArrayRegionsEqual(original, 1024, buf, 1024, 513);
+      reader.readFully(buf, 1537, 514);
+      assertArrayRegionsEqual(original, 1537, buf, 1537, 514);
+    }
+  }
+  
+  @Test
+  public void testBlockReaderSimpleReads() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true);
+  }
+
+  @Test
+  public void testBlockReaderSimpleReadsNoChecksum() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false);
+  }
+  
+  private static class TestBlockReaderLocalArrayReads2 
+      implements BlockReaderLocalTest {
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException { }
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[]) 
+        throws IOException {
+      byte buf[] = new byte[TEST_LENGTH];
+      reader.readFully(buf, 0, 10);
+      assertArrayRegionsEqual(original, 0, buf, 0, 10);
+      reader.readFully(buf, 10, 100);
+      assertArrayRegionsEqual(original, 10, buf, 10, 100);
+      reader.readFully(buf, 110, 700);
+      assertArrayRegionsEqual(original, 110, buf, 110, 700);
+      reader.readFully(buf, 810, 1); // from offset 810 to offset 811
+      reader.readFully(buf, 811, 5);
+      assertArrayRegionsEqual(original, 811, buf, 811, 5);
+      reader.readFully(buf, 816, 900); // skip from offset 816 to offset 1716
+      reader.readFully(buf, 1716, 5);
+      assertArrayRegionsEqual(original, 1716, buf, 1716, 5);
+    }
+  }
+  
+  @Test
+  public void testBlockReaderLocalArrayReads2() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
+        true);
+  }
 
-    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
-    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+  @Test
+  public void testBlockReaderLocalArrayReads2NoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
         false);
-    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        UserGroupInformation.getCurrentUser().getShortUserName());
+  }
 
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+  private static class TestBlockReaderLocalByteBufferReads 
+      implements BlockReaderLocalTest {
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException { }
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[]) 
+        throws IOException {
+      ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
+      readFully(reader, buf, 0, 10);
+      assertArrayRegionsEqual(original, 0, buf.array(), 0, 10);
+      readFully(reader, buf, 10, 100);
+      assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
+      readFully(reader, buf, 110, 700);
+      assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
+      reader.skip(1); // skip from offset 810 to offset 811
+      readFully(reader, buf, 811, 5);
+      assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
+    }
+  }
+  
+  @Test
+  public void testBlockReaderLocalByteBufferReads()
+      throws IOException {
+    runBlockReaderLocalTest(
+        new TestBlockReaderLocalByteBufferReads(), true);
   }
 
-  @AfterClass
-  public static void teardownCluster() {
-    cluster.shutdown();
+  @Test
+  public void testBlockReaderLocalByteBufferReadsNoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(
+        new TestBlockReaderLocalByteBufferReads(), false);
   }
+  
+  private static class TestBlockReaderLocalReadCorruptStart
+      implements BlockReaderLocalTest {
+    boolean usingChecksums = false;
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException {
+      RandomAccessFile bf = null;
+      this.usingChecksums = usingChecksums;
+      try {
+        bf = new RandomAccessFile(blockFile, "rw");
+        bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0});
+      } finally {
+        if (bf != null) bf.close();
+      }
+    }
 
-  /**
-   * Test that, in the case of an error, the position and limit of a ByteBuffer
-   * are left unchanged. This is not mandated by ByteBufferReadable, but clients
-   * of this class might immediately issue a retry on failure, so it's polite.
-   */
+    public void doTest(BlockReaderLocal reader, byte original[]) 
+        throws IOException {
+      byte buf[] = new byte[TEST_LENGTH];
+      if (usingChecksums) {
+        try {
+          reader.readFully(buf, 0, 10);
+          Assert.fail("did not detect corruption");
+        } catch (IOException e) {
+          // expected
+        }
+      } else {
+        reader.readFully(buf, 0, 10);
+      }
+    }
+  }
+  
   @Test
-  public void testStablePositionAfterCorruptRead() throws Exception {
-    final short REPL_FACTOR = 1;
-    final long FILE_LENGTH = 512L;
-    cluster.waitActive();
-    FileSystem fs = cluster.getFileSystem();
-
-    Path path = new Path("/corrupted");
-
-    DFSTestUtil.createFile(fs, path, FILE_LENGTH, REPL_FACTOR, 12345L);
-    DFSTestUtil.waitReplication(fs, path, REPL_FACTOR);
-
-    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, path);
-    int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
-    assertEquals("All replicas not corrupted", REPL_FACTOR, blockFilesCorrupted);
-
-    FSDataInputStream dis = cluster.getFileSystem().open(path);
-    ByteBuffer buf = ByteBuffer.allocateDirect((int)FILE_LENGTH);
-    boolean sawException = false;
-    try {
-      dis.read(buf);
-    } catch (ChecksumException ex) {
-      sawException = true;
+  public void testBlockReaderLocalReadCorruptStart()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true);
+  }
+  
+  private static class TestBlockReaderLocalReadCorrupt
+      implements BlockReaderLocalTest {
+    boolean usingChecksums = false;
+    @Override
+    public void setup(File blockFile, boolean usingChecksums) 
+        throws IOException {
+      RandomAccessFile bf = null;
+      this.usingChecksums = usingChecksums;
+      try {
+        bf = new RandomAccessFile(blockFile, "rw");
+        bf.seek(1539);
+        bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0});
+      } finally {
+        if (bf != null) bf.close();
+      }
     }
 
-    assertTrue(sawException);
-    assertEquals(0, buf.position());
-    assertEquals(buf.capacity(), buf.limit());
-
-    dis = cluster.getFileSystem().open(path);
-    buf.position(3);
-    buf.limit(25);
-    sawException = false;
-    try {
-      dis.read(buf);
-    } catch (ChecksumException ex) {
-      sawException = true;
+    public void doTest(BlockReaderLocal reader, byte original[]) 
+        throws IOException {
+      byte buf[] = new byte[TEST_LENGTH];
+      try {
+        reader.readFully(buf, 0, 10);
+        assertArrayRegionsEqual(original, 0, buf, 0, 10);
+        reader.readFully(buf, 10, 100);
+        assertArrayRegionsEqual(original, 10, buf, 10, 100);
+        reader.readFully(buf, 110, 700);
+        assertArrayRegionsEqual(original, 110, buf, 110, 700);
+        reader.skip(1); // skip from offset 810 to offset 811
+        reader.readFully(buf, 811, 5);
+        assertArrayRegionsEqual(original, 811, buf, 811, 5);
+        reader.readFully(buf, 816, 900);
+        if (usingChecksums) {
+          // We should detect the corruption when using a checksum file.
+          Assert.fail("did not detect corruption");
+        }
+      } catch (ChecksumException e) {
+        if (!usingChecksums) {
+          Assert.fail("didn't expect to get ChecksumException: not " +
+              "using checksums.");
+        }
+      }
     }
+  }
 
-    assertTrue(sawException);
-    assertEquals(3, buf.position());
-    assertEquals(25, buf.limit());
+  @Test
+  public void testBlockReaderLocalReadCorrupt()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true);
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false);
   }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java Fri Jan 11 23:52:22 2013
@@ -61,7 +61,7 @@ public class TestClientBlockVerification
         util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
     verify(reader).sendReadResult(Status.CHECKSUM_OK);
-    reader.close(null);
+    reader.close(null, null);
   }
 
   /**
@@ -76,7 +76,7 @@ public class TestClientBlockVerification
     // We asked the blockreader for the whole file, and only read
     // half of it, so no CHECKSUM_OK
     verify(reader, never()).sendReadResult(Status.CHECKSUM_OK);
-    reader.close(null);
+    reader.close(null, null);
   }
 
   /**
@@ -92,7 +92,7 @@ public class TestClientBlockVerification
     // And read half the file
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
     verify(reader).sendReadResult(Status.CHECKSUM_OK);
-    reader.close(null);
+    reader.close(null, null);
   }
 
   /**
@@ -111,7 +111,7 @@ public class TestClientBlockVerification
             util.getBlockReader(testBlock, startOffset, length));
         util.readAndCheckEOS(reader, length, true);
         verify(reader).sendReadResult(Status.CHECKSUM_OK);
-        reader.close(null);
+        reader.close(null, null);
       }
     }
   }

Added: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java?rev=1432335&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java (added)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java Fri Jan 11 23:52:22 2013
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.Test;
+
+public class TestFileInputStreamCache {
+  static final Log LOG = LogFactory.getLog(TestFileInputStreamCache.class);
+
+  @Test
+  public void testCreateAndDestroy() throws Exception {
+    FileInputStreamCache cache = new FileInputStreamCache(10, 1000);
+    cache.close();
+  }
+  
+  private static class TestFileDescriptorPair {
+    TemporarySocketDirectory dir = new TemporarySocketDirectory();
+    FileInputStream fis[];
+
+    public TestFileDescriptorPair() throws IOException {
+      fis = new FileInputStream[2];
+      for (int i = 0; i < 2; i++) {
+        String name = dir.getDir() + "/file" + i;
+        FileOutputStream fos = new FileOutputStream(name);
+        fos.write(1);
+        fos.close();
+        fis[i] = new FileInputStream(name);
+      }
+    }
+
+    public FileInputStream[] getFileInputStreams() {
+      return fis;
+    }
+
+    public void close() throws IOException {
+      IOUtils.cleanup(LOG, fis);
+      dir.close();
+    }
+
+    public boolean compareWith(FileInputStream other[]) {
+      if ((other == null) || (fis == null)) {
+        return other == fis;
+      }
+      if (fis.length != other.length) return false;
+      for (int i = 0; i < fis.length; i++) {
+        if (fis[i] != other[i]) return false;
+      }
+      return true;
+    }
+  }
+
+  @Test
+  public void testAddAndRetrieve() throws Exception {
+    FileInputStreamCache cache = new FileInputStreamCache(1, 1000000);
+    DatanodeID dnId = new DatanodeID("127.0.0.1", "localhost", 
+        "xyzzy", 8080, 9090, 7070);
+    ExtendedBlock block = new ExtendedBlock("poolid", 123);
+    TestFileDescriptorPair pair = new TestFileDescriptorPair();
+    cache.put(dnId, block, pair.getFileInputStreams());
+    FileInputStream fis[] = cache.get(dnId, block);
+    Assert.assertTrue(pair.compareWith(fis));
+    pair.close();
+    cache.close();
+  }
+
+  @Test
+  public void testExpiry() throws Exception {
+    FileInputStreamCache cache = new FileInputStreamCache(1, 10);
+    DatanodeID dnId = new DatanodeID("127.0.0.1", "localhost", 
+        "xyzzy", 8080, 9090, 7070);
+    ExtendedBlock block = new ExtendedBlock("poolid", 123);
+    TestFileDescriptorPair pair = new TestFileDescriptorPair();
+    cache.put(dnId, block, pair.getFileInputStreams());
+    Thread.sleep(cache.getExpiryTimeMs() * 100);
+    FileInputStream fis[] = cache.get(dnId, block);
+    Assert.assertNull(fis);
+    pair.close();
+    cache.close();
+  }
+
+  @Test
+  public void testEviction() throws Exception {
+    FileInputStreamCache cache = new FileInputStreamCache(1, 10000000);
+    DatanodeID dnId = new DatanodeID("127.0.0.1", "localhost", 
+        "xyzzy", 8080, 9090, 7070);
+    ExtendedBlock block = new ExtendedBlock("poolid", 123);
+    TestFileDescriptorPair pair = new TestFileDescriptorPair();
+    cache.put(dnId, block, pair.getFileInputStreams());
+    DatanodeID dnId2 = new DatanodeID("127.0.0.1", "localhost", 
+        "xyzzy", 8081, 9091, 7071);
+    TestFileDescriptorPair pair2 = new TestFileDescriptorPair();
+    cache.put(dnId2, block, pair2.getFileInputStreams());
+    FileInputStream fis[] = cache.get(dnId, block);
+    Assert.assertNull(fis);
+    FileInputStream fis2[] = cache.get(dnId2, block);
+    Assert.assertTrue(pair2.compareWith(fis2));
+    pair.close();
+    cache.close();
+  }
+}

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java Fri Jan 11 23:52:22 2013
@@ -17,14 +17,10 @@
  */
 package org.apache.hadoop.hdfs;
 
-import java.io.IOException;
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Test;
 
 public class TestParallelRead extends TestParallelReadUtil {
-
   @BeforeClass
   static public void setupCluster() throws Exception {
     setupCluster(DEFAULT_REPLICATION_FACTOR, new HdfsConfiguration());
@@ -34,26 +30,4 @@ public class TestParallelRead extends Te
   static public void teardownCluster() throws Exception {
     TestParallelReadUtil.teardownCluster();
   }
-
-  /**
-   * Do parallel read several times with different number of files and threads.
-   *
-   * Note that while this is the only "test" in a junit sense, we're actually
-   * dispatching a lot more. Failures in the other methods (and other threads)
-   * need to be manually collected, which is inconvenient.
-   */
-  @Test
-  public void testParallelReadCopying() throws IOException {
-    runTestWorkload(new CopyingReadWorkerHelper());
-  }
-
-  @Test
-  public void testParallelReadByteBuffer() throws IOException {
-    runTestWorkload(new DirectReadWorkerHelper());
-  }
-
-  @Test
-  public void testParallelReadMixed() throws IOException {
-    runTestWorkload(new MixedWorkloadHelper());
-  }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java Fri Jan 11 23:52:22 2013
@@ -32,12 +32,18 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
+import org.junit.Assume;
+import org.junit.Ignore;
+import org.junit.Test;
 
 /**
  * Driver class for testing the use of DFSInputStream by multiple concurrent
- * readers, using the different read APIs. See subclasses for the actual test
- * cases.
+ * readers, using the different read APIs.
+ *
+ * This class is marked as @Ignore so that junit doesn't try to execute the
+ * tests in here directly.  They are executed from subclasses.
  */
+@Ignore
 public class TestParallelReadUtil {
 
   static final Log LOG = LogFactory.getLog(TestParallelReadUtil.class);
@@ -386,4 +392,28 @@ public class TestParallelReadUtil {
     util.shutdown();
   }
 
+  /**
+   * Do parallel read several times with different number of files and threads.
+   *
+   * Note that while this is the only "test" in a junit sense, we're actually
+   * dispatching a lot more. Failures in the other methods (and other threads)
+   * need to be manually collected, which is inconvenient.
+   */
+  @Test
+  public void testParallelReadCopying() throws IOException {
+    Assume.assumeTrue(util.haveRequiredResources());
+    runTestWorkload(new CopyingReadWorkerHelper());
+  }
+
+  @Test
+  public void testParallelReadByteBuffer() throws IOException {
+    Assume.assumeTrue(util.haveRequiredResources());
+    runTestWorkload(new DirectReadWorkerHelper());
+  }
+
+  @Test
+  public void testParallelReadMixed() throws IOException {
+    Assume.assumeTrue(util.haveRequiredResources());
+    runTestWorkload(new MixedWorkloadHelper());
+  }
 }

Added: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java?rev=1432335&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java (added)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java Fri Jan 11 23:52:22 2013
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class TestParallelShortCircuitRead extends TestParallelReadUtil {
+  private static TemporarySocketDirectory sockDir;
+
+  @BeforeClass
+  static public void setupCluster() throws Exception {
+    sockDir = new TemporarySocketDirectory();
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY,
+      new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.setBoolean(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
+    DomainSocket.disableBindPathValidation();
+    setupCluster(1, conf);
+  }
+
+  @AfterClass
+  static public void teardownCluster() throws Exception {
+    sockDir.close();
+    TestParallelReadUtil.teardownCluster();
+  }
+}
\ No newline at end of file

Added: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java?rev=1432335&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java (added)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java Fri Jan 11 23:52:22 2013
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class TestParallelShortCircuitReadNoChecksum extends TestParallelReadUtil {
+  private static TemporarySocketDirectory sockDir;
+
+  @BeforeClass
+  static public void setupCluster() throws Exception {
+    sockDir = new TemporarySocketDirectory();
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY,
+      new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.setBoolean(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
+    DomainSocket.disableBindPathValidation();
+    setupCluster(1, conf);
+  }
+
+  @AfterClass
+  static public void teardownCluster() throws Exception {
+    sockDir.close();
+    TestParallelReadUtil.teardownCluster();
+  }
+}

Added: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java?rev=1432335&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java (added)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java Fri Jan 11 23:52:22 2013
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class TestParallelUnixDomainRead extends TestParallelReadUtil {
+  private static TemporarySocketDirectory sockDir;
+
+  @BeforeClass
+  static public void setupCluster() throws Exception {
+    sockDir = new TemporarySocketDirectory();
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY,
+      new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
+    DomainSocket.disableBindPathValidation();
+    setupCluster(1, conf);
+  }
+
+  @AfterClass
+  static public void teardownCluster() throws Exception {
+    sockDir.close();
+    TestParallelReadUtil.teardownCluster();
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Fri Jan 11 23:52:22 2013
@@ -20,9 +20,11 @@ package org.apache.hadoop.hdfs;
 import static org.junit.Assert.assertTrue;
 
 import java.io.EOFException;
+import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -30,21 +32,22 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**
@@ -55,8 +58,18 @@ import org.junit.Test;
  * system.
  */
 public class TestShortCircuitLocalRead {
+  private static TemporarySocketDirectory sockDir;
 
-  static final String DIR = "/" + TestShortCircuitLocalRead.class.getSimpleName() + "/";
+  @BeforeClass
+  public static void init() {
+    sockDir = new TemporarySocketDirectory();
+    DomainSocket.disableBindPathValidation();
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    sockDir.close();
+  }
 
   static final long seed = 0xDEADBEEFL;
   static final int blockSize = 5120;
@@ -81,7 +94,9 @@ public class TestShortCircuitLocalRead {
     for (int idx = 0; idx < len; idx++) {
       if (expected[from + idx] != actual[idx]) {
         Assert.fail(message + " byte " + (from + idx) + " differs. expected "
-            + expected[from + idx] + " actual " + actual[idx]);
+            + expected[from + idx] + " actual " + actual[idx] +
+            "\nexpected: " + StringUtils.byteToHexString(expected, from, from + len) +
+            "\nactual:   " + StringUtils.byteToHexString(actual, 0, len));
       }
     }
   }
@@ -170,8 +185,9 @@ public class TestShortCircuitLocalRead {
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
         ignoreChecksum);
-    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        UserGroupInformation.getCurrentUser().getShortUserName());
+    conf.set(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY,
+        new File(sockDir.getDir(),
+          "TestShortCircuitLocalRead.__PORT__.sock").getAbsolutePath());
     if (simulatedStorage) {
       SimulatedFSDataset.setFactory(conf);
     }
@@ -229,23 +245,17 @@ public class TestShortCircuitLocalRead {
     doTestShortCircuitRead(false, 10*blockSize+100, 777);
     doTestShortCircuitRead(true, 10*blockSize+100, 777);
   }
-   
+
   @Test
-  public void testGetBlockLocalPathInfo() throws IOException, InterruptedException {
+  public void testDeprecatedGetBlockLocalPathInfoRpc()
+      throws IOException, InterruptedException {
     final Configuration conf = new Configuration();
-    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        "alloweduser1,alloweduser2");
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
         .format(true).build();
     cluster.waitActive();
-    final DataNode dn = cluster.getDataNodes().get(0);
     FileSystem fs = cluster.getFileSystem();
     try {
       DFSTestUtil.createFile(fs, new Path("/tmp/x"), 16, (short) 1, 23);
-      UserGroupInformation aUgi1 =
-          UserGroupInformation.createRemoteUser("alloweduser1");
-      UserGroupInformation aUgi2 =
-          UserGroupInformation.createRemoteUser("alloweduser2");
       LocatedBlocks lb = cluster.getNameNode().getRpcServer()
           .getBlockLocations("/tmp/x", 0, 16);
       // Create a new block object, because the block inside LocatedBlock at
@@ -253,51 +263,11 @@ public class TestShortCircuitLocalRead {
       ExtendedBlock blk = new ExtendedBlock(lb.get(0).getBlock());
       Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
       final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
-      ClientDatanodeProtocol proxy = aUgi1
-          .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
-            @Override
-            public ClientDatanodeProtocol run() throws Exception {
-              return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
-                  60000, false);
-            }
-          });
-      
-      // This should succeed
-      BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token);
-      Assert.assertEquals(
-          DataNodeTestUtils.getFSDataset(dn).getBlockLocalPathInfo(blk).getBlockPath(),
-          blpi.getBlockPath());
-
-      // Try with the other allowed user
-      proxy = aUgi2
-          .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
-            @Override
-            public ClientDatanodeProtocol run() throws Exception {
-              return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
-                  60000, false);
-            }
-          });
-
-      // This should succeed as well
-      blpi = proxy.getBlockLocalPathInfo(blk, token);
-      Assert.assertEquals(
-          DataNodeTestUtils.getFSDataset(dn).getBlockLocalPathInfo(blk).getBlockPath(),
-          blpi.getBlockPath());
-
-      // Now try with a disallowed user
-      UserGroupInformation bUgi = UserGroupInformation
-          .createRemoteUser("notalloweduser");
-      proxy = bUgi
-          .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
-            @Override
-            public ClientDatanodeProtocol run() throws Exception {
-              return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
-                  60000, false);
-            }
-          });
+      ClientDatanodeProtocol proxy = 
+          DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, 60000, false);
       try {
         proxy.getBlockLocalPathInfo(blk, token);
-        Assert.fail("The call should have failed as " + bUgi.getShortUserName()
+        Assert.fail("The call should have failed as this user "
             + " is not allowed to call getBlockLocalPathInfo");
       } catch (IOException ex) {
         Assert.assertTrue(ex.getMessage().contains(
@@ -315,8 +285,6 @@ public class TestShortCircuitLocalRead {
     Configuration conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
-    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        UserGroupInformation.getCurrentUser().getShortUserName());
     if (simulatedStorage) {
       SimulatedFSDataset.setFactory(conf);
     }
@@ -354,6 +322,86 @@ public class TestShortCircuitLocalRead {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testHandleTruncatedBlockFile() throws IOException {
+    MiniDFSCluster cluster = null;
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
+    conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
+    final Path TEST_PATH = new Path("/a");
+    final Path TEST_PATH2 = new Path("/b");
+    final long RANDOM_SEED = 4567L;
+    final long RANDOM_SEED2 = 4568L;
+    FSDataInputStream fsIn = null;
+    final int TEST_LENGTH = 3456;
+    
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          TEST_LENGTH, (short)1, RANDOM_SEED);
+      DFSTestUtil.createFile(fs, TEST_PATH2,
+          TEST_LENGTH, (short)1, RANDOM_SEED2);
+      fsIn = cluster.getFileSystem().open(TEST_PATH2);
+      byte original[] = new byte[TEST_LENGTH];
+      IOUtils.readFully(fsIn, original, 0, TEST_LENGTH);
+      fsIn.close();
+      fsIn = null;
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+      File dataFile = MiniDFSCluster.getBlockFile(0, block);
+      cluster.shutdown();
+      cluster = null;
+      RandomAccessFile raf = null;
+      try {
+        raf = new RandomAccessFile(dataFile, "rw");
+        raf.setLength(0);
+      } finally {
+        if (raf != null) raf.close();
+      }
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(false).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      fsIn = fs.open(TEST_PATH);
+      try {
+        byte buf[] = new byte[100];
+        fsIn.seek(2000);
+        fsIn.readFully(buf, 0, buf.length);
+        Assert.fail("shouldn't be able to read from corrupt 0-length " +
+            "block file.");
+      } catch (IOException e) {
+        DFSClient.LOG.error("caught exception ", e);
+      }
+      fsIn.close();
+      fsIn = null;
+
+      // We should still be able to read the other file.
+      // This is important because it indicates that we detected that the 
+      // previous block was corrupt, rather than blaming the problem on
+      // communication.
+      fsIn = fs.open(TEST_PATH2);
+      byte buf[] = new byte[original.length];
+      fsIn.readFully(buf, 0, buf.length);
+      TestBlockReaderLocal.assertArrayRegionsEqual(original, 0, buf, 0,
+          original.length);
+      fsIn.close();
+      fsIn = null;
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (cluster != null) cluster.shutdown();
+    }
+  }
      
   /**
    * Test to run benchmarks between shortcircuit read vs regular read with

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Fri Jan 11 23:52:22 2013
@@ -148,7 +148,7 @@ public class TestBlockTokenWithDFS {
       blockReader = BlockReaderFactory.newBlockReader(
           conf, file, block, lblock.getBlockToken(), 0, -1,
           true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
-          nodes[0]);
+          nodes[0], null, false);
 
     } catch (IOException ex) {
       if (ex instanceof InvalidBlockTokenException) {

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Fri Jan 11 23:52:22 2013
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -962,6 +963,12 @@ public class SimulatedFSDataset implemen
   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
+      throws IOException {
+    throw new UnsupportedOperationException();
+  }
   
   @Override
   public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)

Modified: hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1432335&r1=1432334&r2=1432335&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Fri Jan 11 23:52:22 2013
@@ -32,6 +32,7 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -281,11 +282,11 @@ public class TestDataNodeVolumeFailure {
     String file = BlockReaderFactory.getFileName(targetAddr, 
         "test-blockpoolid",
         block.getBlockId());
-    BlockReaderFactory.newBlockReader(conf, file, block,
+    BlockReader blockReader =
+      BlockReaderFactory.newBlockReader(conf, file, block,
         lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
-        TcpPeerServer.peerFromSocket(s), datanode);
-
-    // nothing - if it fails - it will throw and exception
+        TcpPeerServer.peerFromSocket(s), datanode, null, false);
+    blockReader.close(null, null);
   }
   
   /**