You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/10/17 23:45:15 UTC

[28/34] git commit: HDFS-7129. Metrics to track usage of memory for writes. (Contributed by Xiaoyu Yao)

HDFS-7129. Metrics to track usage of memory for writes. (Contributed by Xiaoyu Yao)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c865c93d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c865c93d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c865c93d

Branch: refs/heads/branch-2
Commit: c865c93dc14dc09e76346114c998e951f12eae33
Parents: 6916d41
Author: arp <ar...@apache.org>
Authored: Tue Sep 30 00:53:18 2014 -0700
Committer: Jitendra Pandey <Ji...@Jitendra-Pandeys-MacBook-Pro-4.local>
Committed: Fri Oct 17 13:42:03 2014 -0700

----------------------------------------------------------------------
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 40 +++++++++-
 .../impl/RamDiskReplicaLruTracker.java          | 20 ++++-
 .../fsdataset/impl/RamDiskReplicaTracker.java   | 23 +++++-
 .../datanode/metrics/DataNodeMetrics.java       | 80 ++++++++++++++++++++
 .../org/apache/hadoop/hdfs/tools/JMXGet.java    | 18 +++++
 .../fsdataset/impl/TestLazyPersistFiles.java    | 80 ++++++++++++++++----
 6 files changed, 238 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c865c93d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 8ed81df..f2daf99 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1013,11 +1013,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         if (allowLazyPersist) {
           // First try to place the block on a transient volume.
           v = volumes.getNextTransientVolume(b.getNumBytes());
+          datanode.getMetrics().incrRamDiskBlocksWrite();
         } else {
           v = volumes.getNextVolume(storageType, b.getNumBytes());
         }
       } catch (DiskOutOfSpaceException de) {
         if (allowLazyPersist) {
+          datanode.getMetrics().incrRamDiskBlocksWriteFallback();
           allowLazyPersist = false;
           continue;
         }
@@ -1245,6 +1247,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
       if (v.isTransientStorage()) {
         ramDiskReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
+        datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes());
       }
     }
     volumeMap.add(bpid, newReplicaInfo);
@@ -1500,7 +1503,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
 
       if (v.isTransientStorage()) {
-        ramDiskReplicaTracker.discardReplica(bpid, invalidBlks[i].getBlockId(), true);
+        RamDiskReplica replicaInfo =
+          ramDiskReplicaTracker.getReplica(bpid, invalidBlks[i].getBlockId());
+        if (replicaInfo != null) {
+          if (replicaInfo.getIsPersisted() ==  false) {
+            datanode.getMetrics().incrRamDiskBlocksDeletedBeforeLazyPersisted();
+          }
+          discardRamDiskReplica(replicaInfo, true);
+        }
       }
 
       // If a DFSClient has the replica in its cache of short-circuit file
@@ -1646,11 +1656,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     if (info != null) {
       if (touch && info.getVolume().isTransientStorage()) {
         ramDiskReplicaTracker.touch(bpid, blockId);
+        datanode.getMetrics().incrRamDiskBlocksReadHits();
       }
       return info.getBlockFile();
     }
     return null;    
   }
+
   /**
    * check if a data directory is healthy
    * if some volumes failed - make sure to remove all the blocks that belong
@@ -2304,6 +2316,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         nbytes, flags);
   }
 
+  void discardRamDiskReplica(RamDiskReplica replica, boolean deleteSavedCopies) {
+    ramDiskReplicaTracker.discardReplica(replica.getBlockPoolId(),
+      replica.getBlockId(), deleteSavedCopies);
+  }
+
   class LazyWriter implements Runnable {
     private volatile boolean shouldRun = true;
     final int checkpointerInterval;
@@ -2327,7 +2344,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT);
     }
 
-    private void moveReplicaToNewVolume(String bpid, long blockId)
+    private void moveReplicaToNewVolume(String bpid, long blockId, long creationTime)
         throws IOException {
 
       FsVolumeImpl targetVolume;
@@ -2369,6 +2386,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       synchronized (FsDatasetImpl.this) {
         ramDiskReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles);
 
+        // Update metrics (ignore the metadata file size)
+        datanode.getMetrics().incrRamDiskBlocksLazyPersisted();
+        datanode.getMetrics().incrRamDiskBytesLazyPersisted(replicaInfo.getNumBytes());
+        datanode.getMetrics().addRamDiskBlocksLazyPersistWindowMs(
+          Time.monotonicNow() - creationTime);
+
         if (LOG.isDebugEnabled()) {
           LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
                         " to file " + savedFiles[1]);
@@ -2388,7 +2411,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       try {
         block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
         if (block != null) {
-          moveReplicaToNewVolume(block.getBlockPoolId(), block.getBlockId());
+          moveReplicaToNewVolume(block.getBlockPoolId(), block.getBlockId(),
+            block.getCreationTime());
         }
         succeeded = true;
       } catch(IOException ioe) {
@@ -2456,7 +2480,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           metaFile = replicaInfo.getMetaFile();
           blockFileUsed = blockFile.length();
           metaFileUsed = metaFile.length();
-          ramDiskReplicaTracker.discardReplica(replicaState, false);
+          discardRamDiskReplica(replicaState, false);
 
           // Move the replica from lazyPersist/ to finalized/ on target volume
           BlockPoolSlice bpSlice =
@@ -2474,6 +2498,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
           // Update the volumeMap entry.
           volumeMap.add(bpid, newReplicaInfo);
+
+          // Update metrics
+          datanode.getMetrics().incrRamDiskBlocksEvicted();
+          datanode.getMetrics().addRamDiskBlocksEvictionWindowMs(
+              Time.monotonicNow() - replicaState.getCreationTime());
+          if (replicaState.getNumReads() == 0) {
+            datanode.getMetrics().incrRamDiskBlocksEvictedWithoutRead();
+          }
         }
 
         // Before deleting the files from transient storage we must notify the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c865c93d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
index 7808003..a843d9a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import com.google.common.collect.TreeMultimap;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.Time;
 
 import java.io.File;
 import java.util.*;
@@ -97,9 +98,11 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
       return;
     }
 
+    ramDiskReplicaLru.numReads.getAndIncrement();
+
     // Reinsert the replica with its new timestamp.
     if (replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru)) {
-      ramDiskReplicaLru.lastUsedTime = System.currentTimeMillis();
+      ramDiskReplicaLru.lastUsedTime = Time.monotonicNow();
       replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
     }
   }
@@ -132,8 +135,9 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
       replicasNotPersisted.remove(ramDiskReplicaLru);
     }
 
-    ramDiskReplicaLru.lastUsedTime = System.currentTimeMillis();
+    ramDiskReplicaLru.lastUsedTime = Time.monotonicNow();
     replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
+    ramDiskReplicaLru.isPersisted = true;
   }
 
   @Override
@@ -215,4 +219,16 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
 
     // replicasNotPersisted will be lazily GC'ed.
   }
+
+  @Override
+  synchronized RamDiskReplica getReplica(
+    final String bpid, final long blockId) {
+    Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+
+    if (map == null) {
+      return null;
+    }
+
+    return map.get(blockId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c865c93d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
index 2401424..7507925 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
 
 import java.io.File;
+import java.util.concurrent.atomic.AtomicLong;
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -44,6 +46,10 @@ public abstract class RamDiskReplicaTracker {
     private File savedBlockFile;
     private File savedMetaFile;
 
+    private long creationTime;
+    protected AtomicLong numReads = new AtomicLong(0);
+    protected boolean isPersisted;
+
     /**
      * RAM_DISK volume that holds the original replica.
      */
@@ -62,6 +68,8 @@ public abstract class RamDiskReplicaTracker {
       lazyPersistVolume = null;
       savedMetaFile = null;
       savedBlockFile = null;
+      creationTime = Time.monotonicNow();
+      isPersisted = false;
     }
 
     long getBlockId() {
@@ -89,6 +97,12 @@ public abstract class RamDiskReplicaTracker {
       return savedMetaFile;
     }
 
+    long getNumReads() { return numReads.get(); }
+
+    long getCreationTime() { return creationTime; }
+
+    boolean getIsPersisted() {return isPersisted; }
+
     /**
      * Record the saved meta and block files on the given volume.
      *
@@ -243,7 +257,10 @@ public abstract class RamDiskReplicaTracker {
       final String bpid, final long blockId,
       boolean deleteSavedCopies);
 
-  void discardReplica(RamDiskReplica replica, boolean deleteSavedCopies) {
-    discardReplica(replica.getBlockPoolId(), replica.getBlockId(), deleteSavedCopies);
-  }
+  /**
+   * Return RamDiskReplica info given block pool id and block id
+   * Return null if it does not exist in RamDisk
+   */
+  abstract RamDiskReplica getReplica(
+    final String bpid, final long blockId);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c865c93d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index b536e7e..57f12db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -65,6 +65,26 @@ public class DataNodeMetrics {
   @Metric MutableCounterLong writesFromRemoteClient;
   @Metric MutableCounterLong blocksGetLocalPathInfo;
 
+  // RamDisk metrics on read/write
+  @Metric MutableCounterLong ramDiskBlocksWrite;
+  @Metric MutableCounterLong ramDiskBlocksWriteFallback;
+  @Metric MutableCounterLong ramDiskBytesWrite;
+  @Metric MutableCounterLong ramDiskBlocksReadHits;
+
+  // RamDisk metrics on eviction
+  @Metric MutableCounterLong ramDiskBlocksEvicted;
+  @Metric MutableCounterLong ramDiskBlocksEvictedWithoutRead;
+  @Metric MutableRate        ramDiskBlocksEvictionWindowMs;
+  final MutableQuantiles[]   ramDiskBlocksEvictionWindowMsQuantiles;
+
+
+  // RamDisk metrics on lazy persist
+  @Metric MutableCounterLong ramDiskBlocksLazyPersisted;
+  @Metric MutableCounterLong ramDiskBlocksDeletedBeforeLazyPersisted;
+  @Metric MutableCounterLong ramDiskBytesLazyPersisted;
+  @Metric MutableRate        ramDiskBlocksLazyPersistWindowMs;
+  final MutableQuantiles[]   ramDiskBlocksLazyPersistWindowMsQuantiles;
+
   @Metric MutableCounterLong fsyncCount;
   
   @Metric MutableCounterLong volumeFailures;
@@ -107,6 +127,8 @@ public class DataNodeMetrics {
     fsyncNanosQuantiles = new MutableQuantiles[len];
     sendDataPacketBlockedOnNetworkNanosQuantiles = new MutableQuantiles[len];
     sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
+    ramDiskBlocksEvictionWindowMsQuantiles = new MutableQuantiles[len];
+    ramDiskBlocksLazyPersistWindowMsQuantiles = new MutableQuantiles[len];
     
     for (int i = 0; i < len; i++) {
       int interval = intervals[i];
@@ -127,6 +149,14 @@ public class DataNodeMetrics {
           "sendDataPacketTransferNanos" + interval + "s", 
           "Time reading from disk and writing to network while sending " +
           "a packet in ns", "ops", "latency", interval);
+      ramDiskBlocksEvictionWindowMsQuantiles[i] = registry.newQuantiles(
+          "ramDiskBlocksEvictionWindows" + interval + "s",
+          "Time between the RamDisk block write and eviction in ms",
+          "ops", "latency", interval);
+      ramDiskBlocksLazyPersistWindowMsQuantiles[i] = registry.newQuantiles(
+          "ramDiskBlocksLazyPersistWindows" + interval + "s",
+          "Time between the RamDisk block write and disk persist in ms",
+          "ops", "latency", interval);
     }
   }
 
@@ -284,4 +314,54 @@ public class DataNodeMetrics {
       q.add(latencyNanos);
     }
   }
+
+  public void incrRamDiskBlocksWrite() {
+    ramDiskBlocksWrite.incr();
+  }
+
+  public void incrRamDiskBlocksWriteFallback() {
+    ramDiskBlocksWriteFallback.incr();
+  }
+
+  public void addRamDiskBytesWrite(long bytes) {
+    ramDiskBytesWrite.incr(bytes);
+  }
+
+  public void incrRamDiskBlocksReadHits() {
+    ramDiskBlocksReadHits.incr();
+  }
+
+  public void incrRamDiskBlocksEvicted() {
+    ramDiskBlocksEvicted.incr();
+  }
+
+  public void incrRamDiskBlocksEvictedWithoutRead() {
+    ramDiskBlocksEvictedWithoutRead.incr();
+  }
+
+  public void addRamDiskBlocksEvictionWindowMs(long latencyMs) {
+    ramDiskBlocksEvictionWindowMs.add(latencyMs);
+    for (MutableQuantiles q : ramDiskBlocksEvictionWindowMsQuantiles) {
+      q.add(latencyMs);
+    }
+  }
+
+  public void incrRamDiskBlocksLazyPersisted() {
+    ramDiskBlocksLazyPersisted.incr();
+  }
+
+  public void incrRamDiskBlocksDeletedBeforeLazyPersisted() {
+    ramDiskBlocksDeletedBeforeLazyPersisted.incr();
+  }
+
+  public void incrRamDiskBytesLazyPersisted(long bytes) {
+    ramDiskBytesLazyPersisted.incr(bytes);
+  }
+
+  public void addRamDiskBlocksLazyPersistWindowMs(long latencyMs) {
+    ramDiskBlocksLazyPersistWindowMs.add(latencyMs);
+    for (MutableQuantiles q : ramDiskBlocksLazyPersistWindowMsQuantiles) {
+      q.add(latencyMs);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c865c93d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java
index bafef25..bbd545a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.regex.Pattern;
 
 import javax.management.AttributeNotFoundException;
 import javax.management.MBeanAttributeInfo;
@@ -109,6 +110,23 @@ public class JMXGet {
     }
   }
 
+  public void printAllMatchedAttributes(String attrRegExp) throws Exception {
+    err("List of the keys matching " + attrRegExp + " :");
+    Object val = null;
+    Pattern p = Pattern.compile(attrRegExp);
+    for (ObjectName oname : hadoopObjectNames) {
+      err(">>>>>>>>jmx name: " + oname.getCanonicalKeyPropertyListString());
+      MBeanInfo mbinfo = mbsc.getMBeanInfo(oname);
+      MBeanAttributeInfo[] mbinfos = mbinfo.getAttributes();
+      for (MBeanAttributeInfo mb : mbinfos) {
+        if (p.matcher(mb.getName()).lookingAt()) {
+          val = mbsc.getAttribute(oname, mb.getName());
+          System.out.format(format, mb.getName(), (val == null) ? "" : val.toString());
+        }
+      }
+    }
+  }
+
   /**
    * get single value by key
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c865c93d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index 928d0d0..91deb55 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -31,19 +31,18 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.tools.JMXGet;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
 import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -58,6 +57,7 @@ import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 public class TestLazyPersistFiles {
@@ -81,14 +81,21 @@ public class TestLazyPersistFiles {
   private static final int LAZY_WRITER_INTERVAL_SEC = 1;
   private static final int BUFFER_LENGTH = 4096;
   private static final int EVICTION_LOW_WATERMARK = 1;
+  private static final String JMX_SERVICE_NAME = "DataNode";
+  private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
 
   private MiniDFSCluster cluster;
   private DistributedFileSystem fs;
   private DFSClient client;
   private Configuration conf;
+  private JMXGet jmx;
 
   @After
-  public void shutDownCluster() throws IOException {
+  public void shutDownCluster() throws Exception {
+
+    // Dump all RamDisk JMX metrics before shutdown the cluster
+    printRamDiskJMXMetrics();
+
     if (fs != null) {
       fs.close();
       fs = null;
@@ -100,6 +107,10 @@ public class TestLazyPersistFiles {
       cluster.shutdown();
       cluster = null;
     }
+
+    if (jmx != null) {
+      jmx = null;
+    }
   }
 
   @Test (timeout=300000)
@@ -203,13 +214,15 @@ public class TestLazyPersistFiles {
    * @throws IOException
    */
   @Test (timeout=300000)
-  public void testFallbackToDiskFull() throws IOException {
+  public void testFallbackToDiskFull() throws Exception {
     startUpCluster(false, 0);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
 
     makeTestFile(path, BLOCK_SIZE, true);
     ensureFileReplicasOnStorageType(path, DEFAULT);
+
+    verifyRamDiskJMXMetric("RamDiskBlocksWriteFallback", 1);
   }
 
   /**
@@ -384,11 +397,10 @@ public class TestLazyPersistFiles {
 
   /**
    * RamDisk eviction after lazy persist to disk.
-   * @throws IOException
-   * @throws InterruptedException
+   * @throws Exception
    */
   @Test (timeout=300000)
-  public void testRamDiskEviction() throws IOException, InterruptedException {
+  public void testRamDiskEviction() throws Exception {
     startUpCluster(true, 1 + EVICTION_LOW_WATERMARK);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
@@ -411,6 +423,9 @@ public class TestLazyPersistFiles {
     // RAM_DISK.
     ensureFileReplicasOnStorageType(path2, RAM_DISK);
     ensureFileReplicasOnStorageType(path1, DEFAULT);
+
+    verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 1);
+    verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 1);
   }
 
   /**
@@ -454,7 +469,7 @@ public class TestLazyPersistFiles {
    */
   @Test (timeout=300000)
   public void testRamDiskEvictionIsLru()
-    throws IOException, InterruptedException {
+    throws Exception {
     final int NUM_PATHS = 5;
     startUpCluster(true, NUM_PATHS + EVICTION_LOW_WATERMARK);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -499,6 +514,14 @@ public class TestLazyPersistFiles {
         ensureFileReplicasOnStorageType(paths[indexes.get(j)], RAM_DISK);
       }
     }
+
+    verifyRamDiskJMXMetric("RamDiskBlocksWrite", NUM_PATHS * 2);
+    verifyRamDiskJMXMetric("RamDiskBlocksWriteFallback", 0);
+    verifyRamDiskJMXMetric("RamDiskBytesWrite", BLOCK_SIZE * NUM_PATHS * 2);
+    verifyRamDiskJMXMetric("RamDiskBlocksReadHits", NUM_PATHS);
+    verifyRamDiskJMXMetric("RamDiskBlocksEvicted", NUM_PATHS);
+    verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 0);
+    verifyRamDiskJMXMetric("RamDiskBlocksDeletedBeforeLazyPersisted", 0);
   }
 
   /**
@@ -506,9 +529,9 @@ public class TestLazyPersistFiles {
    * Memory is freed up and file is gone.
    * @throws IOException
    */
-  @Test (timeout=300000)
+  @Test // (timeout=300000)
   public void testDeleteBeforePersist()
-    throws IOException, InterruptedException {
+    throws Exception {
     startUpCluster(true, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
@@ -523,6 +546,8 @@ public class TestLazyPersistFiles {
     Assert.assertFalse(fs.exists(path));
 
     assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
+
+    verifyRamDiskJMXMetric("RamDiskBlocksDeletedBeforeLazyPersisted", 1);
   }
 
   /**
@@ -533,7 +558,7 @@ public class TestLazyPersistFiles {
    */
   @Test (timeout=300000)
   public void testDeleteAfterPersist()
-    throws IOException, InterruptedException {
+    throws Exception {
     startUpCluster(true, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
@@ -548,9 +573,10 @@ public class TestLazyPersistFiles {
     client.delete(path.toString(), false);
     Assert.assertFalse(fs.exists(path));
 
-    triggerBlockReport();
-
     assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
+
+    verifyRamDiskJMXMetric("RamDiskBlocksLazyPersisted", 1);
+    verifyRamDiskJMXMetric("RamDiskBytesLazyPersisted", BLOCK_SIZE);
   }
 
   /**
@@ -760,6 +786,11 @@ public class TestLazyPersistFiles {
         .build();
     fs = cluster.getFileSystem();
     client = fs.getClient();
+    try {
+      jmx = initJMX();
+    } catch (Exception e) {
+      fail("Failed initialize JMX for testing: " + e);
+    }
     LOG.info("Cluster startup complete");
   }
 
@@ -929,4 +960,25 @@ public class TestLazyPersistFiles {
       }
     }
   }
+
+  JMXGet initJMX() throws Exception
+  {
+    JMXGet jmx = new JMXGet();
+    jmx.setService(JMX_SERVICE_NAME);
+    jmx.init();
+    return jmx;
+  }
+
+  void printRamDiskJMXMetrics() {
+    try {
+      jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  void verifyRamDiskJMXMetric(String metricName, long expectedValue)
+      throws Exception {
+    assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName)));
+  }
 }