You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2019/10/27 17:08:58 UTC

[hadoop] 02/10: HDFS-14393. Refactor FsDatasetCache for SCM cache implementation. Contributed by Rakesh R

This is an automated email from the ASF dual-hosted git repository.

rakeshr pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 712749c1a00eb3e7ed5a3511320c2297abc10e11
Author: Rakesh Radhakrishnan <ra...@apache.org>
AuthorDate: Fri Mar 29 00:18:15 2019 +0530

    HDFS-14393. Refactor FsDatasetCache for SCM cache implementation. Contributed by Rakesh R
    
    (cherry picked from commit f3f51284d57ef2e0c7e968b6eea56eab578f7e93)
---
 .../datanode/fsdataset/impl/FsDatasetCache.java    | 132 ++-----------
 .../fsdataset/impl/MappableBlockLoader.java        |  20 ++
 .../datanode/fsdataset/impl/MemoryCacheStats.java  | 212 +++++++++++++++++++++
 .../fsdataset/impl/MemoryMappableBlockLoader.java  |  24 ++-
 .../datanode/TestFsDatasetCacheRevocation.java     |  31 +--
 .../{ => fsdataset/impl}/TestFsDatasetCache.java   |  10 +-
 6 files changed, 298 insertions(+), 131 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index b7795f6..f2abada 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
-import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -131,104 +130,10 @@ public class FsDatasetCache {
 
   private final long revocationPollingMs;
 
-  /**
-   * The approximate amount of cache space in use.
-   *
-   * This number is an overestimate, counting bytes that will be used only
-   * if pending caching operations succeed.  It does not take into account
-   * pending uncaching operations.
-   *
-   * This overestimate is more useful to the NameNode than an underestimate,
-   * since we don't want the NameNode to assign us more replicas than
-   * we can cache, because of the current batch of operations.
-   */
-  private final UsedBytesCount usedBytesCount;
-
-  public static class PageRounder {
-    private final long osPageSize =
-        NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
-
-    /**
-     * Round up a number to the operating system page size.
-     */
-    public long roundUp(long count) {
-      return (count + osPageSize - 1) & (~(osPageSize - 1));
-    }
-
-    /**
-     * Round down a number to the operating system page size.
-     */
-    public long roundDown(long count) {
-      return count & (~(osPageSize - 1));
-    }
-  }
-
-  private class UsedBytesCount {
-    private final AtomicLong usedBytes = new AtomicLong(0);
-    
-    private final PageRounder rounder = new PageRounder();
-
-    /**
-     * Try to reserve more bytes.
-     *
-     * @param count    The number of bytes to add.  We will round this
-     *                 up to the page size.
-     *
-     * @return         The new number of usedBytes if we succeeded;
-     *                 -1 if we failed.
-     */
-    long reserve(long count) {
-      count = rounder.roundUp(count);
-      while (true) {
-        long cur = usedBytes.get();
-        long next = cur + count;
-        if (next > maxBytes) {
-          return -1;
-        }
-        if (usedBytes.compareAndSet(cur, next)) {
-          return next;
-        }
-      }
-    }
-    
-    /**
-     * Release some bytes that we're using.
-     *
-     * @param count    The number of bytes to release.  We will round this
-     *                 up to the page size.
-     *
-     * @return         The new number of usedBytes.
-     */
-    long release(long count) {
-      count = rounder.roundUp(count);
-      return usedBytes.addAndGet(-count);
-    }
-
-    /**
-     * Release some bytes that we're using rounded down to the page size.
-     *
-     * @param count    The number of bytes to release.  We will round this
-     *                 down to the page size.
-     *
-     * @return         The new number of usedBytes.
-     */
-    long releaseRoundDown(long count) {
-      count = rounder.roundDown(count);
-      return usedBytes.addAndGet(-count);
-    }
-
-    long get() {
-      return usedBytes.get();
-    }
-  }
-
-  /**
-   * The total cache capacity in bytes.
-   */
-  private final long maxBytes;
-
   private final MappableBlockLoader mappableBlockLoader;
 
+  private final MemoryCacheStats memCacheStats;
+
   /**
    * Number of cache commands that could not be completed successfully
    */
@@ -240,12 +145,10 @@ public class FsDatasetCache {
 
   public FsDatasetCache(FsDatasetImpl dataset) throws IOException {
     this.dataset = dataset;
-    this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory();
     ThreadFactory workerFactory = new ThreadFactoryBuilder()
         .setDaemon(true)
         .setNameFormat("FsDatasetCache-%d-" + dataset.toString())
         .build();
-    this.usedBytesCount = new UsedBytesCount();
     this.uncachingExecutor = new ThreadPoolExecutor(
             0, 1,
             60, TimeUnit.SECONDS,
@@ -270,7 +173,11 @@ public class FsDatasetCache {
               ".  Reconfigure this to " + minRevocationPollingMs);
     }
     this.revocationPollingMs = confRevocationPollingMs;
-    this.mappableBlockLoader = new MemoryMappableBlockLoader();
+
+    this.mappableBlockLoader = new MemoryMappableBlockLoader(this);
+    // Both lazy writer and read cache are sharing this statistics.
+    this.memCacheStats = new MemoryCacheStats(
+        dataset.datanode.getDnConf().getMaxLockedMemory());
   }
 
   /**
@@ -371,7 +278,7 @@ public class FsDatasetCache {
    *                 -1 if we failed.
    */
   long reserve(long count) {
-    return usedBytesCount.reserve(count);
+    return memCacheStats.reserve(count);
   }
 
   /**
@@ -383,7 +290,7 @@ public class FsDatasetCache {
    * @return         The new number of usedBytes.
    */
   long release(long count) {
-    return usedBytesCount.release(count);
+    return memCacheStats.release(count);
   }
 
   /**
@@ -395,7 +302,7 @@ public class FsDatasetCache {
    * @return         The new number of usedBytes.
    */
   long releaseRoundDown(long count) {
-    return usedBytesCount.releaseRoundDown(count);
+    return memCacheStats.releaseRoundDown(count);
   }
 
   /**
@@ -404,14 +311,14 @@ public class FsDatasetCache {
    * @return the OS page size.
    */
   long getOsPageSize() {
-    return usedBytesCount.rounder.osPageSize;
+    return memCacheStats.getPageSize();
   }
 
   /**
    * Round up to the OS page size.
    */
   long roundUpPageSize(long count) {
-    return usedBytesCount.rounder.roundUp(count);
+    return memCacheStats.roundUpPageSize(count);
   }
 
   /**
@@ -437,14 +344,14 @@ public class FsDatasetCache {
       MappableBlock mappableBlock = null;
       ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
           key.getBlockId(), length, genstamp);
-      long newUsedBytes = reserve(length);
+      long newUsedBytes = mappableBlockLoader.reserve(length);
       boolean reservedBytes = false;
       try {
         if (newUsedBytes < 0) {
           LOG.warn("Failed to cache " + key + ": could not reserve " + length +
               " more bytes in the cache: " +
               DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
-              " of " + maxBytes + " exceeded.");
+              " of " + memCacheStats.getCacheCapacity() + " exceeded.");
           return;
         }
         reservedBytes = true;
@@ -497,10 +404,10 @@ public class FsDatasetCache {
         IOUtils.closeQuietly(metaIn);
         if (!success) {
           if (reservedBytes) {
-            release(length);
+            mappableBlockLoader.release(length);
           }
           LOG.debug("Caching of {} was aborted.  We are now caching only {} "
-                  + "bytes in total.", key, usedBytesCount.get());
+                  + "bytes in total.", key, memCacheStats.getCacheUsed());
           IOUtils.closeQuietly(mappableBlock);
           numBlocksFailedToCache.incrementAndGet();
 
@@ -574,7 +481,8 @@ public class FsDatasetCache {
       synchronized (FsDatasetCache.this) {
         mappableBlockMap.remove(key);
       }
-      long newUsedBytes = release(value.mappableBlock.getLength());
+      long newUsedBytes = mappableBlockLoader
+          .release(value.mappableBlock.getLength());
       numBlocksCached.addAndGet(-1);
       dataset.datanode.getMetrics().incrBlocksUncached(1);
       if (revocationTimeMs != 0) {
@@ -593,14 +501,14 @@ public class FsDatasetCache {
    * Get the approximate amount of cache space used.
    */
   public long getCacheUsed() {
-    return usedBytesCount.get();
+    return memCacheStats.getCacheUsed();
   }
 
   /**
    * Get the maximum amount of bytes we can cache.  This is a constant.
    */
   public long getCacheCapacity() {
-    return maxBytes;
+    return memCacheStats.getCacheCapacity();
   }
 
   public long getNumBlocksFailedToCache() {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
index a323f78..0f5ec2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java
@@ -58,6 +58,26 @@ public abstract class MappableBlockLoader {
       throws IOException;
 
   /**
+   * Try to reserve some given bytes.
+   *
+   * @param bytesCount
+   *          The number of bytes to add.
+   *
+   * @return The new number of usedBytes if we succeeded; -1 if we failed.
+   */
+  abstract long reserve(long bytesCount);
+
+  /**
+   * Release some bytes that we're using.
+   *
+   * @param bytesCount
+   *          The number of bytes to release.
+   *
+   * @return The new number of usedBytes.
+   */
+  abstract long release(long bytesCount);
+
+  /**
    * Reads bytes into a buffer until EOF or the buffer's limit is reached.
    */
   protected int fillBuffer(FileChannel channel, ByteBuffer buf)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryCacheStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryCacheStats.java
new file mode 100644
index 0000000..d276c27
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryCacheStats.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.io.nativeio.NativeIO;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Keeps statistics for the memory cache.
+ */
+class MemoryCacheStats {
+
+  /**
+   * The approximate amount of cache space in use.
+   *
+   * This number is an overestimate, counting bytes that will be used only if
+   * pending caching operations succeed. It does not take into account pending
+   * uncaching operations.
+   *
+   * This overestimate is more useful to the NameNode than an underestimate,
+   * since we don't want the NameNode to assign us more replicas than we can
+   * cache, because of the current batch of operations.
+   */
+  private final UsedBytesCount usedBytesCount;
+
+  /**
+   * The total cache capacity in bytes.
+   */
+  private final long maxBytes;
+
+  MemoryCacheStats(long maxBytes) {
+    this.usedBytesCount = new UsedBytesCount();
+    this.maxBytes = maxBytes;
+  }
+
+  /**
+   * Used to count operating system page size.
+   */
+  @VisibleForTesting
+  static class PageRounder {
+    private final long osPageSize = NativeIO.POSIX.getCacheManipulator()
+        .getOperatingSystemPageSize();
+
+    /**
+     * Round up a number to the operating system page size.
+     */
+    public long roundUp(long count) {
+      return (count + osPageSize - 1) & (~(osPageSize - 1));
+    }
+
+    /**
+     * Round down a number to the operating system page size.
+     */
+    public long roundDown(long count) {
+      return count & (~(osPageSize - 1));
+    }
+  }
+
+  /**
+   * Counts used bytes for memory.
+   */
+  private class UsedBytesCount {
+    private final AtomicLong usedBytes = new AtomicLong(0);
+
+    private MemoryCacheStats.PageRounder rounder = new PageRounder();
+
+    /**
+     * Try to reserve more bytes.
+     *
+     * @param count
+     *          The number of bytes to add. We will round this up to the page
+     *          size.
+     *
+     * @return The new number of usedBytes if we succeeded; -1 if we failed.
+     */
+    long reserve(long count) {
+      count = rounder.roundUp(count);
+      while (true) {
+        long cur = usedBytes.get();
+        long next = cur + count;
+        if (next > getCacheCapacity()) {
+          return -1;
+        }
+        if (usedBytes.compareAndSet(cur, next)) {
+          return next;
+        }
+      }
+    }
+
+    /**
+     * Release some bytes that we're using.
+     *
+     * @param count
+     *          The number of bytes to release. We will round this up to the
+     *          page size.
+     *
+     * @return The new number of usedBytes.
+     */
+    long release(long count) {
+      count = rounder.roundUp(count);
+      return usedBytes.addAndGet(-count);
+    }
+
+    /**
+     * Release some bytes that we're using rounded down to the page size.
+     *
+     * @param count
+     *          The number of bytes to release. We will round this down to the
+     *          page size.
+     *
+     * @return The new number of usedBytes.
+     */
+    long releaseRoundDown(long count) {
+      count = rounder.roundDown(count);
+      return usedBytes.addAndGet(-count);
+    }
+
+    long get() {
+      return usedBytes.get();
+    }
+  }
+
+  // Stats related methods for FSDatasetMBean
+
+  /**
+   * Get the approximate amount of cache space used.
+   */
+  public long getCacheUsed() {
+    return usedBytesCount.get();
+  }
+
+  /**
+   * Get the maximum amount of bytes we can cache. This is a constant.
+   */
+  public long getCacheCapacity() {
+    return maxBytes;
+  }
+
+  /**
+   * Try to reserve more bytes.
+   *
+   * @param count
+   *          The number of bytes to add. We will round this up to the page
+   *          size.
+   *
+   * @return The new number of usedBytes if we succeeded; -1 if we failed.
+   */
+  long reserve(long count) {
+    return usedBytesCount.reserve(count);
+  }
+
+  /**
+   * Release some bytes that we're using.
+   *
+   * @param count
+   *          The number of bytes to release. We will round this up to the
+   *          page size.
+   *
+   * @return The new number of usedBytes.
+   */
+  long release(long count) {
+    return usedBytesCount.release(count);
+  }
+
+  /**
+   * Release some bytes that we're using rounded down to the page size.
+   *
+   * @param count
+   *          The number of bytes to release. We will round this down to the
+   *          page size.
+   *
+   * @return The new number of usedBytes.
+   */
+  long releaseRoundDown(long count) {
+    return usedBytesCount.releaseRoundDown(count);
+  }
+
+  /**
+   * Get the OS page size.
+   *
+   * @return the OS page size.
+   */
+  long getPageSize() {
+    return usedBytesCount.rounder.osPageSize;
+  }
+
+  /**
+   * Round up to the OS page size.
+   */
+  long roundUpPageSize(long count) {
+    return usedBytesCount.rounder.roundUp(count);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
index 7a0f7c7..d93193e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java
@@ -42,6 +42,18 @@ import java.nio.channels.FileChannel;
 @InterfaceStability.Unstable
 public class MemoryMappableBlockLoader extends MappableBlockLoader {
 
+  private final FsDatasetCache cacheManager;
+
+  /**
+   * Constructs memory mappable loader.
+   *
+   * @param cacheManager
+   *          FsDatasetCache reference.
+   */
+  MemoryMappableBlockLoader(FsDatasetCache cacheManager) {
+    this.cacheManager = cacheManager;
+  }
+
   /**
    * Load the block.
    *
@@ -90,7 +102,7 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
   /**
    * Verifies the block's checksum. This is an I/O intensive operation.
    */
-  public void verifyChecksum(long length, FileInputStream metaIn,
+  private void verifyChecksum(long length, FileInputStream metaIn,
                              FileChannel blockChannel, String blockFileName)
       throws IOException {
     // Verify the checksum from the block's meta file
@@ -139,4 +151,14 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader {
       IOUtils.closeQuietly(metaChannel);
     }
   }
+
+  @Override
+  long reserve(long bytesCount) {
+    return cacheManager.reserve(bytesCount);
+  }
+
+  @Override
+  long release(long bytesCount) {
+    return cacheManager.release(bytesCount);
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java
index 40de320..fd72804 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestFsDatasetCache;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
 import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
@@ -50,6 +51,9 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Tests FsDatasetCache behaviors.
+ */
 public class TestFsDatasetCacheRevocation {
   private static final Logger LOG = LoggerFactory.getLogger(
       TestFsDatasetCacheRevocation.class);
@@ -86,7 +90,7 @@ public class TestFsDatasetCacheRevocation {
     conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
     conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
     conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
-      new File(sockDir.getDir(), "sock").getAbsolutePath());
+        new File(sockDir.getDir(), "sock").getAbsolutePath());
     return conf;
   }
 
@@ -112,19 +116,18 @@ public class TestFsDatasetCacheRevocation {
     DistributedFileSystem dfs = cluster.getFileSystem();
 
     // Create and cache a file.
-    final String TEST_FILE = "/test_file";
-    DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
+    final String testFile = "/test_file";
+    DFSTestUtil.createFile(dfs, new Path(testFile),
         BLOCK_SIZE, (short)1, 0xcafe);
     dfs.addCachePool(new CachePoolInfo("pool"));
-    long cacheDirectiveId =
-      dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
-        setPool("pool").setPath(new Path(TEST_FILE)).
-          setReplication((short) 1).build());
+    long cacheDirectiveId = dfs
+        .addCacheDirective(new CacheDirectiveInfo.Builder().setPool("pool")
+            .setPath(new Path(testFile)).setReplication((short) 1).build());
     FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
     DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
 
     // Mmap the file.
-    FSDataInputStream in = dfs.open(new Path(TEST_FILE));
+    FSDataInputStream in = dfs.open(new Path(testFile));
     ByteBuffer buf =
         in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));
 
@@ -143,8 +146,8 @@ public class TestFsDatasetCacheRevocation {
   }
 
   /**
-   * Test that when we have an uncache request, and the client refuses to release
-   * the replica for a long time, we will un-mlock it.
+   * Test that when we have an uncache request, and the client refuses to
+   * release the replica for a long time, we will un-mlock it.
    */
   @Test(timeout=120000)
   public void testRevocation() throws Exception {
@@ -163,19 +166,19 @@ public class TestFsDatasetCacheRevocation {
     DistributedFileSystem dfs = cluster.getFileSystem();
 
     // Create and cache a file.
-    final String TEST_FILE = "/test_file2";
-    DFSTestUtil.createFile(dfs, new Path(TEST_FILE),
+    final String testFile = "/test_file2";
+    DFSTestUtil.createFile(dfs, new Path(testFile),
         BLOCK_SIZE, (short)1, 0xcafe);
     dfs.addCachePool(new CachePoolInfo("pool"));
     long cacheDirectiveId =
         dfs.addCacheDirective(new CacheDirectiveInfo.Builder().
-            setPool("pool").setPath(new Path(TEST_FILE)).
+            setPool("pool").setPath(new Path(testFile)).
             setReplication((short) 1).build());
     FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
     DFSTestUtil.verifyExpectedCacheUsage(BLOCK_SIZE, 1, fsd);
 
     // Mmap the file.
-    FSDataInputStream in = dfs.open(new Path(TEST_FILE));
+    FSDataInputStream in = dfs.open(new Path(testFile));
     ByteBuffer buf =
         in.read(null, BLOCK_SIZE, EnumSet.noneOf(ReadOption.class));
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java
similarity index 98%
rename from hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
rename to hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java
index 2dbd5b9..9060584 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.datanode;
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import net.jcip.annotations.NotThreadSafe;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@@ -60,9 +60,11 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
+import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MemoryCacheStats.PageRounder;
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
@@ -101,7 +103,7 @@ public class TestFsDatasetCache {
   private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class);
 
   // Most Linux installs allow a default of 64KB locked memory
-  static final long CACHE_CAPACITY = 64 * 1024;
+  public static final long CACHE_CAPACITY = 64 * 1024;
   // mlock always locks the entire page. So we don't need to deal with this
   // rounding, use the OS page size for the block size.
   private static final long PAGE_SIZE =


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org