You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/10/18 01:31:14 UTC
[27/34] git commit: HDFS-6990. Add unit test for evict/delete
RAM_DISK block with open handle. (Contributed by Xiaoyu Yao)
HDFS-6990. Add unit test for evict/delete RAM_DISK block with open handle. (Contributed by Xiaoyu Yao)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a59ce506
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a59ce506
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a59ce506
Branch: refs/heads/branch-2.6
Commit: a59ce50627357c5aadccc276b5f58b2efa0cf19e
Parents: e9eebaa
Author: arp <ar...@apache.org>
Authored: Mon Sep 22 18:10:28 2014 -0700
Committer: Jitendra Pandey <Ji...@Jitendra-Pandeys-MacBook-Pro-4.local>
Committed: Fri Oct 17 16:00:52 2014 -0700
----------------------------------------------------------------------
.../fsdataset/impl/TestScrLazyPersistFiles.java | 341 +++++++++++++++++++
1 file changed, 341 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a59ce506/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
new file mode 100644
index 0000000..4270f8c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+ import org.apache.commons.io.IOUtils;
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.commons.logging.impl.Log4JLogger;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.CreateFlag;
+ import org.apache.hadoop.fs.FSDataInputStream;
+ import org.apache.hadoop.fs.FSDataOutputStream;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.permission.FsPermission;
+ import org.apache.hadoop.hdfs.*;
+ import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+ import org.apache.hadoop.hdfs.server.datanode.DataNode;
+ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+ import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+ import org.apache.hadoop.hdfs.server.namenode.NameNode;
+ import org.apache.hadoop.net.unix.DomainSocket;
+ import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+ import org.apache.hadoop.security.UserGroupInformation;
+ import org.apache.hadoop.test.GenericTestUtils;
+ import org.apache.hadoop.util.NativeCodeLoader;
+ import org.apache.log4j.Level;
+ import org.junit.*;
+
+ import java.io.File;
+ import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.EnumSet;
+ import java.util.List;
+ import java.util.UUID;
+
+ import static org.apache.hadoop.fs.CreateFlag.CREATE;
+ import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
+ import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+ import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
+ import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
+ import static org.hamcrest.CoreMatchers.equalTo;
+ import static org.hamcrest.core.Is.is;
+ import static org.junit.Assert.assertThat;
+
+public class TestScrLazyPersistFiles {
+ public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
+
+ static {
+ ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ private static short REPL_FACTOR = 1;
+ private static final int BLOCK_SIZE = 10485760; // 10 MB
+ private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
+ private static final long HEARTBEAT_INTERVAL_SEC = 1;
+ private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
+ private static final int LAZY_WRITER_INTERVAL_SEC = 1;
+ private static final int BUFFER_LENGTH = 4096;
+ private static TemporarySocketDirectory sockDir;
+
+ private MiniDFSCluster cluster;
+ private DistributedFileSystem fs;
+ private DFSClient client;
+ private Configuration conf;
+
+ @BeforeClass
+ public static void init() {
+ sockDir = new TemporarySocketDirectory();
+ DomainSocket.disableBindPathValidation();
+ }
+
+ @AfterClass
+ public static void shutdown() throws IOException {
+ sockDir.close();
+ }
+
+ @Before
+ public void before() {
+ Assume.assumeThat(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS,
+ equalTo(true));
+ Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+ }
+
+ @After
+ public void shutDownCluster() throws IOException {
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ client = null;
+ }
+
+ if (cluster != null) {
+ cluster.shutdownDataNodes();
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ /**
+ * Read in-memory block with Short Circuit Read
+ * Note: the test uses faked RAM_DISK from physical disk.
+ */
+ @Test (timeout=300000)
+ public void testRamDiskShortCircuitRead()
+ throws IOException, InterruptedException {
+ startUpCluster(REPL_FACTOR,
+ new StorageType[]{RAM_DISK, DEFAULT},
+ 2 * BLOCK_SIZE - 1, true); // 1 replica + delta, SCR read
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ final int SEED = 0xFADED;
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ makeRandomTestFile(path, BLOCK_SIZE, true, SEED);
+ ensureFileReplicasOnStorageType(path, RAM_DISK);
+
+ // Sleep for a short time to allow the lazy writer thread to do its job
+ Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+ //assertThat(verifyReadRandomFile(path, BLOCK_SIZE, SEED), is(true));
+ FSDataInputStream fis = fs.open(path);
+
+ // Verify SCR read counters
+ try {
+ fis = fs.open(path);
+ byte[] buf = new byte[BUFFER_LENGTH];
+ fis.read(0, buf, 0, BUFFER_LENGTH);
+ HdfsDataInputStream dfsis = (HdfsDataInputStream) fis;
+ Assert.assertEquals(BUFFER_LENGTH,
+ dfsis.getReadStatistics().getTotalBytesRead());
+ Assert.assertEquals(BUFFER_LENGTH,
+ dfsis.getReadStatistics().getTotalShortCircuitBytesRead());
+ } finally {
+ fis.close();
+ fis = null;
+ }
+ }
+
+ /**
+ * Eviction of lazy persisted blocks with Short Circuit Read handle open
+ * Note: the test uses faked RAM_DISK from physical disk.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test (timeout=300000000)
+ public void testRamDiskEvictionWithShortCircuitReadHandle()
+ throws IOException, InterruptedException {
+ startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
+ (6 * BLOCK_SIZE -1), true); // 5 replica + delta, SCR.
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+ Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+ final int SEED = 0xFADED;
+
+ makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+ // Sleep for a short time to allow the lazy writer thread to do its job.
+ // However the block replica should not be evicted from RAM_DISK yet.
+ Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+ // No eviction should happen as the free ratio is below the threshold
+ FSDataInputStream fis = fs.open(path1);
+ try {
+ // Keep and open read handle to path1 while creating path2
+ byte[] buf = new byte[BUFFER_LENGTH];
+ fis.read(0, buf, 0, BUFFER_LENGTH);
+
+ // Create the 2nd file that will trigger RAM_DISK eviction.
+ makeTestFile(path2, BLOCK_SIZE * 2, true);
+ ensureFileReplicasOnStorageType(path2, RAM_DISK);
+
+ // Ensure path1 is still readable from the open SCR handle.
+ fis.read(fis.getPos(), buf, 0, BUFFER_LENGTH);
+ HdfsDataInputStream dfsis = (HdfsDataInputStream) fis;
+ Assert.assertEquals(2 * BUFFER_LENGTH,
+ dfsis.getReadStatistics().getTotalBytesRead());
+ Assert.assertEquals(2 * BUFFER_LENGTH,
+ dfsis.getReadStatistics().getTotalShortCircuitBytesRead());
+ } finally {
+ IOUtils.closeQuietly(fis);
+ }
+
+ // After the open handle is closed, path1 should be evicted to DISK.
+ triggerBlockReport();
+ ensureFileReplicasOnStorageType(path1, DEFAULT);
+ }
+
+ // ---- Utility functions for all test cases -------------------------------
+
+ /**
+ * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
+ * capped. If ramDiskStorageLimit < 0 then it is ignored.
+ */
+ private void startUpCluster(final int numDataNodes,
+ final StorageType[] storageTypes,
+ final long ramDiskStorageLimit,
+ final boolean useSCR)
+ throws IOException {
+
+ conf = new Configuration();
+ conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
+ LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
+ conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
+ conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+ HEARTBEAT_RECHECK_INTERVAL_MSEC);
+ conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
+ LAZY_WRITER_INTERVAL_SEC);
+
+ if (useSCR)
+ {
+ conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
+ conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT,
+ UUID.randomUUID().toString());
+ conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+ new File(sockDir.getDir(),
+ "TestShortCircuitLocalReadHandle._PORT.sock").getAbsolutePath());
+ conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ }
+
+ REPL_FACTOR = 1; //Reset in case a test has modified the value
+
+ cluster = new MiniDFSCluster
+ .Builder(conf)
+ .numDataNodes(numDataNodes)
+ .storageTypes(storageTypes != null ? storageTypes : new StorageType[] { DEFAULT, DEFAULT })
+ .build();
+ fs = cluster.getFileSystem();
+ client = fs.getClient();
+
+ // Artificially cap the storage capacity of the RAM_DISK volume.
+ if (ramDiskStorageLimit >= 0) {
+ List<? extends FsVolumeSpi> volumes =
+ cluster.getDataNodes().get(0).getFSDataset().getVolumes();
+
+ for (FsVolumeSpi volume : volumes) {
+ if (volume.getStorageType() == RAM_DISK) {
+ ((FsTransientVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
+ }
+ }
+ }
+
+ LOG.info("Cluster startup complete");
+ }
+
+ private void startUpCluster(final int numDataNodes,
+ final StorageType[] storageTypes,
+ final long ramDiskStorageLimit)
+ throws IOException {
+ startUpCluster(numDataNodes, storageTypes, ramDiskStorageLimit, false);
+ }
+
+ private void makeTestFile(Path path, long length, final boolean isLazyPersist)
+ throws IOException {
+
+ EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+
+ if (isLazyPersist) {
+ createFlags.add(LAZY_PERSIST);
+ }
+
+ FSDataOutputStream fos = null;
+ try {
+ fos =
+ fs.create(path,
+ FsPermission.getFileDefault(),
+ createFlags,
+ BUFFER_LENGTH,
+ REPL_FACTOR,
+ BLOCK_SIZE,
+ null);
+
+ // Allocate a block.
+ byte[] buffer = new byte[BUFFER_LENGTH];
+ for (int bytesWritten = 0; bytesWritten < length; ) {
+ fos.write(buffer, 0, buffer.length);
+ bytesWritten += buffer.length;
+ }
+ if (length > 0) {
+ fos.hsync();
+ }
+ } finally {
+ IOUtils.closeQuietly(fos);
+ }
+ }
+
+ private LocatedBlocks ensureFileReplicasOnStorageType(
+ Path path, StorageType storageType) throws IOException {
+ // Ensure that returned block locations returned are correct!
+ LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
+ assertThat(fs.exists(path), is(true));
+ long fileLength = client.getFileInfo(path.toString()).getLen();
+ LocatedBlocks locatedBlocks =
+ client.getLocatedBlocks(path.toString(), 0, fileLength);
+ for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+ assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
+ }
+ return locatedBlocks;
+ }
+
+ private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
+ long seed) throws IOException {
+ DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
+ BLOCK_SIZE, REPL_FACTOR, seed, true);
+ }
+
+ private boolean verifyReadRandomFile(
+ Path path, int fileLength, int seed) throws IOException {
+ byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
+ byte expected[] = DFSTestUtil.
+ calculateFileContentsFromSeed(seed, fileLength);
+ return Arrays.equals(contents, expected);
+ }
+
+ private void triggerBlockReport()
+ throws IOException, InterruptedException {
+ // Trigger block report to NN
+ DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
+ Thread.sleep(10 * 1000);
+ }
+}