You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2013/04/13 04:14:01 UTC

svn commit: r1467538 [3/3] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/net/ src/main/java/org/apache/hadoop/hdfs/protocol/datatrans...

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Sat Apr 13 02:13:59 2013
@@ -17,90 +17,333 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestBlockReaderLocal {
-  static MiniDFSCluster cluster;
-  static HdfsConfiguration conf;
+  public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
+      int off2, int len) {
+    for (int i = 0; i < len; i++) {
+      if (buf1[off1 + i] != buf2[off2 + i]) {
+        Assert.fail("arrays differ at byte " +  i + ". " + 
+          "The first array has " + (int)buf1[off1 + i] + 
+          ", but the second array has " + (int)buf2[off2 + i]);
+      }
+    }
+  }
 
-  @BeforeClass
-  public static void setupCluster() throws IOException {
-    conf = new HdfsConfiguration();
+  /**
+   * Similar to IOUtils#readFully(). Reads bytes in a loop.
+   *
+   * @param reader           The BlockReaderLocal to read bytes from
+   * @param buf              The ByteBuffer to read into
+   * @param off              The offset in the buffer to read into
+   * @param len              The number of bytes to read.
+   * 
+   * @throws IOException     If it could not read the requested number of bytes
+   */
+  private static void readFully(BlockReaderLocal reader,
+      ByteBuffer buf, int off, int len) throws IOException {
+    int amt = len;
+    while (amt > 0) {
+      buf.limit(off + len);
+      buf.position(off);
+      long ret = reader.read(buf);
+      if (ret < 0) {
+        throw new EOFException( "Premature EOF from BlockReaderLocal " +
+            "after reading " + (len - amt) + " byte(s).");
+      }
+      amt -= ret;
+      off += ret;
+    }
+  }
+
+  private static interface BlockReaderLocalTest {
+    final int TEST_LENGTH = 12345;
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException;
+    public void doTest(BlockReaderLocal reader, byte original[])
+        throws IOException;
+  }
+  
+  public void runBlockReaderLocalTest(BlockReaderLocalTest test,
+      boolean checksum) throws IOException {
+    MiniDFSCluster cluster = null;
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, !checksum);
+    conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
+    FileInputStream dataIn = null, checkIn = null;
+    final Path TEST_PATH = new Path("/a");
+    final long RANDOM_SEED = 4567L;
+    BlockReaderLocal blockReaderLocal = null;
+    FSDataInputStream fsIn = null;
+    byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+    
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      IOUtils.readFully(fsIn, original, 0,
+          BlockReaderLocalTest.TEST_LENGTH);
+      fsIn.close();
+      fsIn = null;
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+      File dataFile = MiniDFSCluster.getBlockFile(0, block);
+      File metaFile = MiniDFSCluster.getBlockMetadataFile(0, block);
+
+      DatanodeID datanodeID = cluster.getDataNodes().get(0).getDatanodeId();
+      cluster.shutdown();
+      cluster = null;
+      test.setup(dataFile, checksum);
+      dataIn = new FileInputStream(dataFile);
+      checkIn = new FileInputStream(metaFile);
+      blockReaderLocal = new BlockReaderLocal(conf,
+          TEST_PATH.getName(), block, 0, -1,
+          dataIn, checkIn, datanodeID, checksum);
+      dataIn = null;
+      checkIn = null;
+      test.doTest(blockReaderLocal, original);
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (cluster != null) cluster.shutdown();
+      if (dataIn != null) dataIn.close();
+      if (checkIn != null) checkIn.close();
+      if (blockReaderLocal != null) blockReaderLocal.close(null, null);
+    }
+  }
+  
+  private static class TestBlockReaderLocalImmediateClose 
+      implements BlockReaderLocalTest {
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException { }
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[]) 
+        throws IOException { }
+  }
+  
+  @Test
+  public void testBlockReaderLocalImmediateClose() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true);
+    runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false);
+  }
+  
+  private static class TestBlockReaderSimpleReads 
+      implements BlockReaderLocalTest {
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException { }
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[]) 
+        throws IOException {
+      byte buf[] = new byte[TEST_LENGTH];
+      reader.readFully(buf, 0, 512);
+      assertArrayRegionsEqual(original, 0, buf, 0, 512);
+      reader.readFully(buf, 512, 512);
+      assertArrayRegionsEqual(original, 512, buf, 512, 512);
+      reader.readFully(buf, 1024, 513);
+      assertArrayRegionsEqual(original, 1024, buf, 1024, 513);
+      reader.readFully(buf, 1537, 514);
+      assertArrayRegionsEqual(original, 1537, buf, 1537, 514);
+    }
+  }
+  
+  @Test
+  public void testBlockReaderSimpleReads() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true);
+  }
+
+  @Test
+  public void testBlockReaderSimpleReadsNoChecksum() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false);
+  }
+  
+  private static class TestBlockReaderLocalArrayReads2 
+      implements BlockReaderLocalTest {
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException { }
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[]) 
+        throws IOException {
+      byte buf[] = new byte[TEST_LENGTH];
+      reader.readFully(buf, 0, 10);
+      assertArrayRegionsEqual(original, 0, buf, 0, 10);
+      reader.readFully(buf, 10, 100);
+      assertArrayRegionsEqual(original, 10, buf, 10, 100);
+      reader.readFully(buf, 110, 700);
+      assertArrayRegionsEqual(original, 110, buf, 110, 700);
+      reader.readFully(buf, 810, 1); // from offset 810 to offset 811
+      reader.readFully(buf, 811, 5);
+      assertArrayRegionsEqual(original, 811, buf, 811, 5);
+      reader.readFully(buf, 816, 900); // skip from offset 816 to offset 1716
+      reader.readFully(buf, 1716, 5);
+      assertArrayRegionsEqual(original, 1716, buf, 1716, 5);
+    }
+  }
+  
+  @Test
+  public void testBlockReaderLocalArrayReads2() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
+        true);
+  }
 
-    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
-    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+  @Test
+  public void testBlockReaderLocalArrayReads2NoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
         false);
-    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        UserGroupInformation.getCurrentUser().getShortUserName());
+  }
 
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+  private static class TestBlockReaderLocalByteBufferReads 
+      implements BlockReaderLocalTest {
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException { }
+    @Override
+    public void doTest(BlockReaderLocal reader, byte original[]) 
+        throws IOException {
+      ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
+      readFully(reader, buf, 0, 10);
+      assertArrayRegionsEqual(original, 0, buf.array(), 0, 10);
+      readFully(reader, buf, 10, 100);
+      assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
+      readFully(reader, buf, 110, 700);
+      assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
+      reader.skip(1); // skip from offset 810 to offset 811
+      readFully(reader, buf, 811, 5);
+      assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
+    }
+  }
+  
+  @Test
+  public void testBlockReaderLocalByteBufferReads()
+      throws IOException {
+    runBlockReaderLocalTest(
+        new TestBlockReaderLocalByteBufferReads(), true);
   }
 
-  @AfterClass
-  public static void teardownCluster() {
-    cluster.shutdown();
+  @Test
+  public void testBlockReaderLocalByteBufferReadsNoChecksum()
+      throws IOException {
+    runBlockReaderLocalTest(
+        new TestBlockReaderLocalByteBufferReads(), false);
   }
+  
+  private static class TestBlockReaderLocalReadCorruptStart
+      implements BlockReaderLocalTest {
+    boolean usingChecksums = false;
+    @Override
+    public void setup(File blockFile, boolean usingChecksums)
+        throws IOException {
+      RandomAccessFile bf = null;
+      this.usingChecksums = usingChecksums;
+      try {
+        bf = new RandomAccessFile(blockFile, "rw");
+        bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0});
+      } finally {
+        if (bf != null) bf.close();
+      }
+    }
 
-  /**
-   * Test that, in the case of an error, the position and limit of a ByteBuffer
-   * are left unchanged. This is not mandated by ByteBufferReadable, but clients
-   * of this class might immediately issue a retry on failure, so it's polite.
-   */
+    public void doTest(BlockReaderLocal reader, byte original[]) 
+        throws IOException {
+      byte buf[] = new byte[TEST_LENGTH];
+      if (usingChecksums) {
+        try {
+          reader.readFully(buf, 0, 10);
+          Assert.fail("did not detect corruption");
+        } catch (IOException e) {
+          // expected
+        }
+      } else {
+        reader.readFully(buf, 0, 10);
+      }
+    }
+  }
+  
   @Test
-  public void testStablePositionAfterCorruptRead() throws Exception {
-    final short REPL_FACTOR = 1;
-    final long FILE_LENGTH = 512L;
-    cluster.waitActive();
-    FileSystem fs = cluster.getFileSystem();
-
-    Path path = new Path("/corrupted");
-
-    DFSTestUtil.createFile(fs, path, FILE_LENGTH, REPL_FACTOR, 12345L);
-    DFSTestUtil.waitReplication(fs, path, REPL_FACTOR);
-
-    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, path);
-    int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
-    assertEquals("All replicas not corrupted", REPL_FACTOR, blockFilesCorrupted);
-
-    FSDataInputStream dis = cluster.getFileSystem().open(path);
-    ByteBuffer buf = ByteBuffer.allocateDirect((int)FILE_LENGTH);
-    boolean sawException = false;
-    try {
-      dis.read(buf);
-    } catch (ChecksumException ex) {
-      sawException = true;
+  public void testBlockReaderLocalReadCorruptStart()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true);
+  }
+  
+  private static class TestBlockReaderLocalReadCorrupt
+      implements BlockReaderLocalTest {
+    boolean usingChecksums = false;
+    @Override
+    public void setup(File blockFile, boolean usingChecksums) 
+        throws IOException {
+      RandomAccessFile bf = null;
+      this.usingChecksums = usingChecksums;
+      try {
+        bf = new RandomAccessFile(blockFile, "rw");
+        bf.seek(1539);
+        bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0});
+      } finally {
+        if (bf != null) bf.close();
+      }
     }
 
-    assertTrue(sawException);
-    assertEquals(0, buf.position());
-    assertEquals(buf.capacity(), buf.limit());
-
-    dis = cluster.getFileSystem().open(path);
-    buf.position(3);
-    buf.limit(25);
-    sawException = false;
-    try {
-      dis.read(buf);
-    } catch (ChecksumException ex) {
-      sawException = true;
+    public void doTest(BlockReaderLocal reader, byte original[]) 
+        throws IOException {
+      byte buf[] = new byte[TEST_LENGTH];
+      try {
+        reader.readFully(buf, 0, 10);
+        assertArrayRegionsEqual(original, 0, buf, 0, 10);
+        reader.readFully(buf, 10, 100);
+        assertArrayRegionsEqual(original, 10, buf, 10, 100);
+        reader.readFully(buf, 110, 700);
+        assertArrayRegionsEqual(original, 110, buf, 110, 700);
+        reader.skip(1); // skip from offset 810 to offset 811
+        reader.readFully(buf, 811, 5);
+        assertArrayRegionsEqual(original, 811, buf, 811, 5);
+        reader.readFully(buf, 816, 900);
+        if (usingChecksums) {
+          // We should detect the corruption when using a checksum file.
+          Assert.fail("did not detect corruption");
+        }
+      } catch (ChecksumException e) {
+        if (!usingChecksums) {
+          Assert.fail("didn't expect to get ChecksumException: not " +
+              "using checksums.");
+        }
+      }
     }
+  }
 
-    assertTrue(sawException);
-    assertEquals(3, buf.position());
-    assertEquals(25, buf.limit());
+  @Test
+  public void testBlockReaderLocalReadCorrupt()
+      throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true);
+    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false);
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java Sat Apr 13 02:13:59 2013
@@ -61,7 +61,7 @@ public class TestClientBlockVerification
         util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
     verify(reader).sendReadResult(Status.CHECKSUM_OK);
-    reader.close();
+    reader.close(null, null);
   }
 
   /**
@@ -76,7 +76,7 @@ public class TestClientBlockVerification
     // We asked the blockreader for the whole file, and only read
     // half of it, so no CHECKSUM_OK
     verify(reader, never()).sendReadResult(Status.CHECKSUM_OK);
-    reader.close();
+    reader.close(null, null);
   }
 
   /**
@@ -92,7 +92,7 @@ public class TestClientBlockVerification
     // And read half the file
     util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
     verify(reader).sendReadResult(Status.CHECKSUM_OK);
-    reader.close();
+    reader.close(null, null);
   }
 
   /**
@@ -111,7 +111,7 @@ public class TestClientBlockVerification
             util.getBlockReader(testBlock, startOffset, length));
         util.readAndCheckEOS(reader, length, true);
         verify(reader).sendReadResult(Status.CHECKSUM_OK);
-        reader.close();
+        reader.close(null, null);
       }
     }
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java Sat Apr 13 02:13:59 2013
@@ -18,28 +18,20 @@
 package org.apache.hadoop.hdfs;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.security.PrivilegedExceptionAction;
+
+import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.security.token.Token;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.Mockito;
@@ -55,59 +47,31 @@ public class TestConnCache {
 
   static final int BLOCK_SIZE = 4096;
   static final int FILE_SIZE = 3 * BLOCK_SIZE;
-  final static int CACHE_SIZE = 4;
-  final static long CACHE_EXPIRY_MS = 200;
-  static Configuration conf = null;
-  static MiniDFSCluster cluster = null;
-  static FileSystem fs = null;
-  static SocketCache cache;
-
-  static final Path testFile = new Path("/testConnCache.dat");
-  static byte authenticData[] = null;
-
-  static BlockReaderTestUtil util = null;
-
 
   /**
    * A mock Answer to remember the BlockReader used.
    *
    * It verifies that all invocation to DFSInputStream.getBlockReader()
-   * use the same socket.
+   * use the same peer.
    */
   private class MockGetBlockReader implements Answer<RemoteBlockReader2> {
     public RemoteBlockReader2 reader = null;
-    private Socket sock = null;
+    private Peer peer = null;
 
     @Override
     public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable {
       RemoteBlockReader2 prevReader = reader;
       reader = (RemoteBlockReader2) invocation.callRealMethod();
-      if (sock == null) {
-        sock = reader.dnSock;
+      if (peer == null) {
+        peer = reader.getPeer();
       } else if (prevReader != null) {
-        assertSame("DFSInputStream should use the same socket",
-                   sock, reader.dnSock);
+        Assert.assertSame("DFSInputStream should use the same peer",
+                   peer, reader.getPeer());
       }
       return reader;
     }
   }
 
-  @BeforeClass
-  public static void setupCluster() throws Exception {
-    final int REPLICATION_FACTOR = 1;
-
-    /* create a socket cache. There is only one socket cache per jvm */
-    cache = SocketCache.getInstance(CACHE_SIZE, CACHE_EXPIRY_MS);
-
-    util = new BlockReaderTestUtil(REPLICATION_FACTOR);
-    cluster = util.getCluster();
-    conf = util.getConf();
-    fs = cluster.getFileSystem();
-
-    authenticData = util.writeFile(testFile, FILE_SIZE / 1024);
-  }
-
-
   /**
    * (Optionally) seek to position, read and verify data.
    *
@@ -117,9 +81,10 @@ public class TestConnCache {
                      long pos,
                      byte[] buffer,
                      int offset,
-                     int length)
+                     int length,
+                     byte[] authenticData)
       throws IOException {
-    assertTrue("Test buffer too small", buffer.length >= offset + length);
+    Assert.assertTrue("Test buffer too small", buffer.length >= offset + length);
 
     if (pos >= 0)
       in.seek(pos);
@@ -129,7 +94,7 @@ public class TestConnCache {
 
     while (length > 0) {
       int cnt = in.read(buffer, offset, length);
-      assertTrue("Error in read", cnt > 0);
+      Assert.assertTrue("Error in read", cnt > 0);
       offset += cnt;
       length -= cnt;
     }
@@ -145,115 +110,22 @@ public class TestConnCache {
   }
 
   /**
-   * Test the SocketCache itself.
-   */
-  @Test
-  public void testSocketCache() throws Exception {
-    // Make a client
-    InetSocketAddress nnAddr =
-        new InetSocketAddress("localhost", cluster.getNameNodePort());
-    DFSClient client = new DFSClient(nnAddr, conf);
-
-    // Find out the DN addr
-    LocatedBlock block =
-        client.getNamenode().getBlockLocations(
-            testFile.toString(), 0, FILE_SIZE)
-        .getLocatedBlocks().get(0);
-    DataNode dn = util.getDataNode(block);
-    InetSocketAddress dnAddr = dn.getXferAddress();
-
-
-    // Make some sockets to the DN
-    Socket[] dnSockets = new Socket[CACHE_SIZE];
-    for (int i = 0; i < dnSockets.length; ++i) {
-      dnSockets[i] = client.socketFactory.createSocket(
-          dnAddr.getAddress(), dnAddr.getPort());
-    }
-
-
-    // Insert a socket to the NN
-    Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort());
-    cache.put(nnSock, null);
-    assertSame("Read the write", nnSock, cache.get(nnAddr).sock);
-    cache.put(nnSock, null);
-
-    // Insert DN socks
-    for (Socket dnSock : dnSockets) {
-      cache.put(dnSock, null);
-    }
-
-    assertEquals("NN socket evicted", null, cache.get(nnAddr));
-    assertTrue("Evicted socket closed", nnSock.isClosed());
- 
-    // Lookup the DN socks
-    for (Socket dnSock : dnSockets) {
-      assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr).sock);
-      dnSock.close();
-    }
-
-    assertEquals("Cache is empty", 0, cache.size());
-  }
-
-
-  /**
-   * Test the SocketCache expiry.
-   * Verify that socket cache entries expire after the set
-   * expiry time.
-   */
-  @Test
-  public void testSocketCacheExpiry() throws Exception {
-    // Make a client
-    InetSocketAddress nnAddr =
-        new InetSocketAddress("localhost", cluster.getNameNodePort());
-    DFSClient client = new DFSClient(nnAddr, conf);
-
-    // Find out the DN addr
-    LocatedBlock block =
-        client.getNamenode().getBlockLocations(
-            testFile.toString(), 0, FILE_SIZE)
-        .getLocatedBlocks().get(0);
-    DataNode dn = util.getDataNode(block);
-    InetSocketAddress dnAddr = dn.getXferAddress();
-
-
-    // Make some sockets to the DN and put in cache
-    Socket[] dnSockets = new Socket[CACHE_SIZE];
-    for (int i = 0; i < dnSockets.length; ++i) {
-      dnSockets[i] = client.socketFactory.createSocket(
-          dnAddr.getAddress(), dnAddr.getPort());
-      cache.put(dnSockets[i], null);
-    }
-
-    // Client side still has the sockets cached
-    assertEquals(CACHE_SIZE, client.socketCache.size());
-
-    //sleep for a second and see if it expired
-    Thread.sleep(CACHE_EXPIRY_MS + 1000);
-    
-    // Client side has no sockets cached
-    assertEquals(0, client.socketCache.size());
-
-    //sleep for another second and see if 
-    //the daemon thread runs fine on empty cache
-    Thread.sleep(CACHE_EXPIRY_MS + 1000);
-  }
-
-
-  /**
    * Read a file served entirely from one DN. Seek around and read from
    * different offsets. And verify that they all use the same socket.
-   *
-   * @throws java.io.IOException
+   * @throws Exception 
    */
   @Test
   @SuppressWarnings("unchecked")
-  public void testReadFromOneDN() throws IOException {
-    LOG.info("Starting testReadFromOneDN()");
+  public void testReadFromOneDN() throws Exception {
+    BlockReaderTestUtil util = new BlockReaderTestUtil(1,
+        new HdfsConfiguration());
+    final Path testFile = new Path("/testConnCache.dat");
+    byte authenticData[] = util.writeFile(testFile, FILE_SIZE / 1024);
     DFSClient client = new DFSClient(
-        new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
-    DFSInputStream in = spy(client.open(testFile.toString()));
+        new InetSocketAddress("localhost",
+            util.getCluster().getNameNodePort()), util.getConf());
+    DFSInputStream in = Mockito.spy(client.open(testFile.toString()));
     LOG.info("opened " + testFile.toString());
-
     byte[] dataBuf = new byte[BLOCK_SIZE];
 
     MockGetBlockReader answer = new MockGetBlockReader();
@@ -270,18 +142,15 @@ public class TestConnCache {
                            Matchers.anyString());
 
     // Initial read
-    pread(in, 0, dataBuf, 0, dataBuf.length);
+    pread(in, 0, dataBuf, 0, dataBuf.length, authenticData);
     // Read again and verify that the socket is the same
-    pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length);
-    pread(in, 1024, dataBuf, 0, dataBuf.length);
-    pread(in, -1, dataBuf, 0, dataBuf.length);            // No seek; just read
-    pread(in, 64, dataBuf, 0, dataBuf.length / 2);
+    pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length,
+        authenticData);
+    pread(in, 1024, dataBuf, 0, dataBuf.length, authenticData);
+    // No seek; just read
+    pread(in, -1, dataBuf, 0, dataBuf.length, authenticData);
+    pread(in, 64, dataBuf, 0, dataBuf.length / 2, authenticData);
 
     in.close();
   }
-
-  @AfterClass
-  public static void teardownCluster() throws Exception {
-    util.shutdown();
-  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java Sat Apr 13 02:13:59 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -70,7 +71,7 @@ public class TestDataTransferKeepalive {
       .numDataNodes(1).build();
     fs = cluster.getFileSystem();
     dfsClient = ((DistributedFileSystem)fs).dfs;
-    dfsClient.socketCache.clear();
+    dfsClient.peerCache.clear();
 
     String poolId = cluster.getNamesystem().getBlockPoolId();
     dn = cluster.getDataNodes().get(0);
@@ -93,13 +94,13 @@ public class TestDataTransferKeepalive {
     DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
 
     // Clients that write aren't currently re-used.
-    assertEquals(0, dfsClient.socketCache.size());
+    assertEquals(0, dfsClient.peerCache.size());
     assertXceiverCount(0);
 
     // Reads the file, so we should get a
     // cached socket, and should have an xceiver on the other side.
     DFSTestUtil.readFile(fs, TEST_FILE);
-    assertEquals(1, dfsClient.socketCache.size());
+    assertEquals(1, dfsClient.peerCache.size());
     assertXceiverCount(1);
 
     // Sleep for a bit longer than the keepalive timeout
@@ -110,13 +111,13 @@ public class TestDataTransferKeepalive {
     // The socket is still in the cache, because we don't
     // notice that it's closed until we try to read
     // from it again.
-    assertEquals(1, dfsClient.socketCache.size());
+    assertEquals(1, dfsClient.peerCache.size());
     
     // Take it out of the cache - reading should
     // give an EOF.
-    Socket s = dfsClient.socketCache.get(dnAddr).sock;
-    assertNotNull(s);
-    assertEquals(-1, NetUtils.getInputStream(s).read());
+    Peer peer = dfsClient.peerCache.get(dn.getDatanodeId(), false);
+    assertNotNull(peer);
+    assertEquals(-1, peer.getInputStream().read());
   }
 
   /**
@@ -175,14 +176,14 @@ public class TestDataTransferKeepalive {
     }
     
     DFSClient client = ((DistributedFileSystem)fs).dfs;
-    assertEquals(5, client.socketCache.size());
+    assertEquals(5, client.peerCache.size());
     
     // Let all the xceivers timeout
     Thread.sleep(1500);
     assertXceiverCount(0);
 
     // Client side still has the sockets cached
-    assertEquals(5, client.socketCache.size());
+    assertEquals(5, client.peerCache.size());
 
     // Reading should not throw an exception.
     DFSTestUtil.readFile(fs, TEST_FILE);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java Sat Apr 13 02:13:59 2013
@@ -17,53 +17,30 @@
  */
 package org.apache.hadoop.hdfs;
 
-import java.io.IOException;
-
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.log4j.Level;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Test;
 
 public class TestParallelRead extends TestParallelReadUtil {
-
   @BeforeClass
   static public void setupCluster() throws Exception {
-    setupCluster(DEFAULT_REPLICATION_FACTOR, new HdfsConfiguration());
+    // This is a test of the normal (TCP) read path.  For this reason, we turn
+    // off both short-circuit local reads and UNIX domain socket data traffic.
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
+                    false);
+    // dfs.domain.socket.path should be ignored because the previous two keys
+    // were set to false.  This is a regression test for HDFS-4473.
+    conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, "/will/not/be/created");
+
+    setupCluster(DEFAULT_REPLICATION_FACTOR, conf);
   }
 
   @AfterClass
   static public void teardownCluster() throws Exception {
     TestParallelReadUtil.teardownCluster();
   }
-
-  /**
-   * Do parallel read several times with different number of files and threads.
-   *
-   * Note that while this is the only "test" in a junit sense, we're actually
-   * dispatching a lot more. Failures in the other methods (and other threads)
-   * need to be manually collected, which is inconvenient.
-   */
-  @Test
-  public void testParallelReadCopying() throws IOException {
-    runTestWorkload(new CopyingReadWorkerHelper());
-  }
-
-  @Test
-  public void testParallelReadByteBuffer() throws IOException {
-    runTestWorkload(new DirectReadWorkerHelper());
-  }
-
-  @Test
-  public void testParallelReadMixed() throws IOException {
-    runTestWorkload(new MixedWorkloadHelper());
-  }
-  
-  @Test
-  public void testParallelNoChecksums() throws IOException {
-    verifyChecksums = false;
-    runTestWorkload(new MixedWorkloadHelper());
-  }
-
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java Sat Apr 13 02:13:59 2013
@@ -32,12 +32,18 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
+import org.junit.Assume;
+import org.junit.Ignore;
+import org.junit.Test;
 
 /**
  * Driver class for testing the use of DFSInputStream by multiple concurrent
- * readers, using the different read APIs. See subclasses for the actual test
- * cases.
+ * readers, using the different read APIs.
+ *
+ * This class is marked as @Ignore so that junit doesn't try to execute the
+ * tests in here directly.  They are executed from subclasses.
  */
+@Ignore
 public class TestParallelReadUtil {
 
   static final Log LOG = LogFactory.getLog(TestParallelReadUtil.class);
@@ -388,4 +394,31 @@ public class TestParallelReadUtil {
     util.shutdown();
   }
 
+  /**
+   * Do parallel read several times with different number of files and threads.
+   *
+   * Note that while this is the only "test" in a junit sense, we're actually
+   * dispatching a lot more. Failures in the other methods (and other threads)
+   * need to be manually collected, which is inconvenient.
+   */
+  @Test
+  public void testParallelReadCopying() throws IOException {
+    runTestWorkload(new CopyingReadWorkerHelper());
+  }
+
+  @Test
+  public void testParallelReadByteBuffer() throws IOException {
+    runTestWorkload(new DirectReadWorkerHelper());
+  }
+
+  @Test
+  public void testParallelReadMixed() throws IOException {
+    runTestWorkload(new MixedWorkloadHelper());
+  }
+  
+  @Test
+  public void testParallelNoChecksums() throws IOException {
+    verifyChecksums = false;
+    runTestWorkload(new MixedWorkloadHelper());
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Sat Apr 13 02:13:59 2013
@@ -21,10 +21,13 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.assertFalse;
 
 import java.io.EOFException;
+import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -32,7 +35,6 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -40,15 +42,21 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import static org.hamcrest.CoreMatchers.*;
 
 /**
  * Test for short circuit read functionality using {@link BlockReaderLocal}.
@@ -58,9 +66,24 @@ import org.junit.Test;
  * system.
  */
 public class TestShortCircuitLocalRead {
+  private static TemporarySocketDirectory sockDir;
 
-  static final String DIR = "/" + TestShortCircuitLocalRead.class.getSimpleName() + "/";
+  @BeforeClass
+  public static void init() {
+    sockDir = new TemporarySocketDirectory();
+    DomainSocket.disableBindPathValidation();
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    sockDir.close();
+  }
 
+  @Before
+  public void before() {
+    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+  }
+  
   static final long seed = 0xDEADBEEFL;
   static final int blockSize = 5120;
   boolean simulatedStorage = false;
@@ -84,7 +107,9 @@ public class TestShortCircuitLocalRead {
     for (int idx = 0; idx < len; idx++) {
       if (expected[from + idx] != actual[idx]) {
         Assert.fail(message + " byte " + (from + idx) + " differs. expected "
-            + expected[from + idx] + " actual " + actual[idx]);
+            + expected[from + idx] + " actual " + actual[idx] +
+            "\nexpected: " + StringUtils.byteToHexString(expected, from, from + len) +
+            "\nactual:   " + StringUtils.byteToHexString(actual, 0, len));
       }
     }
   }
@@ -96,11 +121,13 @@ public class TestShortCircuitLocalRead {
   /** Check file content, reading as user {@code readingUser} */
   static void checkFileContent(URI uri, Path name, byte[] expected,
       int readOffset, String readingUser, Configuration conf,
-      boolean shortCircuitFails)
+      boolean legacyShortCircuitFails)
       throws IOException, InterruptedException {
     // Ensure short circuit is enabled
     DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
-    assertTrue(fs.getClient().getShortCircuitLocalReads());
+    if (legacyShortCircuitFails) {
+      assertTrue(fs.getClient().useLegacyBlockReaderLocal());
+    }
     
     FSDataInputStream stm = fs.open(name);
     byte[] actual = new byte[expected.length-readOffset];
@@ -127,9 +154,8 @@ public class TestShortCircuitLocalRead {
     }
     checkData(actual, readOffset, expected, "Read 3");
     
-    if (shortCircuitFails) {
-      // short circuit should be disabled due to failure
-      assertFalse(fs.getClient().getShortCircuitLocalReads());
+    if (legacyShortCircuitFails) {
+      assertFalse(fs.getClient().useLegacyBlockReaderLocal());
     }
     stm.close();
   }
@@ -145,11 +171,13 @@ public class TestShortCircuitLocalRead {
   /** Check the file content, reading as user {@code readingUser} */
   static void checkFileContentDirect(URI uri, Path name, byte[] expected,
       int readOffset, String readingUser, Configuration conf,
-      boolean shortCircuitFails)
+      boolean legacyShortCircuitFails)
       throws IOException, InterruptedException {
     // Ensure short circuit is enabled
     DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
-    assertTrue(fs.getClient().getShortCircuitLocalReads());
+    if (legacyShortCircuitFails) {
+      assertTrue(fs.getClient().useLegacyBlockReaderLocal());
+    }
     
     HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(name);
 
@@ -180,33 +208,45 @@ public class TestShortCircuitLocalRead {
       nread += nbytes;
     }
     checkData(arrayFromByteBuffer(actual), readOffset, expected, "Read 3");
-    if (shortCircuitFails) {
-      // short circuit should be disabled due to failure
-      assertFalse(fs.getClient().getShortCircuitLocalReads());
+    if (legacyShortCircuitFails) {
+      assertFalse(fs.getClient().useLegacyBlockReaderLocal());
     }
     stm.close();
   }
 
+  public void doTestShortCircuitReadLegacy(boolean ignoreChecksum, int size,
+      int readOffset, String shortCircuitUser, String readingUser,
+      boolean legacyShortCircuitFails) throws IOException, InterruptedException {
+    doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset,
+        shortCircuitUser, readingUser, legacyShortCircuitFails);
+  }
+
   public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
       int readOffset) throws IOException, InterruptedException {
     String shortCircuitUser = getCurrentUser();
-    doTestShortCircuitRead(ignoreChecksum, size, readOffset, shortCircuitUser,
-        shortCircuitUser, false);
+    doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset,
+        null, getCurrentUser(), false);
   }
   
   /**
    * Test that file data can be read by reading the block file
    * directly from the local store.
    */
-  public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
+  public void doTestShortCircuitReadImpl(boolean ignoreChecksum, int size,
       int readOffset, String shortCircuitUser, String readingUser,
-      boolean shortCircuitFails) throws IOException, InterruptedException {
+      boolean legacyShortCircuitFails) throws IOException, InterruptedException {
     Configuration conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
         ignoreChecksum);
-    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        shortCircuitUser);
+    conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        new File(sockDir.getDir(),
+          "TestShortCircuitLocalRead._PORT.sock").getAbsolutePath());
+    if (shortCircuitUser != null) {
+      conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+          shortCircuitUser);
+      conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
+    }
     if (simulatedStorage) {
       SimulatedFSDataset.setFactory(conf);
     }
@@ -228,9 +268,9 @@ public class TestShortCircuitLocalRead {
       
       URI uri = cluster.getURI();
       checkFileContent(uri, file1, fileData, readOffset, readingUser, conf,
-          shortCircuitFails);
+          legacyShortCircuitFails);
       checkFileContentDirect(uri, file1, fileData, readOffset, readingUser,
-          conf, shortCircuitFails);
+          conf, legacyShortCircuitFails);
     } finally {
       fs.close();
       cluster.shutdown();
@@ -255,6 +295,12 @@ public class TestShortCircuitLocalRead {
     doTestShortCircuitRead(true, 13, 5);
   }
   
+  @Test(timeout=10000)
+  public void testLocalReadLegacy() throws Exception {
+    doTestShortCircuitReadLegacy(true, 13, 0, getCurrentUser(),
+        getCurrentUser(), false);
+  }
+
   /**
    * Try a short circuit from a reader that is not allowed to
    * to use short circuit. The test ensures reader falls back to non
@@ -262,7 +308,7 @@ public class TestShortCircuitLocalRead {
    */
   @Test(timeout=10000)
   public void testLocalReadFallback() throws Exception {
-    doTestShortCircuitRead(true, 13, 0, getCurrentUser(), "notallowed", true);
+    doTestShortCircuitReadLegacy(true, 13, 0, getCurrentUser(), "notallowed", true);
   }
   
   @Test(timeout=10000)
@@ -276,7 +322,7 @@ public class TestShortCircuitLocalRead {
     doTestShortCircuitRead(false, 10*blockSize+100, 777);
     doTestShortCircuitRead(true, 10*blockSize+100, 777);
   }
-   
+
   private ClientDatanodeProtocol getProxy(UserGroupInformation ugi,
       final DatanodeID dnInfo, final Configuration conf) throws IOException,
       InterruptedException {
@@ -301,21 +347,15 @@ public class TestShortCircuitLocalRead {
   }
   
   @Test(timeout=10000)
-  public void testGetBlockLocalPathInfo() throws IOException, InterruptedException {
+  public void testDeprecatedGetBlockLocalPathInfoRpc()
+      throws IOException, InterruptedException {
     final Configuration conf = new Configuration();
-    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        "alloweduser1,alloweduser2");
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
         .format(true).build();
     cluster.waitActive();
-    final DataNode dn = cluster.getDataNodes().get(0);
     FileSystem fs = cluster.getFileSystem();
     try {
       DFSTestUtil.createFile(fs, new Path("/tmp/x"), 16, (short) 1, 23);
-      UserGroupInformation aUgi1 =
-          UserGroupInformation.createRemoteUser("alloweduser1");
-      UserGroupInformation aUgi2 =
-          UserGroupInformation.createRemoteUser("alloweduser2");
       LocatedBlocks lb = cluster.getNameNode().getRpcServer()
           .getBlockLocations("/tmp/x", 0, 16);
       // Create a new block object, because the block inside LocatedBlock at
@@ -323,29 +363,11 @@ public class TestShortCircuitLocalRead {
       ExtendedBlock blk = new ExtendedBlock(lb.get(0).getBlock());
       Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
       final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
-      ClientDatanodeProtocol proxy = getProxy(aUgi1, dnInfo, conf);
-      // This should succeed
-      BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token);
-      Assert.assertEquals(
-          DataNodeTestUtils.getFSDataset(dn).getBlockLocalPathInfo(blk).getBlockPath(),
-          blpi.getBlockPath());
-
-      // Try with the other allowed user
-      proxy = getProxy(aUgi2, dnInfo, conf);
-
-      // This should succeed as well
-      blpi = proxy.getBlockLocalPathInfo(blk, token);
-      Assert.assertEquals(
-          DataNodeTestUtils.getFSDataset(dn).getBlockLocalPathInfo(blk).getBlockPath(),
-          blpi.getBlockPath());
-
-      // Now try with a disallowed user
-      UserGroupInformation bUgi = UserGroupInformation
-          .createRemoteUser("notalloweduser");
-      proxy = getProxy(bUgi, dnInfo, conf);
+      ClientDatanodeProtocol proxy = 
+          DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, 60000, false);
       try {
         proxy.getBlockLocalPathInfo(blk, token);
-        Assert.fail("The call should have failed as " + bUgi.getShortUserName()
+        Assert.fail("The call should have failed as this user "
             + " is not allowed to call getBlockLocalPathInfo");
       } catch (IOException ex) {
         Assert.assertTrue(ex.getMessage().contains(
@@ -363,8 +385,9 @@ public class TestShortCircuitLocalRead {
     Configuration conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
-    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        getCurrentUser());
+    conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        "/tmp/testSkipWithVerifyChecksum._PORT");
+    DomainSocket.disableBindPathValidation();
     if (simulatedStorage) {
       SimulatedFSDataset.setFactory(conf);
     }
@@ -402,6 +425,88 @@ public class TestShortCircuitLocalRead {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testHandleTruncatedBlockFile() throws IOException {
+    MiniDFSCluster cluster = null;
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
+    conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        "/tmp/testHandleTruncatedBlockFile._PORT");
+    conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
+    final Path TEST_PATH = new Path("/a");
+    final Path TEST_PATH2 = new Path("/b");
+    final long RANDOM_SEED = 4567L;
+    final long RANDOM_SEED2 = 4568L;
+    FSDataInputStream fsIn = null;
+    final int TEST_LENGTH = 3456;
+    
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          TEST_LENGTH, (short)1, RANDOM_SEED);
+      DFSTestUtil.createFile(fs, TEST_PATH2,
+          TEST_LENGTH, (short)1, RANDOM_SEED2);
+      fsIn = cluster.getFileSystem().open(TEST_PATH2);
+      byte original[] = new byte[TEST_LENGTH];
+      IOUtils.readFully(fsIn, original, 0, TEST_LENGTH);
+      fsIn.close();
+      fsIn = null;
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+      File dataFile = MiniDFSCluster.getBlockFile(0, block);
+      cluster.shutdown();
+      cluster = null;
+      RandomAccessFile raf = null;
+      try {
+        raf = new RandomAccessFile(dataFile, "rw");
+        raf.setLength(0);
+      } finally {
+        if (raf != null) raf.close();
+      }
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(false).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      fsIn = fs.open(TEST_PATH);
+      try {
+        byte buf[] = new byte[100];
+        fsIn.seek(2000);
+        fsIn.readFully(buf, 0, buf.length);
+        Assert.fail("shouldn't be able to read from corrupt 0-length " +
+            "block file.");
+      } catch (IOException e) {
+        DFSClient.LOG.error("caught exception ", e);
+      }
+      fsIn.close();
+      fsIn = null;
+
+      // We should still be able to read the other file.
+      // This is important because it indicates that we detected that the 
+      // previous block was corrupt, rather than blaming the problem on
+      // communication.
+      fsIn = fs.open(TEST_PATH2);
+      byte buf[] = new byte[original.length];
+      fsIn.readFully(buf, 0, buf.length);
+      TestBlockReaderLocal.assertArrayRegionsEqual(original, 0, buf, 0,
+          original.length);
+      fsIn.close();
+      fsIn = null;
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (cluster != null) cluster.shutdown();
+    }
+  }
      
   /**
    * Test to run benchmarks between short circuit read vs regular read with
@@ -424,6 +529,8 @@ public class TestShortCircuitLocalRead {
     // Setup create a file
     final Configuration conf = new Configuration();
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, shortcircuit);
+    conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        "/tmp/TestShortCircuitLocalRead._PORT");
     conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
         checksum);
     

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Sat Apr 13 02:13:59 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -145,8 +146,9 @@ public class TestBlockTokenWithDFS {
       String file = BlockReaderFactory.getFileName(targetAddr, 
           "test-blockpoolid", block.getBlockId());
       blockReader = BlockReaderFactory.newBlockReader(
-          conf, s, file, block, 
-          lblock.getBlockToken(), 0, -1, null);
+          conf, file, block, lblock.getBlockToken(), 0, -1,
+          true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
+          nodes[0], null, false);
 
     } catch (IOException ex) {
       if (ex instanceof InvalidBlockTokenException) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Sat Apr 13 02:13:59 2013
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -962,6 +963,12 @@ public class SimulatedFSDataset implemen
   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
+      throws IOException {
+    throw new UnsupportedOperationException();
+  }
   
   @Override
   public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1467538&r1=1467537&r2=1467538&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Sat Apr 13 02:13:59 2013
@@ -32,11 +32,13 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -280,10 +282,11 @@ public class TestDataNodeVolumeFailure {
     String file = BlockReaderFactory.getFileName(targetAddr, 
         "test-blockpoolid",
         block.getBlockId());
-    BlockReaderFactory.newBlockReader(conf, s, file, block, lblock
-        .getBlockToken(), 0, -1, null);
-
-    // nothing - if it fails - it will throw and exception
+    BlockReader blockReader =
+      BlockReaderFactory.newBlockReader(conf, file, block,
+        lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
+        TcpPeerServer.peerFromSocket(s), datanode, null, false);
+    blockReader.close(null, null);
   }
   
   /**