You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/10/17 23:45:15 UTC
[28/34] git commit: HDFS-7129. Metrics to track usage of memory for
writes. (Contributed by Xiaoyu Yao)
HDFS-7129. Metrics to track usage of memory for writes. (Contributed by Xiaoyu Yao)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c865c93d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c865c93d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c865c93d
Branch: refs/heads/branch-2
Commit: c865c93dc14dc09e76346114c998e951f12eae33
Parents: 6916d41
Author: arp <ar...@apache.org>
Authored: Tue Sep 30 00:53:18 2014 -0700
Committer: Jitendra Pandey <Ji...@Jitendra-Pandeys-MacBook-Pro-4.local>
Committed: Fri Oct 17 13:42:03 2014 -0700
----------------------------------------------------------------------
.../datanode/fsdataset/impl/FsDatasetImpl.java | 40 +++++++++-
.../impl/RamDiskReplicaLruTracker.java | 20 ++++-
.../fsdataset/impl/RamDiskReplicaTracker.java | 23 +++++-
.../datanode/metrics/DataNodeMetrics.java | 80 ++++++++++++++++++++
.../org/apache/hadoop/hdfs/tools/JMXGet.java | 18 +++++
.../fsdataset/impl/TestLazyPersistFiles.java | 80 ++++++++++++++++----
6 files changed, 238 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c865c93d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 8ed81df..f2daf99 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1013,11 +1013,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (allowLazyPersist) {
// First try to place the block on a transient volume.
v = volumes.getNextTransientVolume(b.getNumBytes());
+ datanode.getMetrics().incrRamDiskBlocksWrite();
} else {
v = volumes.getNextVolume(storageType, b.getNumBytes());
}
} catch (DiskOutOfSpaceException de) {
if (allowLazyPersist) {
+ datanode.getMetrics().incrRamDiskBlocksWriteFallback();
allowLazyPersist = false;
continue;
}
@@ -1245,6 +1247,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (v.isTransientStorage()) {
ramDiskReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
+ datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes());
}
}
volumeMap.add(bpid, newReplicaInfo);
@@ -1500,7 +1503,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
if (v.isTransientStorage()) {
- ramDiskReplicaTracker.discardReplica(bpid, invalidBlks[i].getBlockId(), true);
+ RamDiskReplica replicaInfo =
+ ramDiskReplicaTracker.getReplica(bpid, invalidBlks[i].getBlockId());
+ if (replicaInfo != null) {
+ if (replicaInfo.getIsPersisted() == false) {
+ datanode.getMetrics().incrRamDiskBlocksDeletedBeforeLazyPersisted();
+ }
+ discardRamDiskReplica(replicaInfo, true);
+ }
}
// If a DFSClient has the replica in its cache of short-circuit file
@@ -1646,11 +1656,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (info != null) {
if (touch && info.getVolume().isTransientStorage()) {
ramDiskReplicaTracker.touch(bpid, blockId);
+ datanode.getMetrics().incrRamDiskBlocksReadHits();
}
return info.getBlockFile();
}
return null;
}
+
/**
* check if a data directory is healthy
* if some volumes failed - make sure to remove all the blocks that belong
@@ -2304,6 +2316,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
nbytes, flags);
}
+ void discardRamDiskReplica(RamDiskReplica replica, boolean deleteSavedCopies) {
+ ramDiskReplicaTracker.discardReplica(replica.getBlockPoolId(),
+ replica.getBlockId(), deleteSavedCopies);
+ }
+
class LazyWriter implements Runnable {
private volatile boolean shouldRun = true;
final int checkpointerInterval;
@@ -2327,7 +2344,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT);
}
- private void moveReplicaToNewVolume(String bpid, long blockId)
+ private void moveReplicaToNewVolume(String bpid, long blockId, long creationTime)
throws IOException {
FsVolumeImpl targetVolume;
@@ -2369,6 +2386,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
synchronized (FsDatasetImpl.this) {
ramDiskReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles);
+ // Update metrics (ignore the metadata file size)
+ datanode.getMetrics().incrRamDiskBlocksLazyPersisted();
+ datanode.getMetrics().incrRamDiskBytesLazyPersisted(replicaInfo.getNumBytes());
+ datanode.getMetrics().addRamDiskBlocksLazyPersistWindowMs(
+ Time.monotonicNow() - creationTime);
+
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
" to file " + savedFiles[1]);
@@ -2388,7 +2411,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
try {
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
if (block != null) {
- moveReplicaToNewVolume(block.getBlockPoolId(), block.getBlockId());
+ moveReplicaToNewVolume(block.getBlockPoolId(), block.getBlockId(),
+ block.getCreationTime());
}
succeeded = true;
} catch(IOException ioe) {
@@ -2456,7 +2480,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
metaFile = replicaInfo.getMetaFile();
blockFileUsed = blockFile.length();
metaFileUsed = metaFile.length();
- ramDiskReplicaTracker.discardReplica(replicaState, false);
+ discardRamDiskReplica(replicaState, false);
// Move the replica from lazyPersist/ to finalized/ on target volume
BlockPoolSlice bpSlice =
@@ -2474,6 +2498,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Update the volumeMap entry.
volumeMap.add(bpid, newReplicaInfo);
+
+ // Update metrics
+ datanode.getMetrics().incrRamDiskBlocksEvicted();
+ datanode.getMetrics().addRamDiskBlocksEvictionWindowMs(
+ Time.monotonicNow() - replicaState.getCreationTime());
+ if (replicaState.getNumReads() == 0) {
+ datanode.getMetrics().incrRamDiskBlocksEvictedWithoutRead();
+ }
}
// Before deleting the files from transient storage we must notify the
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c865c93d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
index 7808003..a843d9a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.collect.TreeMultimap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.Time;
import java.io.File;
import java.util.*;
@@ -97,9 +98,11 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
return;
}
+ ramDiskReplicaLru.numReads.getAndIncrement();
+
// Reinsert the replica with its new timestamp.
if (replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru)) {
- ramDiskReplicaLru.lastUsedTime = System.currentTimeMillis();
+ ramDiskReplicaLru.lastUsedTime = Time.monotonicNow();
replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
}
}
@@ -132,8 +135,9 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
replicasNotPersisted.remove(ramDiskReplicaLru);
}
- ramDiskReplicaLru.lastUsedTime = System.currentTimeMillis();
+ ramDiskReplicaLru.lastUsedTime = Time.monotonicNow();
replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
+ ramDiskReplicaLru.isPersisted = true;
}
@Override
@@ -215,4 +219,16 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
// replicasNotPersisted will be lazily GC'ed.
}
+
+ @Override
+ synchronized RamDiskReplica getReplica(
+ final String bpid, final long blockId) {
+ Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+
+ if (map == null) {
+ return null;
+ }
+
+ return map.get(blockId);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c865c93d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
index 2401424..7507925 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
import java.io.File;
+import java.util.concurrent.atomic.AtomicLong;
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -44,6 +46,10 @@ public abstract class RamDiskReplicaTracker {
private File savedBlockFile;
private File savedMetaFile;
+ private long creationTime;
+ protected AtomicLong numReads = new AtomicLong(0);
+ protected boolean isPersisted;
+
/**
* RAM_DISK volume that holds the original replica.
*/
@@ -62,6 +68,8 @@ public abstract class RamDiskReplicaTracker {
lazyPersistVolume = null;
savedMetaFile = null;
savedBlockFile = null;
+ creationTime = Time.monotonicNow();
+ isPersisted = false;
}
long getBlockId() {
@@ -89,6 +97,12 @@ public abstract class RamDiskReplicaTracker {
return savedMetaFile;
}
+ long getNumReads() { return numReads.get(); }
+
+ long getCreationTime() { return creationTime; }
+
+ boolean getIsPersisted() {return isPersisted; }
+
/**
* Record the saved meta and block files on the given volume.
*
@@ -243,7 +257,10 @@ public abstract class RamDiskReplicaTracker {
final String bpid, final long blockId,
boolean deleteSavedCopies);
- void discardReplica(RamDiskReplica replica, boolean deleteSavedCopies) {
- discardReplica(replica.getBlockPoolId(), replica.getBlockId(), deleteSavedCopies);
- }
+ /**
+ * Return RamDiskReplica info given block pool id and block id
+ * Return null if it does not exist in RamDisk
+ */
+ abstract RamDiskReplica getReplica(
+ final String bpid, final long blockId);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c865c93d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index b536e7e..57f12db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -65,6 +65,26 @@ public class DataNodeMetrics {
@Metric MutableCounterLong writesFromRemoteClient;
@Metric MutableCounterLong blocksGetLocalPathInfo;
+ // RamDisk metrics on read/write
+ @Metric MutableCounterLong ramDiskBlocksWrite;
+ @Metric MutableCounterLong ramDiskBlocksWriteFallback;
+ @Metric MutableCounterLong ramDiskBytesWrite;
+ @Metric MutableCounterLong ramDiskBlocksReadHits;
+
+ // RamDisk metrics on eviction
+ @Metric MutableCounterLong ramDiskBlocksEvicted;
+ @Metric MutableCounterLong ramDiskBlocksEvictedWithoutRead;
+ @Metric MutableRate ramDiskBlocksEvictionWindowMs;
+ final MutableQuantiles[] ramDiskBlocksEvictionWindowMsQuantiles;
+
+
+ // RamDisk metrics on lazy persist
+ @Metric MutableCounterLong ramDiskBlocksLazyPersisted;
+ @Metric MutableCounterLong ramDiskBlocksDeletedBeforeLazyPersisted;
+ @Metric MutableCounterLong ramDiskBytesLazyPersisted;
+ @Metric MutableRate ramDiskBlocksLazyPersistWindowMs;
+ final MutableQuantiles[] ramDiskBlocksLazyPersistWindowMsQuantiles;
+
@Metric MutableCounterLong fsyncCount;
@Metric MutableCounterLong volumeFailures;
@@ -107,6 +127,8 @@ public class DataNodeMetrics {
fsyncNanosQuantiles = new MutableQuantiles[len];
sendDataPacketBlockedOnNetworkNanosQuantiles = new MutableQuantiles[len];
sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
+ ramDiskBlocksEvictionWindowMsQuantiles = new MutableQuantiles[len];
+ ramDiskBlocksLazyPersistWindowMsQuantiles = new MutableQuantiles[len];
for (int i = 0; i < len; i++) {
int interval = intervals[i];
@@ -127,6 +149,14 @@ public class DataNodeMetrics {
"sendDataPacketTransferNanos" + interval + "s",
"Time reading from disk and writing to network while sending " +
"a packet in ns", "ops", "latency", interval);
+ ramDiskBlocksEvictionWindowMsQuantiles[i] = registry.newQuantiles(
+ "ramDiskBlocksEvictionWindows" + interval + "s",
+ "Time between the RamDisk block write and eviction in ms",
+ "ops", "latency", interval);
+ ramDiskBlocksLazyPersistWindowMsQuantiles[i] = registry.newQuantiles(
+ "ramDiskBlocksLazyPersistWindows" + interval + "s",
+ "Time between the RamDisk block write and disk persist in ms",
+ "ops", "latency", interval);
}
}
@@ -284,4 +314,54 @@ public class DataNodeMetrics {
q.add(latencyNanos);
}
}
+
+ public void incrRamDiskBlocksWrite() {
+ ramDiskBlocksWrite.incr();
+ }
+
+ public void incrRamDiskBlocksWriteFallback() {
+ ramDiskBlocksWriteFallback.incr();
+ }
+
+ public void addRamDiskBytesWrite(long bytes) {
+ ramDiskBytesWrite.incr(bytes);
+ }
+
+ public void incrRamDiskBlocksReadHits() {
+ ramDiskBlocksReadHits.incr();
+ }
+
+ public void incrRamDiskBlocksEvicted() {
+ ramDiskBlocksEvicted.incr();
+ }
+
+ public void incrRamDiskBlocksEvictedWithoutRead() {
+ ramDiskBlocksEvictedWithoutRead.incr();
+ }
+
+ public void addRamDiskBlocksEvictionWindowMs(long latencyMs) {
+ ramDiskBlocksEvictionWindowMs.add(latencyMs);
+ for (MutableQuantiles q : ramDiskBlocksEvictionWindowMsQuantiles) {
+ q.add(latencyMs);
+ }
+ }
+
+ public void incrRamDiskBlocksLazyPersisted() {
+ ramDiskBlocksLazyPersisted.incr();
+ }
+
+ public void incrRamDiskBlocksDeletedBeforeLazyPersisted() {
+ ramDiskBlocksDeletedBeforeLazyPersisted.incr();
+ }
+
+ public void incrRamDiskBytesLazyPersisted(long bytes) {
+ ramDiskBytesLazyPersisted.incr(bytes);
+ }
+
+ public void addRamDiskBlocksLazyPersistWindowMs(long latencyMs) {
+ ramDiskBlocksLazyPersistWindowMs.add(latencyMs);
+ for (MutableQuantiles q : ramDiskBlocksLazyPersistWindowMsQuantiles) {
+ q.add(latencyMs);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c865c93d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java
index bafef25..bbd545a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Set;
import java.util.TreeSet;
+import java.util.regex.Pattern;
import javax.management.AttributeNotFoundException;
import javax.management.MBeanAttributeInfo;
@@ -109,6 +110,23 @@ public class JMXGet {
}
}
+ public void printAllMatchedAttributes(String attrRegExp) throws Exception {
+ err("List of the keys matching " + attrRegExp + " :");
+ Object val = null;
+ Pattern p = Pattern.compile(attrRegExp);
+ for (ObjectName oname : hadoopObjectNames) {
+ err(">>>>>>>>jmx name: " + oname.getCanonicalKeyPropertyListString());
+ MBeanInfo mbinfo = mbsc.getMBeanInfo(oname);
+ MBeanAttributeInfo[] mbinfos = mbinfo.getAttributes();
+ for (MBeanAttributeInfo mb : mbinfos) {
+ if (p.matcher(mb.getName()).lookingAt()) {
+ val = mbsc.getAttribute(oname, mb.getName());
+ System.out.format(format, mb.getName(), (val == null) ? "" : val.toString());
+ }
+ }
+ }
+ }
+
/**
* get single value by key
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c865c93d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index 928d0d0..91deb55 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -31,19 +31,18 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.tools.JMXGet;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -58,6 +57,7 @@ import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class TestLazyPersistFiles {
@@ -81,14 +81,21 @@ public class TestLazyPersistFiles {
private static final int LAZY_WRITER_INTERVAL_SEC = 1;
private static final int BUFFER_LENGTH = 4096;
private static final int EVICTION_LOW_WATERMARK = 1;
+ private static final String JMX_SERVICE_NAME = "DataNode";
+ private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private DFSClient client;
private Configuration conf;
+ private JMXGet jmx;
@After
- public void shutDownCluster() throws IOException {
+ public void shutDownCluster() throws Exception {
+
+ // Dump all RamDisk JMX metrics before shutdown the cluster
+ printRamDiskJMXMetrics();
+
if (fs != null) {
fs.close();
fs = null;
@@ -100,6 +107,10 @@ public class TestLazyPersistFiles {
cluster.shutdown();
cluster = null;
}
+
+ if (jmx != null) {
+ jmx = null;
+ }
}
@Test (timeout=300000)
@@ -203,13 +214,15 @@ public class TestLazyPersistFiles {
* @throws IOException
*/
@Test (timeout=300000)
- public void testFallbackToDiskFull() throws IOException {
+ public void testFallbackToDiskFull() throws Exception {
startUpCluster(false, 0);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path, DEFAULT);
+
+ verifyRamDiskJMXMetric("RamDiskBlocksWriteFallback", 1);
}
/**
@@ -384,11 +397,10 @@ public class TestLazyPersistFiles {
/**
* RamDisk eviction after lazy persist to disk.
- * @throws IOException
- * @throws InterruptedException
+ * @throws Exception
*/
@Test (timeout=300000)
- public void testRamDiskEviction() throws IOException, InterruptedException {
+ public void testRamDiskEviction() throws Exception {
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
@@ -411,6 +423,9 @@ public class TestLazyPersistFiles {
// RAM_DISK.
ensureFileReplicasOnStorageType(path2, RAM_DISK);
ensureFileReplicasOnStorageType(path1, DEFAULT);
+
+ verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 1);
+ verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 1);
}
/**
@@ -454,7 +469,7 @@ public class TestLazyPersistFiles {
*/
@Test (timeout=300000)
public void testRamDiskEvictionIsLru()
- throws IOException, InterruptedException {
+ throws Exception {
final int NUM_PATHS = 5;
startUpCluster(true, NUM_PATHS + EVICTION_LOW_WATERMARK);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -499,6 +514,14 @@ public class TestLazyPersistFiles {
ensureFileReplicasOnStorageType(paths[indexes.get(j)], RAM_DISK);
}
}
+
+ verifyRamDiskJMXMetric("RamDiskBlocksWrite", NUM_PATHS * 2);
+ verifyRamDiskJMXMetric("RamDiskBlocksWriteFallback", 0);
+ verifyRamDiskJMXMetric("RamDiskBytesWrite", BLOCK_SIZE * NUM_PATHS * 2);
+ verifyRamDiskJMXMetric("RamDiskBlocksReadHits", NUM_PATHS);
+ verifyRamDiskJMXMetric("RamDiskBlocksEvicted", NUM_PATHS);
+ verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 0);
+ verifyRamDiskJMXMetric("RamDiskBlocksDeletedBeforeLazyPersisted", 0);
}
/**
@@ -506,9 +529,9 @@ public class TestLazyPersistFiles {
* Memory is freed up and file is gone.
* @throws IOException
*/
- @Test (timeout=300000)
+ @Test // (timeout=300000)
public void testDeleteBeforePersist()
- throws IOException, InterruptedException {
+ throws Exception {
startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
@@ -523,6 +546,8 @@ public class TestLazyPersistFiles {
Assert.assertFalse(fs.exists(path));
assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
+
+ verifyRamDiskJMXMetric("RamDiskBlocksDeletedBeforeLazyPersisted", 1);
}
/**
@@ -533,7 +558,7 @@ public class TestLazyPersistFiles {
*/
@Test (timeout=300000)
public void testDeleteAfterPersist()
- throws IOException, InterruptedException {
+ throws Exception {
startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
@@ -548,9 +573,10 @@ public class TestLazyPersistFiles {
client.delete(path.toString(), false);
Assert.assertFalse(fs.exists(path));
- triggerBlockReport();
-
assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
+
+ verifyRamDiskJMXMetric("RamDiskBlocksLazyPersisted", 1);
+ verifyRamDiskJMXMetric("RamDiskBytesLazyPersisted", BLOCK_SIZE);
}
/**
@@ -760,6 +786,11 @@ public class TestLazyPersistFiles {
.build();
fs = cluster.getFileSystem();
client = fs.getClient();
+ try {
+ jmx = initJMX();
+ } catch (Exception e) {
+ fail("Failed initialize JMX for testing: " + e);
+ }
LOG.info("Cluster startup complete");
}
@@ -929,4 +960,25 @@ public class TestLazyPersistFiles {
}
}
}
+
+ JMXGet initJMX() throws Exception
+ {
+ JMXGet jmx = new JMXGet();
+ jmx.setService(JMX_SERVICE_NAME);
+ jmx.init();
+ return jmx;
+ }
+
+ void printRamDiskJMXMetrics() {
+ try {
+ jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ void verifyRamDiskJMXMetric(String metricName, long expectedValue)
+ throws Exception {
+ assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName)));
+ }
}