You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2014/08/28 00:51:45 UTC
[2/3] git commit: HDFS-6926. DN support for saving replicas to
persistent storage and evicting in-memory replicas. (Arpit Agarwal)
HDFS-6926. DN support for saving replicas to persistent storage and evicting in-memory replicas. (Arpit Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c569fd20
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c569fd20
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c569fd20
Branch: refs/heads/HDFS-6581
Commit: c569fd203ea7f55ddd6050fa4a437bc8142a1155
Parents: 184a6fe
Author: arp <ar...@apache.org>
Authored: Wed Aug 27 15:36:48 2014 -0700
Committer: arp <ar...@apache.org>
Committed: Wed Aug 27 15:36:48 2014 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-6581.txt | 3 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 +
.../server/datanode/BlockPoolSliceScanner.java | 2 +-
.../hdfs/server/datanode/DataStorage.java | 1 +
.../hdfs/server/datanode/ReplicaInPipeline.java | 2 +-
.../AvailableSpaceVolumeChoosingPolicy.java | 6 +-
.../server/datanode/fsdataset/FsDatasetSpi.java | 3 +
.../datanode/fsdataset/impl/BlockPoolSlice.java | 45 ++-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 318 ++++++++++++++++++-
.../datanode/fsdataset/impl/FsVolumeImpl.java | 6 +
.../fsdataset/impl/LazyWriteReplicaTracker.java | 177 +++++++++++
.../server/datanode/SimulatedFSDataset.java | 5 +
.../hdfs/server/namenode/TestAddBlockRetry.java | 1 +
13 files changed, 551 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c569fd20/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
index 961b421..335c55d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
@@ -13,3 +13,6 @@
HDFS-6925. DataNode should attempt to place replicas on transient storage
first if lazyPersist flag is received. (Arpit Agarwal)
+ HDFS-6926. DN support for saving replicas to persistent storage and
+ evicting in-memory replicas. (Arpit Agarwal)
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c569fd20/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 71a530b..580b7a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -123,6 +123,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0;
public static final String DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY = "dfs.datanode.fsdatasetcache.max.threads.per.volume";
public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
+ public static final String DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec";
+ public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60;
public static final String DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT =
"dfs.namenode.path.based.cache.block.map.allocation.percent";
public static final float DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f;
@@ -227,6 +229,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final float DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT = 2.0f;
public static final String DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS = "dfs.namenode.edit.log.autoroll.check.interval.ms";
public static final int DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT = 5*60*1000;
+
+ public static final String DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC = "dfs.namenode.lazypersist.file.scrub.interval.sec";
+ public static final int DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT = 5 * 60;
public static final String DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH = "dfs.namenode.edits.noeditlogchannelflush";
public static final boolean DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT = false;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c569fd20/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
index bbb67fc..61f1e7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
@@ -191,7 +191,7 @@ class BlockPoolSliceScanner {
+ hours + " hours for block pool " + bpid);
// get the list of blocks and arrange them in random order
- List<FinalizedReplica> arr = dataset.getFinalizedBlocks(blockPoolId);
+ List<FinalizedReplica> arr = dataset.getFinalizedBlocksOnPersistentStorage(blockPoolId);
Collections.shuffle(arr);
long scanTime = -1;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c569fd20/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 4b9656e..693dcab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -81,6 +81,7 @@ public class DataStorage extends Storage {
final static String STORAGE_DIR_DETACHED = "detach";
public final static String STORAGE_DIR_RBW = "rbw";
public final static String STORAGE_DIR_FINALIZED = "finalized";
+ public final static String STORAGE_DIR_LAZY_PERSIST = "lazypersist";
public final static String STORAGE_DIR_TMP = "tmp";
// Set of bpids for which 'trash' is currently enabled.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c569fd20/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index f808e01..dc57688 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -225,7 +225,7 @@ public class ReplicaInPipeline extends ReplicaInfo
}
}
} else {
- // for create, we can use the requested checksum
+ // for create, we can use the requested checksum
checksum = requestedChecksum;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c569fd20/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
index 235cd7b..d0d36ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
@@ -138,8 +138,7 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
if (mostAvailableAmongLowVolumes < replicaSize ||
random.nextFloat() < scaledPreferencePercent) {
volume = roundRobinPolicyHighAvailable.chooseVolume(
- highAvailableVolumes,
- replicaSize);
+ highAvailableVolumes, replicaSize);
if (LOG.isDebugEnabled()) {
LOG.debug("Volumes are imbalanced. Selecting " + volume +
" from high available space volumes for write of block size "
@@ -147,8 +146,7 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
}
} else {
volume = roundRobinPolicyLowAvailable.chooseVolume(
- lowAvailableVolumes,
- replicaSize);
+ lowAvailableVolumes, replicaSize);
if (LOG.isDebugEnabled()) {
LOG.debug("Volumes are imbalanced. Selecting " + volume +
" from low available space volumes for write of block size "
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c569fd20/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 5f39a3d..94de48b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -113,6 +113,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
/** @return a list of finalized blocks for the given block pool. */
public List<FinalizedReplica> getFinalizedBlocks(String bpid);
+ /** @return a list of finalized blocks for the given block pool. */
+ public List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid);
+
/**
* Check whether the in-memory block record matches the block on the disk,
* and, in case that they are not matched, update the record or mark it
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c569fd20/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index af467b9..31a254b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -28,6 +28,7 @@ import java.io.InputStream;
import java.io.RandomAccessFile;
import java.util.Scanner;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DU;
import org.apache.hadoop.fs.FileUtil;
@@ -61,6 +62,7 @@ class BlockPoolSlice {
private final File currentDir; // StorageDirectory/current/bpid/current
// directory where finalized replicas are stored
private final File finalizedDir;
+ private final File lazypersistDir;
private final File rbwDir; // directory store RBW replica
private final File tmpDir; // directory store Temporary replica
private static final String DU_CACHE_FILE = "dfsUsed";
@@ -85,12 +87,24 @@ class BlockPoolSlice {
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
this.finalizedDir = new File(
currentDir, DataStorage.STORAGE_DIR_FINALIZED);
+ this.lazypersistDir = new File(currentDir, DataStorage.STORAGE_DIR_LAZY_PERSIST);
if (!this.finalizedDir.exists()) {
if (!this.finalizedDir.mkdirs()) {
throw new IOException("Failed to mkdirs " + this.finalizedDir);
}
}
+ // Delete all checkpointed replicas on startup.
+ // TODO: We can move checkpointed replicas to the finalized dir and delete
+ // the copy on RAM_DISK. For now we take the simpler approach.
+
+ FileUtil.fullyDelete(lazypersistDir);
+ if (!this.lazypersistDir.exists()) {
+ if (!this.lazypersistDir.mkdirs()) {
+ throw new IOException("Failed to mkdirs " + this.lazypersistDir);
+ }
+ }
+
// Files that were being written when the datanode was last shutdown
// are now moved back to the data directory. It is possible that
// in the future, we might want to do some sort of datanode-local
@@ -136,6 +150,10 @@ class BlockPoolSlice {
return finalizedDir;
}
+ File getLazypersistDir() {
+ return lazypersistDir;
+ }
+
File getRbwDir() {
return rbwDir;
}
@@ -252,12 +270,37 @@ class BlockPoolSlice {
dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
return blockFile;
}
-
+
+ File lazyPersistReplica(Block b, File f) throws IOException {
+ File blockFile = FsDatasetImpl.copyBlockFiles(b, f, lazypersistDir);
+ File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
+ dfsUsage.incDfsUsed(b.getNumBytes() + metaFile.length());
+ return blockFile;
+ }
+
+ /**
+ * Move a persisted replica from lazypersist directory to a subdirectory
+ * under finalized.
+ */
+ File activateSavedReplica(Block b, File blockFile) throws IOException {
+ final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
+ final File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
+ final File targetBlockFile = new File(blockDir, blockFile.getName());
+ final File targetMetaFile = new File(blockDir, metaFile.getName());
+ FileUtils.moveFile(blockFile, targetBlockFile);
+ FsDatasetImpl.LOG.info("Moved " + blockFile + " to " + targetBlockFile);
+ FileUtils.moveFile(metaFile, targetMetaFile);
+ FsDatasetImpl.LOG.info("Moved " + metaFile + " to " + targetMetaFile);
+ return targetBlockFile;
+ }
+
void checkDirs() throws DiskErrorException {
DiskChecker.checkDirs(finalizedDir);
DiskChecker.checkDir(tmpDir);
DiskChecker.checkDir(rbwDir);
}
+
+
void getVolumeMap(ReplicaMap volumeMap) throws IOException {
// add finalized replicas
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c569fd20/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index b875497..8643d6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -27,12 +27,7 @@ import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.Executor;
import javax.management.NotCompliantMBeanException;
@@ -88,6 +83,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -111,7 +107,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
-
@Override // FsDatasetSpi
public List<FsVolumeImpl> getVolumes() {
return volumes.volumes;
@@ -204,11 +199,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final FsVolumeList volumes;
final Map<String, DatanodeStorage> storageMap;
final FsDatasetAsyncDiskService asyncDiskService;
+ final Daemon lazyWriter;
final FsDatasetCache cacheManager;
private final Configuration conf;
private final int validVolsRequired;
+ private volatile boolean fsRunning;
final ReplicaMap volumeMap;
+ final LazyWriteReplicaTracker lazyWriteReplicaTracker;
+
+ private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3;
+
// Used for synchronizing access to usage stats
private final Object statsLock = new Object();
@@ -218,6 +219,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/
FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
) throws IOException {
+ this.fsRunning = true;
this.datanode = datanode;
this.dataStorage = storage;
this.conf = conf;
@@ -248,6 +250,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
storageMap = new HashMap<String, DatanodeStorage>();
volumeMap = new ReplicaMap(this);
+ lazyWriteReplicaTracker = new LazyWriteReplicaTracker(this);
+
@SuppressWarnings("unchecked")
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
ReflectionUtils.newInstance(conf.getClass(
@@ -257,11 +261,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
volumes = new FsVolumeList(volsFailed, blockChooserImpl);
asyncDiskService = new FsDatasetAsyncDiskService(datanode);
+ // TODO: Initialize transientReplicaTracker from blocks on disk.
+
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
addVolume(dataLocations, storage.getStorageDir(idx));
}
cacheManager = new FsDatasetCache(this);
+ lazyWriter = new Daemon(new LazyWriter(
+ conf.getInt(DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
+ DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC)));
+ lazyWriter.start();
registerMBean(datanode.getDatanodeUuid());
}
@@ -531,8 +541,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return new ReplicaInputStreams(blockInFile.getFD(), metaInFile.getFD());
}
- static File moveBlockFiles(Block b, File srcfile, File destdir
- ) throws IOException {
+ static File moveBlockFiles(Block b, File srcfile, File destdir)
+ throws IOException {
final File dstfile = new File(destdir, b.getBlockName());
final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
@@ -555,6 +565,30 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return dstfile;
}
+ static File copyBlockFiles(Block b, File srcfile, File destdir)
+ throws IOException {
+ final File dstfile = new File(destdir, b.getBlockName());
+ final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
+ final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
+ try {
+ FileUtils.copyFile(srcmeta, dstmeta);
+ } catch (IOException e) {
+ throw new IOException("Failed to copy meta file for " + b
+ + " from " + srcmeta + " to " + dstmeta, e);
+ }
+ try {
+ FileUtils.copyFile(srcfile, dstfile);
+ } catch (IOException e) {
+ throw new IOException("Failed to copy block file for " + b
+ + " from " + srcfile + " to " + dstfile.getAbsolutePath(), e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta
+ + " and " + srcfile + " to " + dstfile);
+ }
+ return dstfile;
+ }
+
static private void truncateBlock(File blockFile, File metaFile,
long oldlen, long newlen) throws IOException {
LOG.info("truncateBlock: blockFile=" + blockFile
@@ -817,6 +851,83 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
+ /**
+ * Attempt to evict one or more transient block replicas we have at least
+ * spaceNeeded bytes free.
+ *
+ * @return true if we were able to free up at least spaceNeeded bytes, false
+ * otherwise.
+ */
+ private boolean tryToEvictBlocks(final String bpid, final long spaceNeeded)
+ throws IOException {
+
+ boolean isAvailable = false;
+
+ LOG.info("Attempting to evict blocks from transient storage");
+
+ // Reverse the map so we can iterate in order of replica creation times,
+ // evicting oldest replicas one at a time until we have sufficient space.
+ TreeMultimap<Long, LazyWriteReplicaTracker.ReplicaState> lruMap =
+ lazyWriteReplicaTracker.getLruMap();
+ int blocksEvicted = 0;
+
+ // TODO: It is really inefficient to do this with the Object lock held!
+ // TODO: This logic is here just for prototyping.
+ // TODO: We should replace it with proactive discard when ram_disk free space
+ // TODO: falls below a low watermark. That way we avoid fs operations on the
+ // TODO: hot path with the lock held.
+ synchronized (this) {
+ long currentTime = System.currentTimeMillis() / 1000;
+ for (Map.Entry<Long, LazyWriteReplicaTracker.ReplicaState> entry : lruMap.entries()) {
+ LazyWriteReplicaTracker.ReplicaState lazyWriteReplica = entry.getValue();
+ LOG.info("RAM_DISK: Evicting blockId=" + lazyWriteReplica.blockId +
+ "; block LMT=" + entry.getKey() +
+ "; currentTime=" + currentTime);
+ ReplicaInfo replicaInfo = getReplicaInfo(bpid, lazyWriteReplica.blockId);
+ Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
+ File blockFile = replicaInfo.getBlockFile();
+ File metaFile = replicaInfo.getMetaFile();
+ long used = blockFile.length() + metaFile.length();
+ lazyWriteReplicaTracker.discardReplica(bpid, entry.getValue().blockId, false);
+
+ // Move the persisted replica to the finalized directory of
+ // the target volume.
+ BlockPoolSlice bpSlice =
+ lazyWriteReplica.lazyPersistVolume.getBlockPoolSlice(bpid);
+ File newBlockFile = bpSlice.activateSavedReplica(
+ replicaInfo, lazyWriteReplica.savedBlockFile);
+
+ ReplicaInfo newReplicaInfo =
+ new FinalizedReplica(replicaInfo.getBlockId(),
+ replicaInfo.getBytesOnDisk(),
+ replicaInfo.getGenerationStamp(),
+ lazyWriteReplica.lazyPersistVolume,
+ newBlockFile.getParentFile());
+
+ // Update the volumeMap entry. This removes the old entry.
+ volumeMap.add(bpid, newReplicaInfo);
+
+ // Remove the old replicas.
+ blockFile.delete();
+ metaFile.delete();
+ ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, used);
+ ++blocksEvicted;
+
+ if (replicaInfo.getVolume().getAvailable() > spaceNeeded) {
+ LOG.info("RAM_DISK: freed up " + spaceNeeded + " bytes for new block");
+ isAvailable = true;
+ break;
+ }
+
+ if (blocksEvicted == MAX_BLOCK_EVICTIONS_PER_ITERATION) {
+ break;
+ }
+ }
+ }
+
+ return isAvailable;
+ }
+
@Override // FsDatasetSpi
public synchronized ReplicaInPipeline createRbw(StorageType storageType,
ExtendedBlock b, boolean allowLazyPersist) throws IOException {
@@ -839,7 +950,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
} catch (DiskOutOfSpaceException de) {
if (allowLazyPersist) {
- allowLazyPersist = false;
+ if (!tryToEvictBlocks(b.getBlockPoolId(), b.getNumBytes())) {
+ // Eviction did not work, we'll just fallback to DEFAULT storage.
+ LOG.info("RAM_DISK: Failed to free up " + b.getNumBytes() +
+ " bytes for new block. Will fallback to DEFAULT " +
+ "storage");
+ allowLazyPersist = false;
+ }
continue;
}
throw de;
@@ -851,6 +968,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
b.getGenerationStamp(), v, f.getParentFile());
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
+
return newReplicaInfo;
}
@@ -988,7 +1106,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
b.getGenerationStamp(), v, f.getParentFile());
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
-
return newReplicaInfo;
}
@@ -1054,8 +1171,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
File dest = v.addBlock(bpid, replicaInfo, f);
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
+
+ if (v.isTransientStorage()) {
+ lazyWriteReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
+
+ // Schedule a checkpoint.
+ ((LazyWriter) lazyWriter.getRunnable())
+ .addReplicaToLazyWriteQueue(bpid, replicaInfo.getBlockId());
+ }
}
volumeMap.add(bpid, newReplicaInfo);
+
return newReplicaInfo;
}
@@ -1075,6 +1201,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
replicaInfo.getMetaFile(), b.getLocalBlock())) {
LOG.warn("Block " + b + " unfinalized and removed. " );
}
+ if (replicaInfo.getVolume().isTransientStorage()) {
+ lazyWriteReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true);
+ }
}
}
@@ -1171,6 +1300,22 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
/**
+ * Get the list of finalized blocks from in-memory blockmap for a block pool.
+ */
+ @Override
+ public synchronized List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) {
+ ArrayList<FinalizedReplica> finalized =
+ new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
+ for (ReplicaInfo b : volumeMap.replicas(bpid)) {
+ if(!b.getVolume().isTransientStorage() &&
+ b.getState() == ReplicaState.FINALIZED) {
+ finalized.add(new FinalizedReplica((FinalizedReplica)b));
+ }
+ }
+ return finalized;
+ }
+
+ /**
* Check whether the given block is a valid one.
* valid means finalized
*/
@@ -1287,6 +1432,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
volumeMap.remove(bpid, invalidBlks[i]);
}
+ if (v.isTransientStorage()) {
+ lazyWriteReplicaTracker.discardReplica(bpid, invalidBlks[i].getBlockId(), true);
+ }
+
// If a DFSClient has the replica in its cache of short-circuit file
// descriptors (and the client is using ShortCircuitShm), invalidate it.
datanode.getShortCircuitRegistry().processBlockInvalidation(
@@ -1482,8 +1631,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public void shutdown() {
- if (mbeanName != null)
+ fsRunning = false;
+
+ ((LazyWriter) lazyWriter.getRunnable()).stop();
+ lazyWriter.interrupt();
+
+ if (mbeanName != null) {
MBeans.unregister(mbeanName);
+ }
if (asyncDiskService != null) {
asyncDiskService.shutdown();
@@ -1492,6 +1647,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if(volumes != null) {
volumes.shutdown();
}
+
+ try {
+ lazyWriter.join();
+ } catch (InterruptedException ie) {
+ LOG.warn("FsDatasetImpl.shutdown ignoring InterruptedException " +
+ "from LazyWriter.join");
+ }
}
@Override // FSDatasetMBean
@@ -1524,7 +1686,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/
@Override
public void checkAndUpdate(String bpid, long blockId, File diskFile,
- File diskMetaFile, FsVolumeSpi vol) {
+ File diskMetaFile, FsVolumeSpi vol) throws IOException {
Block corruptBlock = null;
ReplicaInfo memBlockInfo;
synchronized (this) {
@@ -1557,6 +1719,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (blockScanner != null) {
blockScanner.deleteBlock(bpid, new Block(blockId));
}
+ if (vol.isTransientStorage()) {
+ lazyWriteReplicaTracker.discardReplica(bpid, blockId, true);
+ }
LOG.warn("Removed block " + blockId
+ " from memory with missing block file on the disk");
// Finally remove the metadata file
@@ -1580,6 +1745,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (blockScanner != null) {
blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
}
+ if (vol.isTransientStorage()) {
+ lazyWriteReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
+ }
LOG.warn("Added missing block to memory " + diskBlockInfo);
return;
}
@@ -1757,9 +1925,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final String bpid = oldBlock.getBlockPoolId();
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
LOG.info("updateReplica: " + oldBlock
- + ", recoveryId=" + recoveryId
- + ", length=" + newlength
- + ", replica=" + replica);
+ + ", recoveryId=" + recoveryId
+ + ", length=" + newlength
+ + ", replica=" + replica);
//check replica
if (replica == null) {
@@ -2019,5 +2187,123 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, fd, offset,
nbytes, flags);
}
+
+ private static class BlockIdPair {
+ final String bpid;
+ final long blockId;
+
+ BlockIdPair(final String bpid, final long blockId) {
+ this.bpid = bpid;
+ this.blockId = blockId;
+ }
+ }
+
+ private class LazyWriter implements Runnable {
+ private volatile boolean shouldRun = true;
+ final int checkpointerInterval;
+
+ final private Queue<BlockIdPair> blocksPendingCheckpoint;
+
+ public LazyWriter(final int checkpointerInterval) {
+ this.checkpointerInterval = checkpointerInterval;
+ blocksPendingCheckpoint = new LinkedList<BlockIdPair>();
+ }
+
+ // Schedule a replica for writing to persistent storage.
+ public synchronized void addReplicaToLazyWriteQueue(
+ String bpid, long blockId) {
+ LOG.info("Block with blockId=" + blockId + "; bpid=" + bpid + " added to lazy writer queue");
+ blocksPendingCheckpoint.add(new BlockIdPair(bpid, blockId));
+ }
+
+ private void moveReplicaToNewVolume(String bpid, long blockId)
+ throws IOException {
+
+ LOG.info("LazyWriter invoked to save blockId=" + blockId + "; bpid=" + bpid);
+
+ FsVolumeImpl targetVolume = null;
+ Block block = null;
+ File blockFile = null;
+
+ synchronized (this) {
+ block = getStoredBlock(bpid, blockId);
+ blockFile = getFile(bpid, blockId);
+
+ if (block == null) {
+ // The block was deleted before it could be checkpointed.
+ return;
+ }
+
+ // Pick a target volume for the block.
+ targetVolume = volumes.getNextVolume(
+ StorageType.DEFAULT, block.getNumBytes());
+ }
+
+ LOG.info("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
+ lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
+ File savedBlockFile = targetVolume.getBlockPoolSlice(bpid)
+ .lazyPersistReplica(block, blockFile);
+ lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedBlockFile);
+ LOG.info("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
+ " to file " + savedBlockFile);
+ }
+
+ /**
+ * Checkpoint a pending replica to persistent storage now.
+ * @return true if there is more work to be done, false otherwise.
+ */
+ private boolean saveNextReplica() {
+ BlockIdPair blockIdPair = null;
+ int moreWorkThreshold = 0;
+
+ try {
+ synchronized (this) {
+ // Dequeue the next replica waiting to be checkpointed.
+ blockIdPair = blocksPendingCheckpoint.poll();
+ if (blockIdPair == null) {
+ LOG.info("LazyWriter has no blocks to persist. " +
+ "Thread going to sleep.");
+ return false;
+ }
+ }
+
+ // Move the replica outside the lock.
+ moveReplicaToNewVolume(blockIdPair.bpid, blockIdPair.blockId);
+
+ } catch(IOException ioe) {
+ // If we failed, put the block on the queue and let a retry
+ // interval elapse before we try again so we don't try to keep
+ // checkpointing the same block in a tight loop.
+ synchronized (this) {
+ blocksPendingCheckpoint.add(blockIdPair);
+ ++moreWorkThreshold;
+ }
+ }
+
+ synchronized (this) {
+ return blocksPendingCheckpoint.size() > moreWorkThreshold;
+ }
+ }
+
+ @Override
+ public void run() {
+ while (fsRunning && shouldRun) {
+ try {
+ if (!saveNextReplica()) {
+ Thread.sleep(checkpointerInterval * 1000);
+ }
+ } catch (InterruptedException e) {
+ LOG.info("LazyWriter was interrupted, exiting");
+ break;
+ } catch (Exception e) {
+ LOG.error("Ignoring exception in LazyWriter:", e);
+ }
+ }
+ }
+
+ public void stop() {
+ shouldRun = false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c569fd20/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index e4b0f2b..d3c585d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -328,6 +328,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
File finalizedDir = new File(bpCurrentDir,
DataStorage.STORAGE_DIR_FINALIZED);
+ File lazypersistDir = new File(bpCurrentDir,
+ DataStorage.STORAGE_DIR_LAZY_PERSIST);
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
if (force) {
FileUtil.fullyDelete(bpDir);
@@ -339,6 +341,10 @@ public class FsVolumeImpl implements FsVolumeSpi {
!FileUtil.fullyDelete(finalizedDir)) {
throw new IOException("Failed to delete " + finalizedDir);
}
+ if (!DatanodeUtil.dirNoFilesRecursive(lazypersistDir) ||
+ !FileUtil.fullyDelete(lazypersistDir)) {
+ throw new IOException("Failed to delete " + lazypersistDir);
+ }
FileUtil.fullyDelete(tmpDir);
for (File f : FileUtil.listFiles(bpCurrentDir)) {
if (!f.delete()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c569fd20/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
new file mode 100644
index 0000000..ae28f09
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+
+import com.google.common.collect.Multimap;
+import com.google.common.collect.TreeMultimap;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+class LazyWriteReplicaTracker {
+
+ enum State {
+ IN_MEMORY,
+ LAZY_PERSIST_IN_PROGRESS,
+ LAZY_PERSIST_COMPLETE,
+ }
+
+ static class ReplicaState implements Comparable<ReplicaState> {
+
+ final String bpid;
+ final long blockId;
+ State state;
+
+ /**
+ * transient storage volume that holds the original replica.
+ */
+ final FsVolumeImpl transientVolume;
+
+ /**
+ * Persistent volume that holds or will hold the saved replica.
+ */
+ FsVolumeImpl lazyPersistVolume;
+ File savedBlockFile;
+
+ ReplicaState(final String bpid, final long blockId, FsVolumeImpl transientVolume) {
+ this.bpid = bpid;
+ this.blockId = blockId;
+ this.transientVolume = transientVolume;
+ state = State.IN_MEMORY;
+ lazyPersistVolume = null;
+ savedBlockFile = null;
+ }
+
+ @Override
+ public int hashCode() {
+ return bpid.hashCode() ^ (int) blockId;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ ReplicaState otherState = (ReplicaState) other;
+ return (otherState.bpid.equals(bpid) && otherState.blockId == blockId);
+ }
+
+ @Override
+ public int compareTo(ReplicaState other) {
+ if (blockId == other.blockId) {
+ return 0;
+ } else if (blockId < other.blockId) {
+ return -1;
+ } else {
+ return 1;
+ }
+ }
+ }
+
+ final FsDatasetImpl fsDataset;
+
+ /**
+ * Map of blockpool ID to map of blockID to ReplicaInfo.
+ */
+ final Map<String, Map<Long, ReplicaState>> replicaMaps;
+
+ /**
+ * A map of blockId to persist complete time for transient blocks. This allows
+ * us to evict LRU blocks from transient storage. Protected by 'this'
+ * Object lock.
+ */
+ final Map<ReplicaState, Long> persistTimeMap;
+
+ LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) {
+ this.fsDataset = fsDataset;
+ replicaMaps = new HashMap<String, Map<Long, ReplicaState>>();
+ persistTimeMap = new HashMap<ReplicaState, Long>();
+ }
+
+ TreeMultimap<Long, ReplicaState> getLruMap() {
+ // TODO: This can be made more efficient.
+ TreeMultimap<Long, ReplicaState> reversedMap = TreeMultimap.create();
+ for (Map.Entry<ReplicaState, Long> entry : persistTimeMap.entrySet()) {
+ reversedMap.put(entry.getValue(), entry.getKey());
+ }
+ return reversedMap;
+ }
+
+ synchronized void addReplica(String bpid, long blockId,
+ final FsVolumeImpl transientVolume) {
+ Map<Long, ReplicaState> map = replicaMaps.get(bpid);
+ if (map == null) {
+ map = new HashMap<Long, ReplicaState>();
+ replicaMaps.put(bpid, map);
+ }
+ map.put(blockId, new ReplicaState(bpid, blockId, transientVolume));
+ }
+
+ synchronized void recordStartLazyPersist(
+ final String bpid, final long blockId, FsVolumeImpl checkpointVolume) {
+ Map<Long, ReplicaState> map = replicaMaps.get(bpid);
+ ReplicaState replicaState = map.get(blockId);
+ replicaState.state = State.LAZY_PERSIST_IN_PROGRESS;
+ replicaState.lazyPersistVolume = checkpointVolume;
+ }
+
+ synchronized void recordEndLazyPersist(
+ final String bpid, final long blockId, File savedBlockFile) {
+ Map<Long, ReplicaState> map = replicaMaps.get(bpid);
+ ReplicaState replicaState = map.get(blockId);
+
+ if (replicaState == null) {
+ throw new IllegalStateException("Unknown replica bpid=" +
+ bpid + "; blockId=" + blockId);
+ }
+ replicaState.state = State.LAZY_PERSIST_COMPLETE;
+ replicaState.savedBlockFile = savedBlockFile;
+ persistTimeMap.put(replicaState, System.currentTimeMillis() / 1000);
+ }
+
+ synchronized void discardReplica(
+ final String bpid, final long blockId, boolean force) {
+ Map<Long, ReplicaState> map = replicaMaps.get(bpid);
+ ReplicaState replicaState = map.get(blockId);
+
+ if (replicaState == null) {
+ if (force) {
+ return;
+ }
+ throw new IllegalStateException("Unknown replica bpid=" +
+ bpid + "; blockId=" + blockId);
+ }
+
+ if (replicaState.state != State.LAZY_PERSIST_COMPLETE && !force) {
+ throw new IllegalStateException("Discarding replica without " +
+ "saving it to disk bpid=" + bpid + "; blockId=" + blockId);
+
+ }
+
+ map.remove(blockId);
+ persistTimeMap.remove(replicaState);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c569fd20/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index f1b570d..6183ba1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1112,6 +1112,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
+ public List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public Map<String, Object> getVolumeInfoMap() {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c569fd20/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
index 5153e76..db13791 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;