You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/10/18 01:31:20 UTC
[33/34] git commit: HDFS-7112. LazyWriter should use either async IO
or one thread per physical disk. Contributed by Xiaoyu Yao.
HDFS-7112. LazyWriter should use either async IO or one thread per physical disk. Contributed by Xiaoyu Yao.
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1b6d115c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1b6d115c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1b6d115c
Branch: refs/heads/branch-2.6
Commit: 1b6d115ccf9b8eef1989a3aca73c1ba3165c95f6
Parents: 137f0b8
Author: cnauroth <cn...@apache.org>
Authored: Tue Oct 7 20:25:19 2014 -0700
Committer: Jitendra Pandey <Ji...@Jitendra-Pandeys-MacBook-Pro-4.local>
Committed: Fri Oct 17 16:00:53 2014 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 95 +++++++
.../server/datanode/fsdataset/FsDatasetSpi.java | 13 +-
.../datanode/fsdataset/impl/BlockPoolSlice.java | 21 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 179 ++++++++-----
.../datanode/fsdataset/impl/FsVolumeImpl.java | 15 +-
.../impl/RamDiskAsyncLazyPersistService.java | 252 +++++++++++++++++++
.../server/datanode/SimulatedFSDataset.java | 12 +
.../fsdataset/impl/TestLazyPersistFiles.java | 4 +-
8 files changed, 506 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b6d115c/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 8f4d6ee..afb8d35 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -570,6 +570,101 @@ Release 2.6.0 - UNRELEASED
HDFS-5089. When a LayoutVersion support SNAPSHOT, it must support
FSIMAGE_NAME_OPTIMIZATION. (szetszwo)
+ BREAKDOWN OF HDFS-6581 SUBTASKS AND RELATED JIRAS
+
+ HDFS-6921. Add LazyPersist flag to FileStatus. (Arpit Agarwal)
+
+ HDFS-6924. Add new RAM_DISK storage type. (Arpit Agarwal)
+
+ HDFS-6922. Add LazyPersist flag to INodeFile, save it in FsImage and
+ edit logs. (Arpit Agarwal)
+
+ HDFS-6923. Propagate LazyPersist flag to DNs via DataTransferProtocol.
+ (Arpit Agarwal)
+
+ HDFS-6925. DataNode should attempt to place replicas on transient storage
+ first if lazyPersist flag is received. (Arpit Agarwal)
+
+ HDFS-6926. DN support for saving replicas to persistent storage and
+ evicting in-memory replicas. (Arpit Agarwal)
+
+ HDFS-6927. Initial unit tests for lazy persist files. (Arpit Agarwal)
+
+ HDFS-6929. NN periodically unlinks lazy persist files with missing
+ replicas from namespace. (Arpit Agarwal)
+
+ HDFS-6928. 'hdfs put' command should accept lazyPersist flag for testing.
+ (Arpit Agarwal)
+
+ HDFS-6960. Bugfix in LazyWriter, fix test case and some refactoring.
+ (Arpit Agarwal)
+
+ HDFS-6931. Move lazily persisted replicas to finalized directory on DN
+ startup. (Arpit Agarwal)
+
+ HDFS-6950. Add Additional unit tests for HDFS-6581. (Xiaoyu Yao via
+ Arpit Agarwal)
+
+ HDFS-6930. Improve replica eviction from RAM disk. (Arpit Agarwal)
+
+ HDFS-6977. Delete all copies when a block is deleted from the block space.
+ (Arpit Agarwal)
+
+ HDFS-6991. Notify NN of evicted block before deleting it from RAM disk.
+ (Arpit Agarwal)
+
+ HDFS-6978. Directory scanner should correctly reconcile blocks on RAM
+ disk. (Arpit Agarwal)
+
+ HDFS-7066. LazyWriter#evictBlocks misses a null check for replicaState.
+ (Xiaoyu Yao via Arpit Agarwal)
+
+ HDFS-7064. Fix unit test failures in HDFS-6581 branch. (Xiaoyu Yao via
+ Arpit Agarwal)
+
+ HDFS-6581. Few more unit test fixes for HDFS-6581. (Arpit Agarwal)
+
+ HDFS-7080. Fix finalize and upgrade unit test failures. (Arpit Agarwal)
+
+ HDFS-7084. FsDatasetImpl#copyBlockFiles debug log can be improved.
+ (Xiaoyu Yao via Arpit Agarwal)
+
+ HDFS-7091. Add forwarding constructor for INodeFile for existing callers.
+ (Arpit Agarwal)
+
+ HDFS-7100. Make eviction scheme pluggable. (Arpit Agarwal)
+
+ HDFS-7108. Fix unit test failures in SimulatedFsDataset. (Arpit Agarwal)
+
+ HDFS-6990. Add unit test for evict/delete RAM_DISK block with open
+ handle. (Xiaoyu Yao via Arpit Agarwal)
+
+ HDFS-7143. Fix findbugs warnings in HDFS-6581 branch. (szetszwo via
+ Arpit Agarwal)
+
+ HDFS-6932. Balancer and Mover tools should ignore replicas on RAM_DISK.
+ (Xiaoyu Yao via Arpit Agarwal)
+
+ HDFS-7144. Fix findbugs warnings in RamDiskReplicaTracker. (szetszwo via
+ Arpit Agarwal)
+
+ HDFS-7155. Bugfix in createLocatedFileStatus caused by bad merge.
+ (Arpit Agarwal)
+
+ HDFS-7153. Add storagePolicy to NN edit log during file creation.
+ (Arpit Agarwal)
+
+ HDFS-7159. Use block storage policy to set lazy persist preference.
+ (Arpit Agarwal)
+
+ HDFS-7129. Metrics to track usage of memory for writes. (Xiaoyu Yao
+ via Arpit Agarwal)
+
+ HDFS-7171. Fix Jenkins failures in HDFS-6581 branch. (Arpit Agarwal)
+
+ HDFS-7112. LazyWriter should use either async IO or one thread per physical
+ disk. (Xiaoyu Yao via cnauroth)
+
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b6d115c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 2bb2e7f..3f1400d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -463,5 +464,15 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
public void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block,
final FileDescriptor fd, final long offset, final long nbytes,
final int flags);
-}
+ /**
+ * Callback from RamDiskAsyncLazyPersistService upon async lazy persist task end
+ */
+ public void onCompleteLazyPersist(String bpId, long blockId,
+ long creationTime, File[] savedFiles, FsVolumeImpl targetVolume);
+
+ /**
+ * Callback from RamDiskAsyncLazyPersistService upon async lazy persist task fail
+ */
+ public void onFailLazyPersist(String bpId, long blockId);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b6d115c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 3f58d38..dce2ff8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -171,6 +171,10 @@ class BlockPoolSlice {
long getDfsUsed() throws IOException {
return dfsUsage.getUsed();
}
+
+ void incDfsUsed(long value) {
+ dfsUsage.incDfsUsed(value);
+ }
/**
* Read in the cached DU value and return it if it is less than 600 seconds
@@ -277,23 +281,6 @@ class BlockPoolSlice {
}
/**
- * Save the given replica to persistent storage.
- *
- * @return The saved meta and block files, in that order.
- * @throws IOException
- */
- File[] lazyPersistReplica(long blockId, long genStamp,
- File srcMeta, File srcFile) throws IOException {
- if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) {
- FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir);
- }
- File targetFiles[] = FsDatasetImpl.copyBlockFiles(
- blockId, genStamp, srcMeta, srcFile, lazypersistDir);
- dfsUsage.incDfsUsed(targetFiles[0].length() + targetFiles[1].length());
- return targetFiles;
- }
-
- /**
* Move a persisted replica from lazypersist directory to a subdirectory
* under finalized.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b6d115c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index f2daf99..07e19cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -220,6 +220,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final ReplicaMap volumeMap;
final RamDiskReplicaTracker ramDiskReplicaTracker;
+ final RamDiskAsyncLazyPersistService asyncLazyPersistService;
private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3;
@@ -273,10 +274,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
VolumeChoosingPolicy.class), conf);
volumes = new FsVolumeList(volsFailed, blockChooserImpl);
asyncDiskService = new FsDatasetAsyncDiskService(datanode);
+ asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
addVolume(dataLocations, storage.getStorageDir(idx));
}
+ setupAsyncLazyPersistThreads();
cacheManager = new FsDatasetCache(this);
@@ -409,6 +412,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
+ setupAsyncLazyPersistThreads();
+
for (int i = 0; i < volumes.size(); i++) {
if (successFlags[i]) {
succeedVolumes.add(volumes.get(i));
@@ -462,6 +467,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
storageMap.remove(sd.getStorageUuid());
}
}
+ setupAsyncLazyPersistThreads();
}
private StorageType getStorageTypeFromLocations(
@@ -1506,10 +1512,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
RamDiskReplica replicaInfo =
ramDiskReplicaTracker.getReplica(bpid, invalidBlks[i].getBlockId());
if (replicaInfo != null) {
- if (replicaInfo.getIsPersisted() == false) {
+ if (!replicaInfo.getIsPersisted()) {
datanode.getMetrics().incrRamDiskBlocksDeletedBeforeLazyPersisted();
}
- discardRamDiskReplica(replicaInfo, true);
+ ramDiskReplicaTracker.discardReplica(replicaInfo.getBlockPoolId(),
+ replicaInfo.getBlockId(), true);
}
}
@@ -1750,6 +1757,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (asyncDiskService != null) {
asyncDiskService.shutdown();
}
+
+ if (asyncLazyPersistService != null) {
+ asyncLazyPersistService.shutdown();
+ }
if(volumes != null) {
volumes.shutdown();
@@ -2309,6 +2320,40 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override
+ public void onCompleteLazyPersist(String bpId, long blockId,
+ long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
+ synchronized (FsDatasetImpl.this) {
+ ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles);
+
+ targetVolume.incDfsUsed(bpId,
+ savedFiles[0].length() + savedFiles[1].length());
+
+ // Update metrics (ignore the metadata file size)
+ datanode.getMetrics().incrRamDiskBlocksLazyPersisted();
+ datanode.getMetrics().incrRamDiskBytesLazyPersisted(savedFiles[1].length());
+ datanode.getMetrics().addRamDiskBlocksLazyPersistWindowMs(
+ Time.monotonicNow() - creationTime);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LazyWriter: Finish persisting RamDisk block: "
+ + " block pool Id: " + bpId + " block id: " + blockId
+ + " to block file " + savedFiles[1] + " and meta file " + savedFiles[0]
+ + " on target volume " + targetVolume);
+ }
+ }
+ }
+
+ @Override
+ public void onFailLazyPersist(String bpId, long blockId) {
+ RamDiskReplica block = null;
+ block = ramDiskReplicaTracker.getReplica(bpId, blockId);
+ if (block != null) {
+ LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
+ ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block);
+ }
+ }
+
+ @Override
public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
FileDescriptor fd, long offset, long nbytes, int flags) {
FsVolumeImpl fsVolumeImpl = this.getVolume(block);
@@ -2316,9 +2361,38 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
nbytes, flags);
}
- void discardRamDiskReplica(RamDiskReplica replica, boolean deleteSavedCopies) {
- ramDiskReplicaTracker.discardReplica(replica.getBlockPoolId(),
- replica.getBlockId(), deleteSavedCopies);
+ private boolean ramDiskConfigured() {
+ for (FsVolumeImpl v: getVolumes()){
+ if (v.isTransientStorage()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // Add/Remove per DISK volume async lazy persist thread when RamDisk volume is
+ // added or removed.
+ // This should only be called when the FsDataSetImpl#volumes list is finalized.
+ private void setupAsyncLazyPersistThreads() {
+ boolean ramDiskConfigured = ramDiskConfigured();
+ for (FsVolumeImpl v: getVolumes()){
+ // Skip transient volumes
+ if (v.isTransientStorage()) {
+ continue;
+ }
+
+ // Add thread for DISK volume if RamDisk is configured
+ if (ramDiskConfigured &&
+ !asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
+ asyncLazyPersistService.addVolume(v.getCurrentDir());
+ }
+
+ // Remove thread for DISK volume if RamDisk is not configured
+ if (!ramDiskConfigured &&
+ asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
+ asyncLazyPersistService.removeVolume(v.getCurrentDir());
+ }
+ }
}
class LazyWriter implements Runnable {
@@ -2344,61 +2418,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT);
}
- private void moveReplicaToNewVolume(String bpid, long blockId, long creationTime)
- throws IOException {
-
- FsVolumeImpl targetVolume;
- ReplicaInfo replicaInfo;
- BlockPoolSlice bpSlice;
- File srcFile, srcMeta;
- long genStamp;
-
- synchronized (FsDatasetImpl.this) {
- replicaInfo = volumeMap.get(bpid, blockId);
-
- if (replicaInfo == null || !replicaInfo.getVolume().isTransientStorage()) {
- // The block was either deleted before it could be checkpointed or
- // it is already on persistent storage. This can occur if a second
- // replica on persistent storage was found after the lazy write was
- // scheduled.
- return;
- }
-
- // Pick a target volume for the block.
- targetVolume = volumes.getNextVolume(
- StorageType.DEFAULT, replicaInfo.getNumBytes());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
- }
-
- ramDiskReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
- bpSlice = targetVolume.getBlockPoolSlice(bpid);
- srcMeta = replicaInfo.getMetaFile();
- srcFile = replicaInfo.getBlockFile();
- genStamp = replicaInfo.getGenerationStamp();
- }
-
- // Drop the FsDatasetImpl lock for the file copy.
- File[] savedFiles =
- bpSlice.lazyPersistReplica(blockId, genStamp, srcMeta, srcFile);
-
- synchronized (FsDatasetImpl.this) {
- ramDiskReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles);
-
- // Update metrics (ignore the metadata file size)
- datanode.getMetrics().incrRamDiskBlocksLazyPersisted();
- datanode.getMetrics().incrRamDiskBytesLazyPersisted(replicaInfo.getNumBytes());
- datanode.getMetrics().addRamDiskBlocksLazyPersistWindowMs(
- Time.monotonicNow() - creationTime);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
- " to file " + savedFiles[1]);
- }
- }
- }
-
/**
* Checkpoint a pending replica to persistent storage now.
* If we fail then move the replica to the end of the queue.
@@ -2406,13 +2425,43 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/
private boolean saveNextReplica() {
RamDiskReplica block = null;
+ FsVolumeImpl targetVolume;
+ ReplicaInfo replicaInfo;
boolean succeeded = false;
try {
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
if (block != null) {
- moveReplicaToNewVolume(block.getBlockPoolId(), block.getBlockId(),
- block.getCreationTime());
+ synchronized (FsDatasetImpl.this) {
+ replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId());
+
+ // If replicaInfo is null, the block was either deleted before
+ // it could be checkpointed or it is already on persistent storage.
+ // This can occur if a second replica on persistent storage was found
+ // after the lazy write was scheduled.
+ if (replicaInfo != null &&
+ replicaInfo.getVolume().isTransientStorage()) {
+ // Pick a target volume to persist the block.
+ targetVolume = volumes.getNextVolume(
+ StorageType.DEFAULT, replicaInfo.getNumBytes());
+
+ ramDiskReplicaTracker.recordStartLazyPersist(
+ block.getBlockPoolId(), block.getBlockId(), targetVolume);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LazyWriter: Start persisting RamDisk block:"
+ + " block pool Id: " + block.getBlockPoolId()
+ + " block id: " + block.getBlockId()
+ + " on target volume " + targetVolume);
+ }
+
+ asyncLazyPersistService.submitLazyPersistTask(
+ block.getBlockPoolId(), block.getBlockId(),
+ replicaInfo.getGenerationStamp(), block.getCreationTime(),
+ replicaInfo.getMetaFile(), replicaInfo.getBlockFile(),
+ targetVolume);
+ }
+ }
}
succeeded = true;
} catch(IOException ioe) {
@@ -2420,10 +2469,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} finally {
if (!succeeded && block != null) {
LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
- ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block);
+ onFailLazyPersist(block.getBlockPoolId(), block.getBlockId());
}
}
-
return succeeded;
}
@@ -2480,7 +2528,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
metaFile = replicaInfo.getMetaFile();
blockFileUsed = blockFile.length();
metaFileUsed = metaFile.length();
- discardRamDiskReplica(replicaState, false);
+ ramDiskReplicaTracker.discardReplica(replicaState.getBlockPoolId(),
+ replicaState.getBlockId(), false);
// Move the replica from lazyPersist/ to finalized/ on target volume
BlockPoolSlice bpSlice =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b6d115c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 60ea125..32709be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -124,7 +124,11 @@ public class FsVolumeImpl implements FsVolumeSpi {
File getRbwDir(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getRbwDir();
}
-
+
+ File getLazyPersistDir(String bpid) throws IOException {
+ return getBlockPoolSlice(bpid).getLazypersistDir();
+ }
+
void decDfsUsed(String bpid, long value) {
synchronized(dataset) {
BlockPoolSlice bp = bpSlices.get(bpid);
@@ -134,6 +138,15 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
}
+ void incDfsUsed(String bpid, long value) {
+ synchronized(dataset) {
+ BlockPoolSlice bp = bpSlices.get(bpid);
+ if (bp != null) {
+ bp.incDfsUsed(value);
+ }
+ }
+ }
+
long getDfsUsed() throws IOException {
long dfsUsed = 0;
synchronized(dataset) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b6d115c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
new file mode 100644
index 0000000..76acbea
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is a container of multiple thread pools, one for each non-RamDisk
+ * volume with a maximum thread count of 1 so that we can schedule async lazy
+ * persist operations easily with volume arrival and departure handled.
+ *
+ * This class and {@link org.apache.hadoop.util.AsyncDiskService} are similar.
+ * They should be combined.
+ */
+class RamDiskAsyncLazyPersistService {
+ public static final Log LOG = LogFactory.getLog(RamDiskAsyncLazyPersistService.class);
+
+ // ThreadPool core pool size
+ private static final int CORE_THREADS_PER_VOLUME = 1;
+ // ThreadPool maximum pool size
+ private static final int MAXIMUM_THREADS_PER_VOLUME = 1;
+ // ThreadPool keep-alive time for threads over core pool size
+ private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
+
+ private final DataNode datanode;
+ private final ThreadGroup threadGroup;
+ private Map<File, ThreadPoolExecutor> executors
+ = new HashMap<File, ThreadPoolExecutor>();
+
+ /**
+ * Create a RamDiskAsyncLazyPersistService with a set of volumes (specified by their
+ * root directories).
+ *
+ * The RamDiskAsyncLazyPersistService uses one ThreadPool per volume to do the async
+ * disk operations.
+ */
+ RamDiskAsyncLazyPersistService(DataNode datanode) {
+ this.datanode = datanode;
+ this.threadGroup = new ThreadGroup(getClass().getSimpleName());
+ }
+
+ private void addExecutorForVolume(final File volume) {
+ ThreadFactory threadFactory = new ThreadFactory() {
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(threadGroup, r);
+ t.setName("Async RamDisk lazy persist worker for volume " + volume);
+ return t;
+ }
+ };
+
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(
+ CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
+ THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), threadFactory);
+
+ // This can reduce the number of running threads
+ executor.allowCoreThreadTimeOut(true);
+ executors.put(volume, executor);
+ }
+
+ /**
+ * Starts AsyncLazyPersistService for a new volume
+ * @param volume the root of the new data volume.
+ */
+ synchronized void addVolume(File volume) {
+ if (executors == null) {
+ throw new RuntimeException("AsyncLazyPersistService is already shutdown");
+ }
+ ThreadPoolExecutor executor = executors.get(volume);
+ if (executor != null) {
+ throw new RuntimeException("Volume " + volume + " is already existed.");
+ }
+ addExecutorForVolume(volume);
+ }
+
+ /**
+ * Stops AsyncLazyPersistService for a volume.
+ * @param volume the root of the volume.
+ */
+ synchronized void removeVolume(File volume) {
+ if (executors == null) {
+ throw new RuntimeException("AsyncDiskService is already shutdown");
+ }
+ ThreadPoolExecutor executor = executors.get(volume);
+ if (executor == null) {
+ throw new RuntimeException("Can not find volume " + volume
+ + " to remove.");
+ } else {
+ executor.shutdown();
+ executors.remove(volume);
+ }
+ }
+
+ /**
+ * Query if the thread pool exist for the volume
+ * @param volume the root of a volume
+ * @return true if there is one thread pool for the volume
+ * false otherwise
+ */
+ synchronized boolean queryVolume(File volume) {
+ if (executors == null) {
+ throw new RuntimeException("AsyncLazyPersistService is already shutdown");
+ }
+ ThreadPoolExecutor executor = executors.get(volume);
+ return (executor != null);
+ }
+
+ /**
+ * Execute the task sometime in the future, using ThreadPools.
+ */
+ synchronized void execute(File root, Runnable task) {
+ if (executors == null) {
+ throw new RuntimeException("AsyncLazyPersistService is already shutdown");
+ }
+ ThreadPoolExecutor executor = executors.get(root);
+ if (executor == null) {
+ throw new RuntimeException("Cannot find root " + root
+ + " for execution of task " + task);
+ } else {
+ executor.execute(task);
+ }
+ }
+
+ /**
+ * Gracefully shut down all ThreadPool. Will wait for all lazy persist
+ * tasks to finish.
+ */
+ synchronized void shutdown() {
+ if (executors == null) {
+ LOG.warn("AsyncLazyPersistService has already shut down.");
+ } else {
+ LOG.info("Shutting down all async lazy persist service threads");
+
+ for (Map.Entry<File, ThreadPoolExecutor> e : executors.entrySet()) {
+ e.getValue().shutdown();
+ }
+ // clear the executor map so that calling execute again will fail.
+ executors = null;
+ LOG.info("All async lazy persist service threads have been shut down");
+ }
+ }
+
+ /**
+ * Asynchronously lazy persist the block from the RamDisk to Disk.
+ */
+ void submitLazyPersistTask(String bpId, long blockId,
+ long genStamp, long creationTime,
+ File metaFile, File blockFile,
+ FsVolumeImpl targetVolume) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LazyWriter schedule async task to persist RamDisk block pool id: "
+ + bpId + " block id: " + blockId);
+ }
+
+ File lazyPersistDir = targetVolume.getLazyPersistDir(bpId);
+ if (!lazyPersistDir.exists() && !lazyPersistDir.mkdirs()) {
+ FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir);
+ throw new IOException("LazyWriter fail to find or create lazy persist dir: "
+ + lazyPersistDir.toString());
+ }
+
+ ReplicaLazyPersistTask lazyPersistTask = new ReplicaLazyPersistTask(
+ bpId, blockId, genStamp, creationTime, blockFile, metaFile,
+ targetVolume, lazyPersistDir);
+ execute(targetVolume.getCurrentDir(), lazyPersistTask);
+ }
+
+ class ReplicaLazyPersistTask implements Runnable {
+ final String bpId;
+ final long blockId;
+ final long genStamp;
+ final long creationTime;
+ final File blockFile;
+ final File metaFile;
+ final FsVolumeImpl targetVolume;
+ final File lazyPersistDir;
+
+ ReplicaLazyPersistTask(String bpId, long blockId,
+ long genStamp, long creationTime,
+ File blockFile, File metaFile,
+ FsVolumeImpl targetVolume, File lazyPersistDir) {
+ this.bpId = bpId;
+ this.blockId = blockId;
+ this.genStamp = genStamp;
+ this.creationTime = creationTime;
+ this.blockFile = blockFile;
+ this.metaFile = metaFile;
+ this.targetVolume = targetVolume;
+ this.lazyPersistDir = lazyPersistDir;
+ }
+
+ @Override
+ public String toString() {
+ // Called in AsyncLazyPersistService.execute for displaying error messages.
+ return "LazyWriter async task of persist RamDisk block pool id:"
+ + bpId + " block pool id: "
+ + blockId + " with block file " + blockFile
+ + " and meta file " + metaFile + " to target volume " + targetVolume;}
+
+ @Override
+ public void run() {
+ boolean succeeded = false;
+ try {
+ // No FsDatasetImpl lock for the file copy
+ File targetFiles[] = FsDatasetImpl.copyBlockFiles(
+ blockId, genStamp, metaFile, blockFile, lazyPersistDir);
+
+ // Lock FsDataSetImpl during onCompleteLazyPersist callback
+ datanode.getFSDataset().onCompleteLazyPersist(bpId, blockId,
+ creationTime, targetFiles, targetVolume);
+ succeeded = true;
+ } catch (Exception e){
+ FsDatasetImpl.LOG.warn(
+ "LazyWriter failed to async persist RamDisk block pool id: "
+ + bpId + "block Id: " + blockId);
+ } finally {
+ if (!succeeded) {
+ datanode.getFSDataset().onFailLazyPersist(bpId, blockId);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b6d115c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index d1284fe..0786bc6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -1209,5 +1210,16 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
FileDescriptor fd, long offset, long nbytes, int flags) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public void onCompleteLazyPersist(String bpId, long blockId,
+ long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void onFailLazyPersist(String bpId, long blockId) {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1b6d115c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index 91deb55..9f1d50a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -971,7 +971,9 @@ public class TestLazyPersistFiles {
void printRamDiskJMXMetrics() {
try {
- jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
+ if (jmx != null) {
+ jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
+ }
} catch (Exception e) {
e.printStackTrace();
}