You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2014/10/27 20:59:19 UTC

[1/3] HDFS-6934. Move checksum computation off the hot path when writing to RAM disk. Contributed by Chris Nauroth.

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6 6724c2f7e -> e8d77593f


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
new file mode 100644
index 0000000..c762849
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.tools.JMXGet;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
+import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+public abstract class LazyPersistTestCase {
+
+  static {
+    ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
+  protected static final int BUFFER_LENGTH = 4096;
+  protected static final int EVICTION_LOW_WATERMARK = 1;
+  private static final long HEARTBEAT_INTERVAL_SEC = 1;
+  private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
+  private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
+  private static final String JMX_SERVICE_NAME = "DataNode";
+  protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
+  protected static final int LAZY_WRITER_INTERVAL_SEC = 1;
+  protected static final Log LOG = LogFactory.getLog(LazyPersistTestCase.class);
+  protected static final short REPL_FACTOR = 1;
+
+  protected MiniDFSCluster cluster;
+  protected DistributedFileSystem fs;
+  protected DFSClient client;
+  protected JMXGet jmx;
+  protected TemporarySocketDirectory sockDir;
+
+  @After
+  public void shutDownCluster() throws Exception {
+
+    // Dump all RamDisk JMX metrics before shutdown the cluster
+    printRamDiskJMXMetrics();
+
+    if (fs != null) {
+      fs.close();
+      fs = null;
+      client = null;
+    }
+
+    if (cluster != null) {
+      cluster.shutdownDataNodes();
+      cluster.shutdown();
+      cluster = null;
+    }
+
+    if (jmx != null) {
+      jmx = null;
+    }
+
+    IOUtils.closeQuietly(sockDir);
+    sockDir = null;
+  }
+
+  @Rule
+  public Timeout timeout = new Timeout(300000);
+
+  protected final LocatedBlocks ensureFileReplicasOnStorageType(
+      Path path, StorageType storageType) throws IOException {
+    // Ensure that returned block locations returned are correct!
+    LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
+    assertThat(fs.exists(path), is(true));
+    long fileLength = client.getFileInfo(path.toString()).getLen();
+    LocatedBlocks locatedBlocks =
+        client.getLocatedBlocks(path.toString(), 0, fileLength);
+    for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+      assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
+    }
+    return locatedBlocks;
+  }
+
+  protected final void makeRandomTestFile(Path path, long length,
+      boolean isLazyPersist, long seed) throws IOException {
+    DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
+      BLOCK_SIZE, REPL_FACTOR, seed, true);
+  }
+
+  protected final void makeTestFile(Path path, long length,
+      boolean isLazyPersist) throws IOException {
+
+    EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+
+    if (isLazyPersist) {
+      createFlags.add(LAZY_PERSIST);
+    }
+
+    FSDataOutputStream fos = null;
+    try {
+      fos =
+          fs.create(path,
+              FsPermission.getFileDefault(),
+              createFlags,
+              BUFFER_LENGTH,
+              REPL_FACTOR,
+              BLOCK_SIZE,
+              null);
+
+      // Allocate a block.
+      byte[] buffer = new byte[BUFFER_LENGTH];
+      for (int bytesWritten = 0; bytesWritten < length; ) {
+        fos.write(buffer, 0, buffer.length);
+        bytesWritten += buffer.length;
+      }
+      if (length > 0) {
+        fos.hsync();
+      }
+    } finally {
+      IOUtils.closeQuietly(fos);
+    }
+  }
+
+  /**
+   * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
+   * capped. If ramDiskStorageLimit < 0 then it is ignored.
+   */
+  protected final void startUpCluster(boolean hasTransientStorage,
+                                      final int ramDiskReplicaCapacity,
+                                      final boolean useSCR,
+                                      final boolean useLegacyBlockReaderLocal)
+      throws IOException {
+
+    Configuration conf = new Configuration();
+    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
+                LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
+    conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+                HEARTBEAT_RECHECK_INTERVAL_MSEC);
+    conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
+                LAZY_WRITER_INTERVAL_SEC);
+    conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
+                EVICTION_LOW_WATERMARK * BLOCK_SIZE);
+
+    if (useSCR) {
+      conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+      // Do not share a client context across tests.
+      conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
+      if (useLegacyBlockReaderLocal) {
+        conf.setBoolean(DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
+        conf.set(DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+            UserGroupInformation.getCurrentUser().getShortUserName());
+      } else {
+        sockDir = new TemporarySocketDirectory();
+        conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
+            this.getClass().getSimpleName() + "._PORT.sock").getAbsolutePath());
+      }
+    }
+
+    long[] capacities = null;
+    if (hasTransientStorage && ramDiskReplicaCapacity >= 0) {
+      // Convert replica count to byte count, add some delta for .meta and
+      // VERSION files.
+      long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) +
+          (BLOCK_SIZE - 1);
+      capacities = new long[] { ramDiskStorageLimit, -1 };
+    }
+
+    cluster = new MiniDFSCluster
+        .Builder(conf)
+        .numDataNodes(REPL_FACTOR)
+        .storageCapacities(capacities)
+        .storageTypes(hasTransientStorage ?
+            new StorageType[]{ RAM_DISK, DEFAULT } : null)
+        .build();
+    fs = cluster.getFileSystem();
+    client = fs.getClient();
+    try {
+      jmx = initJMX();
+    } catch (Exception e) {
+      fail("Failed initialize JMX for testing: " + e);
+    }
+    LOG.info("Cluster startup complete");
+  }
+
+  /**
+   * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
+   * capped. If ramDiskStorageLimit < 0 then it is ignored.
+   */
+  protected final void startUpCluster(final int numDataNodes,
+                                      final StorageType[] storageTypes,
+                                      final long ramDiskStorageLimit,
+                                      final boolean useSCR)
+    throws IOException {
+
+    Configuration conf = new Configuration();
+    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
+      LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
+    conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+      HEARTBEAT_RECHECK_INTERVAL_MSEC);
+    conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
+      LAZY_WRITER_INTERVAL_SEC);
+
+    if (useSCR)
+    {
+      conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
+      conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
+      sockDir = new TemporarySocketDirectory();
+      conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
+          this.getClass().getSimpleName() + "._PORT.sock").getAbsolutePath());
+      conf.set(DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+        UserGroupInformation.getCurrentUser().getShortUserName());
+    }
+
+    cluster = new MiniDFSCluster
+      .Builder(conf)
+      .numDataNodes(numDataNodes)
+      .storageTypes(storageTypes != null ?
+          storageTypes : new StorageType[] { DEFAULT, DEFAULT })
+      .build();
+    fs = cluster.getFileSystem();
+    client = fs.getClient();
+
+    // Artificially cap the storage capacity of the RAM_DISK volume.
+    if (ramDiskStorageLimit >= 0) {
+      List<? extends FsVolumeSpi> volumes =
+        cluster.getDataNodes().get(0).getFSDataset().getVolumes();
+
+      for (FsVolumeSpi volume : volumes) {
+        if (volume.getStorageType() == RAM_DISK) {
+          ((FsVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
+        }
+      }
+    }
+
+    LOG.info("Cluster startup complete");
+  }
+
+  protected final void startUpCluster(boolean hasTransientStorage,
+                                      final int ramDiskReplicaCapacity)
+      throws IOException {
+    startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false, false);
+  }
+
+  protected final void triggerBlockReport()
+      throws IOException, InterruptedException {
+    // Trigger block report to NN
+    DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
+    Thread.sleep(10 * 1000);
+  }
+
+  protected final boolean verifyBlockDeletedFromDir(File dir,
+      LocatedBlocks locatedBlocks) {
+
+    for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+      File targetDir =
+        DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId());
+
+      File blockFile = new File(targetDir, lb.getBlock().getBlockName());
+      if (blockFile.exists()) {
+        LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
+          " exists after deletion.");
+        return false;
+      }
+      File metaFile = new File(targetDir,
+        DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
+          lb.getBlock().getGenerationStamp()));
+      if (metaFile.exists()) {
+        LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
+          " exists after deletion.");
+        return false;
+      }
+    }
+    return true;
+  }
+
+  protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
+      throws IOException, InterruptedException {
+
+    LOG.info("Verifying replica has no saved copy after deletion.");
+    triggerBlockReport();
+
+    while(
+      DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0))
+        > 0L){
+      Thread.sleep(1000);
+    }
+
+    final String bpid = cluster.getNamesystem().getBlockPoolId();
+    List<? extends FsVolumeSpi> volumes =
+      cluster.getDataNodes().get(0).getFSDataset().getVolumes();
+
+    // Make sure deleted replica does not have a copy on either finalized dir of
+    // transient volume or finalized dir of non-transient volume
+    for (FsVolumeSpi v : volumes) {
+      FsVolumeImpl volume = (FsVolumeImpl) v;
+      File targetDir = (v.isTransientStorage()) ?
+          volume.getBlockPoolSlice(bpid).getFinalizedDir() :
+          volume.getBlockPoolSlice(bpid).getLazypersistDir();
+      if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  protected final void verifyRamDiskJMXMetric(String metricName,
+      long expectedValue) throws Exception {
+    assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName)));
+  }
+
+  protected final boolean verifyReadRandomFile(
+      Path path, int fileLength, int seed) throws IOException {
+    byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
+    byte expected[] = DFSTestUtil.
+      calculateFileContentsFromSeed(seed, fileLength);
+    return Arrays.equals(contents, expected);
+  }
+
+  private JMXGet initJMX() throws Exception {
+    JMXGet jmx = new JMXGet();
+    jmx.setService(JMX_SERVICE_NAME);
+    jmx.init();
+    return jmx;
+  }
+
+  private void printRamDiskJMXMetrics() {
+    try {
+      if (jmx != null) {
+        jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index 444afed..771609c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -17,103 +17,45 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.tools.JMXGet;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.log4j.Level;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.*;
-import java.util.*;
+import java.io.File;
+import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
-import static org.apache.hadoop.fs.CreateFlag.CREATE;
-import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
 import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class TestLazyPersistFiles {
-  public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
-
-  static {
-    ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
-    ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
-    ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
-  }
-
+public class TestLazyPersistFiles extends LazyPersistTestCase {
   private static final byte LAZY_PERSIST_POLICY_ID = (byte) 15;
 
   private static final int THREADPOOL_SIZE = 10;
 
-  private static final short REPL_FACTOR = 1;
-  private static final int BLOCK_SIZE = 5 * 1024 * 1024;
-  private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
-  private static final long HEARTBEAT_INTERVAL_SEC = 1;
-  private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
-  private static final int LAZY_WRITER_INTERVAL_SEC = 1;
-  private static final int BUFFER_LENGTH = 4096;
-  private static final int EVICTION_LOW_WATERMARK = 1;
-  private static final String JMX_SERVICE_NAME = "DataNode";
-  private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
-
-  private MiniDFSCluster cluster;
-  private DistributedFileSystem fs;
-  private DFSClient client;
-  private Configuration conf;
-  private JMXGet jmx;
-
-  @After
-  public void shutDownCluster() throws Exception {
-
-    // Dump all RamDisk JMX metrics before shutdown the cluster
-    printRamDiskJMXMetrics();
-
-    if (fs != null) {
-      fs.close();
-      fs = null;
-      client = null;
-    }
-
-    if (cluster != null) {
-      cluster.shutdownDataNodes();
-      cluster.shutdown();
-      cluster = null;
-    }
-
-    if (jmx != null) {
-      jmx = null;
-    }
-  }
-
-  @Test (timeout=300000)
+  @Test
   public void testPolicyNotSetByDefault() throws IOException {
     startUpCluster(false, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -126,7 +68,7 @@ public class TestLazyPersistFiles {
     assertThat(status.getStoragePolicy(), not(LAZY_PERSIST_POLICY_ID));
   }
 
-  @Test (timeout=300000)
+  @Test
   public void testPolicyPropagation() throws IOException {
     startUpCluster(false, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -138,7 +80,7 @@ public class TestLazyPersistFiles {
     assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
   }
 
-  @Test (timeout=300000)
+  @Test
   public void testPolicyPersistenceInEditLog() throws IOException {
     startUpCluster(false, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -152,7 +94,7 @@ public class TestLazyPersistFiles {
     assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
   }
 
-  @Test (timeout=300000)
+  @Test
   public void testPolicyPersistenceInFsImage() throws IOException {
     startUpCluster(false, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -170,7 +112,7 @@ public class TestLazyPersistFiles {
     assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
   }
 
-  @Test (timeout=300000)
+  @Test
   public void testPlacementOnRamDisk() throws IOException {
     startUpCluster(true, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -180,7 +122,7 @@ public class TestLazyPersistFiles {
     ensureFileReplicasOnStorageType(path, RAM_DISK);
   }
 
-  @Test (timeout=300000)
+  @Test
   public void testPlacementOnSizeLimitedRamDisk() throws IOException {
     startUpCluster(true, 3);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -199,7 +141,7 @@ public class TestLazyPersistFiles {
    * Write should default to disk. No error.
    * @throws IOException
    */
-  @Test (timeout=300000)
+  @Test
   public void testFallbackToDisk() throws IOException {
     startUpCluster(false, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -213,7 +155,7 @@ public class TestLazyPersistFiles {
    * File can not fit in RamDisk even with eviction
    * @throws IOException
    */
-  @Test (timeout=300000)
+  @Test
   public void testFallbackToDiskFull() throws Exception {
     startUpCluster(false, 0);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -231,7 +173,7 @@ public class TestLazyPersistFiles {
    * Expect 2 or less blocks are on RamDisk and 3 or more on disk.
    * @throws IOException
    */
-  @Test (timeout=300000)
+  @Test
   public void testFallbackToDiskPartial()
     throws IOException, InterruptedException {
     startUpCluster(true, 2);
@@ -271,7 +213,7 @@ public class TestLazyPersistFiles {
    *
    * @throws IOException
    */
-  @Test (timeout=300000)
+  @Test
   public void testRamDiskNotChosenByDefault() throws IOException {
     startUpCluster(true, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -289,7 +231,7 @@ public class TestLazyPersistFiles {
    * Append to lazy persist file is denied.
    * @throws IOException
    */
-  @Test (timeout=300000)
+  @Test
   public void testAppendIsDenied() throws IOException {
     startUpCluster(true, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -310,7 +252,7 @@ public class TestLazyPersistFiles {
    * must be discarded by the NN, instead of being kept around as a
    * 'corrupt' file.
    */
-  @Test (timeout=300000)
+  @Test
   public void testLazyPersistFilesAreDiscarded()
       throws IOException, InterruptedException {
     startUpCluster(true, 2);
@@ -344,7 +286,7 @@ public class TestLazyPersistFiles {
                is(0L));
   }
 
-  @Test (timeout=300000)
+  @Test
   public void testLazyPersistBlocksAreSaved()
       throws IOException, InterruptedException {
     startUpCluster(true, -1);
@@ -399,7 +341,7 @@ public class TestLazyPersistFiles {
    * RamDisk eviction after lazy persist to disk.
    * @throws Exception
    */
-  @Test (timeout=300000)
+  @Test
   public void testRamDiskEviction() throws Exception {
     startUpCluster(true, 1 + EVICTION_LOW_WATERMARK);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -434,7 +376,7 @@ public class TestLazyPersistFiles {
    * @throws IOException
    * @throws InterruptedException
    */
-  @Test (timeout=300000)
+  @Test
   public void testRamDiskEvictionBeforePersist()
     throws IOException, InterruptedException {
     startUpCluster(true, 1);
@@ -459,7 +401,7 @@ public class TestLazyPersistFiles {
 
     assert(fs.exists(path1));
     assert(fs.exists(path2));
-    verifyReadRandomFile(path1, BLOCK_SIZE, SEED);
+    assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
   }
 
   /**
@@ -467,7 +409,7 @@ public class TestLazyPersistFiles {
    * @throws IOException
    * @throws InterruptedException
    */
-  @Test (timeout=300000)
+  @Test
   public void testRamDiskEvictionIsLru()
     throws Exception {
     final int NUM_PATHS = 5;
@@ -529,7 +471,7 @@ public class TestLazyPersistFiles {
    * Memory is freed up and file is gone.
    * @throws IOException
    */
-  @Test // (timeout=300000)
+  @Test
   public void testDeleteBeforePersist()
     throws Exception {
     startUpCluster(true, -1);
@@ -556,7 +498,7 @@ public class TestLazyPersistFiles {
    * @throws IOException
    * @throws InterruptedException
    */
-  @Test (timeout=300000)
+  @Test
   public void testDeleteAfterPersist()
     throws Exception {
     startUpCluster(true, -1);
@@ -584,7 +526,7 @@ public class TestLazyPersistFiles {
    * @throws IOException
    * @throws InterruptedException
    */
-  @Test (timeout=300000)
+  @Test
   public void testDfsUsageCreateDelete()
     throws IOException, InterruptedException {
     startUpCluster(true, 4);
@@ -615,7 +557,7 @@ public class TestLazyPersistFiles {
   /**
    * Concurrent read from the same node and verify the contents.
    */
-  @Test (timeout=300000)
+  @Test
   public void testConcurrentRead()
     throws Exception {
     startUpCluster(true, 2);
@@ -666,7 +608,7 @@ public class TestLazyPersistFiles {
    * @throws IOException
    * @throws InterruptedException
    */
-  @Test (timeout=300000)
+  @Test
   public void testConcurrentWrites()
     throws IOException, InterruptedException {
     startUpCluster(true, 9);
@@ -702,7 +644,7 @@ public class TestLazyPersistFiles {
     assertThat(testFailed.get(), is(false));
   }
 
-  @Test (timeout=300000)
+  @Test
   public void testDnRestartWithSavedReplicas()
       throws IOException, InterruptedException {
 
@@ -726,7 +668,7 @@ public class TestLazyPersistFiles {
     ensureFileReplicasOnStorageType(path1, DEFAULT);
   }
 
-  @Test (timeout=300000)
+  @Test
   public void testDnRestartWithUnsavedReplicas()
       throws IOException, InterruptedException {
 
@@ -746,183 +688,6 @@ public class TestLazyPersistFiles {
     ensureFileReplicasOnStorageType(path1, RAM_DISK);
   }
 
-  // ---- Utility functions for all test cases -------------------------------
-
-  /**
-   * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
-   * capped. If ramDiskStorageLimit < 0 then it is ignored.
-   */
-  private void startUpCluster(boolean hasTransientStorage,
-                              final int ramDiskReplicaCapacity,
-                              final boolean useSCR)
-      throws IOException {
-
-    conf = new Configuration();
-    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
-                LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
-    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
-    conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
-                HEARTBEAT_RECHECK_INTERVAL_MSEC);
-    conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
-                LAZY_WRITER_INTERVAL_SEC);
-    conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
-                EVICTION_LOW_WATERMARK * BLOCK_SIZE);
-
-    conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, useSCR);
-
-    long[] capacities = null;
-    if (hasTransientStorage && ramDiskReplicaCapacity >= 0) {
-      // Convert replica count to byte count, add some delta for .meta and VERSION files.
-      long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) + (BLOCK_SIZE - 1);
-      capacities = new long[] { ramDiskStorageLimit, -1 };
-    }
-
-    cluster = new MiniDFSCluster
-        .Builder(conf)
-        .numDataNodes(REPL_FACTOR)
-        .storageCapacities(capacities)
-        .storageTypes(hasTransientStorage ? new StorageType[]{ RAM_DISK, DEFAULT } : null)
-        .build();
-    fs = cluster.getFileSystem();
-    client = fs.getClient();
-    try {
-      jmx = initJMX();
-    } catch (Exception e) {
-      fail("Failed initialize JMX for testing: " + e);
-    }
-    LOG.info("Cluster startup complete");
-  }
-
-  private void startUpCluster(boolean hasTransientStorage,
-                              final int ramDiskReplicaCapacity)
-    throws IOException {
-    startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false);
-  }
-
-  private void makeTestFile(Path path, long length, final boolean isLazyPersist)
-      throws IOException {
-
-    EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
-
-    if (isLazyPersist) {
-      createFlags.add(LAZY_PERSIST);
-    }
-
-    FSDataOutputStream fos = null;
-    try {
-      fos =
-          fs.create(path,
-              FsPermission.getFileDefault(),
-              createFlags,
-              BUFFER_LENGTH,
-              REPL_FACTOR,
-              BLOCK_SIZE,
-              null);
-
-      // Allocate a block.
-      byte[] buffer = new byte[BUFFER_LENGTH];
-      for (int bytesWritten = 0; bytesWritten < length; ) {
-        fos.write(buffer, 0, buffer.length);
-        bytesWritten += buffer.length;
-      }
-      if (length > 0) {
-        fos.hsync();
-      }
-    } finally {
-      IOUtils.closeQuietly(fos);
-    }
-  }
-
-  private LocatedBlocks ensureFileReplicasOnStorageType(
-      Path path, StorageType storageType) throws IOException {
-    // Ensure that returned block locations returned are correct!
-    LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
-    assertThat(fs.exists(path), is(true));
-    long fileLength = client.getFileInfo(path.toString()).getLen();
-    LocatedBlocks locatedBlocks =
-        client.getLocatedBlocks(path.toString(), 0, fileLength);
-    for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
-      assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
-    }
-    return locatedBlocks;
-  }
-
-  private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
-                                  long seed) throws IOException {
-    DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
-      BLOCK_SIZE, REPL_FACTOR, seed, true);
-  }
-
-  private boolean verifyReadRandomFile(
-    Path path, int fileLength, int seed) throws IOException {
-    byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
-    byte expected[] = DFSTestUtil.
-      calculateFileContentsFromSeed(seed, fileLength);
-    return Arrays.equals(contents, expected);
-  }
-
-  private boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
-    throws IOException, InterruptedException {
-
-    LOG.info("Verifying replica has no saved copy after deletion.");
-    triggerBlockReport();
-
-    while(
-      DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0))
-        > 0L){
-      Thread.sleep(1000);
-    }
-
-    final String bpid = cluster.getNamesystem().getBlockPoolId();
-    List<? extends FsVolumeSpi> volumes =
-      cluster.getDataNodes().get(0).getFSDataset().getVolumes();
-
-    // Make sure deleted replica does not have a copy on either finalized dir of
-    // transient volume or finalized dir of non-transient volume
-    for (FsVolumeSpi v : volumes) {
-      FsVolumeImpl volume = (FsVolumeImpl) v;
-      File targetDir = (v.isTransientStorage()) ?
-          volume.getBlockPoolSlice(bpid).getFinalizedDir() :
-          volume.getBlockPoolSlice(bpid).getLazypersistDir();
-      if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private boolean verifyBlockDeletedFromDir(File dir, LocatedBlocks locatedBlocks) {
-
-    for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
-      File targetDir =
-        DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId());
-
-      File blockFile = new File(targetDir, lb.getBlock().getBlockName());
-      if (blockFile.exists()) {
-        LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
-          " exists after deletion.");
-        return false;
-      }
-      File metaFile = new File(targetDir,
-        DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
-          lb.getBlock().getGenerationStamp()));
-      if (metaFile.exists()) {
-        LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
-          " exists after deletion.");
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private void triggerBlockReport()
-    throws IOException, InterruptedException {
-    // Trigger block report to NN
-    DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
-    Thread.sleep(10 * 1000);
-  }
-
   class WriterRunnable implements Runnable {
     private final int id;
     private final Path paths[];
@@ -960,27 +725,4 @@ public class TestLazyPersistFiles {
       }
     }
   }
-
-  JMXGet initJMX() throws Exception
-  {
-    JMXGet jmx = new JMXGet();
-    jmx.setService(JMX_SERVICE_NAME);
-    jmx.init();
-    return jmx;
-  }
-
-  void printRamDiskJMXMetrics() {
-    try {
-      if (jmx != null) {
-        jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-  }
-
-  void verifyRamDiskJMXMetric(String metricName, long expectedValue)
-      throws Exception {
-    assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName)));
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
index b6ac287..efc6dcb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
@@ -15,84 +15,44 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-  package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-  import org.apache.commons.io.IOUtils;
-  import org.apache.commons.logging.Log;
-  import org.apache.commons.logging.LogFactory;
-  import org.apache.commons.logging.impl.Log4JLogger;
-  import org.apache.hadoop.conf.Configuration;
-  import org.apache.hadoop.fs.CreateFlag;
-  import org.apache.hadoop.fs.FSDataInputStream;
-  import org.apache.hadoop.fs.FSDataOutputStream;
-  import org.apache.hadoop.fs.Path;
-  import org.apache.hadoop.fs.permission.FsPermission;
-  import org.apache.hadoop.hdfs.*;
-  import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
-  import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-  import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-  import org.apache.hadoop.hdfs.server.datanode.DataNode;
-  import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
-  import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
-  import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-  import org.apache.hadoop.hdfs.server.namenode.NameNode;
-  import org.apache.hadoop.net.unix.DomainSocket;
-  import org.apache.hadoop.net.unix.TemporarySocketDirectory;
-  import org.apache.hadoop.security.UserGroupInformation;
-  import org.apache.hadoop.test.GenericTestUtils;
-  import org.apache.hadoop.util.NativeCodeLoader;
-  import org.apache.log4j.Level;
-  import org.junit.*;
-
-  import java.io.File;
-  import java.io.IOException;
-  import java.util.Arrays;
-  import java.util.EnumSet;
-  import java.util.List;
-  import java.util.UUID;
-
-  import static org.apache.hadoop.fs.CreateFlag.CREATE;
-  import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
-  import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-  import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
-  import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
-  import static org.hamcrest.CoreMatchers.equalTo;
-  import static org.hamcrest.core.Is.is;
-  import static org.junit.Assert.assertThat;
-
-public class TestScrLazyPersistFiles {
-  public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
-
-  static {
-    ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
-    ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
-    ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
-  }
-
-  private static short REPL_FACTOR = 1;
-  private static final int BLOCK_SIZE = 10485760;   // 10 MB
-  private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
-  private static final long HEARTBEAT_INTERVAL_SEC = 1;
-  private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
-  private static final int LAZY_WRITER_INTERVAL_SEC = 1;
-  private static final int BUFFER_LENGTH = 4096;
-  private static TemporarySocketDirectory sockDir;
-
-  private MiniDFSCluster cluster;
-  private DistributedFileSystem fs;
-  private DFSClient client;
-  private Configuration conf;
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.ClientContext;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
+import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class TestScrLazyPersistFiles extends LazyPersistTestCase {
 
   @BeforeClass
   public static void init() {
-    sockDir = new TemporarySocketDirectory();
     DomainSocket.disableBindPathValidation();
   }
 
-  @AfterClass
-  public static void shutdown() throws IOException {
-    sockDir.close();
-  }
-
   @Before
   public void before() {
     Assume.assumeThat(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS,
@@ -100,26 +60,14 @@ public class TestScrLazyPersistFiles {
     Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
   }
 
-  @After
-  public void shutDownCluster() throws IOException {
-    if (fs != null) {
-      fs.close();
-      fs = null;
-      client = null;
-    }
-
-    if (cluster != null) {
-      cluster.shutdownDataNodes();
-      cluster.shutdown();
-      cluster = null;
-    }
-  }
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
 
   /**
    * Read in-memory block with Short Circuit Read
    * Note: the test uses faked RAM_DISK from physical disk.
    */
-  @Test (timeout=300000)
+  @Test
   public void testRamDiskShortCircuitRead()
     throws IOException, InterruptedException {
     startUpCluster(REPL_FACTOR,
@@ -160,7 +108,7 @@ public class TestScrLazyPersistFiles {
    * @throws IOException
    * @throws InterruptedException
    */
-  @Test (timeout=300000000)
+  @Test
   public void testRamDiskEvictionWithShortCircuitReadHandle()
     throws IOException, InterruptedException {
     startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
@@ -204,123 +152,149 @@ public class TestScrLazyPersistFiles {
     ensureFileReplicasOnStorageType(path1, DEFAULT);
   }
 
-  // ---- Utility functions for all test cases -------------------------------
+  @Test
+  public void testShortCircuitReadAfterEviction()
+      throws IOException, InterruptedException {
+    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+    startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false);
+    doShortCircuitReadAfterEvictionTest();
+  }
 
-  /**
-   * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
-   * capped. If ramDiskStorageLimit < 0 then it is ignored.
-   */
-  private void startUpCluster(final int numDataNodes,
-                              final StorageType[] storageTypes,
-                              final long ramDiskStorageLimit,
-                              final boolean useSCR)
-    throws IOException {
-
-    conf = new Configuration();
-    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
-      LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
-    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
-    conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
-      HEARTBEAT_RECHECK_INTERVAL_MSEC);
-    conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
-      LAZY_WRITER_INTERVAL_SEC);
-
-    if (useSCR)
-    {
-      conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
-      conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT,
-        UUID.randomUUID().toString());
-      conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
-        new File(sockDir.getDir(),
-          "TestShortCircuitLocalReadHandle._PORT.sock").getAbsolutePath());
-      conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        UserGroupInformation.getCurrentUser().getShortUserName());
-    }
+  @Test
+  public void testLegacyShortCircuitReadAfterEviction()
+      throws IOException, InterruptedException {
+    startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true);
+    doShortCircuitReadAfterEvictionTest();
+  }
 
-    REPL_FACTOR = 1; //Reset in case a test has modified the value
-
-    cluster = new MiniDFSCluster
-      .Builder(conf)
-      .numDataNodes(numDataNodes)
-      .storageTypes(storageTypes != null ? storageTypes : new StorageType[] { DEFAULT, DEFAULT })
-      .build();
-    fs = cluster.getFileSystem();
-    client = fs.getClient();
-
-    // Artificially cap the storage capacity of the RAM_DISK volume.
-    if (ramDiskStorageLimit >= 0) {
-      List<? extends FsVolumeSpi> volumes =
-        cluster.getDataNodes().get(0).getFSDataset().getVolumes();
-
-      for (FsVolumeSpi volume : volumes) {
-        if (volume.getStorageType() == RAM_DISK) {
-          ((FsVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
-        }
-      }
+  private void doShortCircuitReadAfterEvictionTest() throws IOException,
+      InterruptedException {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+
+    final int SEED = 0xFADED;
+    makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+
+    // Verify short-circuit read from RAM_DISK.
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+    File metaFile = MiniDFSCluster.getBlockMetadataFile(0,
+        DFSTestUtil.getFirstBlock(fs, path1));
+    assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
+    assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
+
+    // Sleep for a short time to allow the lazy writer thread to do its job.
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+    // Verify short-circuit read from RAM_DISK once again.
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+    metaFile = MiniDFSCluster.getBlockMetadataFile(0,
+        DFSTestUtil.getFirstBlock(fs, path1));
+    assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
+    assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
+
+    // Create another file with a replica on RAM_DISK, which evicts the first.
+    makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+    triggerBlockReport();
+
+    // Verify short-circuit read still works from DEFAULT storage.  This time,
+    // we'll have a checksum written during lazy persistence.
+    ensureFileReplicasOnStorageType(path1, DEFAULT);
+    metaFile = MiniDFSCluster.getBlockMetadataFile(0,
+        DFSTestUtil.getFirstBlock(fs, path1));
+    assertTrue(metaFile.length() > BlockMetadataHeader.getHeaderSize());
+    assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
+
+    // In the implementation of legacy short-circuit reads, any failure is
+    // trapped silently, reverts back to a remote read, and also disables all
+    // subsequent legacy short-circuit reads in the ClientContext.  If the test
+    // uses legacy, then assert that it didn't get disabled.
+    ClientContext clientContext = client.getClientContext();
+    if (clientContext.getUseLegacyBlockReaderLocal()) {
+      Assert.assertFalse(clientContext.getDisableLegacyBlockReaderLocal());
     }
+  }
 
-    LOG.info("Cluster startup complete");
+  @Test
+  public void testShortCircuitReadBlockFileCorruption() throws IOException,
+      InterruptedException {
+    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+    startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false);
+    doShortCircuitReadBlockFileCorruptionTest();
   }
 
-  private void makeTestFile(Path path, long length, final boolean isLazyPersist)
-    throws IOException {
+  @Test
+  public void testLegacyShortCircuitReadBlockFileCorruption() throws IOException,
+      InterruptedException {
+    startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true);
+    doShortCircuitReadBlockFileCorruptionTest();
+  }
 
-    EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+  public void doShortCircuitReadBlockFileCorruptionTest() throws IOException,
+      InterruptedException {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
 
-    if (isLazyPersist) {
-      createFlags.add(LAZY_PERSIST);
-    }
+    final int SEED = 0xFADED;
+    makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
 
-    FSDataOutputStream fos = null;
-    try {
-      fos =
-        fs.create(path,
-          FsPermission.getFileDefault(),
-          createFlags,
-          BUFFER_LENGTH,
-          REPL_FACTOR,
-          BLOCK_SIZE,
-          null);
-
-      // Allocate a block.
-      byte[] buffer = new byte[BUFFER_LENGTH];
-      for (int bytesWritten = 0; bytesWritten < length; ) {
-        fos.write(buffer, 0, buffer.length);
-        bytesWritten += buffer.length;
-      }
-      if (length > 0) {
-        fos.hsync();
-      }
-    } finally {
-      IOUtils.closeQuietly(fos);
-    }
+    // Create another file with a replica on RAM_DISK, which evicts the first.
+    makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
+
+    // Sleep for a short time to allow the lazy writer thread to do its job.
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+    triggerBlockReport();
+
+    // Corrupt the lazy-persisted block file, and verify that checksum
+    // verification catches it.
+    ensureFileReplicasOnStorageType(path1, DEFAULT);
+    MiniDFSCluster.corruptReplica(0, DFSTestUtil.getFirstBlock(fs, path1));
+    exception.expect(ChecksumException.class);
+    DFSTestUtil.readFileBuffer(fs, path1);
   }
 
-  private LocatedBlocks ensureFileReplicasOnStorageType(
-    Path path, StorageType storageType) throws IOException {
-    // Ensure that returned block locations returned are correct!
-    LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
-    assertThat(fs.exists(path), is(true));
-    long fileLength = client.getFileInfo(path.toString()).getLen();
-    LocatedBlocks locatedBlocks =
-      client.getLocatedBlocks(path.toString(), 0, fileLength);
-    for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
-      assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
-    }
-    return locatedBlocks;
+  @Test
+  public void testShortCircuitReadMetaFileCorruption() throws IOException,
+      InterruptedException {
+    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+    startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false);
+    doShortCircuitReadMetaFileCorruptionTest();
   }
 
-  private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
-                                  long seed) throws IOException {
-    DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
-      BLOCK_SIZE, REPL_FACTOR, seed, true);
+  @Test
+  public void testLegacyShortCircuitReadMetaFileCorruption() throws IOException,
+      InterruptedException {
+    startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true);
+    doShortCircuitReadMetaFileCorruptionTest();
   }
 
-  private void triggerBlockReport()
-    throws IOException, InterruptedException {
-    // Trigger block report to NN
-    DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
-    Thread.sleep(10 * 1000);
+  public void doShortCircuitReadMetaFileCorruptionTest() throws IOException,
+      InterruptedException {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+
+    final int SEED = 0xFADED;
+    makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+    // Create another file with a replica on RAM_DISK, which evicts the first.
+    makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
+
+    // Sleep for a short time to allow the lazy writer thread to do its job.
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+    triggerBlockReport();
+
+    // Corrupt the lazy-persisted checksum file, and verify that checksum
+    // verification catches it.
+    ensureFileReplicasOnStorageType(path1, DEFAULT);
+    File metaFile = MiniDFSCluster.getBlockMetadataFile(0,
+        DFSTestUtil.getFirstBlock(fs, path1));
+    MiniDFSCluster.corruptBlock(metaFile);
+    exception.expect(ChecksumException.class);
+    DFSTestUtil.readFileBuffer(fs, path1);
   }
 }


[3/3] git commit: HDFS-6934. Revert files accidentally committed.

Posted by cn...@apache.org.
HDFS-6934. Revert files accidentally committed.

(cherry picked from commit 5b1dfe78b8b06335bed0bcb83f12bb936d4c021b)
(cherry picked from commit 5dbd27f8b4923daea6e70b0bfe5cd3063a5f1f57)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e8d77593
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e8d77593
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e8d77593

Branch: refs/heads/branch-2.6
Commit: e8d77593fa6f4cd18e11c4d637cd9ff6997be925
Parents: a9f31af
Author: cnauroth <cn...@apache.org>
Authored: Mon Oct 27 12:15:03 2014 -0700
Committer: cnauroth <cn...@apache.org>
Committed: Mon Oct 27 12:18:48 2014 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/io/nativeio/NativeIO.java |   3 +-
 .../main/java/org/apache/hadoop/util/Shell.java | 111 -------------------
 .../java/org/apache/hadoop/util/TestShell.java  |  20 ----
 3 files changed, 1 insertion(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8d77593/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
index f0aca3a..2400958 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
@@ -869,8 +869,7 @@ public class NativeIO {
    * @throws IOException
    */
   public static void copyFileUnbuffered(File src, File dst) throws IOException {
-    if ((nativeLoaded) &&
-        (Shell.WINDOWS || (Shell.isLinuxSendfileAvailable))) {
+    if ((nativeLoaded) && (Shell.WINDOWS || Shell.LINUX)) {
       copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath());
     } else {
       FileUtils.copyFile(src, dst);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8d77593/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
index 9b2a824..3aac27b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
@@ -377,117 +377,6 @@ abstract public class Shell {
     return winUtilsPath;
   }
 
-  public static class LinuxKernelVersion implements Comparable<LinuxKernelVersion>{
-    private final short major;
-    private final short minor;
-    private final short revision;
-
-    public LinuxKernelVersion(short major, short minor, short revision) {
-      this.major = major;
-      this.minor = minor;
-      this.revision = revision;
-    }
-
-    /**
-     * Parse Linux kernel version string from output of POSIX command 'uname -r'
-     * @param version version string from POSIX command 'uname -r'
-     * @return LinuxKernelVersion
-     * @throws IllegalArgumentException
-     *
-     * Note:
-     * On CentOS 5.8: '2.6.18-308.24.1.el5'
-     * On Ubuntu 14:  '3.13.0-32-generic'
-     */
-    public static LinuxKernelVersion parseLinuxKernelVersion(String version)
-        throws IllegalArgumentException {
-      if (version == null) {
-        throw new IllegalArgumentException();
-      }
-      String parts[] = version.split("-")[0].split("\\.");
-      if (parts.length != 3) {
-        throw new IllegalArgumentException(version);
-      }
-      short major = Short.parseShort(parts[0]);
-      short minor = Short.parseShort(parts[1]);
-      short revision = Short.parseShort(parts[2]);
-      return new LinuxKernelVersion(major, minor, revision);
-    }
-
-    @Override
-    public int compareTo(LinuxKernelVersion o) {
-      if (this.major == o.major) {
-        if (this.minor == o.minor) {
-          return this.revision - o.revision;
-        } else {
-          return this.minor - o.minor;
-        }
-      } else {
-        return this.major - o.major;
-      }
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (this == other) {
-        return true;
-      }
-      if (!(other instanceof LinuxKernelVersion)) {
-        return false;
-      }
-      return compareTo((LinuxKernelVersion) other) == 0;
-    }
-
-    @Override
-    public String toString() {
-      return String.format("%d.%d.%d", major, minor, revision);
-    }
-
-    @Override
-    public int hashCode(){
-      int hash = 41;
-      hash = (19 * hash) + major;
-      hash = (53 * hash) + minor;
-      hash = (29 * hash) + revision;
-      return hash;
-    }
-  }
-
-  /*
-   * sendfile() API between two file descriptors
-   * is only supported on Linux Kernel version 2.6.33+
-   * according to http://man7.org/linux/man-pages/man2/sendfile.2.html
-   */
-  public static final boolean isLinuxSendfileAvailable = isLinuxSendfileSupported();
-  private static LinuxKernelVersion minLkvSupportSendfile =
-      new LinuxKernelVersion((short)2, (short)6, (short)33);
-
-  private static boolean isLinuxSendfileSupported() {
-    if (!Shell.LINUX) {
-      return false;
-    }
-    ShellCommandExecutor shexec = null;
-    boolean sendfileSupported = false;
-    try {
-      String[] args = {"uname", "bash", "-r"};
-      shexec = new ShellCommandExecutor(args);
-      shexec.execute();
-      String version = shexec.getOutput();
-      LinuxKernelVersion lkv =
-          LinuxKernelVersion.parseLinuxKernelVersion(version);
-      if (lkv.compareTo(minLkvSupportSendfile) > 0) {
-        sendfileSupported = true;
-      }
-    } catch (Exception e) {
-      LOG.warn("isLinuxSendfileSupported() failed unexpected: " + e);
-    } finally {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("uname exited with exit code "
-            + (shexec != null ? shexec.getExitCode() : "(null executor)"));
-      }
-    }
-    return sendfileSupported;
-  }
-
   public static final boolean isSetsidAvailable = isSetsidSupported();
   private static boolean isSetsidSupported() {
     if (Shell.WINDOWS) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8d77593/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
index 19589f8..d9dc9ef 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
@@ -165,24 +165,4 @@ public class TestShell extends TestCase {
       assertEquals(2, command.getRunCount());
     }
   }
-
-  public void testLinuxKernelVersion() throws IOException {
-    Shell.LinuxKernelVersion v2_6_18 =
-        new Shell.LinuxKernelVersion((short)2, (short)6, (short)18);
-    Shell.LinuxKernelVersion v2_6_32 =
-        new Shell.LinuxKernelVersion((short)2, (short)6, (short)32);
-    assertTrue(v2_6_18.compareTo(v2_6_32) < 0);
-  }
-
-  public void testParseLinuxKernelVersion() throws Exception {
-    String centOs58Ver = new String("2.6.18-308.24.1.el5");
-    String ubuntu14Ver = new String("3.13.0-32-generic");
-    Shell.LinuxKernelVersion lkvCentOs58 =
-        Shell.LinuxKernelVersion.parseLinuxKernelVersion(centOs58Ver);
-    Shell.LinuxKernelVersion lkvUnbuntu14 =
-        Shell.LinuxKernelVersion.parseLinuxKernelVersion(ubuntu14Ver);
-    assertTrue(lkvUnbuntu14.compareTo(lkvCentOs58) > 0);
-    assertFalse(lkvUnbuntu14.equals(lkvCentOs58));
-  }
-
 }


[2/3] git commit: HDFS-6934. Move checksum computation off the hot path when writing to RAM disk. Contributed by Chris Nauroth.

Posted by cn...@apache.org.
HDFS-6934. Move checksum computation off the hot path when writing to RAM disk. Contributed by Chris Nauroth.

(cherry picked from commit 463aec11718e47d4aabb86a7a539cb973460aae6)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

(cherry picked from commit 3d67da502aa21d314d672f8b465d5415d77b5df0)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a9f31af2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a9f31af2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a9f31af2

Branch: refs/heads/branch-2.6
Commit: a9f31af29c25d7f097cf22d90ead5115e1e74461
Parents: 6724c2f
Author: cnauroth <cn...@apache.org>
Authored: Mon Oct 27 09:38:30 2014 -0700
Committer: cnauroth <cn...@apache.org>
Committed: Mon Oct 27 12:01:47 2014 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/fs/FSOutputSummer.java    |  17 +-
 .../main/java/org/apache/hadoop/fs/Options.java |  20 +-
 .../org/apache/hadoop/io/nativeio/NativeIO.java |   3 +-
 .../org/apache/hadoop/util/DataChecksum.java    |  18 +-
 .../main/java/org/apache/hadoop/util/Shell.java | 111 ++++++
 .../java/org/apache/hadoop/util/TestShell.java  |  20 +
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../apache/hadoop/hdfs/BlockReaderFactory.java  |  13 +-
 .../apache/hadoop/hdfs/BlockReaderLocal.java    |  29 +-
 .../hadoop/hdfs/BlockReaderLocalLegacy.java     |  33 +-
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |   9 +-
 .../org/apache/hadoop/hdfs/DFSInputStream.java  |  77 ++--
 .../org/apache/hadoop/hdfs/DFSOutputStream.java | 110 +++---
 .../hadoop/hdfs/protocol/LocatedBlock.java      |   7 +-
 .../server/datanode/BlockMetadataHeader.java    |  43 +-
 .../hdfs/server/datanode/BlockReceiver.java     | 115 +++---
 .../hdfs/server/datanode/BlockSender.java       |  50 ++-
 .../hdfs/server/datanode/ReplicaInPipeline.java |   7 +-
 .../fsdataset/ReplicaOutputStreams.java         |   9 +-
 .../datanode/fsdataset/impl/BlockPoolSlice.java |   9 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  82 +++-
 .../impl/RamDiskAsyncLazyPersistService.java    |   2 +-
 .../impl/RamDiskReplicaLruTracker.java          |   4 +-
 .../server/datanode/SimulatedFSDataset.java     |   3 +-
 .../fsdataset/impl/LazyPersistTestCase.java     | 389 +++++++++++++++++++
 .../fsdataset/impl/TestLazyPersistFiles.java    | 326 ++--------------
 .../fsdataset/impl/TestScrLazyPersistFiles.java | 356 ++++++++---------
 27 files changed, 1155 insertions(+), 710 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
index 19cbb6f..934421a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
@@ -51,7 +51,7 @@ abstract public class FSOutputSummer extends OutputStream {
   protected FSOutputSummer(DataChecksum sum) {
     this.sum = sum;
     this.buf = new byte[sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS];
-    this.checksum = new byte[sum.getChecksumSize() * BUFFER_NUM_CHUNKS];
+    this.checksum = new byte[getChecksumSize() * BUFFER_NUM_CHUNKS];
     this.count = 0;
   }
   
@@ -188,7 +188,12 @@ abstract public class FSOutputSummer extends OutputStream {
   protected synchronized int getBufferedDataSize() {
     return count;
   }
-  
+
+  /** @return the size for a checksum. */
+  protected int getChecksumSize() {
+    return sum.getChecksumSize();
+  }
+
   /** Generate checksums for the given data chunks and output chunks & checksums
    * to the underlying output stream.
    */
@@ -197,9 +202,8 @@ abstract public class FSOutputSummer extends OutputStream {
     sum.calculateChunkedSums(b, off, len, checksum, 0);
     for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
       int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
-      int ckOffset = i / sum.getBytesPerChecksum() * sum.getChecksumSize();
-      writeChunk(b, off + i, chunkLen, checksum, ckOffset,
-          sum.getChecksumSize());
+      int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
+      writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
     }
   }
 
@@ -226,8 +230,7 @@ abstract public class FSOutputSummer extends OutputStream {
    */
   protected synchronized void setChecksumBufSize(int size) {
     this.buf = new byte[size];
-    this.checksum = new byte[((size - 1) / sum.getBytesPerChecksum() + 1) *
-        sum.getChecksumSize()];
+    this.checksum = new byte[sum.getChecksumSize(size)];
     this.count = 0;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
index e070943..da75d1c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
@@ -234,15 +234,14 @@ public final class Options {
    * This is used in FileSystem and FileContext to specify checksum options.
    */
   public static class ChecksumOpt {
-    private final int crcBlockSize;
-    private final DataChecksum.Type crcType;
+    private final DataChecksum.Type checksumType;
+    private final int bytesPerChecksum;
 
     /**
      * Create a uninitialized one
      */
     public ChecksumOpt() {
-      crcBlockSize = -1;
-      crcType = DataChecksum.Type.DEFAULT;
+      this(DataChecksum.Type.DEFAULT, -1);
     }
 
     /**
@@ -251,16 +250,21 @@ public final class Options {
      * @param size bytes per checksum
      */
     public ChecksumOpt(DataChecksum.Type type, int size) {
-      crcBlockSize = size;
-      crcType = type;
+      checksumType = type;
+      bytesPerChecksum = size;
     }
 
     public int getBytesPerChecksum() {
-      return crcBlockSize;
+      return bytesPerChecksum;
     }
 
     public DataChecksum.Type getChecksumType() {
-      return crcType;
+      return checksumType;
+    }
+    
+    @Override
+    public String toString() {
+      return checksumType + ":" + bytesPerChecksum;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
index 2400958..f0aca3a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
@@ -869,7 +869,8 @@ public class NativeIO {
    * @throws IOException
    */
   public static void copyFileUnbuffered(File src, File dst) throws IOException {
-    if ((nativeLoaded) && (Shell.WINDOWS || Shell.LINUX)) {
+    if ((nativeLoaded) &&
+        (Shell.WINDOWS || (Shell.isLinuxSendfileAvailable))) {
       copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath());
     } else {
       FileUtils.copyFile(src, dst);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
index 9f0ee35..a38ec32 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
@@ -37,9 +37,6 @@ import org.apache.hadoop.fs.ChecksumException;
 @InterfaceStability.Evolving
 public class DataChecksum implements Checksum {
   
-  // Misc constants
-  public static final int HEADER_LEN = 5; /// 1 byte type and 4 byte len
-  
   // checksum types
   public static final int CHECKSUM_NULL    = 0;
   public static final int CHECKSUM_CRC32   = 1;
@@ -103,7 +100,7 @@ public class DataChecksum implements Checksum {
    * @return DataChecksum of the type in the array or null in case of an error.
    */
   public static DataChecksum newDataChecksum( byte bytes[], int offset ) {
-    if ( offset < 0 || bytes.length < offset + HEADER_LEN ) {
+    if (offset < 0 || bytes.length < offset + getChecksumHeaderSize()) {
       return null;
     }
     
@@ -116,8 +113,8 @@ public class DataChecksum implements Checksum {
   }
   
   /**
-   * This constructucts a DataChecksum by reading HEADER_LEN bytes from
-   * input stream <i>in</i>
+   * This constructs a DataChecksum by reading HEADER_LEN bytes from input
+   * stream <i>in</i>
    */
   public static DataChecksum newDataChecksum( DataInputStream in )
                                  throws IOException {
@@ -141,7 +138,7 @@ public class DataChecksum implements Checksum {
   }
 
   public byte[] getHeader() {
-    byte[] header = new byte[DataChecksum.HEADER_LEN];
+    byte[] header = new byte[getChecksumHeaderSize()];
     header[0] = (byte) (type.id & 0xff);
     // Writing in buffer just like DataOutput.WriteInt()
     header[1+0] = (byte) ((bytesPerChecksum >>> 24) & 0xff);
@@ -229,13 +226,18 @@ public class DataChecksum implements Checksum {
     bytesPerChecksum = chunkSize;
   }
   
-  // Accessors
+  /** @return the checksum algorithm type. */
   public Type getChecksumType() {
     return type;
   }
+  /** @return the size for a checksum. */
   public int getChecksumSize() {
     return type.size;
   }
+  /** @return the required checksum size given the data length. */
+  public int getChecksumSize(int dataSize) {
+    return ((dataSize - 1)/getBytesPerChecksum() + 1) * getChecksumSize(); 
+  }
   public int getBytesPerChecksum() {
     return bytesPerChecksum;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
index 3aac27b..9b2a824 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
@@ -377,6 +377,117 @@ abstract public class Shell {
     return winUtilsPath;
   }
 
+  public static class LinuxKernelVersion implements Comparable<LinuxKernelVersion>{
+    private final short major;
+    private final short minor;
+    private final short revision;
+
+    public LinuxKernelVersion(short major, short minor, short revision) {
+      this.major = major;
+      this.minor = minor;
+      this.revision = revision;
+    }
+
+    /**
+     * Parse Linux kernel version string from output of POSIX command 'uname -r'
+     * @param version version string from POSIX command 'uname -r'
+     * @return LinuxKernelVersion
+     * @throws IllegalArgumentException
+     *
+     * Note:
+     * On CentOS 5.8: '2.6.18-308.24.1.el5'
+     * On Ubuntu 14:  '3.13.0-32-generic'
+     */
+    public static LinuxKernelVersion parseLinuxKernelVersion(String version)
+        throws IllegalArgumentException {
+      if (version == null) {
+        throw new IllegalArgumentException();
+      }
+      String parts[] = version.split("-")[0].split("\\.");
+      if (parts.length != 3) {
+        throw new IllegalArgumentException(version);
+      }
+      short major = Short.parseShort(parts[0]);
+      short minor = Short.parseShort(parts[1]);
+      short revision = Short.parseShort(parts[2]);
+      return new LinuxKernelVersion(major, minor, revision);
+    }
+
+    @Override
+    public int compareTo(LinuxKernelVersion o) {
+      if (this.major == o.major) {
+        if (this.minor == o.minor) {
+          return this.revision - o.revision;
+        } else {
+          return this.minor - o.minor;
+        }
+      } else {
+        return this.major - o.major;
+      }
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (this == other) {
+        return true;
+      }
+      if (!(other instanceof LinuxKernelVersion)) {
+        return false;
+      }
+      return compareTo((LinuxKernelVersion) other) == 0;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%d.%d.%d", major, minor, revision);
+    }
+
+    @Override
+    public int hashCode(){
+      int hash = 41;
+      hash = (19 * hash) + major;
+      hash = (53 * hash) + minor;
+      hash = (29 * hash) + revision;
+      return hash;
+    }
+  }
+
+  /*
+   * sendfile() API between two file descriptors
+   * is only supported on Linux Kernel version 2.6.33+
+   * according to http://man7.org/linux/man-pages/man2/sendfile.2.html
+   */
+  public static final boolean isLinuxSendfileAvailable = isLinuxSendfileSupported();
+  private static LinuxKernelVersion minLkvSupportSendfile =
+      new LinuxKernelVersion((short)2, (short)6, (short)33);
+
+  private static boolean isLinuxSendfileSupported() {
+    if (!Shell.LINUX) {
+      return false;
+    }
+    ShellCommandExecutor shexec = null;
+    boolean sendfileSupported = false;
+    try {
+      String[] args = {"uname", "bash", "-r"};
+      shexec = new ShellCommandExecutor(args);
+      shexec.execute();
+      String version = shexec.getOutput();
+      LinuxKernelVersion lkv =
+          LinuxKernelVersion.parseLinuxKernelVersion(version);
+      if (lkv.compareTo(minLkvSupportSendfile) > 0) {
+        sendfileSupported = true;
+      }
+    } catch (Exception e) {
+      LOG.warn("isLinuxSendfileSupported() failed unexpected: " + e);
+    } finally {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("uname exited with exit code "
+            + (shexec != null ? shexec.getExitCode() : "(null executor)"));
+      }
+    }
+    return sendfileSupported;
+  }
+
   public static final boolean isSetsidAvailable = isSetsidSupported();
   private static boolean isSetsidSupported() {
     if (Shell.WINDOWS) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
index d9dc9ef..19589f8 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
@@ -165,4 +165,24 @@ public class TestShell extends TestCase {
       assertEquals(2, command.getRunCount());
     }
   }
+
+  public void testLinuxKernelVersion() throws IOException {
+    Shell.LinuxKernelVersion v2_6_18 =
+        new Shell.LinuxKernelVersion((short)2, (short)6, (short)18);
+    Shell.LinuxKernelVersion v2_6_32 =
+        new Shell.LinuxKernelVersion((short)2, (short)6, (short)32);
+    assertTrue(v2_6_18.compareTo(v2_6_32) < 0);
+  }
+
+  public void testParseLinuxKernelVersion() throws Exception {
+    String centOs58Ver = new String("2.6.18-308.24.1.el5");
+    String ubuntu14Ver = new String("3.13.0-32-generic");
+    Shell.LinuxKernelVersion lkvCentOs58 =
+        Shell.LinuxKernelVersion.parseLinuxKernelVersion(centOs58Ver);
+    Shell.LinuxKernelVersion lkvUnbuntu14 =
+        Shell.LinuxKernelVersion.parseLinuxKernelVersion(ubuntu14Ver);
+    assertTrue(lkvUnbuntu14.compareTo(lkvCentOs58) > 0);
+    assertFalse(lkvUnbuntu14.equals(lkvCentOs58));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 38695e0..0eb2fe5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -686,6 +686,9 @@ Release 2.6.0 - UNRELEASED
       HDFS-7090. Use unbuffered writes when persisting in-memory replicas.
       (Xiaoyu Yao via cnauroth)
 
+      HDFS-6934. Move checksum computation off the hot path when writing to RAM
+      disk. (cnauroth)
+
     BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
   
       HDFS-6387. HDFS CLI admin tool for creating & deleting an

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
index 3fb442b..13e0a52 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -110,6 +110,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
   private DatanodeInfo datanode;
 
   /**
+   * StorageType of replica on DataNode.
+   */
+  private StorageType storageType;
+
+  /**
    * If false, we won't try short-circuit local reads.
    */
   private boolean allowShortCircuitLocalReads;
@@ -201,6 +206,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
     return this;
   }
 
+  public BlockReaderFactory setStorageType(StorageType storageType) {
+    this.storageType = storageType;
+    return this;
+  }
+
   public BlockReaderFactory setAllowShortCircuitLocalReads(
       boolean allowShortCircuitLocalReads) {
     this.allowShortCircuitLocalReads = allowShortCircuitLocalReads;
@@ -353,7 +363,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
     try {
       return BlockReaderLocalLegacy.newBlockReader(conf,
           userGroupInformation, configuration, fileName, block, token,
-          datanode, startOffset, length);
+          datanode, startOffset, length, storageType);
     } catch (RemoteException remoteException) {
       ioe = remoteException.unwrapRemoteException(
                 InvalidToken.class, AccessControlException.class);
@@ -415,6 +425,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
         setShortCircuitReplica(info.getReplica()).
         setVerifyChecksum(verifyChecksum).
         setCachingStrategy(cachingStrategy).
+        setStorageType(storageType).
         build();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
index cd75e53..a3bfde7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
@@ -66,6 +66,7 @@ class BlockReaderLocal implements BlockReader {
     private ShortCircuitReplica replica;
     private long dataPos;
     private ExtendedBlock block;
+    private StorageType storageType;
 
     public Builder(Conf conf) {
       this.maxReadahead = Integer.MAX_VALUE;
@@ -106,6 +107,11 @@ class BlockReaderLocal implements BlockReader {
       return this;
     }
 
+    public Builder setStorageType(StorageType storageType) {
+      this.storageType = storageType;
+      return this;
+    }
+
     public BlockReaderLocal build() {
       Preconditions.checkNotNull(replica);
       return new BlockReaderLocal(this);
@@ -209,6 +215,11 @@ class BlockReaderLocal implements BlockReader {
    */
   private ByteBuffer checksumBuf;
 
+  /**
+   * StorageType of replica on DataNode.
+   */
+  private StorageType storageType;
+
   private BlockReaderLocal(Builder builder) {
     this.replica = builder.replica;
     this.dataIn = replica.getDataStream().getChannel();
@@ -237,6 +248,7 @@ class BlockReaderLocal implements BlockReader {
       this.zeroReadaheadRequested = false;
     }
     this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
+    this.storageType = builder.storageType;
   }
 
   private synchronized void createDataBufIfNeeded() {
@@ -327,8 +339,8 @@ class BlockReaderLocal implements BlockReader {
         int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum;
         checksumBuf.clear();
         checksumBuf.limit(checksumsNeeded * checksumSize);
-        long checksumPos =
-          7 + ((startDataPos / bytesPerChecksum) * checksumSize);
+        long checksumPos = BlockMetadataHeader.getHeaderSize()
+            + ((startDataPos / bytesPerChecksum) * checksumSize);
         while (checksumBuf.hasRemaining()) {
           int nRead = checksumIn.read(checksumBuf, checksumPos);
           if (nRead < 0) {
@@ -350,7 +362,14 @@ class BlockReaderLocal implements BlockReader {
 
   private boolean createNoChecksumContext() {
     if (verifyChecksum) {
-      return replica.addNoChecksumAnchor();
+      if (storageType != null && storageType.isTransient()) {
+        // Checksums are not stored for replicas on transient storage.  We do not
+        // anchor, because we do not intend for client activity to block eviction
+        // from transient storage on the DataNode side.
+        return true;
+      } else {
+        return replica.addNoChecksumAnchor();
+      }
     } else {
       return true;
     }
@@ -358,7 +377,9 @@ class BlockReaderLocal implements BlockReader {
 
   private void releaseNoChecksumContext() {
     if (verifyChecksum) {
-      replica.removeNoChecksumAnchor();
+      if (storageType == null || !storageType.isTransient()) {
+        replica.removeNoChecksumAnchor();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
index 4745575..95c7178 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
@@ -177,7 +177,8 @@ class BlockReaderLocalLegacy implements BlockReader {
       UserGroupInformation userGroupInformation,
       Configuration configuration, String file, ExtendedBlock blk,
       Token<BlockTokenIdentifier> token, DatanodeInfo node, 
-      long startOffset, long length) throws IOException {
+      long startOffset, long length, StorageType storageType)
+      throws IOException {
     LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
         .getIpcPort());
     // check the cache first
@@ -188,7 +189,7 @@ class BlockReaderLocalLegacy implements BlockReader {
       }
       pathinfo = getBlockPathInfo(userGroupInformation, blk, node,
           configuration, conf.socketTimeout, token,
-          conf.connectToDnViaHostname);
+          conf.connectToDnViaHostname, storageType);
     }
 
     // check to see if the file exists. It may so happen that the
@@ -200,7 +201,8 @@ class BlockReaderLocalLegacy implements BlockReader {
     FileInputStream dataIn = null;
     FileInputStream checksumIn = null;
     BlockReaderLocalLegacy localBlockReader = null;
-    boolean skipChecksumCheck = conf.skipShortCircuitChecksums;
+    boolean skipChecksumCheck = conf.skipShortCircuitChecksums ||
+        storageType.isTransient();
     try {
       // get a local file system
       File blkfile = new File(pathinfo.getBlockPath());
@@ -217,15 +219,8 @@ class BlockReaderLocalLegacy implements BlockReader {
         File metafile = new File(pathinfo.getMetaPath());
         checksumIn = new FileInputStream(metafile);
 
-        // read and handle the common header here. For now just a version
-        BlockMetadataHeader header = BlockMetadataHeader
-            .readHeader(new DataInputStream(checksumIn));
-        short version = header.getVersion();
-        if (version != BlockMetadataHeader.VERSION) {
-          LOG.warn("Wrong version (" + version + ") for metadata file for "
-              + blk + " ignoring ...");
-        }
-        DataChecksum checksum = header.getChecksum();
+        final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
+            new DataInputStream(checksumIn), blk);
         long firstChunkOffset = startOffset
             - (startOffset % checksum.getBytesPerChecksum());
         localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token,
@@ -266,8 +261,8 @@ class BlockReaderLocalLegacy implements BlockReader {
   
   private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi,
       ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout,
-      Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname)
-      throws IOException {
+      Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname,
+      StorageType storageType) throws IOException {
     LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
     BlockLocalPathInfo pathinfo = null;
     ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node,
@@ -275,7 +270,15 @@ class BlockReaderLocalLegacy implements BlockReader {
     try {
       // make RPC to local datanode to find local pathnames of blocks
       pathinfo = proxy.getBlockLocalPathInfo(blk, token);
-      if (pathinfo != null) {
+      // We cannot cache the path information for a replica on transient storage.
+      // If the replica gets evicted, then it moves to a different path.  Then,
+      // our next attempt to read from the cached path would fail to find the
+      // file.  Additionally, the failure would cause us to disable legacy
+      // short-circuit read for all subsequent use in the ClientContext.  Unlike
+      // the newer short-circuit read implementation, we have no communication
+      // channel for the DataNode to notify the client that the path has been
+      // invalidated.  Therefore, our only option is to skip caching.
+      if (pathinfo != null && !storageType.isTransient()) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Cached location of block " + blk + " as " + pathinfo);
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 2c75b5e..08cc58f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -96,6 +96,7 @@ import javax.net.SocketFactory;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CipherSuite;
@@ -513,8 +514,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
       return createChecksum(null);
     }
 
-    private DataChecksum createChecksum(ChecksumOpt userOpt) 
-        throws IOException {
+    private DataChecksum createChecksum(ChecksumOpt userOpt) {
       // Fill in any missing field with the default.
       ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt(
           defaultChecksumOpt, userOpt);
@@ -522,8 +522,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
           myOpt.getChecksumType(),
           myOpt.getBytesPerChecksum());
       if (dataChecksum == null) {
-        throw new IOException("Invalid checksum type specified: "
-            + myOpt.getChecksumType().name());
+        throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt="
+            + userOpt + ", default=" + defaultChecksumOpt
+            + ", effective=null");
       }
       return dataChecksum;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index af1ba14..ff65ebc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -567,6 +568,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       DNAddrPair retval = chooseDataNode(targetBlock, null);
       chosenNode = retval.info;
       InetSocketAddress targetAddr = retval.addr;
+      StorageType storageType = retval.storageType;
 
       try {
         ExtendedBlock blk = targetBlock.getBlock();
@@ -575,6 +577,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
             setInetSocketAddress(targetAddr).
             setRemotePeerFactory(dfsClient).
             setDatanodeInfo(chosenNode).
+            setStorageType(storageType).
             setFileName(src).
             setBlock(blk).
             setBlockToken(accessToken).
@@ -872,12 +875,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   private DNAddrPair chooseDataNode(LocatedBlock block,
       Collection<DatanodeInfo> ignoredNodes) throws IOException {
     while (true) {
-      DatanodeInfo[] nodes = block.getLocations();
       try {
-        return getBestNodeDNAddrPair(nodes, ignoredNodes);
+        return getBestNodeDNAddrPair(block, ignoredNodes);
       } catch (IOException ie) {
-        String errMsg =
-          getBestNodeDNAddrPairErrorString(nodes, deadNodes, ignoredNodes);
+        String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
+          deadNodes, ignoredNodes);
         String blockInfo = block.getBlock() + " file=" + src;
         if (failures >= dfsClient.getMaxBlockAcquireFailures()) {
           String description = "Could not obtain block: " + blockInfo;
@@ -886,7 +888,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
           throw new BlockMissingException(src, description,
               block.getStartOffset());
         }
-        
+
+        DatanodeInfo[] nodes = block.getLocations();
         if (nodes == null || nodes.length == 0) {
           DFSClient.LOG.info("No node available for " + blockInfo);
         }
@@ -920,22 +923,44 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   /**
-   * Get the best node.
-   * @param nodes Nodes to choose from.
-   * @param ignoredNodes Do not chose nodes in this array (may be null)
+   * Get the best node from which to stream the data.
+   * @param block LocatedBlock, containing nodes in priority order.
+   * @param ignoredNodes Do not choose nodes in this array (may be null)
    * @return The DNAddrPair of the best node.
    * @throws IOException
    */
-  private DNAddrPair getBestNodeDNAddrPair(final DatanodeInfo[] nodes,
+  private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
       Collection<DatanodeInfo> ignoredNodes) throws IOException {
-    DatanodeInfo chosenNode = bestNode(nodes, deadNodes, ignoredNodes);
+    DatanodeInfo[] nodes = block.getLocations();
+    StorageType[] storageTypes = block.getStorageTypes();
+    DatanodeInfo chosenNode = null;
+    StorageType storageType = null;
+    if (nodes != null) {
+      for (int i = 0; i < nodes.length; i++) {
+        if (!deadNodes.containsKey(nodes[i])
+            && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
+          chosenNode = nodes[i];
+          // Storage types are ordered to correspond with nodes, so use the same
+          // index to get storage type.
+          if (storageTypes != null && i < storageTypes.length) {
+            storageType = storageTypes[i];
+          }
+          break;
+        }
+      }
+    }
+    if (chosenNode == null) {
+      throw new IOException("No live nodes contain block " + block.getBlock() +
+          " after checking nodes = " + Arrays.toString(nodes) +
+          ", ignoredNodes = " + ignoredNodes);
+    }
     final String dnAddr =
         chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
     if (DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
     }
     InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
-    return new DNAddrPair(chosenNode, targetAddr);
+    return new DNAddrPair(chosenNode, targetAddr, storageType);
   }
 
   private static String getBestNodeDNAddrPairErrorString(
@@ -1018,6 +1043,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       }
       DatanodeInfo chosenNode = datanode.info;
       InetSocketAddress targetAddr = datanode.addr;
+      StorageType storageType = datanode.storageType;
       BlockReader reader = null;
 
       try {
@@ -1028,6 +1054,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
             setInetSocketAddress(targetAddr).
             setRemotePeerFactory(dfsClient).
             setDatanodeInfo(chosenNode).
+            setStorageType(storageType).
             setFileName(src).
             setBlock(block.getBlock()).
             setBlockToken(blockToken).
@@ -1151,7 +1178,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         // If no nodes to do hedged reads against, pass.
         try {
           try {
-            chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored);
+            chosenNode = getBestNodeDNAddrPair(block, ignored);
           } catch (IOException ioe) {
             chosenNode = chooseDataNode(block, ignored);
           }
@@ -1494,31 +1521,17 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     throw new IOException("Mark/reset not supported");
   }
 
-  /**
-   * Pick the best node from which to stream the data.
-   * Entries in <i>nodes</i> are already in the priority order
-   */
-  static DatanodeInfo bestNode(DatanodeInfo nodes[],
-      AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes,
-      Collection<DatanodeInfo> ignoredNodes) throws IOException {
-    if (nodes != null) {
-      for (int i = 0; i < nodes.length; i++) {
-        if (!deadNodes.containsKey(nodes[i])
-            && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
-          return nodes[i];
-        }
-      }
-    }
-    throw new IOException("No live nodes contain current block");
-  }
-
   /** Utility class to encapsulate data node info and its address. */
-  static class DNAddrPair {
+  private static final class DNAddrPair {
     final DatanodeInfo info;
     final InetSocketAddress addr;
-    DNAddrPair(DatanodeInfo info, InetSocketAddress addr) {
+    final StorageType storageType;
+
+    DNAddrPair(DatanodeInfo info, InetSocketAddress addr,
+        StorageType storageType) {
       this.info = info;
       this.addr = addr;
+      this.storageType = storageType;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 60178c7..a83c854 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -42,6 +42,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CreateFlag;
@@ -89,9 +91,9 @@ import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.DataChecksum.Type;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Time;
-
 import org.htrace.Span;
 import org.htrace.Trace;
 import org.htrace.TraceScope;
@@ -148,7 +150,10 @@ public class DFSOutputStream extends FSOutputSummer
   private String src;
   private final long fileId;
   private final long blockSize;
-  private final DataChecksum checksum;
+  /** Only for DataTransferProtocol.writeBlock(..) */
+  private final DataChecksum checksum4WriteBlock;
+  private final int bytesPerChecksum; 
+
   // both dataQueue and ackQueue are protected by dataQueue lock
   private final LinkedList<Packet> dataQueue = new LinkedList<Packet>();
   private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
@@ -245,6 +250,9 @@ public class DFSOutputStream extends FSOutputSummer
     }
 
     void writeChecksum(byte[] inarray, int off, int len) {
+      if (len == 0) {
+        return;
+      }
       if (checksumPos + len > dataStart) {
         throw new BufferOverflowException();
       }
@@ -378,18 +386,11 @@ public class DFSOutputStream extends FSOutputSummer
     private final Span traceSpan;
 
     /**
-     * Default construction for file create
-     */
-    private DataStreamer() {
-      this(null, null);
-    }
-
-    /**
      * construction with tracing info
      */
     private DataStreamer(HdfsFileStatus stat, Span span) {
       isAppend = false;
-      isLazyPersistFile = initLazyPersist(stat);
+      isLazyPersistFile = isLazyPersist(stat);
       stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
       traceSpan = span;
     }
@@ -409,7 +410,7 @@ public class DFSOutputStream extends FSOutputSummer
       block = lastBlock.getBlock();
       bytesSent = block.getNumBytes();
       accessToken = lastBlock.getBlockToken();
-      isLazyPersistFile = initLazyPersist(stat);
+      isLazyPersistFile = isLazyPersist(stat);
       long usedInLastBlock = stat.getLen() % blockSize;
       int freeInLastBlock = (int)(blockSize - usedInLastBlock);
 
@@ -452,13 +453,6 @@ public class DFSOutputStream extends FSOutputSummer
 
       }
     }
-    
-    private boolean initLazyPersist(HdfsFileStatus stat) {
-      final BlockStoragePolicy lpPolicy = blockStoragePolicySuite
-          .getPolicy(HdfsConstants.MEMORY_STORAGE_POLICY_NAME);
-      return lpPolicy != null &&
-             stat.getStoragePolicy() == lpPolicy.getId();
-    }
 
     private void setPipeline(LocatedBlock lb) {
       setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
@@ -553,7 +547,7 @@ public class DFSOutputStream extends FSOutputSummer
             }
             // get packet to be sent.
             if (dataQueue.isEmpty()) {
-              one = new Packet(checksum.getChecksumSize());  // heartbeat packet
+              one = new Packet(getChecksumSize());  // heartbeat packet
             } else {
               one = dataQueue.getFirst(); // regular data packet
             }
@@ -1408,8 +1402,8 @@ public class DFSOutputStream extends FSOutputSummer
           // send the request
           new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
               dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, 
-              nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
-              cachingStrategy.get(), isLazyPersistFile);
+              nodes.length, block.getNumBytes(), bytesSent, newGS,
+              checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile);
   
           // receive ack for connect
           BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@@ -1618,9 +1612,23 @@ public class DFSOutputStream extends FSOutputSummer
     return value;
   }
 
+  /** 
+   * @return the object for computing checksum.
+   *         The type is NULL if checksum is not computed.
+   */
+  private static DataChecksum getChecksum4Compute(DataChecksum checksum,
+      HdfsFileStatus stat) {
+    if (isLazyPersist(stat) && stat.getReplication() == 1) {
+      // do not compute checksum for writing to single replica to memory
+      return DataChecksum.newDataChecksum(Type.NULL,
+          checksum.getBytesPerChecksum());
+    }
+    return checksum;
+  }
+ 
   private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
       HdfsFileStatus stat, DataChecksum checksum) throws IOException {
-    super(checksum);
+    super(getChecksum4Compute(checksum, stat));
     this.dfsClient = dfsClient;
     this.src = src;
     this.fileId = stat.getFileId();
@@ -1635,15 +1643,18 @@ public class DFSOutputStream extends FSOutputSummer
           "Set non-null progress callback on DFSOutputStream " + src);
     }
     
-    final int bytesPerChecksum = checksum.getBytesPerChecksum();
-    if ( bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) {
-      throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum +
-                            ") and blockSize(" + blockSize + 
-                            ") do not match. " + "blockSize should be a " +
-                            "multiple of io.bytes.per.checksum");
-                            
-    }
-    this.checksum = checksum;
+    this.bytesPerChecksum = checksum.getBytesPerChecksum();
+    if (bytesPerChecksum <= 0) {
+      throw new HadoopIllegalArgumentException(
+          "Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0");
+    }
+    if (blockSize % bytesPerChecksum != 0) {
+      throw new HadoopIllegalArgumentException("Invalid values: "
+          + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
+          + ") must divide block size (=" + blockSize + ").");
+    }
+    this.checksum4WriteBlock = checksum;
+
     this.dfsclientSlowLogThresholdMs =
       dfsClient.getConf().dfsclientSlowIoWarningThresholdMs;
   }
@@ -1655,8 +1666,7 @@ public class DFSOutputStream extends FSOutputSummer
     this(dfsClient, src, progress, stat, checksum);
     this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
 
-    computePacketChunkSize(dfsClient.getConf().writePacketSize,
-        checksum.getBytesPerChecksum());
+    computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
 
     Span traceSpan = null;
     if (Trace.isTracing()) {
@@ -1734,11 +1744,9 @@ public class DFSOutputStream extends FSOutputSummer
     if (lastBlock != null) {
       // indicate that we are appending to an existing block
       bytesCurBlock = lastBlock.getBlockSize();
-      streamer = new DataStreamer(lastBlock, stat,
-          checksum.getBytesPerChecksum(), traceSpan);
+      streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum, traceSpan);
     } else {
-      computePacketChunkSize(dfsClient.getConf().writePacketSize,
-          checksum.getBytesPerChecksum());
+      computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
       streamer = new DataStreamer(stat, traceSpan);
     }
     this.fileEncryptionInfo = stat.getFileEncryptionInfo();
@@ -1752,9 +1760,15 @@ public class DFSOutputStream extends FSOutputSummer
     out.start();
     return out;
   }
+  
+  private static boolean isLazyPersist(HdfsFileStatus stat) {
+    final BlockStoragePolicy p = blockStoragePolicySuite.getPolicy(
+        HdfsConstants.MEMORY_STORAGE_POLICY_NAME);
+    return p != null && stat.getStoragePolicy() == p.getId();
+  }
 
   private void computePacketChunkSize(int psize, int csize) {
-    int chunkSize = csize + checksum.getChecksumSize();
+    final int chunkSize = csize + getChecksumSize();
     chunksPerPacket = Math.max(psize/chunkSize, 1);
     packetSize = chunkSize*chunksPerPacket;
     if (DFSClient.LOG.isDebugEnabled()) {
@@ -1811,21 +1825,19 @@ public class DFSOutputStream extends FSOutputSummer
     dfsClient.checkOpen();
     checkClosed();
 
-    int bytesPerChecksum = this.checksum.getBytesPerChecksum(); 
     if (len > bytesPerChecksum) {
       throw new IOException("writeChunk() buffer size is " + len +
                             " is larger than supported  bytesPerChecksum " +
                             bytesPerChecksum);
     }
-    if (cklen != this.checksum.getChecksumSize()) {
+    if (cklen != 0 && cklen != getChecksumSize()) {
       throw new IOException("writeChunk() checksum size is supposed to be " +
-                            this.checksum.getChecksumSize() + 
-                            " but found to be " + cklen);
+                            getChecksumSize() + " but found to be " + cklen);
     }
 
     if (currentPacket == null) {
       currentPacket = new Packet(packetSize, chunksPerPacket, 
-          bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
+          bytesCurBlock, currentSeqno++, getChecksumSize());
       if (DFSClient.LOG.isDebugEnabled()) {
         DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
             currentPacket.seqno +
@@ -1873,7 +1885,7 @@ public class DFSOutputStream extends FSOutputSummer
       //
       if (bytesCurBlock == blockSize) {
         currentPacket = new Packet(0, 0, bytesCurBlock, 
-            currentSeqno++, this.checksum.getChecksumSize());
+            currentSeqno++, getChecksumSize());
         currentPacket.lastPacketInBlock = true;
         currentPacket.syncBlock = shouldSyncBlock;
         waitAndQueueCurrentPacket();
@@ -1967,7 +1979,7 @@ public class DFSOutputStream extends FSOutputSummer
             // but sync was requested.
             // Send an empty packet
             currentPacket = new Packet(packetSize, chunksPerPacket,
-                bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
+                bytesCurBlock, currentSeqno++, getChecksumSize());
           }
         } else {
           if (isSync && bytesCurBlock > 0) {
@@ -1976,7 +1988,7 @@ public class DFSOutputStream extends FSOutputSummer
             // and sync was requested.
             // So send an empty sync packet.
             currentPacket = new Packet(packetSize, chunksPerPacket,
-                bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
+                bytesCurBlock, currentSeqno++, getChecksumSize());
           } else {
             // just discard the current packet since it is already been sent.
             currentPacket = null;
@@ -2180,8 +2192,7 @@ public class DFSOutputStream extends FSOutputSummer
 
       if (bytesCurBlock != 0) {
         // send an empty packet to mark the end of the block
-        currentPacket = new Packet(0, 0, bytesCurBlock, 
-            currentSeqno++, this.checksum.getChecksumSize());
+        currentPacket = new Packet(0, 0, bytesCurBlock, currentSeqno++, getChecksumSize());
         currentPacket.lastPacketInBlock = true;
         currentPacket.syncBlock = shouldSyncBlock;
       }
@@ -2245,8 +2256,7 @@ public class DFSOutputStream extends FSOutputSummer
   @VisibleForTesting
   public synchronized void setChunksPerPacket(int value) {
     chunksPerPacket = Math.min(chunksPerPacket, value);
-    packetSize = (checksum.getBytesPerChecksum() + 
-                  checksum.getChecksumSize()) * chunksPerPacket;
+    packetSize = (bytesPerChecksum + getChecksumSize()) * chunksPerPacket;
   }
 
   synchronized void setTestFilename(String newname) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
index 16bcc0b..30368f6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -185,7 +186,11 @@ public class LocatedBlock {
         + "; getBlockSize()=" + getBlockSize()
         + "; corrupt=" + corrupt
         + "; offset=" + offset
-        + "; locs=" + java.util.Arrays.asList(locs)
+        + "; locs=" + Arrays.asList(locs)
+        + "; storageIDs=" +
+            (storageIDs != null ? Arrays.asList(storageIDs) : null)
+        + "; storageTypes=" +
+            (storageTypes != null ? Arrays.asList(storageTypes) : null)
         + "}";
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
index b86cad4..51a6134 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
@@ -29,10 +29,13 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.DataChecksum;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.DataChecksum;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -46,6 +49,7 @@ import com.google.common.annotations.VisibleForTesting;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class BlockMetadataHeader {
+  private static final Log LOG = LogFactory.getLog(BlockMetadataHeader.class);
 
   public static final short VERSION = 1;
   
@@ -74,6 +78,37 @@ public class BlockMetadataHeader {
   }
 
   /**
+   * Read the checksum header from the meta file.
+   * @return the data checksum obtained from the header.
+   */
+  public static DataChecksum readDataChecksum(File metaFile) throws IOException {
+    DataInputStream in = null;
+    try {
+      in = new DataInputStream(new BufferedInputStream(
+        new FileInputStream(metaFile), HdfsConstants.IO_FILE_BUFFER_SIZE));
+      return readDataChecksum(in, metaFile);
+    } finally {
+      IOUtils.closeStream(in);
+    }
+  }
+
+  /**
+   * Read the checksum header from the meta input stream.
+   * @return the data checksum obtained from the header.
+   */
+  public static DataChecksum readDataChecksum(final DataInputStream metaIn,
+      final Object name) throws IOException {
+    // read and handle the common header here. For now just a version
+    final BlockMetadataHeader header = readHeader(metaIn);
+    if (header.getVersion() != VERSION) {
+      LOG.warn("Unexpected meta-file version for " + name
+          + ": version in file is " + header.getVersion()
+          + " but expected version is " + VERSION);
+    }
+    return header.getChecksum();
+  }
+
+  /**
    * Read the header without changing the position of the FileChannel.
    *
    * @param fc The FileChannel to read.
@@ -82,7 +117,7 @@ public class BlockMetadataHeader {
    */
   public static BlockMetadataHeader preadHeader(FileChannel fc)
       throws IOException {
-    byte arr[] = new byte[2 + DataChecksum.HEADER_LEN];
+    final byte arr[] = new byte[getHeaderSize()];
     ByteBuffer buf = ByteBuffer.wrap(arr);
 
     while (buf.hasRemaining()) {
@@ -158,7 +193,7 @@ public class BlockMetadataHeader {
    * Writes all the fields till the beginning of checksum.
    * @throws IOException on error
    */
-  static void writeHeader(DataOutputStream out, DataChecksum checksum)
+  public static void writeHeader(DataOutputStream out, DataChecksum checksum)
                          throws IOException {
     writeHeader(out, new BlockMetadataHeader(VERSION, checksum));
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 4d1cc6c..75f1c36 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -81,12 +81,12 @@ class BlockReceiver implements Closeable {
    * checksum polynomial than the block is stored with on disk,
    * the DataNode needs to recalculate checksums before writing.
    */
-  private boolean needsChecksumTranslation;
+  private final boolean needsChecksumTranslation;
   private OutputStream out = null; // to block file at local disk
   private FileDescriptor outFd;
   private DataOutputStream checksumOut = null; // to crc file at local disk
-  private int bytesPerChecksum;
-  private int checksumSize;
+  private final int bytesPerChecksum;
+  private final int checksumSize;
   
   private final PacketReceiver packetReceiver = new PacketReceiver(false);
   
@@ -98,7 +98,6 @@ class BlockReceiver implements Closeable {
   private DataTransferThrottler throttler;
   private ReplicaOutputStreams streams;
   private DatanodeInfo srcDataNode = null;
-  private Checksum partialCrc = null;
   private final DataNode datanode;
   volatile private boolean mirrorError;
 
@@ -489,7 +488,7 @@ class BlockReceiver implements Closeable {
     long offsetInBlock = header.getOffsetInBlock();
     long seqno = header.getSeqno();
     boolean lastPacketInBlock = header.isLastPacketInBlock();
-    int len = header.getDataLen();
+    final int len = header.getDataLen();
     boolean syncBlock = header.getSyncBlock();
 
     // avoid double sync'ing on close
@@ -498,7 +497,7 @@ class BlockReceiver implements Closeable {
     }
 
     // update received bytes
-    long firstByteInBlock = offsetInBlock;
+    final long firstByteInBlock = offsetInBlock;
     offsetInBlock += len;
     if (replicaInfo.getNumBytes() < offsetInBlock) {
       replicaInfo.setNumBytes(offsetInBlock);
@@ -538,16 +537,15 @@ class BlockReceiver implements Closeable {
         flushOrSync(true);
       }
     } else {
-      int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
-                                                            checksumSize;
+      final int checksumLen = diskChecksum.getChecksumSize(len);
+      final int checksumReceivedLen = checksumBuf.capacity();
 
-      if ( checksumBuf.capacity() != checksumLen) {
-        throw new IOException("Length of checksums in packet " +
-            checksumBuf.capacity() + " does not match calculated checksum " +
-            "length " + checksumLen);
+      if (checksumReceivedLen > 0 && checksumReceivedLen != checksumLen) {
+        throw new IOException("Invalid checksum length: received length is "
+            + checksumReceivedLen + " but expected length is " + checksumLen);
       }
 
-      if (shouldVerifyChecksum()) {
+      if (checksumReceivedLen > 0 && shouldVerifyChecksum()) {
         try {
           verifyChunks(dataBuf, checksumBuf);
         } catch (IOException ioe) {
@@ -571,11 +569,17 @@ class BlockReceiver implements Closeable {
           translateChunks(dataBuf, checksumBuf);
         }
       }
+
+      if (checksumReceivedLen == 0 && !streams.isTransientStorage()) {
+        // checksum is missing, need to calculate it
+        checksumBuf = ByteBuffer.allocate(checksumLen);
+        diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
+      }
       
       // by this point, the data in the buffer uses the disk checksum
 
-      byte[] lastChunkChecksum;
-      
+      final boolean shouldNotWriteChecksum = checksumReceivedLen == 0
+          && streams.isTransientStorage();
       try {
         long onDiskLen = replicaInfo.getBytesOnDisk();
         if (onDiskLen<offsetInBlock) {
@@ -587,14 +591,16 @@ class BlockReceiver implements Closeable {
           }
           
           // If this is a partial chunk, then read in pre-existing checksum
-          if (firstByteInBlock % bytesPerChecksum != 0) {
-            LOG.info("Packet starts at " + firstByteInBlock +
-                     " for " + block +
-                     " which is not a multiple of bytesPerChecksum " +
-                     bytesPerChecksum);
+          Checksum partialCrc = null;
+          if (!shouldNotWriteChecksum && firstByteInBlock % bytesPerChecksum != 0) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("receivePacket for " + block 
+                  + ": bytesPerChecksum=" + bytesPerChecksum                  
+                  + " does not divide firstByteInBlock=" + firstByteInBlock);
+            }
             long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
                 onDiskLen / bytesPerChecksum * checksumSize;
-            computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum);
+            partialCrc = computePartialChunkCrc(onDiskLen, offsetInChecksum);
           }
 
           int startByteToDisk = (int)(onDiskLen-firstByteInBlock) 
@@ -611,41 +617,40 @@ class BlockReceiver implements Closeable {
                 + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
           }
 
-          // If this is a partial chunk, then verify that this is the only
-          // chunk in the packet. Calculate new crc for this chunk.
-          if (partialCrc != null) {
+          final byte[] lastCrc;
+          if (shouldNotWriteChecksum) {
+            lastCrc = null;
+          } else if (partialCrc != null) {
+            // If this is a partial chunk, then verify that this is the only
+            // chunk in the packet. Calculate new crc for this chunk.
             if (len > bytesPerChecksum) {
-              throw new IOException("Got wrong length during writeBlock(" + 
-                                    block + ") from " + inAddr + " " +
-                                    "A packet can have only one partial chunk."+
-                                    " len = " + len + 
-                                    " bytesPerChecksum " + bytesPerChecksum);
+              throw new IOException("Unexpected packet data length for "
+                  +  block + " from " + inAddr + ": a partial chunk must be "
+                  + " sent in an individual packet (data length = " + len
+                  +  " > bytesPerChecksum = " + bytesPerChecksum + ")");
             }
             partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk);
             byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
-            lastChunkChecksum = Arrays.copyOfRange(
-              buf, buf.length - checksumSize, buf.length
-            );
+            lastCrc = copyLastChunkChecksum(buf, checksumSize, buf.length);
             checksumOut.write(buf);
             if(LOG.isDebugEnabled()) {
               LOG.debug("Writing out partial crc for data len " + len);
             }
             partialCrc = null;
           } else {
-            lastChunkChecksum = Arrays.copyOfRange(
-                checksumBuf.array(),
-                checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen - checksumSize,
-                checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen);
-            checksumOut.write(checksumBuf.array(),
-                checksumBuf.arrayOffset() + checksumBuf.position(),
-                checksumLen);
+            // write checksum
+            final int offset = checksumBuf.arrayOffset() +
+                checksumBuf.position();
+            final int end = offset + checksumLen;
+            lastCrc = copyLastChunkChecksum(checksumBuf.array(), checksumSize,
+                end);
+            checksumOut.write(checksumBuf.array(), offset, checksumLen);
           }
+
           /// flush entire packet, sync if requested
           flushOrSync(syncBlock);
           
-          replicaInfo.setLastChecksumAndDataLen(
-            offsetInBlock, lastChunkChecksum
-          );
+          replicaInfo.setLastChecksumAndDataLen(offsetInBlock, lastCrc);
 
           datanode.metrics.incrBytesWritten(len);
 
@@ -685,6 +690,10 @@ class BlockReceiver implements Closeable {
     return lastPacketInBlock?-1:len;
   }
 
+  private static byte[] copyLastChunkChecksum(byte[] array, int size, int end) {
+    return Arrays.copyOfRange(array, end - size, end);
+  }
+
   private void manageWriterOsCache(long offsetInBlock) {
     try {
       if (outFd != null &&
@@ -920,18 +929,19 @@ class BlockReceiver implements Closeable {
    * reads in the partial crc chunk and computes checksum
    * of pre-existing data in partial chunk.
    */
-  private void computePartialChunkCrc(long blkoff, long ckoff, 
-                                      int bytesPerChecksum) throws IOException {
+  private Checksum computePartialChunkCrc(long blkoff, long ckoff)
+      throws IOException {
 
     // find offset of the beginning of partial chunk.
     //
     int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
-    int checksumSize = diskChecksum.getChecksumSize();
     blkoff = blkoff - sizePartialChunk;
-    LOG.info("computePartialChunkCrc sizePartialChunk " + 
-              sizePartialChunk + " " + block +
-              " block offset " + blkoff +
-              " metafile offset " + ckoff);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("computePartialChunkCrc for " + block
+          + ": sizePartialChunk=" + sizePartialChunk
+          + ", block offset=" + blkoff
+          + ", metafile offset=" + ckoff);
+    }
 
     // create an input stream from the block file
     // and read in partial crc chunk into temporary buffer
@@ -950,10 +960,12 @@ class BlockReceiver implements Closeable {
     }
 
     // compute crc of partial chunk from data read in the block file.
-    partialCrc = DataChecksum.newDataChecksum(
+    final Checksum partialCrc = DataChecksum.newDataChecksum(
         diskChecksum.getChecksumType(), diskChecksum.getBytesPerChecksum());
     partialCrc.update(buf, 0, sizePartialChunk);
-    LOG.info("Read in partial CRC chunk from disk for " + block);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Read in partial CRC chunk from disk for " + block);
+    }
 
     // paranoia! verify that the pre-computed crc matches what we
     // recalculated just now
@@ -964,6 +976,7 @@ class BlockReceiver implements Closeable {
                    checksum2long(crcbuf);
       throw new IOException(msg);
     }
+    return partialCrc;
   }
   
   private static enum PacketResponderType {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index febf2de..c8855d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.LongWritable;
@@ -262,26 +263,37 @@ class BlockSender implements java.io.Closeable {
        */
       DataChecksum csum = null;
       if (verifyChecksum || sendChecksum) {
-        final InputStream metaIn = datanode.data.getMetaDataInputStream(block);
-        if (!corruptChecksumOk || metaIn != null) {
-          if (metaIn == null) {
-            //need checksum but meta-data not found
-            throw new FileNotFoundException("Meta-data not found for " + block);
-          }
-
-          checksumIn = new DataInputStream(
-              new BufferedInputStream(metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
+        LengthInputStream metaIn = null;
+        boolean keepMetaInOpen = false;
+        try {
+          metaIn = datanode.data.getMetaDataInputStream(block);
+          if (!corruptChecksumOk || metaIn != null) {
+            if (metaIn == null) {
+              //need checksum but meta-data not found
+              throw new FileNotFoundException("Meta-data not found for " +
+                  block);
+            }
+
+            // The meta file will contain only the header if the NULL checksum
+            // type was used, or if the replica was written to transient storage.
+            // Checksum verification is not performed for replicas on transient
+            // storage.  The header is important for determining the checksum
+            // type later when lazy persistence copies the block to non-transient
+            // storage and computes the checksum.
+            if (metaIn.getLength() > BlockMetadataHeader.getHeaderSize()) {
+              checksumIn = new DataInputStream(new BufferedInputStream(
+                  metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
   
-          // read and handle the common header here. For now just a version
-          BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
-          short version = header.getVersion();
-          if (version != BlockMetadataHeader.VERSION) {
-            LOG.warn("Wrong version (" + version + ") for metadata file for "
-                + block + " ignoring ...");
+              csum = BlockMetadataHeader.readDataChecksum(checksumIn, block);
+              keepMetaInOpen = true;
+            }
+          } else {
+            LOG.warn("Could not find metadata file for " + block);
+          }
+        } finally {
+          if (!keepMetaInOpen) {
+            IOUtils.closeStream(metaIn);
           }
-          csum = header.getChecksum();
-        } else {
-          LOG.warn("Could not find metadata file for " + block);
         }
       }
       if (csum == null) {
@@ -340,7 +352,7 @@ class BlockSender implements java.io.Closeable {
       endOffset = end;
 
       // seek to the right offsets
-      if (offset > 0) {
+      if (offset > 0 && checksumIn != null) {
         long checksumSkip = (offset / chunkSize) * checksumSize;
         // note blockInStream is seeked when created below
         if (checksumSkip > 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index 45862ca..6a26640 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -213,7 +213,7 @@ public class ReplicaInPipeline extends ReplicaInfo
     
     // the checksum that should actually be used -- this
     // may differ from requestedChecksum for appends.
-    DataChecksum checksum;
+    final DataChecksum checksum;
     
     RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
     
@@ -250,7 +250,7 @@ public class ReplicaInPipeline extends ReplicaInfo
         }
       }
     } else {
-			// for create, we can use the requested checksum
+      // for create, we can use the requested checksum
       checksum = requestedChecksum;
     }
     
@@ -264,7 +264,8 @@ public class ReplicaInPipeline extends ReplicaInfo
         blockOut.getChannel().position(blockDiskSize);
         crcOut.getChannel().position(crcDiskSize);
       }
-      return new ReplicaOutputStreams(blockOut, crcOut, checksum);
+      return new ReplicaOutputStreams(blockOut, crcOut, checksum,
+          getVolume().isTransientStorage());
     } catch (IOException e) {
       IOUtils.closeStream(blockOut);
       IOUtils.closeStream(metaRAF);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
index 95044c8..bd1461a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
@@ -32,16 +32,18 @@ public class ReplicaOutputStreams implements Closeable {
   private final OutputStream dataOut;
   private final OutputStream checksumOut;
   private final DataChecksum checksum;
+  private final boolean isTransientStorage;
 
   /**
    * Create an object with a data output stream, a checksum output stream
    * and a checksum.
    */
   public ReplicaOutputStreams(OutputStream dataOut, OutputStream checksumOut,
-      DataChecksum checksum) {
+      DataChecksum checksum, boolean isTransientStorage) {
     this.dataOut = dataOut;
     this.checksumOut = checksumOut;
     this.checksum = checksum;
+    this.isTransientStorage = isTransientStorage;
   }
 
   /** @return the data output stream. */
@@ -59,6 +61,11 @@ public class ReplicaOutputStreams implements Closeable {
     return checksum;
   }
 
+  /** @return is writing to a transient storage? */
+  public boolean isTransientStorage() {
+    return isTransientStorage;
+  }
+
   @Override
   public void close() {
     IOUtils.closeStream(dataOut);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index dce2ff8..e3d1607 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -599,13 +599,8 @@ class BlockPoolSlice {
               HdfsConstants.IO_FILE_BUFFER_SIZE));
 
       // read and handle the common header here. For now just a version
-      BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
-      short version = header.getVersion();
-      if (version != BlockMetadataHeader.VERSION) {
-        FsDatasetImpl.LOG.warn("Wrong version (" + version + ") for metadata file "
-            + metaFile + " ignoring ...");
-      }
-      DataChecksum checksum = header.getChecksum();
+      final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
+          checksumIn, metaFile);
       int bytesPerChecksum = checksum.getBytesPerChecksum();
       int checksumSize = checksum.getChecksumSize();
       long numChunks = Math.min(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index e77ea34..f130d05 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileDescriptor;
 import java.io.FileInputStream;
@@ -59,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -92,6 +95,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlo
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.metrics2.util.MBeans;
@@ -634,7 +638,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * Get the meta info of a block stored in volumeMap. To find a block,
    * block pool Id, block Id and generation stamp must match.
    * @param b extended block
-   * @return the meta replica information; null if block was not found
+   * @return the meta replica information
    * @throws ReplicaNotFoundException if no entry is in the map or 
    *                        there is a generation stamp mismatch
    */
@@ -722,23 +726,80 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
     final File dstFile = new File(destDir, srcFile.getName());
     final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
-    try {
-      Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true);
-    } catch (IOException e) {
-      throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e);
-    }
+    computeChecksum(srcMeta, dstMeta, srcFile);
+
     try {
       Storage.nativeCopyFileUnbuffered(srcFile, dstFile, true);
     } catch (IOException e) {
       throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e);
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Copied " + srcMeta + " to " + dstMeta);
+      LOG.debug("Copied " + srcMeta + " to " + dstMeta +
+          " and calculated checksum");
       LOG.debug("Copied " + srcFile + " to " + dstFile);
     }
     return new File[] {dstMeta, dstFile};
   }
 
+  /**
+   * Compute and store the checksum for a block file that does not already have
+   * its checksum computed.
+   *
+   * @param srcMeta source meta file, containing only the checksum header, not a
+   *     calculated checksum
+   * @param dstMeta destination meta file, into which this method will write a
+   *     full computed checksum
+   * @param blockFile block file for which the checksum will be computed
+   * @throws IOException
+   */
+  private static void computeChecksum(File srcMeta, File dstMeta, File blockFile)
+      throws IOException {
+    final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta);
+    final byte[] data = new byte[1 << 16];
+    final byte[] crcs = new byte[checksum.getChecksumSize(data.length)];
+
+    DataOutputStream metaOut = null;
+    InputStream dataIn = null;
+    try {
+      File parentFile = dstMeta.getParentFile();
+      if (parentFile != null) {
+        if (!parentFile.mkdirs() && !parentFile.isDirectory()) {
+          throw new IOException("Destination '" + parentFile
+              + "' directory cannot be created");
+        }
+      }
+      metaOut = new DataOutputStream(new BufferedOutputStream(
+          new FileOutputStream(dstMeta), HdfsConstants.SMALL_BUFFER_SIZE));
+      BlockMetadataHeader.writeHeader(metaOut, checksum);
+
+      dataIn = isNativeIOAvailable ?
+          NativeIO.getShareDeleteFileInputStream(blockFile) :
+          new FileInputStream(blockFile);
+
+      int offset = 0;
+      for(int n; (n = dataIn.read(data, offset, data.length - offset)) != -1; ) {
+        if (n > 0) {
+          n += offset;
+          offset = n % checksum.getBytesPerChecksum();
+          final int length = n - offset;
+
+          if (length > 0) {
+            checksum.calculateChunkedSums(data, 0, length, crcs, 0);
+            metaOut.write(crcs, 0, checksum.getChecksumSize(length));
+
+            System.arraycopy(data, length, data, 0, offset);
+          }
+        }
+      }
+
+      // calculate and write the last crc
+      checksum.calculateChunkedSums(data, 0, offset, crcs, 0);
+      metaOut.write(crcs, 0, 4);
+    } finally {
+      IOUtils.cleanup(LOG, dataIn, metaOut);
+    }
+  }
+
   static private void truncateBlock(File blockFile, File metaFile,
       long oldlen, long newlen) throws IOException {
     LOG.info("truncateBlock: blockFile=" + blockFile
@@ -1641,6 +1702,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
   }
 
+  @Override
   public boolean isCached(String bpid, long blockId) {
     return cacheManager.isCached(bpid, blockId);
   }
@@ -2556,8 +2618,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         // Before deleting the files from transient storage we must notify the
         // NN that the files are on the new storage. Else a blockReport from
         // the transient storage might cause the NN to think the blocks are lost.
+        // Replicas must be evicted from client short-circuit caches, because the
+        // storage will no longer be transient, and thus will require validating
+        // checksum.  This also stops a client from holding file descriptors,
+        // which would prevent the OS from reclaiming the memory.
         ExtendedBlock extendedBlock =
             new ExtendedBlock(bpid, newReplicaInfo);
+        datanode.getShortCircuitRegistry().processBlockInvalidation(
+            ExtendedBlockId.fromExtendedBlock(extendedBlock));
         datanode.notifyNamenodeReceivedBlock(
             extendedBlock, null, newReplicaInfo.getStorageUuid());
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
index 76acbea..5fdcc2f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
@@ -241,7 +241,7 @@ class RamDiskAsyncLazyPersistService {
       } catch (Exception e){
         FsDatasetImpl.LOG.warn(
             "LazyWriter failed to async persist RamDisk block pool id: "
-            + bpId + "block Id: " + blockId);
+            + bpId + "block Id: " + blockId, e);
       } finally {
         if (!succeeded) {
           datanode.getFSDataset().onFailLazyPersist(bpId, blockId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
index a843d9a..c01a6cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
@@ -168,9 +168,9 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
 
   @Override
   synchronized RamDiskReplicaLru getNextCandidateForEviction() {
-    Iterator it = replicasPersisted.values().iterator();
+    final Iterator<RamDiskReplicaLru> it = replicasPersisted.values().iterator();
     while (it.hasNext()) {
-      RamDiskReplicaLru ramDiskReplicaLru = (RamDiskReplicaLru) it.next();
+      final RamDiskReplicaLru ramDiskReplicaLru = it.next();
       it.remove();
 
       Map<Long, RamDiskReplicaLru> replicaMap =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9f31af2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 0786bc6..83b476f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -248,7 +248,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
             + theBlock);
       } else {
         SimulatedOutputStream crcStream = new SimulatedOutputStream();
-        return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum);
+        return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum,
+            volume.isTransientStorage());
       }
     }