You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2020/05/18 14:04:24 UTC

[hadoop] branch trunk updated: HDFS-15202 Boost short circuit cache (rebase PR-1884) (#2016)

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 86e6aa8  HDFS-15202 Boost short circuit cache (rebase PR-1884) (#2016)
86e6aa8 is described below

commit 86e6aa8eec538e142044e2b6415ec1caff5e9cbd
Author: pustota2009 <61...@users.noreply.github.com>
AuthorDate: Mon May 18 17:04:04 2020 +0300

    HDFS-15202 Boost short circuit cache (rebase PR-1884) (#2016)
    
    Added parameter dfs.client.short.circuit.num improving HDFS-client's massive reading performance by create few instances ShortCircuit caches instead of one. It helps avoid locks and lets CPU do job.
---
 .../java/org/apache/hadoop/hdfs/ClientContext.java |  20 ++-
 .../hadoop/hdfs/client/HdfsClientConfigKeys.java   |   2 +
 .../hdfs/client/impl/BlockReaderFactory.java       |   9 +-
 .../hadoop/hdfs/client/impl/DfsClientConf.java     |  21 ++-
 .../src/main/resources/hdfs-default.xml            |  10 ++
 .../hadoop/fs/TestEnhancedByteBufferAccess.java    |   6 +-
 .../hdfs/client/impl/TestBlockReaderFactory.java   |  14 +-
 .../hdfs/client/impl/TestBlockReaderLocal.java     | 190 +++++++++++++++------
 .../hdfs/shortcircuit/TestShortCircuitCache.java   |  19 ++-
 9 files changed, 215 insertions(+), 76 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
index cbd941b..7a03240 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
@@ -77,7 +77,7 @@ public class ClientContext {
   /**
    * Caches short-circuit file descriptors, mmap regions.
    */
-  private final ShortCircuitCache shortCircuitCache;
+  private final ShortCircuitCache[] shortCircuitCache;
 
   /**
    * Caches TCP and UNIX domain sockets for reuse.
@@ -132,13 +132,23 @@ public class ClientContext {
    */
   private DeadNodeDetector deadNodeDetector = null;
 
+  /**
+   * ShortCircuitCache array size.
+   */
+  private final int clientShortCircuitNum;
+
   private ClientContext(String name, DfsClientConf conf,
       Configuration config) {
     final ShortCircuitConf scConf = conf.getShortCircuitConf();
 
     this.name = name;
     this.confString = scConf.confAsString();
-    this.shortCircuitCache = ShortCircuitCache.fromConf(scConf);
+    this.clientShortCircuitNum = conf.getClientShortCircuitNum();
+    this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum];
+    for (int i = 0; i < this.clientShortCircuitNum; i++) {
+      this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf);
+    }
+
     this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
         scConf.getSocketCacheExpiry());
     this.keyProviderCache = new KeyProviderCache(
@@ -228,7 +238,11 @@ public class ClientContext {
   }
 
   public ShortCircuitCache getShortCircuitCache() {
-    return shortCircuitCache;
+    return shortCircuitCache[0];
+  }
+
+  public ShortCircuitCache getShortCircuitCache(long idx) {
+    return shortCircuitCache[(int) (idx % clientShortCircuitNum)];
   }
 
   public PeerCache getPeerCache() {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index ab3f6f2..e8b5402 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -144,6 +144,8 @@ public interface HdfsClientConfigKeys {
       "dfs.short.circuit.shared.memory.watcher.interrupt.check.ms";
   int     DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT =
       60000;
+  String DFS_CLIENT_SHORT_CIRCUIT_NUM = "dfs.client.short.circuit.num";
+  int DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT = 1;
   String  DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY =
       "dfs.client.slow.io.warning.threshold.ms";
   long    DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java
index a3b611c..028d629 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java
@@ -476,7 +476,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
               "giving up on BlockReaderLocal.", this, pathInfo);
       return null;
     }
-    ShortCircuitCache cache = clientContext.getShortCircuitCache();
+    ShortCircuitCache cache =
+        clientContext.getShortCircuitCache(block.getBlockId());
     ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
         block.getBlockPoolId());
     ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
@@ -527,7 +528,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
       if (curPeer.fromCache) remainingCacheTries--;
       DomainPeer peer = (DomainPeer)curPeer.peer;
       Slot slot = null;
-      ShortCircuitCache cache = clientContext.getShortCircuitCache();
+      ShortCircuitCache cache =
+          clientContext.getShortCircuitCache(block.getBlockId());
       try {
         MutableBoolean usedPeer = new MutableBoolean(false);
         slot = cache.allocShmSlot(datanode, peer, usedPeer,
@@ -582,7 +584,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
    */
   private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
           Slot slot) throws IOException {
-    ShortCircuitCache cache = clientContext.getShortCircuitCache();
+    ShortCircuitCache cache =
+        clientContext.getShortCircuitCache(block.getBlockId());
     final DataOutputStream out =
         new DataOutputStream(new BufferedOutputStream(peer.getOutputStream(), SMALL_BUFFER_SIZE));
     SlotId slotId = slot == null ? null : slot.getSlotId();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index 918fef7..e41b608 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@ -142,6 +142,7 @@ public class DfsClientConf {
   private final long refreshReadBlockLocationsMS;
 
   private final ShortCircuitConf shortCircuitConf;
+  private final int clientShortCircuitNum;
 
   private final long hedgedReadThresholdMillis;
   private final int hedgedReadThreadpoolSize;
@@ -272,8 +273,6 @@ public class DfsClientConf {
         HdfsClientConfigKeys.
             DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT);
 
-    shortCircuitConf = new ShortCircuitConf(conf);
-
     hedgedReadThresholdMillis = conf.getLong(
         HedgedRead.THRESHOLD_MILLIS_KEY,
         HedgedRead.THRESHOLD_MILLIS_DEFAULT);
@@ -296,6 +295,17 @@ public class DfsClientConf {
     leaseHardLimitPeriod =
         conf.getLong(HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_KEY,
             HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT) * 1000;
+
+    shortCircuitConf = new ShortCircuitConf(conf);
+    clientShortCircuitNum = conf.getInt(
+            HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM,
+            HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT);
+    Preconditions.checkArgument(clientShortCircuitNum >= 1,
+            HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM +
+                    "can't be less then 1.");
+    Preconditions.checkArgument(clientShortCircuitNum <= 5,
+            HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM +
+                    "can't be more then 5.");
   }
 
   @SuppressWarnings("unchecked")
@@ -601,6 +611,13 @@ public class DfsClientConf {
     return slowIoWarningThresholdMs;
   }
 
+  /*
+   * @return the clientShortCircuitNum
+   */
+  public int getClientShortCircuitNum() {
+    return clientShortCircuitNum;
+  }
+
   /**
    * @return the hedgedReadThresholdMillis
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 33d8163..c6b616a 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4179,6 +4179,16 @@
 </property>
 
 <property>
+  <name>dfs.client.short.circuit.num</name>
+  <value>1</value>
+  <description>
+    Number of short-circuit caches. This setting should
+    be in the range 1 - 5. Lower values will result in lower CPU consumption; higher
+    values may speed up massive parallel reading files.
+  </description>
+</property>
+
+<property>
   <name>dfs.client.read.striped.threadpool.size</name>
   <value>18</value>
   <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
index 90b4f11..19bc711 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
@@ -358,7 +358,7 @@ public class TestEnhancedByteBufferAccess {
     fsIn.close();
     fsIn = fs.open(TEST_PATH);
     final ShortCircuitCache cache = ClientContext.get(
-        CONTEXT, conf).getShortCircuitCache();
+        CONTEXT, conf).getShortCircuitCache(0);
     cache.accept(new CountingVisitor(0, 5, 5, 0));
     results[0] = fsIn.read(null, BLOCK_SIZE,
         EnumSet.of(ReadOption.SKIP_CHECKSUMS));
@@ -654,12 +654,12 @@ public class TestEnhancedByteBufferAccess {
         BLOCK_SIZE), byteBufferToArray(result2));
     fsIn2.releaseBuffer(result2);
     fsIn2.close();
-    
+
     // check that the replica is anchored 
     final ExtendedBlock firstBlock =
         DFSTestUtil.getFirstBlock(fs, TEST_PATH);
     final ShortCircuitCache cache = ClientContext.get(
-        CONTEXT, conf).getShortCircuitCache();
+        CONTEXT, conf).getShortCircuitCache(0);
     waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1);
     // Uncache the replica
     fs.removeCacheDirective(directiveId);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java
index 6b04b14..8442449 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java
@@ -389,7 +389,7 @@ public class TestBlockReaderFactory {
 
         try (FSDataInputStream in = dfs.open(testFile)) {
           Assert.assertEquals(0,
-              dfs.getClient().getClientContext().getShortCircuitCache()
+              dfs.getClient().getClientContext().getShortCircuitCache(0)
                   .getReplicaInfoMapSize());
 
           final byte[] buf = new byte[testFileLen];
@@ -398,12 +398,12 @@ public class TestBlockReaderFactory {
 
           // Set cache size to 0 so the replica marked evictable by unbuffer
           // will be purged immediately.
-          dfs.getClient().getClientContext().getShortCircuitCache()
+          dfs.getClient().getClientContext().getShortCircuitCache(0)
               .setMaxTotalSize(0);
           LOG.info("Unbuffering");
           in.unbuffer();
           Assert.assertEquals(0,
-              dfs.getClient().getClientContext().getShortCircuitCache()
+              dfs.getClient().getClientContext().getShortCircuitCache(0)
                   .getReplicaInfoMapSize());
 
           DFSTestUtil.appendFile(dfs, testFile, "append more data");
@@ -432,7 +432,7 @@ public class TestBlockReaderFactory {
       final int expectedScrRepMapSize) {
     Assert.assertThat(expected, CoreMatchers.is(actual));
     Assert.assertEquals(expectedScrRepMapSize,
-        dfs.getClient().getClientContext().getShortCircuitCache()
+        dfs.getClient().getClientContext().getShortCircuitCache(0)
             .getReplicaInfoMapSize());
   }
 
@@ -467,7 +467,7 @@ public class TestBlockReaderFactory {
         calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
     Assert.assertTrue(Arrays.equals(contents, expected));
     final ShortCircuitCache cache =
-        fs.getClient().getClientContext().getShortCircuitCache();
+        fs.getClient().getClientContext().getShortCircuitCache(0);
     final DatanodeInfo datanode = new DatanodeInfoBuilder()
         .setNodeID(cluster.getDataNodes().get(0).getDatanodeId())
         .build();
@@ -516,7 +516,7 @@ public class TestBlockReaderFactory {
         calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
     Assert.assertTrue(Arrays.equals(contents, expected));
     final ShortCircuitCache cache =
-        fs.getClient().getClientContext().getShortCircuitCache();
+        fs.getClient().getClientContext().getShortCircuitCache(0);
     Assert.assertEquals(null, cache.getDfsClientShmManager());
     cluster.shutdown();
     sockDir.close();
@@ -548,7 +548,7 @@ public class TestBlockReaderFactory {
         calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
     Assert.assertTrue(Arrays.equals(contents, expected));
     final ShortCircuitCache cache =
-        fs.getClient().getClientContext().getShortCircuitCache();
+        fs.getClient().getClientContext().getShortCircuitCache(0);
     cache.close();
     Assert.assertTrue(cache.getDfsClientShmManager().
         getDomainSocketWatcher().isClosed());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java
index 95fb67a..4c327a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java
@@ -116,7 +116,7 @@ public class TestBlockReaderLocal {
   }
 
   private static class BlockReaderLocalTest {
-    final static int TEST_LENGTH = 12345;
+    final static int TEST_LENGTH = 1234567;
     final static int BYTES_PER_CHECKSUM = 512;
 
     public void setConfiguration(HdfsConfiguration conf) {
@@ -130,10 +130,14 @@ public class TestBlockReaderLocal {
         throws IOException {
       // default: no-op
     }
-  }
+    public void doTest(BlockReaderLocal reader, byte[] original, int shift)
+            throws IOException {
+      // default: no-op
+    }  }
 
   public void runBlockReaderLocalTest(BlockReaderLocalTest test,
-      boolean checksum, long readahead) throws IOException {
+      boolean checksum, long readahead, int shortCircuitCachesNum)
+          throws IOException {
     Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
     MiniDFSCluster cluster = null;
     HdfsConfiguration conf = new HdfsConfiguration();
@@ -143,10 +147,13 @@ public class TestBlockReaderLocal {
         BlockReaderLocalTest.BYTES_PER_CHECKSUM);
     conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
     conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead);
+    conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM,
+        shortCircuitCachesNum);
     test.setConfiguration(conf);
     FileInputStream dataIn = null, metaIn = null;
     final Path TEST_PATH = new Path("/a");
     final long RANDOM_SEED = 4567L;
+    final int blockSize = 10 * 1024;
     BlockReaderLocal blockReaderLocal = null;
     FSDataInputStream fsIn = null;
     byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
@@ -158,8 +165,8 @@ public class TestBlockReaderLocal {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
       cluster.waitActive();
       fs = cluster.getFileSystem();
-      DFSTestUtil.createFile(fs, TEST_PATH,
-          BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
+      DFSTestUtil.createFile(fs, TEST_PATH, 1024,
+          BlockReaderLocalTest.TEST_LENGTH, blockSize, (short)1, RANDOM_SEED);
       try {
         DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
       } catch (InterruptedException e) {
@@ -174,47 +181,52 @@ public class TestBlockReaderLocal {
           BlockReaderLocalTest.TEST_LENGTH);
       fsIn.close();
       fsIn = null;
-      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
-      File dataFile = cluster.getBlockFile(0, block);
-      File metaFile = cluster.getBlockMetadataFile(0, block);
-
-      ShortCircuitCache shortCircuitCache =
-          ClientContext.getFromConf(conf).getShortCircuitCache();
+      for (int i = 0; i < shortCircuitCachesNum; i++) {
+        ExtendedBlock block = DFSTestUtil.getAllBlocks(
+                fs, TEST_PATH).get(i).getBlock();
+        File dataFile = cluster.getBlockFile(0, block);
+        File metaFile = cluster.getBlockMetadataFile(0, block);
+
+        ShortCircuitCache shortCircuitCache =
+                ClientContext.getFromConf(conf).getShortCircuitCache(
+                        block.getBlockId());
+        test.setup(dataFile, checksum);
+        FileInputStream[] streams = {
+            new FileInputStream(dataFile),
+            new FileInputStream(metaFile)
+        };
+        dataIn = streams[0];
+        metaIn = streams[1];
+        ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
+                block.getBlockPoolId());
+        raf = new RandomAccessFile(
+                new File(sockDir.getDir().getAbsolutePath(),
+                        UUID.randomUUID().toString()), "rw");
+        raf.setLength(8192);
+        FileInputStream shmStream = new FileInputStream(raf.getFD());
+        shm = new ShortCircuitShm(ShmId.createRandom(), shmStream);
+        ShortCircuitReplica replica =
+                new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache,
+                        Time.now(), shm.allocAndRegisterSlot(
+                        ExtendedBlockId.fromExtendedBlock(block)));
+        blockReaderLocal = new BlockReaderLocal.Builder(
+                new DfsClientConf.ShortCircuitConf(conf)).
+                setFilename(TEST_PATH.getName()).
+                setBlock(block).
+                setShortCircuitReplica(replica).
+                setCachingStrategy(new CachingStrategy(false, readahead)).
+                setVerifyChecksum(checksum).
+                build();
+        dataIn = null;
+        metaIn = null;
+        test.doTest(blockReaderLocal, original, i * blockSize);
+        // BlockReaderLocal should not alter the file position.
+        Assert.assertEquals(0, streams[0].getChannel().position());
+        Assert.assertEquals(0, streams[1].getChannel().position());
+      }
       cluster.shutdown();
       cluster = null;
-      test.setup(dataFile, checksum);
-      FileInputStream streams[] = {
-          new FileInputStream(dataFile),
-          new FileInputStream(metaFile)
-      };
-      dataIn = streams[0];
-      metaIn = streams[1];
-      ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
-          block.getBlockPoolId());
-      raf = new RandomAccessFile(
-          new File(sockDir.getDir().getAbsolutePath(),
-            UUID.randomUUID().toString()), "rw");
-      raf.setLength(8192);
-      FileInputStream shmStream = new FileInputStream(raf.getFD());
-      shm = new ShortCircuitShm(ShmId.createRandom(), shmStream);
-      ShortCircuitReplica replica =
-          new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache,
-              Time.now(), shm.allocAndRegisterSlot(
-                  ExtendedBlockId.fromExtendedBlock(block)));
-      blockReaderLocal = new BlockReaderLocal.Builder(
-              new DfsClientConf.ShortCircuitConf(conf)).
-          setFilename(TEST_PATH.getName()).
-          setBlock(block).
-          setShortCircuitReplica(replica).
-          setCachingStrategy(new CachingStrategy(false, readahead)).
-          setVerifyChecksum(checksum).
-          build();
-      dataIn = null;
-      metaIn = null;
-      test.doTest(blockReaderLocal, original);
-      // BlockReaderLocal should not alter the file position.
-      Assert.assertEquals(0, streams[0].getChannel().position());
-      Assert.assertEquals(0, streams[1].getChannel().position());
+
     } finally {
       if (fsIn != null) fsIn.close();
       if (fs != null) fs.close();
@@ -227,6 +239,12 @@ public class TestBlockReaderLocal {
     }
   }
 
+  public void runBlockReaderLocalTest(BlockReaderLocalTest test,
+      boolean checksum, long readahead) throws IOException {
+    runBlockReaderLocalTest(BlockReaderLocalTest test,
+      boolean checksum, long readahead, 1);
+  }
+
   private static class TestBlockReaderLocalImmediateClose
       extends BlockReaderLocalTest {
   }
@@ -242,7 +260,7 @@ public class TestBlockReaderLocal {
     @Override
     public void doTest(BlockReaderLocal reader, byte original[])
         throws IOException {
-      byte buf[] = new byte[TEST_LENGTH];
+      byte[] buf = new byte[TEST_LENGTH];
       reader.readFully(buf, 0, 512);
       assertArrayRegionsEqual(original, 0, buf, 0, 512);
       reader.readFully(buf, 512, 512);
@@ -291,7 +309,7 @@ public class TestBlockReaderLocal {
     @Override
     public void doTest(BlockReaderLocal reader, byte original[])
         throws IOException {
-      byte buf[] = new byte[TEST_LENGTH];
+      byte[] buf = new byte[TEST_LENGTH];
       reader.readFully(buf, 0, 10);
       assertArrayRegionsEqual(original, 0, buf, 0, 10);
       reader.readFully(buf, 10, 100);
@@ -369,7 +387,7 @@ public class TestBlockReaderLocal {
   public void testBlockReaderLocalByteBufferReadsNoReadahead()
       throws IOException {
     runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
-        true, 0);
+         true, 0);
   }
 
   @Test
@@ -468,7 +486,7 @@ public class TestBlockReaderLocal {
 
     public void doTest(BlockReaderLocal reader, byte original[])
         throws IOException {
-      byte buf[] = new byte[TEST_LENGTH];
+      byte[] buf = new byte[TEST_LENGTH];
       if (usingChecksums) {
         try {
           reader.readFully(buf, 0, 10);
@@ -508,7 +526,7 @@ public class TestBlockReaderLocal {
 
     public void doTest(BlockReaderLocal reader, byte original[])
         throws IOException {
-      byte buf[] = new byte[TEST_LENGTH];
+      byte[] buf = new byte[TEST_LENGTH];
       try {
         reader.readFully(buf, 0, 10);
         assertArrayRegionsEqual(original, 0, buf, 0, 10);
@@ -845,4 +863,78 @@ public class TestBlockReaderLocal {
       }
     }
   }
+
+  private static class TestBlockReaderFiveShortCircutCachesReads
+          extends BlockReaderLocalTest {
+    @Override
+    public void doTest(BlockReaderLocal reader, byte[] original, int shift)
+            throws IOException {
+      byte[] buf = new byte[TEST_LENGTH];
+      reader.readFully(buf, 0, 512);
+      assertArrayRegionsEqual(original, shift, buf, 0, 512);
+      reader.readFully(buf, 512, 512);
+      assertArrayRegionsEqual(original, 512 + shift, buf, 512, 512);
+      reader.readFully(buf, 1024, 513);
+      assertArrayRegionsEqual(original, 1024 + shift, buf, 1024, 513);
+      reader.readFully(buf, 1537, 514);
+      assertArrayRegionsEqual(original, 1537 + shift, buf, 1537, 514);
+      // Readahead is always at least the size of one chunk in this test.
+      Assert.assertTrue(reader.getMaxReadaheadLength() >=
+              BlockReaderLocalTest.BYTES_PER_CHECKSUM);
+    }
+  }
+
+  @Test
+  public void testBlockReaderFiveShortCircutCachesReads() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
+            true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
+            5);
+  }
+
+  @Test
+  public void testBlockReaderFiveShortCircutCachesReadsShortReadahead()
+          throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
+            true, BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1,
+            5);
+  }
+
+  @Test
+  public void testBlockReaderFiveShortCircutCachesReadsNoChecksum()
+          throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
+            false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
+            5);
+  }
+
+  @Test
+  public void testBlockReaderFiveShortCircutCachesReadsNoReadahead()
+          throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
+            true, 0, 5);
+  }
+
+  @Test
+  public void testBlockReaderFiveShortCircutCachesReadsNoChecksumNoReadahead()
+          throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
+            false, 0, 5);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBlockReaderShortCircutCachesOutOfRangeBelow()
+          throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
+            true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
+            0);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBlockReaderShortCircutCachesOutOfRangeAbove()
+          throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
+            true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
+            555);
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
index b2da6a2..53cac2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
@@ -431,7 +431,7 @@ public class TestShortCircuitCache {
     cluster.waitActive();
     DistributedFileSystem fs = cluster.getFileSystem();
     final ShortCircuitCache cache =
-        fs.getClient().getClientContext().getShortCircuitCache();
+        fs.getClient().getClientContext().getShortCircuitCache(0);
     cache.getDfsClientShmManager().visit(new Visitor() {
       @Override
       public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
@@ -501,7 +501,7 @@ public class TestShortCircuitCache {
     cluster.waitActive();
     DistributedFileSystem fs = cluster.getFileSystem();
     final ShortCircuitCache cache =
-        fs.getClient().getClientContext().getShortCircuitCache();
+        fs.getClient().getClientContext().getShortCircuitCache(0);
     String TEST_FILE = "/test_file";
     final int TEST_FILE_LEN = 8193;
     final int SEED = 0xFADED;
@@ -565,7 +565,7 @@ public class TestShortCircuitCache {
     cluster.waitActive();
     DistributedFileSystem fs = cluster.getFileSystem();
     final ShortCircuitCache cache =
-        fs.getClient().getClientContext().getShortCircuitCache();
+        fs.getClient().getClientContext().getShortCircuitCache(0);
     cache.getDfsClientShmManager().visit(new Visitor() {
       @Override
       public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
@@ -877,19 +877,20 @@ public class TestShortCircuitCache {
             return peerCache;
           });
 
-      Mockito.when(clientContext.getShortCircuitCache()).thenAnswer(
+      Mockito.when(clientContext.getShortCircuitCache(
+          blk.getBlock().getBlockId())).thenAnswer(
           (Answer<ShortCircuitCache>) shortCircuitCacheCall -> {
-            ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class);
-            Mockito.when(cache.allocShmSlot(
+              ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class);
+              Mockito.when(cache.allocShmSlot(
                 Mockito.any(DatanodeInfo.class),
                 Mockito.any(DomainPeer.class),
                 Mockito.any(MutableBoolean.class),
                 Mockito.any(ExtendedBlockId.class),
                 Mockito.anyString()))
-                .thenAnswer((Answer<Slot>) call -> null);
+                  .thenAnswer((Answer<Slot>) call -> null);
 
-            return cache;
-          }
+              return cache;
+            }
       );
 
       DatanodeInfo[] nodes = blk.getLocations();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org