You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2014/08/28 06:47:55 UTC

[10/11] git commit: HDFS-6927. Initial unit tests for Lazy Persist files. (Arpit Agarwal)

HDFS-6927. Initial unit tests for Lazy Persist files. (Arpit Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3f64c4aa
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3f64c4aa
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3f64c4aa

Branch: refs/heads/HDFS-6581
Commit: 3f64c4aaf00d92659ae992bfe7fe8403b4013ae6
Parents: eb448e1
Author: arp <ar...@apache.org>
Authored: Wed Aug 27 15:38:11 2014 -0700
Committer: arp7 <aa...@hortonworks.com>
Committed: Wed Aug 27 21:47:19 2014 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-6581.txt           |   2 +
 .../fsdataset/impl/TestLazyPersistFiles.java    | 396 +++++++++++++++++++
 2 files changed, 398 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f64c4aa/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
index 335c55d..f40d2fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
@@ -16,3 +16,5 @@
     HDFS-6926. DN support for saving replicas to persistent storage and
     evicting in-memory replicas. (Arpit Agarwal)
 
+    HDFS-6927. Initial unit tests for lazy persist files. (Arpit Agarwal)
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f64c4aa/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
new file mode 100644
index 0000000..6e7c091
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
+import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Test;
+
+public class TestLazyPersistFiles {
+  public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
+
+  static {
+    ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+  }
+
+  private static short REPL_FACTOR = 1;
+  private static final long BLOCK_SIZE = 10485760;   // 10 MB
+  private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
+  private static final long HEARTBEAT_INTERVAL_SEC = 1;
+  private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
+  private static final int LAZY_WRITER_INTERVAL_SEC = 3;
+  private static final int BUFFER_LENGTH = 4096;
+
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem fs;
+  private DFSClient client;
+  private Configuration conf;
+
+  @After
+  public void shutDownCluster() throws IOException {
+    if (fs != null) {
+      fs.close();
+      fs = null;
+      client = null;
+    }
+
+    if (cluster != null) {
+      cluster.shutdownDataNodes();
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Test (timeout=300000)
+  public void testFlagNotSetByDefault() throws IOException {
+    startUpCluster(REPL_FACTOR, null, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    makeTestFile(path, 0, false);
+    // Stat the file and check that the lazyPersist flag is returned back.
+    HdfsFileStatus status = client.getFileInfo(path.toString());
+    assertThat(status.isLazyPersist(), is(false));
+  }
+
+  @Test (timeout=300000)
+  public void testFlagPropagation() throws IOException {
+    startUpCluster(REPL_FACTOR, null, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    makeTestFile(path, 0, true);
+    // Stat the file and check that the lazyPersist flag is returned back.
+    HdfsFileStatus status = client.getFileInfo(path.toString());
+    assertThat(status.isLazyPersist(), is(true));
+  }
+
+  @Test (timeout=300000)
+  public void testFlagPersistenceInEditLog() throws IOException {
+    startUpCluster(REPL_FACTOR, null, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    makeTestFile(path, 0, true);
+    cluster.restartNameNode(true);
+
+    // Stat the file and check that the lazyPersist flag is returned back.
+    HdfsFileStatus status = client.getFileInfo(path.toString());
+    assertThat(status.isLazyPersist(), is(true));
+  }
+
+  @Test (timeout=300000)
+  public void testFlagPersistenceInFsImage() throws IOException {
+    startUpCluster(REPL_FACTOR, null, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+    FSDataOutputStream fos = null;
+
+    makeTestFile(path, 0, true);
+    // checkpoint
+    fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
+    fs.saveNamespace();
+    fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+    cluster.restartNameNode(true);
+
+    // Stat the file and check that the lazyPersist flag is returned back.
+    HdfsFileStatus status = client.getFileInfo(path.toString());
+    assertThat(status.isLazyPersist(), is(true));
+  }
+
+  @Test (timeout=300000)
+  public void testPlacementOnRamDisk() throws IOException {
+    startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK}, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    makeTestFile(path, BLOCK_SIZE, true);
+    ensureFileReplicasOnStorageType(path, RAM_DISK);
+  }
+
+  @Test (timeout=300000)
+  public void testFallbackToDisk() throws IOException {
+    startUpCluster(REPL_FACTOR, null, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    makeTestFile(path, BLOCK_SIZE, true);
+    ensureFileReplicasOnStorageType(path, DEFAULT);
+  }
+
+  /**
+   * If the only available storage is RAM_DISK and the LAZY_PERSIST flag is not
+   * specified, then block placement should fail.
+   *
+   * @throws IOException
+   */
+  @Test (timeout=300000)
+  public void testRamDiskNotChosenByDefault() throws IOException {
+    startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, RAM_DISK}, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    try {
+      makeTestFile(path, BLOCK_SIZE, false);
+      fail("Block placement to RAM_DISK should have failed without lazyPersist flag");
+    } catch (Throwable t) {
+      LOG.info("Got expected exception ", t);
+    }
+  }
+
+  @Test (timeout=300000)
+  public void testAppendIsDenied() throws IOException {
+    startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, DEFAULT }, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    makeTestFile(path, BLOCK_SIZE, true);
+
+    try {
+      client.append(path.toString(), BUFFER_LENGTH, null, null).close();
+      fail("Append to LazyPersist file did not fail as expected");
+    } catch (Throwable t) {
+      LOG.info("Got expected exception ", t);
+    }
+  }
+
+  @Test (timeout=300000)
+  public void testLazyPersistBlocksAreSaved()
+      throws IOException, InterruptedException {
+    startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, DEFAULT }, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    // Create a test file
+    makeTestFile(path, BLOCK_SIZE * 10, true);
+    LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
+
+    // Sleep for a short time to allow the lazy writer thread to do its job
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+    // Make sure that there is a saved copy of the replica on persistent
+    // storage.
+    final String bpid = cluster.getNamesystem().getBlockPoolId();
+    List<? extends FsVolumeSpi> volumes =
+        cluster.getDataNodes().get(0).getFSDataset().getVolumes();
+
+    final Set<Long> persistedBlockIds = new HashSet<Long>();
+
+    // Make sure at least one non-transient volume has a saved copy of
+    // the replica.
+    for (FsVolumeSpi v : volumes) {
+      if (v.isTransientStorage()) {
+        continue;
+      }
+
+      FsVolumeImpl volume = (FsVolumeImpl) v;
+      File lazyPersistDir = volume.getBlockPoolSlice(bpid).getLazypersistDir();
+
+      for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+        File persistedBlockFile = new File(lazyPersistDir, "blk_" + lb.getBlock().getBlockId());
+        if (persistedBlockFile.exists()) {
+          // Found a persisted copy for this block!
+          boolean added = persistedBlockIds.add(lb.getBlock().getBlockId());
+          assertThat(added, is(true));
+        }
+      }
+    }
+
+    // We should have found a persisted copy for each located block.
+    assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size()));
+  }
+
+
+  @Test (timeout=300000)
+  public void testRamDiskEviction()
+      throws IOException, InterruptedException {
+    startUpCluster(REPL_FACTOR,
+        new StorageType[] {RAM_DISK, DEFAULT },
+        (2 * BLOCK_SIZE - 1));     // 1 replica + delta.
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+
+    makeTestFile(path1, BLOCK_SIZE, true);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+    // Sleep for a short time to allow the lazy writer thread to do its job.
+    // However the block replica should not be evicted from RAM_DISK yet.
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+    // Create another file with a replica on RAM_DISK.
+    makeTestFile(path2, BLOCK_SIZE, true);
+    DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
+    Thread.sleep(10 * 1000);
+
+    // Make sure that the second file's block replica is on RAM_DISK, whereas
+    // the original file's block replica is now on disk.
+    ensureFileReplicasOnStorageType(path2, RAM_DISK);
+    ensureFileReplicasOnStorageType(path1, DEFAULT);
+  }
+
+  /**
+   * TODO: Stub test, to be completed.
+   * Verify that checksum computation is skipped for files written to memory.
+   */
+  @Test (timeout=300000)
+  public void testChecksumIsSkipped()
+      throws IOException, InterruptedException {
+    startUpCluster(REPL_FACTOR,
+                   new StorageType[] {RAM_DISK, DEFAULT }, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+
+    makeTestFile(path1, BLOCK_SIZE, true);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+    // Verify checksum was not computed.
+
+  }
+
+  // ---- Utility functions for all test cases -------------------------------
+
+  /**
+   * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
+   * capped. If tmpfsStorageLimit < 0 then it is ignored.
+   */
+  private void startUpCluster(final int numDataNodes,
+                              final StorageType[] storageTypes,
+                              final long ramDiskStorageLimit)
+      throws IOException {
+
+    conf = new Configuration();
+    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
+                LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
+    conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+                HEARTBEAT_RECHECK_INTERVAL_MSEC);
+    conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
+                LAZY_WRITER_INTERVAL_SEC);
+
+    REPL_FACTOR = 1; //Reset if case a test has modified the value
+
+    cluster = new MiniDFSCluster
+        .Builder(conf)
+        .numDataNodes(numDataNodes)
+        .storageTypes(storageTypes != null ? storageTypes : new StorageType[] { DEFAULT, DEFAULT })
+        .build();
+    fs = cluster.getFileSystem();
+    client = fs.getClient();
+
+    // Artifically cap the storage capacity of the tmpfs volume.
+    if (ramDiskStorageLimit >= 0) {
+      List<? extends FsVolumeSpi> volumes =
+          cluster.getDataNodes().get(0).getFSDataset().getVolumes();
+
+      for (FsVolumeSpi volume : volumes) {
+        if (volume.getStorageType() == RAM_DISK) {
+          ((FsTransientVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
+        }
+      }
+    }
+
+    LOG.info("Cluster startup complete");
+  }
+
+  private void makeTestFile(Path path, long length, final boolean isLazyPersist)
+      throws IOException {
+
+    EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+
+    if (isLazyPersist) {
+      createFlags.add(LAZY_PERSIST);
+    }
+
+
+    FSDataOutputStream fos = null;
+
+    try {
+      fos =
+          fs.create(path,
+              FsPermission.getFileDefault(),
+              createFlags,
+              BUFFER_LENGTH,
+              REPL_FACTOR,
+              BLOCK_SIZE,
+              null);
+
+      // Allocate a block.
+      byte[] buffer = new byte[BUFFER_LENGTH];
+      for (int bytesWritten = 0; bytesWritten < length; ) {
+        fos.write(buffer, 0, buffer.length);
+        bytesWritten += buffer.length;
+      }
+      if (length > 0) {
+        fos.hsync();
+      }
+    } finally {
+      IOUtils.closeQuietly(fos);
+    }
+  }
+
+  private LocatedBlocks ensureFileReplicasOnStorageType(
+      Path path, StorageType storageType) throws IOException {
+    // Ensure that returned block locations returned are correct!
+    long fileLength = client.getFileInfo(path.toString()).getLen();
+    LocatedBlocks locatedBlocks =
+        client.getLocatedBlocks(path.toString(), 0, fileLength);
+    for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+      assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
+    }
+
+    return locatedBlocks;
+  }
+}