You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2020/05/18 16:37:15 UTC

[hadoop] branch trunk updated (50caba1 -> 2abcf77)

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    from 50caba1  HDFS-15207. VolumeScanner skip to scan blocks accessed during recent scan peroid. Contributed by Yang Yun.
     new 4525292  Revert "HDFS-15202 Boost short circuit cache (rebase PR-1884) (#2016)"
     new 2abcf77  HDFS-15202 Boost short circuit cache (rebase PR-1884) (#2016)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java  | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/02: Revert "HDFS-15202 Boost short circuit cache (rebase PR-1884) (#2016)"

Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 4525292d41482330a86f1cc3935e072f9f67c308
Author: Wei-Chiu Chuang <we...@apache.org>
AuthorDate: Mon May 18 09:22:05 2020 -0700

    Revert "HDFS-15202 Boost short circuit cache (rebase PR-1884) (#2016)"
    
    This reverts commit 86e6aa8eec538e142044e2b6415ec1caff5e9cbd.
---
 .../java/org/apache/hadoop/hdfs/ClientContext.java |  20 +--
 .../hadoop/hdfs/client/HdfsClientConfigKeys.java   |   2 -
 .../hdfs/client/impl/BlockReaderFactory.java       |   9 +-
 .../hadoop/hdfs/client/impl/DfsClientConf.java     |  21 +--
 .../src/main/resources/hdfs-default.xml            |  10 --
 .../hadoop/fs/TestEnhancedByteBufferAccess.java    |   6 +-
 .../hdfs/client/impl/TestBlockReaderFactory.java   |  14 +-
 .../hdfs/client/impl/TestBlockReaderLocal.java     | 190 ++++++---------------
 .../hdfs/shortcircuit/TestShortCircuitCache.java   |  19 +--
 9 files changed, 76 insertions(+), 215 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
index 7a03240..cbd941b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
@@ -77,7 +77,7 @@ public class ClientContext {
   /**
    * Caches short-circuit file descriptors, mmap regions.
    */
-  private final ShortCircuitCache[] shortCircuitCache;
+  private final ShortCircuitCache shortCircuitCache;
 
   /**
    * Caches TCP and UNIX domain sockets for reuse.
@@ -132,23 +132,13 @@ public class ClientContext {
    */
   private DeadNodeDetector deadNodeDetector = null;
 
-  /**
-   * ShortCircuitCache array size.
-   */
-  private final int clientShortCircuitNum;
-
   private ClientContext(String name, DfsClientConf conf,
       Configuration config) {
     final ShortCircuitConf scConf = conf.getShortCircuitConf();
 
     this.name = name;
     this.confString = scConf.confAsString();
-    this.clientShortCircuitNum = conf.getClientShortCircuitNum();
-    this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum];
-    for (int i = 0; i < this.clientShortCircuitNum; i++) {
-      this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf);
-    }
-
+    this.shortCircuitCache = ShortCircuitCache.fromConf(scConf);
     this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
         scConf.getSocketCacheExpiry());
     this.keyProviderCache = new KeyProviderCache(
@@ -238,11 +228,7 @@ public class ClientContext {
   }
 
   public ShortCircuitCache getShortCircuitCache() {
-    return shortCircuitCache[0];
-  }
-
-  public ShortCircuitCache getShortCircuitCache(long idx) {
-    return shortCircuitCache[(int) (idx % clientShortCircuitNum)];
+    return shortCircuitCache;
   }
 
   public PeerCache getPeerCache() {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index e8b5402..ab3f6f2 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -144,8 +144,6 @@ public interface HdfsClientConfigKeys {
       "dfs.short.circuit.shared.memory.watcher.interrupt.check.ms";
   int     DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT =
       60000;
-  String DFS_CLIENT_SHORT_CIRCUIT_NUM = "dfs.client.short.circuit.num";
-  int DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT = 1;
   String  DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY =
       "dfs.client.slow.io.warning.threshold.ms";
   long    DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java
index 028d629..a3b611c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java
@@ -476,8 +476,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
               "giving up on BlockReaderLocal.", this, pathInfo);
       return null;
     }
-    ShortCircuitCache cache =
-        clientContext.getShortCircuitCache(block.getBlockId());
+    ShortCircuitCache cache = clientContext.getShortCircuitCache();
     ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
         block.getBlockPoolId());
     ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
@@ -528,8 +527,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
       if (curPeer.fromCache) remainingCacheTries--;
       DomainPeer peer = (DomainPeer)curPeer.peer;
       Slot slot = null;
-      ShortCircuitCache cache =
-          clientContext.getShortCircuitCache(block.getBlockId());
+      ShortCircuitCache cache = clientContext.getShortCircuitCache();
       try {
         MutableBoolean usedPeer = new MutableBoolean(false);
         slot = cache.allocShmSlot(datanode, peer, usedPeer,
@@ -584,8 +582,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
    */
   private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
           Slot slot) throws IOException {
-    ShortCircuitCache cache =
-        clientContext.getShortCircuitCache(block.getBlockId());
+    ShortCircuitCache cache = clientContext.getShortCircuitCache();
     final DataOutputStream out =
         new DataOutputStream(new BufferedOutputStream(peer.getOutputStream(), SMALL_BUFFER_SIZE));
     SlotId slotId = slot == null ? null : slot.getSlotId();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index e41b608..918fef7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@ -142,7 +142,6 @@ public class DfsClientConf {
   private final long refreshReadBlockLocationsMS;
 
   private final ShortCircuitConf shortCircuitConf;
-  private final int clientShortCircuitNum;
 
   private final long hedgedReadThresholdMillis;
   private final int hedgedReadThreadpoolSize;
@@ -273,6 +272,8 @@ public class DfsClientConf {
         HdfsClientConfigKeys.
             DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT);
 
+    shortCircuitConf = new ShortCircuitConf(conf);
+
     hedgedReadThresholdMillis = conf.getLong(
         HedgedRead.THRESHOLD_MILLIS_KEY,
         HedgedRead.THRESHOLD_MILLIS_DEFAULT);
@@ -295,17 +296,6 @@ public class DfsClientConf {
     leaseHardLimitPeriod =
         conf.getLong(HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_KEY,
             HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT) * 1000;
-
-    shortCircuitConf = new ShortCircuitConf(conf);
-    clientShortCircuitNum = conf.getInt(
-            HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM,
-            HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT);
-    Preconditions.checkArgument(clientShortCircuitNum >= 1,
-            HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM +
-                    "can't be less then 1.");
-    Preconditions.checkArgument(clientShortCircuitNum <= 5,
-            HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM +
-                    "can't be more then 5.");
   }
 
   @SuppressWarnings("unchecked")
@@ -611,13 +601,6 @@ public class DfsClientConf {
     return slowIoWarningThresholdMs;
   }
 
-  /*
-   * @return the clientShortCircuitNum
-   */
-  public int getClientShortCircuitNum() {
-    return clientShortCircuitNum;
-  }
-
   /**
    * @return the hedgedReadThresholdMillis
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 89b2a17..8b1571a 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4189,16 +4189,6 @@
 </property>
 
 <property>
-  <name>dfs.client.short.circuit.num</name>
-  <value>1</value>
-  <description>
-    Number of short-circuit caches. This setting should
-    be in the range 1 - 5. Lower values will result in lower CPU consumption; higher
-    values may speed up massive parallel reading files.
-  </description>
-</property>
-
-<property>
   <name>dfs.client.read.striped.threadpool.size</name>
   <value>18</value>
   <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
index 19bc711..90b4f11 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
@@ -358,7 +358,7 @@ public class TestEnhancedByteBufferAccess {
     fsIn.close();
     fsIn = fs.open(TEST_PATH);
     final ShortCircuitCache cache = ClientContext.get(
-        CONTEXT, conf).getShortCircuitCache(0);
+        CONTEXT, conf).getShortCircuitCache();
     cache.accept(new CountingVisitor(0, 5, 5, 0));
     results[0] = fsIn.read(null, BLOCK_SIZE,
         EnumSet.of(ReadOption.SKIP_CHECKSUMS));
@@ -654,12 +654,12 @@ public class TestEnhancedByteBufferAccess {
         BLOCK_SIZE), byteBufferToArray(result2));
     fsIn2.releaseBuffer(result2);
     fsIn2.close();
-
+    
     // check that the replica is anchored 
     final ExtendedBlock firstBlock =
         DFSTestUtil.getFirstBlock(fs, TEST_PATH);
     final ShortCircuitCache cache = ClientContext.get(
-        CONTEXT, conf).getShortCircuitCache(0);
+        CONTEXT, conf).getShortCircuitCache();
     waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1);
     // Uncache the replica
     fs.removeCacheDirective(directiveId);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java
index 8442449..6b04b14 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java
@@ -389,7 +389,7 @@ public class TestBlockReaderFactory {
 
         try (FSDataInputStream in = dfs.open(testFile)) {
           Assert.assertEquals(0,
-              dfs.getClient().getClientContext().getShortCircuitCache(0)
+              dfs.getClient().getClientContext().getShortCircuitCache()
                   .getReplicaInfoMapSize());
 
           final byte[] buf = new byte[testFileLen];
@@ -398,12 +398,12 @@ public class TestBlockReaderFactory {
 
           // Set cache size to 0 so the replica marked evictable by unbuffer
           // will be purged immediately.
-          dfs.getClient().getClientContext().getShortCircuitCache(0)
+          dfs.getClient().getClientContext().getShortCircuitCache()
               .setMaxTotalSize(0);
           LOG.info("Unbuffering");
           in.unbuffer();
           Assert.assertEquals(0,
-              dfs.getClient().getClientContext().getShortCircuitCache(0)
+              dfs.getClient().getClientContext().getShortCircuitCache()
                   .getReplicaInfoMapSize());
 
           DFSTestUtil.appendFile(dfs, testFile, "append more data");
@@ -432,7 +432,7 @@ public class TestBlockReaderFactory {
       final int expectedScrRepMapSize) {
     Assert.assertThat(expected, CoreMatchers.is(actual));
     Assert.assertEquals(expectedScrRepMapSize,
-        dfs.getClient().getClientContext().getShortCircuitCache(0)
+        dfs.getClient().getClientContext().getShortCircuitCache()
             .getReplicaInfoMapSize());
   }
 
@@ -467,7 +467,7 @@ public class TestBlockReaderFactory {
         calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
     Assert.assertTrue(Arrays.equals(contents, expected));
     final ShortCircuitCache cache =
-        fs.getClient().getClientContext().getShortCircuitCache(0);
+        fs.getClient().getClientContext().getShortCircuitCache();
     final DatanodeInfo datanode = new DatanodeInfoBuilder()
         .setNodeID(cluster.getDataNodes().get(0).getDatanodeId())
         .build();
@@ -516,7 +516,7 @@ public class TestBlockReaderFactory {
         calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
     Assert.assertTrue(Arrays.equals(contents, expected));
     final ShortCircuitCache cache =
-        fs.getClient().getClientContext().getShortCircuitCache(0);
+        fs.getClient().getClientContext().getShortCircuitCache();
     Assert.assertEquals(null, cache.getDfsClientShmManager());
     cluster.shutdown();
     sockDir.close();
@@ -548,7 +548,7 @@ public class TestBlockReaderFactory {
         calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
     Assert.assertTrue(Arrays.equals(contents, expected));
     final ShortCircuitCache cache =
-        fs.getClient().getClientContext().getShortCircuitCache(0);
+        fs.getClient().getClientContext().getShortCircuitCache();
     cache.close();
     Assert.assertTrue(cache.getDfsClientShmManager().
         getDomainSocketWatcher().isClosed());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java
index 4c327a0..95fb67a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java
@@ -116,7 +116,7 @@ public class TestBlockReaderLocal {
   }
 
   private static class BlockReaderLocalTest {
-    final static int TEST_LENGTH = 1234567;
+    final static int TEST_LENGTH = 12345;
     final static int BYTES_PER_CHECKSUM = 512;
 
     public void setConfiguration(HdfsConfiguration conf) {
@@ -130,14 +130,10 @@ public class TestBlockReaderLocal {
         throws IOException {
       // default: no-op
     }
-    public void doTest(BlockReaderLocal reader, byte[] original, int shift)
-            throws IOException {
-      // default: no-op
-    }  }
+  }
 
   public void runBlockReaderLocalTest(BlockReaderLocalTest test,
-      boolean checksum, long readahead, int shortCircuitCachesNum)
-          throws IOException {
+      boolean checksum, long readahead) throws IOException {
     Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
     MiniDFSCluster cluster = null;
     HdfsConfiguration conf = new HdfsConfiguration();
@@ -147,13 +143,10 @@ public class TestBlockReaderLocal {
         BlockReaderLocalTest.BYTES_PER_CHECKSUM);
     conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
     conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead);
-    conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM,
-        shortCircuitCachesNum);
     test.setConfiguration(conf);
     FileInputStream dataIn = null, metaIn = null;
     final Path TEST_PATH = new Path("/a");
     final long RANDOM_SEED = 4567L;
-    final int blockSize = 10 * 1024;
     BlockReaderLocal blockReaderLocal = null;
     FSDataInputStream fsIn = null;
     byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
@@ -165,8 +158,8 @@ public class TestBlockReaderLocal {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
       cluster.waitActive();
       fs = cluster.getFileSystem();
-      DFSTestUtil.createFile(fs, TEST_PATH, 1024,
-          BlockReaderLocalTest.TEST_LENGTH, blockSize, (short)1, RANDOM_SEED);
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
       try {
         DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
       } catch (InterruptedException e) {
@@ -181,52 +174,47 @@ public class TestBlockReaderLocal {
           BlockReaderLocalTest.TEST_LENGTH);
       fsIn.close();
       fsIn = null;
-      for (int i = 0; i < shortCircuitCachesNum; i++) {
-        ExtendedBlock block = DFSTestUtil.getAllBlocks(
-                fs, TEST_PATH).get(i).getBlock();
-        File dataFile = cluster.getBlockFile(0, block);
-        File metaFile = cluster.getBlockMetadataFile(0, block);
-
-        ShortCircuitCache shortCircuitCache =
-                ClientContext.getFromConf(conf).getShortCircuitCache(
-                        block.getBlockId());
-        test.setup(dataFile, checksum);
-        FileInputStream[] streams = {
-            new FileInputStream(dataFile),
-            new FileInputStream(metaFile)
-        };
-        dataIn = streams[0];
-        metaIn = streams[1];
-        ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
-                block.getBlockPoolId());
-        raf = new RandomAccessFile(
-                new File(sockDir.getDir().getAbsolutePath(),
-                        UUID.randomUUID().toString()), "rw");
-        raf.setLength(8192);
-        FileInputStream shmStream = new FileInputStream(raf.getFD());
-        shm = new ShortCircuitShm(ShmId.createRandom(), shmStream);
-        ShortCircuitReplica replica =
-                new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache,
-                        Time.now(), shm.allocAndRegisterSlot(
-                        ExtendedBlockId.fromExtendedBlock(block)));
-        blockReaderLocal = new BlockReaderLocal.Builder(
-                new DfsClientConf.ShortCircuitConf(conf)).
-                setFilename(TEST_PATH.getName()).
-                setBlock(block).
-                setShortCircuitReplica(replica).
-                setCachingStrategy(new CachingStrategy(false, readahead)).
-                setVerifyChecksum(checksum).
-                build();
-        dataIn = null;
-        metaIn = null;
-        test.doTest(blockReaderLocal, original, i * blockSize);
-        // BlockReaderLocal should not alter the file position.
-        Assert.assertEquals(0, streams[0].getChannel().position());
-        Assert.assertEquals(0, streams[1].getChannel().position());
-      }
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+      File dataFile = cluster.getBlockFile(0, block);
+      File metaFile = cluster.getBlockMetadataFile(0, block);
+
+      ShortCircuitCache shortCircuitCache =
+          ClientContext.getFromConf(conf).getShortCircuitCache();
       cluster.shutdown();
       cluster = null;
-
+      test.setup(dataFile, checksum);
+      FileInputStream streams[] = {
+          new FileInputStream(dataFile),
+          new FileInputStream(metaFile)
+      };
+      dataIn = streams[0];
+      metaIn = streams[1];
+      ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
+          block.getBlockPoolId());
+      raf = new RandomAccessFile(
+          new File(sockDir.getDir().getAbsolutePath(),
+            UUID.randomUUID().toString()), "rw");
+      raf.setLength(8192);
+      FileInputStream shmStream = new FileInputStream(raf.getFD());
+      shm = new ShortCircuitShm(ShmId.createRandom(), shmStream);
+      ShortCircuitReplica replica =
+          new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache,
+              Time.now(), shm.allocAndRegisterSlot(
+                  ExtendedBlockId.fromExtendedBlock(block)));
+      blockReaderLocal = new BlockReaderLocal.Builder(
+              new DfsClientConf.ShortCircuitConf(conf)).
+          setFilename(TEST_PATH.getName()).
+          setBlock(block).
+          setShortCircuitReplica(replica).
+          setCachingStrategy(new CachingStrategy(false, readahead)).
+          setVerifyChecksum(checksum).
+          build();
+      dataIn = null;
+      metaIn = null;
+      test.doTest(blockReaderLocal, original);
+      // BlockReaderLocal should not alter the file position.
+      Assert.assertEquals(0, streams[0].getChannel().position());
+      Assert.assertEquals(0, streams[1].getChannel().position());
     } finally {
       if (fsIn != null) fsIn.close();
       if (fs != null) fs.close();
@@ -239,12 +227,6 @@ public class TestBlockReaderLocal {
     }
   }
 
-  public void runBlockReaderLocalTest(BlockReaderLocalTest test,
-      boolean checksum, long readahead) throws IOException {
-    runBlockReaderLocalTest(BlockReaderLocalTest test,
-      boolean checksum, long readahead, 1);
-  }
-
   private static class TestBlockReaderLocalImmediateClose
       extends BlockReaderLocalTest {
   }
@@ -260,7 +242,7 @@ public class TestBlockReaderLocal {
     @Override
     public void doTest(BlockReaderLocal reader, byte original[])
         throws IOException {
-      byte[] buf = new byte[TEST_LENGTH];
+      byte buf[] = new byte[TEST_LENGTH];
       reader.readFully(buf, 0, 512);
       assertArrayRegionsEqual(original, 0, buf, 0, 512);
       reader.readFully(buf, 512, 512);
@@ -309,7 +291,7 @@ public class TestBlockReaderLocal {
     @Override
     public void doTest(BlockReaderLocal reader, byte original[])
         throws IOException {
-      byte[] buf = new byte[TEST_LENGTH];
+      byte buf[] = new byte[TEST_LENGTH];
       reader.readFully(buf, 0, 10);
       assertArrayRegionsEqual(original, 0, buf, 0, 10);
       reader.readFully(buf, 10, 100);
@@ -387,7 +369,7 @@ public class TestBlockReaderLocal {
   public void testBlockReaderLocalByteBufferReadsNoReadahead()
       throws IOException {
     runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
-         true, 0);
+        true, 0);
   }
 
   @Test
@@ -486,7 +468,7 @@ public class TestBlockReaderLocal {
 
     public void doTest(BlockReaderLocal reader, byte original[])
         throws IOException {
-      byte[] buf = new byte[TEST_LENGTH];
+      byte buf[] = new byte[TEST_LENGTH];
       if (usingChecksums) {
         try {
           reader.readFully(buf, 0, 10);
@@ -526,7 +508,7 @@ public class TestBlockReaderLocal {
 
     public void doTest(BlockReaderLocal reader, byte original[])
         throws IOException {
-      byte[] buf = new byte[TEST_LENGTH];
+      byte buf[] = new byte[TEST_LENGTH];
       try {
         reader.readFully(buf, 0, 10);
         assertArrayRegionsEqual(original, 0, buf, 0, 10);
@@ -863,78 +845,4 @@ public class TestBlockReaderLocal {
       }
     }
   }
-
-  private static class TestBlockReaderFiveShortCircutCachesReads
-          extends BlockReaderLocalTest {
-    @Override
-    public void doTest(BlockReaderLocal reader, byte[] original, int shift)
-            throws IOException {
-      byte[] buf = new byte[TEST_LENGTH];
-      reader.readFully(buf, 0, 512);
-      assertArrayRegionsEqual(original, shift, buf, 0, 512);
-      reader.readFully(buf, 512, 512);
-      assertArrayRegionsEqual(original, 512 + shift, buf, 512, 512);
-      reader.readFully(buf, 1024, 513);
-      assertArrayRegionsEqual(original, 1024 + shift, buf, 1024, 513);
-      reader.readFully(buf, 1537, 514);
-      assertArrayRegionsEqual(original, 1537 + shift, buf, 1537, 514);
-      // Readahead is always at least the size of one chunk in this test.
-      Assert.assertTrue(reader.getMaxReadaheadLength() >=
-              BlockReaderLocalTest.BYTES_PER_CHECKSUM);
-    }
-  }
-
-  @Test
-  public void testBlockReaderFiveShortCircutCachesReads() throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
-            true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
-            5);
-  }
-
-  @Test
-  public void testBlockReaderFiveShortCircutCachesReadsShortReadahead()
-          throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
-            true, BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1,
-            5);
-  }
-
-  @Test
-  public void testBlockReaderFiveShortCircutCachesReadsNoChecksum()
-          throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
-            false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
-            5);
-  }
-
-  @Test
-  public void testBlockReaderFiveShortCircutCachesReadsNoReadahead()
-          throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
-            true, 0, 5);
-  }
-
-  @Test
-  public void testBlockReaderFiveShortCircutCachesReadsNoChecksumNoReadahead()
-          throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
-            false, 0, 5);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testBlockReaderShortCircutCachesOutOfRangeBelow()
-          throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
-            true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
-            0);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testBlockReaderShortCircutCachesOutOfRangeAbove()
-          throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
-            true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
-            555);
-  }
-
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
index 53cac2a..b2da6a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
@@ -431,7 +431,7 @@ public class TestShortCircuitCache {
     cluster.waitActive();
     DistributedFileSystem fs = cluster.getFileSystem();
     final ShortCircuitCache cache =
-        fs.getClient().getClientContext().getShortCircuitCache(0);
+        fs.getClient().getClientContext().getShortCircuitCache();
     cache.getDfsClientShmManager().visit(new Visitor() {
       @Override
       public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
@@ -501,7 +501,7 @@ public class TestShortCircuitCache {
     cluster.waitActive();
     DistributedFileSystem fs = cluster.getFileSystem();
     final ShortCircuitCache cache =
-        fs.getClient().getClientContext().getShortCircuitCache(0);
+        fs.getClient().getClientContext().getShortCircuitCache();
     String TEST_FILE = "/test_file";
     final int TEST_FILE_LEN = 8193;
     final int SEED = 0xFADED;
@@ -565,7 +565,7 @@ public class TestShortCircuitCache {
     cluster.waitActive();
     DistributedFileSystem fs = cluster.getFileSystem();
     final ShortCircuitCache cache =
-        fs.getClient().getClientContext().getShortCircuitCache(0);
+        fs.getClient().getClientContext().getShortCircuitCache();
     cache.getDfsClientShmManager().visit(new Visitor() {
       @Override
       public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
@@ -877,20 +877,19 @@ public class TestShortCircuitCache {
             return peerCache;
           });
 
-      Mockito.when(clientContext.getShortCircuitCache(
-          blk.getBlock().getBlockId())).thenAnswer(
+      Mockito.when(clientContext.getShortCircuitCache()).thenAnswer(
           (Answer<ShortCircuitCache>) shortCircuitCacheCall -> {
-              ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class);
-              Mockito.when(cache.allocShmSlot(
+            ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class);
+            Mockito.when(cache.allocShmSlot(
                 Mockito.any(DatanodeInfo.class),
                 Mockito.any(DomainPeer.class),
                 Mockito.any(MutableBoolean.class),
                 Mockito.any(ExtendedBlockId.class),
                 Mockito.anyString()))
-                  .thenAnswer((Answer<Slot>) call -> null);
+                .thenAnswer((Answer<Slot>) call -> null);
 
-              return cache;
-            }
+            return cache;
+          }
       );
 
       DatanodeInfo[] nodes = blk.getLocations();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/02: HDFS-15202 Boost short circuit cache (rebase PR-1884) (#2016)

Posted by we...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 2abcf7762ae74b936e1cedb60d5d2b4cc4ee86ea
Author: Wei-Chiu Chuang <we...@apache.org>
AuthorDate: Mon May 18 09:22:15 2020 -0700

    HDFS-15202 Boost short circuit cache (rebase PR-1884) (#2016)
---
 .../java/org/apache/hadoop/hdfs/ClientContext.java |  20 ++-
 .../hadoop/hdfs/client/HdfsClientConfigKeys.java   |   2 +
 .../hdfs/client/impl/BlockReaderFactory.java       |   9 +-
 .../hadoop/hdfs/client/impl/DfsClientConf.java     |  21 ++-
 .../src/main/resources/hdfs-default.xml            |  10 ++
 .../hadoop/fs/TestEnhancedByteBufferAccess.java    |   6 +-
 .../hdfs/client/impl/TestBlockReaderFactory.java   |  14 +-
 .../hdfs/client/impl/TestBlockReaderLocal.java     | 189 +++++++++++++++------
 .../hdfs/shortcircuit/TestShortCircuitCache.java   |  19 ++-
 9 files changed, 214 insertions(+), 76 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
index cbd941b..7a03240 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
@@ -77,7 +77,7 @@ public class ClientContext {
   /**
    * Caches short-circuit file descriptors, mmap regions.
    */
-  private final ShortCircuitCache shortCircuitCache;
+  private final ShortCircuitCache[] shortCircuitCache;
 
   /**
    * Caches TCP and UNIX domain sockets for reuse.
@@ -132,13 +132,23 @@ public class ClientContext {
    */
   private DeadNodeDetector deadNodeDetector = null;
 
+  /**
+   * ShortCircuitCache array size.
+   */
+  private final int clientShortCircuitNum;
+
   private ClientContext(String name, DfsClientConf conf,
       Configuration config) {
     final ShortCircuitConf scConf = conf.getShortCircuitConf();
 
     this.name = name;
     this.confString = scConf.confAsString();
-    this.shortCircuitCache = ShortCircuitCache.fromConf(scConf);
+    this.clientShortCircuitNum = conf.getClientShortCircuitNum();
+    this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum];
+    for (int i = 0; i < this.clientShortCircuitNum; i++) {
+      this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf);
+    }
+
     this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
         scConf.getSocketCacheExpiry());
     this.keyProviderCache = new KeyProviderCache(
@@ -228,7 +238,11 @@ public class ClientContext {
   }
 
   public ShortCircuitCache getShortCircuitCache() {
-    return shortCircuitCache;
+    return shortCircuitCache[0];
+  }
+
+  public ShortCircuitCache getShortCircuitCache(long idx) {
+    return shortCircuitCache[(int) (idx % clientShortCircuitNum)];
   }
 
   public PeerCache getPeerCache() {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index ab3f6f2..e8b5402 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -144,6 +144,8 @@ public interface HdfsClientConfigKeys {
       "dfs.short.circuit.shared.memory.watcher.interrupt.check.ms";
   int     DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT =
       60000;
+  String DFS_CLIENT_SHORT_CIRCUIT_NUM = "dfs.client.short.circuit.num";
+  int DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT = 1;
   String  DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY =
       "dfs.client.slow.io.warning.threshold.ms";
   long    DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java
index a3b611c..028d629 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java
@@ -476,7 +476,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
               "giving up on BlockReaderLocal.", this, pathInfo);
       return null;
     }
-    ShortCircuitCache cache = clientContext.getShortCircuitCache();
+    ShortCircuitCache cache =
+        clientContext.getShortCircuitCache(block.getBlockId());
     ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
         block.getBlockPoolId());
     ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
@@ -527,7 +528,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
       if (curPeer.fromCache) remainingCacheTries--;
       DomainPeer peer = (DomainPeer)curPeer.peer;
       Slot slot = null;
-      ShortCircuitCache cache = clientContext.getShortCircuitCache();
+      ShortCircuitCache cache =
+          clientContext.getShortCircuitCache(block.getBlockId());
       try {
         MutableBoolean usedPeer = new MutableBoolean(false);
         slot = cache.allocShmSlot(datanode, peer, usedPeer,
@@ -582,7 +584,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
    */
   private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
           Slot slot) throws IOException {
-    ShortCircuitCache cache = clientContext.getShortCircuitCache();
+    ShortCircuitCache cache =
+        clientContext.getShortCircuitCache(block.getBlockId());
     final DataOutputStream out =
         new DataOutputStream(new BufferedOutputStream(peer.getOutputStream(), SMALL_BUFFER_SIZE));
     SlotId slotId = slot == null ? null : slot.getSlotId();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index 918fef7..e41b608 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@ -142,6 +142,7 @@ public class DfsClientConf {
   private final long refreshReadBlockLocationsMS;
 
   private final ShortCircuitConf shortCircuitConf;
+  private final int clientShortCircuitNum;
 
   private final long hedgedReadThresholdMillis;
   private final int hedgedReadThreadpoolSize;
@@ -272,8 +273,6 @@ public class DfsClientConf {
         HdfsClientConfigKeys.
             DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT);
 
-    shortCircuitConf = new ShortCircuitConf(conf);
-
     hedgedReadThresholdMillis = conf.getLong(
         HedgedRead.THRESHOLD_MILLIS_KEY,
         HedgedRead.THRESHOLD_MILLIS_DEFAULT);
@@ -296,6 +295,17 @@ public class DfsClientConf {
     leaseHardLimitPeriod =
         conf.getLong(HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_KEY,
             HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT) * 1000;
+
+    shortCircuitConf = new ShortCircuitConf(conf);
+    clientShortCircuitNum = conf.getInt(
+            HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM,
+            HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT);
+    Preconditions.checkArgument(clientShortCircuitNum >= 1,
+            HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM +
+                    "can't be less then 1.");
+    Preconditions.checkArgument(clientShortCircuitNum <= 5,
+            HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM +
+                    "can't be more then 5.");
   }
 
   @SuppressWarnings("unchecked")
@@ -601,6 +611,13 @@ public class DfsClientConf {
     return slowIoWarningThresholdMs;
   }
 
+  /*
+   * @return the clientShortCircuitNum
+   */
+  public int getClientShortCircuitNum() {
+    return clientShortCircuitNum;
+  }
+
   /**
    * @return the hedgedReadThresholdMillis
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 8b1571a..89b2a17 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4189,6 +4189,16 @@
 </property>
 
 <property>
+  <name>dfs.client.short.circuit.num</name>
+  <value>1</value>
+  <description>
+    Number of short-circuit caches. This setting should
+    be in the range 1 - 5. Lower values will result in lower CPU consumption; higher
+    values may speed up massive parallel reading files.
+  </description>
+</property>
+
+<property>
   <name>dfs.client.read.striped.threadpool.size</name>
   <value>18</value>
   <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
index 90b4f11..19bc711 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
@@ -358,7 +358,7 @@ public class TestEnhancedByteBufferAccess {
     fsIn.close();
     fsIn = fs.open(TEST_PATH);
     final ShortCircuitCache cache = ClientContext.get(
-        CONTEXT, conf).getShortCircuitCache();
+        CONTEXT, conf).getShortCircuitCache(0);
     cache.accept(new CountingVisitor(0, 5, 5, 0));
     results[0] = fsIn.read(null, BLOCK_SIZE,
         EnumSet.of(ReadOption.SKIP_CHECKSUMS));
@@ -654,12 +654,12 @@ public class TestEnhancedByteBufferAccess {
         BLOCK_SIZE), byteBufferToArray(result2));
     fsIn2.releaseBuffer(result2);
     fsIn2.close();
-    
+
     // check that the replica is anchored 
     final ExtendedBlock firstBlock =
         DFSTestUtil.getFirstBlock(fs, TEST_PATH);
     final ShortCircuitCache cache = ClientContext.get(
-        CONTEXT, conf).getShortCircuitCache();
+        CONTEXT, conf).getShortCircuitCache(0);
     waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1);
     // Uncache the replica
     fs.removeCacheDirective(directiveId);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java
index 6b04b14..8442449 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java
@@ -389,7 +389,7 @@ public class TestBlockReaderFactory {
 
         try (FSDataInputStream in = dfs.open(testFile)) {
           Assert.assertEquals(0,
-              dfs.getClient().getClientContext().getShortCircuitCache()
+              dfs.getClient().getClientContext().getShortCircuitCache(0)
                   .getReplicaInfoMapSize());
 
           final byte[] buf = new byte[testFileLen];
@@ -398,12 +398,12 @@ public class TestBlockReaderFactory {
 
           // Set cache size to 0 so the replica marked evictable by unbuffer
           // will be purged immediately.
-          dfs.getClient().getClientContext().getShortCircuitCache()
+          dfs.getClient().getClientContext().getShortCircuitCache(0)
               .setMaxTotalSize(0);
           LOG.info("Unbuffering");
           in.unbuffer();
           Assert.assertEquals(0,
-              dfs.getClient().getClientContext().getShortCircuitCache()
+              dfs.getClient().getClientContext().getShortCircuitCache(0)
                   .getReplicaInfoMapSize());
 
           DFSTestUtil.appendFile(dfs, testFile, "append more data");
@@ -432,7 +432,7 @@ public class TestBlockReaderFactory {
       final int expectedScrRepMapSize) {
     Assert.assertThat(expected, CoreMatchers.is(actual));
     Assert.assertEquals(expectedScrRepMapSize,
-        dfs.getClient().getClientContext().getShortCircuitCache()
+        dfs.getClient().getClientContext().getShortCircuitCache(0)
             .getReplicaInfoMapSize());
   }
 
@@ -467,7 +467,7 @@ public class TestBlockReaderFactory {
         calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
     Assert.assertTrue(Arrays.equals(contents, expected));
     final ShortCircuitCache cache =
-        fs.getClient().getClientContext().getShortCircuitCache();
+        fs.getClient().getClientContext().getShortCircuitCache(0);
     final DatanodeInfo datanode = new DatanodeInfoBuilder()
         .setNodeID(cluster.getDataNodes().get(0).getDatanodeId())
         .build();
@@ -516,7 +516,7 @@ public class TestBlockReaderFactory {
         calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
     Assert.assertTrue(Arrays.equals(contents, expected));
     final ShortCircuitCache cache =
-        fs.getClient().getClientContext().getShortCircuitCache();
+        fs.getClient().getClientContext().getShortCircuitCache(0);
     Assert.assertEquals(null, cache.getDfsClientShmManager());
     cluster.shutdown();
     sockDir.close();
@@ -548,7 +548,7 @@ public class TestBlockReaderFactory {
         calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
     Assert.assertTrue(Arrays.equals(contents, expected));
     final ShortCircuitCache cache =
-        fs.getClient().getClientContext().getShortCircuitCache();
+        fs.getClient().getClientContext().getShortCircuitCache(0);
     cache.close();
     Assert.assertTrue(cache.getDfsClientShmManager().
         getDomainSocketWatcher().isClosed());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java
index 95fb67a..534243d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java
@@ -116,7 +116,7 @@ public class TestBlockReaderLocal {
   }
 
   private static class BlockReaderLocalTest {
-    final static int TEST_LENGTH = 12345;
+    final static int TEST_LENGTH = 1234567;
     final static int BYTES_PER_CHECKSUM = 512;
 
     public void setConfiguration(HdfsConfiguration conf) {
@@ -130,10 +130,14 @@ public class TestBlockReaderLocal {
         throws IOException {
       // default: no-op
     }
-  }
+    public void doTest(BlockReaderLocal reader, byte[] original, int shift)
+            throws IOException {
+      // default: no-op
+    }  }
 
   public void runBlockReaderLocalTest(BlockReaderLocalTest test,
-      boolean checksum, long readahead) throws IOException {
+      boolean checksum, long readahead, int shortCircuitCachesNum)
+          throws IOException {
     Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
     MiniDFSCluster cluster = null;
     HdfsConfiguration conf = new HdfsConfiguration();
@@ -143,10 +147,13 @@ public class TestBlockReaderLocal {
         BlockReaderLocalTest.BYTES_PER_CHECKSUM);
     conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
     conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead);
+    conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM,
+        shortCircuitCachesNum);
     test.setConfiguration(conf);
     FileInputStream dataIn = null, metaIn = null;
     final Path TEST_PATH = new Path("/a");
     final long RANDOM_SEED = 4567L;
+    final int blockSize = 10 * 1024;
     BlockReaderLocal blockReaderLocal = null;
     FSDataInputStream fsIn = null;
     byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
@@ -158,8 +165,8 @@ public class TestBlockReaderLocal {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
       cluster.waitActive();
       fs = cluster.getFileSystem();
-      DFSTestUtil.createFile(fs, TEST_PATH,
-          BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
+      DFSTestUtil.createFile(fs, TEST_PATH, 1024,
+          BlockReaderLocalTest.TEST_LENGTH, blockSize, (short)1, RANDOM_SEED);
       try {
         DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
       } catch (InterruptedException e) {
@@ -174,47 +181,52 @@ public class TestBlockReaderLocal {
           BlockReaderLocalTest.TEST_LENGTH);
       fsIn.close();
       fsIn = null;
-      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
-      File dataFile = cluster.getBlockFile(0, block);
-      File metaFile = cluster.getBlockMetadataFile(0, block);
-
-      ShortCircuitCache shortCircuitCache =
-          ClientContext.getFromConf(conf).getShortCircuitCache();
+      for (int i = 0; i < shortCircuitCachesNum; i++) {
+        ExtendedBlock block = DFSTestUtil.getAllBlocks(
+                fs, TEST_PATH).get(i).getBlock();
+        File dataFile = cluster.getBlockFile(0, block);
+        File metaFile = cluster.getBlockMetadataFile(0, block);
+
+        ShortCircuitCache shortCircuitCache =
+                ClientContext.getFromConf(conf).getShortCircuitCache(
+                        block.getBlockId());
+        test.setup(dataFile, checksum);
+        FileInputStream[] streams = {
+            new FileInputStream(dataFile),
+            new FileInputStream(metaFile)
+        };
+        dataIn = streams[0];
+        metaIn = streams[1];
+        ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
+                block.getBlockPoolId());
+        raf = new RandomAccessFile(
+                new File(sockDir.getDir().getAbsolutePath(),
+                        UUID.randomUUID().toString()), "rw");
+        raf.setLength(8192);
+        FileInputStream shmStream = new FileInputStream(raf.getFD());
+        shm = new ShortCircuitShm(ShmId.createRandom(), shmStream);
+        ShortCircuitReplica replica =
+                new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache,
+                        Time.now(), shm.allocAndRegisterSlot(
+                        ExtendedBlockId.fromExtendedBlock(block)));
+        blockReaderLocal = new BlockReaderLocal.Builder(
+                new DfsClientConf.ShortCircuitConf(conf)).
+                setFilename(TEST_PATH.getName()).
+                setBlock(block).
+                setShortCircuitReplica(replica).
+                setCachingStrategy(new CachingStrategy(false, readahead)).
+                setVerifyChecksum(checksum).
+                build();
+        dataIn = null;
+        metaIn = null;
+        test.doTest(blockReaderLocal, original, i * blockSize);
+        // BlockReaderLocal should not alter the file position.
+        Assert.assertEquals(0, streams[0].getChannel().position());
+        Assert.assertEquals(0, streams[1].getChannel().position());
+      }
       cluster.shutdown();
       cluster = null;
-      test.setup(dataFile, checksum);
-      FileInputStream streams[] = {
-          new FileInputStream(dataFile),
-          new FileInputStream(metaFile)
-      };
-      dataIn = streams[0];
-      metaIn = streams[1];
-      ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
-          block.getBlockPoolId());
-      raf = new RandomAccessFile(
-          new File(sockDir.getDir().getAbsolutePath(),
-            UUID.randomUUID().toString()), "rw");
-      raf.setLength(8192);
-      FileInputStream shmStream = new FileInputStream(raf.getFD());
-      shm = new ShortCircuitShm(ShmId.createRandom(), shmStream);
-      ShortCircuitReplica replica =
-          new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache,
-              Time.now(), shm.allocAndRegisterSlot(
-                  ExtendedBlockId.fromExtendedBlock(block)));
-      blockReaderLocal = new BlockReaderLocal.Builder(
-              new DfsClientConf.ShortCircuitConf(conf)).
-          setFilename(TEST_PATH.getName()).
-          setBlock(block).
-          setShortCircuitReplica(replica).
-          setCachingStrategy(new CachingStrategy(false, readahead)).
-          setVerifyChecksum(checksum).
-          build();
-      dataIn = null;
-      metaIn = null;
-      test.doTest(blockReaderLocal, original);
-      // BlockReaderLocal should not alter the file position.
-      Assert.assertEquals(0, streams[0].getChannel().position());
-      Assert.assertEquals(0, streams[1].getChannel().position());
+
     } finally {
       if (fsIn != null) fsIn.close();
       if (fs != null) fs.close();
@@ -227,6 +239,11 @@ public class TestBlockReaderLocal {
     }
   }
 
+  public void runBlockReaderLocalTest(BlockReaderLocalTest test,
+      boolean checksum, long readahead) throws IOException {
+    runBlockReaderLocalTest(test, checksum, readahead, 1);
+  }
+
   private static class TestBlockReaderLocalImmediateClose
       extends BlockReaderLocalTest {
   }
@@ -242,7 +259,7 @@ public class TestBlockReaderLocal {
     @Override
     public void doTest(BlockReaderLocal reader, byte original[])
         throws IOException {
-      byte buf[] = new byte[TEST_LENGTH];
+      byte[] buf = new byte[TEST_LENGTH];
       reader.readFully(buf, 0, 512);
       assertArrayRegionsEqual(original, 0, buf, 0, 512);
       reader.readFully(buf, 512, 512);
@@ -291,7 +308,7 @@ public class TestBlockReaderLocal {
     @Override
     public void doTest(BlockReaderLocal reader, byte original[])
         throws IOException {
-      byte buf[] = new byte[TEST_LENGTH];
+      byte[] buf = new byte[TEST_LENGTH];
       reader.readFully(buf, 0, 10);
       assertArrayRegionsEqual(original, 0, buf, 0, 10);
       reader.readFully(buf, 10, 100);
@@ -369,7 +386,7 @@ public class TestBlockReaderLocal {
   public void testBlockReaderLocalByteBufferReadsNoReadahead()
       throws IOException {
     runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
-        true, 0);
+         true, 0);
   }
 
   @Test
@@ -468,7 +485,7 @@ public class TestBlockReaderLocal {
 
     public void doTest(BlockReaderLocal reader, byte original[])
         throws IOException {
-      byte buf[] = new byte[TEST_LENGTH];
+      byte[] buf = new byte[TEST_LENGTH];
       if (usingChecksums) {
         try {
           reader.readFully(buf, 0, 10);
@@ -508,7 +525,7 @@ public class TestBlockReaderLocal {
 
     public void doTest(BlockReaderLocal reader, byte original[])
         throws IOException {
-      byte buf[] = new byte[TEST_LENGTH];
+      byte[] buf = new byte[TEST_LENGTH];
       try {
         reader.readFully(buf, 0, 10);
         assertArrayRegionsEqual(original, 0, buf, 0, 10);
@@ -845,4 +862,78 @@ public class TestBlockReaderLocal {
       }
     }
   }
+
+  private static class TestBlockReaderFiveShortCircutCachesReads
+          extends BlockReaderLocalTest {
+    @Override
+    public void doTest(BlockReaderLocal reader, byte[] original, int shift)
+            throws IOException {
+      byte[] buf = new byte[TEST_LENGTH];
+      reader.readFully(buf, 0, 512);
+      assertArrayRegionsEqual(original, shift, buf, 0, 512);
+      reader.readFully(buf, 512, 512);
+      assertArrayRegionsEqual(original, 512 + shift, buf, 512, 512);
+      reader.readFully(buf, 1024, 513);
+      assertArrayRegionsEqual(original, 1024 + shift, buf, 1024, 513);
+      reader.readFully(buf, 1537, 514);
+      assertArrayRegionsEqual(original, 1537 + shift, buf, 1537, 514);
+      // Readahead is always at least the size of one chunk in this test.
+      Assert.assertTrue(reader.getMaxReadaheadLength() >=
+              BlockReaderLocalTest.BYTES_PER_CHECKSUM);
+    }
+  }
+
+  @Test
+  public void testBlockReaderFiveShortCircutCachesReads() throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
+            true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
+            5);
+  }
+
+  @Test
+  public void testBlockReaderFiveShortCircutCachesReadsShortReadahead()
+          throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
+            true, BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1,
+            5);
+  }
+
+  @Test
+  public void testBlockReaderFiveShortCircutCachesReadsNoChecksum()
+          throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
+            false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
+            5);
+  }
+
+  @Test
+  public void testBlockReaderFiveShortCircutCachesReadsNoReadahead()
+          throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
+            true, 0, 5);
+  }
+
+  @Test
+  public void testBlockReaderFiveShortCircutCachesReadsNoChecksumNoReadahead()
+          throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
+            false, 0, 5);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBlockReaderShortCircutCachesOutOfRangeBelow()
+          throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
+            true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
+            0);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBlockReaderShortCircutCachesOutOfRangeAbove()
+          throws IOException {
+    runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
+            true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
+            555);
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
index b2da6a2..53cac2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
@@ -431,7 +431,7 @@ public class TestShortCircuitCache {
     cluster.waitActive();
     DistributedFileSystem fs = cluster.getFileSystem();
     final ShortCircuitCache cache =
-        fs.getClient().getClientContext().getShortCircuitCache();
+        fs.getClient().getClientContext().getShortCircuitCache(0);
     cache.getDfsClientShmManager().visit(new Visitor() {
       @Override
       public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
@@ -501,7 +501,7 @@ public class TestShortCircuitCache {
     cluster.waitActive();
     DistributedFileSystem fs = cluster.getFileSystem();
     final ShortCircuitCache cache =
-        fs.getClient().getClientContext().getShortCircuitCache();
+        fs.getClient().getClientContext().getShortCircuitCache(0);
     String TEST_FILE = "/test_file";
     final int TEST_FILE_LEN = 8193;
     final int SEED = 0xFADED;
@@ -565,7 +565,7 @@ public class TestShortCircuitCache {
     cluster.waitActive();
     DistributedFileSystem fs = cluster.getFileSystem();
     final ShortCircuitCache cache =
-        fs.getClient().getClientContext().getShortCircuitCache();
+        fs.getClient().getClientContext().getShortCircuitCache(0);
     cache.getDfsClientShmManager().visit(new Visitor() {
       @Override
       public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
@@ -877,19 +877,20 @@ public class TestShortCircuitCache {
             return peerCache;
           });
 
-      Mockito.when(clientContext.getShortCircuitCache()).thenAnswer(
+      Mockito.when(clientContext.getShortCircuitCache(
+          blk.getBlock().getBlockId())).thenAnswer(
           (Answer<ShortCircuitCache>) shortCircuitCacheCall -> {
-            ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class);
-            Mockito.when(cache.allocShmSlot(
+              ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class);
+              Mockito.when(cache.allocShmSlot(
                 Mockito.any(DatanodeInfo.class),
                 Mockito.any(DomainPeer.class),
                 Mockito.any(MutableBoolean.class),
                 Mockito.any(ExtendedBlockId.class),
                 Mockito.anyString()))
-                .thenAnswer((Answer<Slot>) call -> null);
+                  .thenAnswer((Answer<Slot>) call -> null);
 
-            return cache;
-          }
+              return cache;
+            }
       );
 
       DatanodeInfo[] nodes = blk.getLocations();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org