You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2015/06/20 23:14:25 UTC
hadoop git commit: HDFS-8457. Ozone: Refactor FsDatasetSpi to pull up
HDFS-agnostic functionality into parent interface. (Contributed by Arpit
Agarwal)
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7240 a229f69ac -> 197b8fb0f
HDFS-8457. Ozone: Refactor FsDatasetSpi to pull up HDFS-agnostic functionality into parent interface. (Contributed by Arpit Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/197b8fb0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/197b8fb0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/197b8fb0
Branch: refs/heads/HDFS-7240
Commit: 197b8fb0fb2688269cf17b7db4628f00e02091a0
Parents: a229f69
Author: Arpit Agarwal <ar...@apache.org>
Authored: Sat Jun 20 14:11:09 2015 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Sat Jun 20 14:11:09 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-7240.txt | 3 +
.../hdfs/server/datanode/BPOfferService.java | 6 +-
.../hdfs/server/datanode/BPServiceActor.java | 19 +-
.../hdfs/server/datanode/BlockSender.java | 2 +-
.../hadoop/hdfs/server/datanode/DataNode.java | 151 +++++++-----
.../hdfs/server/datanode/DataXceiver.java | 16 +-
.../hdfs/server/datanode/DirectoryScanner.java | 10 +-
.../hdfs/server/datanode/VolumeScanner.java | 3 +-
.../server/datanode/fsdataset/DatasetSpi.java | 232 +++++++++++++++++++
.../server/datanode/fsdataset/FsDatasetSpi.java | 219 ++---------------
.../server/datanode/fsdataset/FsVolumeSpi.java | 29 +--
.../server/datanode/fsdataset/VolumeSpi.java | 72 ++++++
.../fsdataset/impl/FsDatasetFactory.java | 24 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 5 +
.../hdfs/TestWriteBlockGetsBlockLengthHint.java | 2 +-
.../server/datanode/SimulatedFSDataset.java | 12 +-
.../server/datanode/TestBPOfferService.java | 6 +-
.../datanode/TestDataNodeInitStorage.java | 2 +-
.../server/datanode/TestSimulatedFSDataset.java | 4 +-
.../extdataset/ExternalDatasetImpl.java | 5 +
20 files changed, 488 insertions(+), 334 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
index a259170..edd7637 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
@@ -10,3 +10,6 @@
HDFS-8641. OzoneHandler : Add Quota Support. (Anu Engineer via
Arpit Agarwal)
+ HDFS-8457. Ozone: Refactor FsDatasetSpi to pull up HDFS-agnostic
+ functionality into parent interface. (Arpit Agarwal)
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 092a8f8..0392a2f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.DatasetSpi;
import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
@@ -71,7 +71,7 @@ class BPOfferService {
private final DataNode dn;
- private FsDatasetSpi<?> dataset = null;
+ private DatasetSpi<?> dataset = null;
/**
* A reference to the BPServiceActor associated with the currently
@@ -306,7 +306,7 @@ class BPOfferService {
* verifies that this namespace matches (eg to prevent a misconfiguration
* where a StandbyNode from a different cluster is specified)
*/
- FsDatasetSpi<?> verifyAndSetNamespaceInfo(NamespaceInfo nsInfo)
+ DatasetSpi<?> verifyAndSetNamespaceInfo(NamespaceInfo nsInfo)
throws IOException {
writeLock();
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index c906636..361ba76 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -45,7 +45,9 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.DatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -113,7 +115,7 @@ class BPServiceActor implements Runnable {
private volatile boolean sendImmediateIBR = false;
private volatile boolean shouldServiceRun = true;
private final DataNode dn;
- private FsDatasetSpi<?> dataset = null;
+ private DatasetSpi<? extends VolumeSpi> dataset = null;
private final DNConf dnConf;
private long prevBlockReportId;
@@ -225,7 +227,7 @@ class BPServiceActor implements Runnable {
// This also initializes our block pool in the DN if we are
// the first NN connection for this BP.
dataset = bpos.verifyAndSetNamespaceInfo(nsInfo);
-
+
// Second phase of the handshake with the NN.
register(nsInfo);
}
@@ -508,8 +510,7 @@ class BPServiceActor implements Runnable {
}
DatanodeCommand cacheReport() throws IOException {
- // If caching is disabled, do not send a cache report
- if (dataset.getCacheCapacity() == 0) {
+ if (!dataset.isCachingSupported()) {
return null;
}
// send cache report if timer has expired.
@@ -552,10 +553,14 @@ class BPServiceActor implements Runnable {
dataset.getVolumeFailureSummary();
int numFailedVolumes = volumeFailureSummary != null ?
volumeFailureSummary.getFailedStorageLocations().length : 0;
+
+ FSDatasetMBean mbean = (dataset instanceof FSDatasetMBean) ?
+ ((FSDatasetMBean) dataset) : null;
+
return bpNamenode.sendHeartbeat(bpRegistration,
reports,
- dataset.getCacheCapacity(),
- dataset.getCacheUsed(),
+ mbean != null ? mbean.getCacheCapacity() : 0,
+ mbean != null ? mbean.getCacheUsed() : 0,
dn.getXmitsInProgress(),
dn.getXceiverCount(),
numFailedVolumes,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 5f4ea10..2c8f5d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -456,7 +456,7 @@ class BlockSender implements java.io.Closeable {
private static Replica getReplica(ExtendedBlock block, DataNode datanode)
throws ReplicaNotFoundException {
final FsDatasetSpi<?> dataset =
- datanode.getFSDataset(block.getBlockPoolId());
+ (FsDatasetSpi<?>) datanode.getDataset(block.getBlockPoolId());
Replica replica =
dataset.getReplica(block.getBlockPoolId(), block.getBlockId());
if (replica == null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 7dba2af..35b97af 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -152,8 +152,10 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.DatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -284,20 +286,18 @@ public class DataNode extends ReconfigurableBase
private boolean shutdownInProgress = false;
private BlockPoolManager blockPoolManager;
- private final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> datasetFactory;
+ private final DatasetSpi.Factory datasetFactory;
// This is an onto (many-one) mapping. Multiple block pool IDs may share
// the same dataset.
private volatile Map<String,
- FsDatasetSpi<? extends FsVolumeSpi>> datasetsMap =
+ DatasetSpi<? extends VolumeSpi>> datasetsMap =
new ConcurrentHashMap<>();
- // Hash set of datasets, used to avoid having to deduplicate the values of datasetsMap
- // every time we need to iterate over all datasets.
- private volatile Set<FsDatasetSpi<? extends FsVolumeSpi>> datasets =
- Collections.newSetFromMap(
- new ConcurrentHashMap<FsDatasetSpi<? extends FsVolumeSpi>,
- Boolean>());
+ // Hash set of datasets, used to avoid having to deduplicate the
+ // values of datasetsMap every time we need to iterate over all datasets.
+ private volatile Map<NodeType, DatasetSpi<? extends VolumeSpi>> datasets =
+ new ConcurrentHashMap<>();
private String clusterId = null;
@@ -340,7 +340,7 @@ public class DataNode extends ReconfigurableBase
private boolean hasAnyBlockPoolRegistered = false;
private final BlockScanner blockScanner;
- private Map<FsDatasetSpi<?>, DirectoryScanner> directoryScannersMap =
+ private Map<DatasetSpi<?>, DirectoryScanner> directoryScannersMap =
new ConcurrentHashMap<>();
/** Activated plug-ins. */
@@ -618,7 +618,7 @@ public class DataNode extends ReconfigurableBase
@Override
public IOException call() {
try {
- for (FsDatasetSpi<?> dataset : datasets) {
+ for (DatasetSpi<?> dataset : datasets.values()) {
dataset.addVolume(location, nsInfos);
}
} catch (IOException e) {
@@ -723,7 +723,7 @@ public class DataNode extends ReconfigurableBase
IOException ioe = null;
// Remove volumes and block infos from FsDataset.
- for (final FsDatasetSpi<?> dataset : datasets) {
+ for (final DatasetSpi<?> dataset : datasets.values()) {
dataset.removeVolumes(absoluteVolumePaths, clearFailure);
}
@@ -896,7 +896,7 @@ public class DataNode extends ReconfigurableBase
* See {@link DirectoryScanner}
*/
private synchronized void initDirectoryScanners(Configuration conf) {
- for (FsDatasetSpi<?> dataset : datasets) {
+ for (DatasetSpi<?> dataset : datasets.values()) {
if (directoryScannersMap.get(dataset) != null) {
continue;
}
@@ -1029,7 +1029,7 @@ public class DataNode extends ReconfigurableBase
*/
public void reportBadBlocks(ExtendedBlock block) throws IOException{
BPOfferService bpos = getBPOSForBlock(block);
- FsVolumeSpi volume = getFSDataset(block.getBlockPoolId()).getVolume(block);
+ VolumeSpi volume = getDataset(block.getBlockPoolId()).getVolume(block);
bpos.reportBadBlocks(
block, volume.getStorageID(), volume.getStorageType());
}
@@ -1344,7 +1344,7 @@ public class DataNode extends ReconfigurableBase
blockScanner.disableBlockPoolId(bpId);
- FsDatasetSpi<?> dataset = getFSDataset(bpId);
+ DatasetSpi<?> dataset = getDataset(bpId);
if (dataset != null) {
dataset.shutdownBlockPool(bpId);
}
@@ -1367,7 +1367,7 @@ public class DataNode extends ReconfigurableBase
* @param bpos Block pool offer service
* @throws IOException if the NN is inconsistent with the local storage.
*/
- FsDatasetSpi<?> initBlockPool(BPOfferService bpos) throws IOException {
+ DatasetSpi<?> initBlockPool(BPOfferService bpos) throws IOException {
NamespaceInfo nsInfo = bpos.getNamespaceInfo();
if (nsInfo == null) {
throw new IOException("NamespaceInfo not found: Block pool " + bpos
@@ -1381,14 +1381,15 @@ public class DataNode extends ReconfigurableBase
// In the case that this is the first block pool to connect, initialize
// the dataset, block scanners, etc.
- FsDatasetSpi<?> dataset = initStorage(bpos.getBlockPoolId(), nsInfo);
+ DatasetSpi<?> dataset = initStorage(bpos.getBlockPoolId(), nsInfo);
// Exclude failed disks before initializing the block pools to avoid startup
// failures.
- checkDiskError(getFSDataset(nsInfo.getBlockPoolID()));
+ checkDiskError(getDataset(nsInfo.getBlockPoolID()));
initDirectoryScanners(conf);
- dataset.addBlockPool(nsInfo.getBlockPoolID(), conf);
+ getDataset(nsInfo.getBlockPoolID()).addBlockPool(
+ nsInfo.getBlockPoolID(), conf);
blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
return dataset;
}
@@ -1405,7 +1406,7 @@ public class DataNode extends ReconfigurableBase
* Initializes the {@link #data}. The initialization is done only once, when
* handshake with the the first namenode is completed.
*/
- private FsDatasetSpi<?> initStorage(
+ private DatasetSpi<?> initStorage(
final String blockPoolId, final NamespaceInfo nsInfo) throws IOException {
if (!datasetFactory.isSimulated()) {
final StartupOption startOpt = getStartupOption(conf);
@@ -1562,14 +1563,18 @@ public class DataNode extends ReconfigurableBase
return maxNumberOfBlocksToLog;
}
+ /**
+ * Only valid for blocks stored by FsDatasetSpi instances.
+ */
@Override
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
Token<BlockTokenIdentifier> token) throws IOException {
checkBlockLocalPathAccess();
checkBlockToken(block, token, BlockTokenIdentifier.AccessMode.READ);
- FsDatasetSpi<?> dataset = getFSDataset(block.getBlockPoolId());
+ DatasetSpi<?> dataset = getDataset(block.getBlockPoolId());
Preconditions.checkNotNull(dataset, "Storage not yet initialized");
- BlockLocalPathInfo info = dataset.getBlockLocalPathInfo(block);
+ BlockLocalPathInfo info =
+ ((FsDatasetSpi<?>) dataset).getBlockLocalPathInfo(block);
if (LOG.isDebugEnabled()) {
if (info != null) {
if (LOG.isTraceEnabled()) {
@@ -1604,6 +1609,9 @@ public class DataNode extends ReconfigurableBase
}
}
+ /**
+ * Only valid for blocks stored by FsDatasetSpi instances.
+ */
FileInputStream[] requestShortCircuitFdsForRead(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> token, int maxVersion)
throws ShortCircuitFdsUnsupportedException,
@@ -1624,7 +1632,8 @@ public class DataNode extends ReconfigurableBase
FileInputStream fis[] = new FileInputStream[2];
try {
- final FsDatasetSpi<?> dataset = getFSDataset(blk.getBlockPoolId());
+ final FsDatasetSpi<?> dataset =
+ (FsDatasetSpi<?>) getDataset(blk.getBlockPoolId());
Preconditions.checkNotNull(dataset, "Storage not yet initialized");
fis[0] = (FileInputStream) dataset.getBlockInputStream(blk, 0);
fis[1] = DatanodeUtil.getMetaDataInputStream(blk, dataset);
@@ -1636,6 +1645,9 @@ public class DataNode extends ReconfigurableBase
return fis;
}
+ /**
+ * Only valid for blocks stored by FsDatasetSpi instances.
+ */
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(
String bpId, long[] blockIds,
@@ -1656,7 +1668,7 @@ public class DataNode extends ReconfigurableBase
DataNodeFaultInjector.get().getHdfsBlocksMetadata();
- final FsDatasetSpi<?> dataset = getFSDataset(bpId);
+ final FsDatasetSpi<?> dataset = (FsDatasetSpi<?>) getDataset(bpId);
Preconditions.checkNotNull(dataset, "Storage not yet initialized");
return dataset.getHdfsBlocksMetadata(bpId, blockIds);
}
@@ -1813,7 +1825,7 @@ public class DataNode extends ReconfigurableBase
LOG.warn("Exception when unlocking storage: " + ie, ie);
}
}
- for (FsDatasetSpi<?> dataset : datasets) {
+ for (DatasetSpi<?> dataset : datasets.values()) {
dataset.shutdown();
}
if (metrics != null) {
@@ -1851,7 +1863,7 @@ public class DataNode extends ReconfigurableBase
}
}
- private void handleDiskError(final FsDatasetSpi<?> dataset,
+ private void handleDiskError(final DatasetSpi<?> dataset,
final String errMsgr) {
final boolean hasEnoughResources = dataset.hasEnoughResource();
LOG.warn("DataNode.handleDiskError: Keep Running: " + hasEnoughResources);
@@ -1912,12 +1924,15 @@ public class DataNode extends ReconfigurableBase
private void reportBadBlock(final BPOfferService bpos,
final ExtendedBlock block, final String msg) {
- FsVolumeSpi volume = getFSDataset(block.getBlockPoolId()).getVolume(block);
+ VolumeSpi volume = getDataset(block.getBlockPoolId()).getVolume(block);
bpos.reportBadBlocks(
block, volume.getStorageID(), volume.getStorageType());
LOG.warn(msg);
}
+ /**
+ * Only valid for blocks stored by FsDatasetSpi instances.
+ */
private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
StorageType[] xferTargetStorageTypes) throws IOException {
BPOfferService bpos = getBPOSForBlock(block);
@@ -1927,7 +1942,9 @@ public class DataNode extends ReconfigurableBase
boolean replicaStateNotFinalized = false;
boolean blockFileNotExist = false;
boolean lengthTooShort = false;
- final FsDatasetSpi<?> dataset = getFSDataset(block.getBlockPoolId());
+
+ final FsDatasetSpi<?> dataset =
+ (FsDatasetSpi<?>) getDataset(block.getBlockPoolId());
Preconditions.checkNotNull(dataset, "Storage not yet initialized");
try {
@@ -1964,10 +1981,8 @@ public class DataNode extends ReconfigurableBase
// Shorter on-disk len indicates corruption so report NN
// the corrupt block
reportBadBlock(bpos, block, "Can't replicate block " + block
- + " because on-disk length "
- + getFSDataset(block.getBlockPoolId()).getLength(block)
- + " is shorter than NameNode recorded length "
- + block.getNumBytes());
+ + " because on-disk length " + dataset.getLength(block)
+ + " is shorter than NameNode recorded length " + block.getNumBytes());
return;
}
@@ -2174,7 +2189,7 @@ public class DataNode extends ReconfigurableBase
DFSUtil.getSmallBufferSize(conf)));
in = new DataInputStream(unbufIn);
blockSender = new BlockSender(b, 0, b.getNumBytes(),
- false, false, true, DataNode.this, getFSDataset(b.getBlockPoolId()),
+ false, false, true, DataNode.this, data,
null, cachingStrategy);
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
@@ -2532,17 +2547,22 @@ public class DataNode extends ReconfigurableBase
* @return
* @throws IOException
*/
- private FsDatasetSpi<?> allocateFsDataset(
+ private synchronized DatasetSpi<?> allocateFsDataset(
final String bpid, final NodeType serviceType) throws IOException {
- FsDatasetSpi<?> dataset =
- datasetFactory.newInstance(this, storage, conf, serviceType);
- datasets.add(dataset);
+
+ DatasetSpi<?> dataset = datasets.get(serviceType);
+ if (dataset != null) {
+ datasetsMap.put(bpid, dataset);
+ return dataset;
+ }
+
+ dataset = datasetFactory.newInstance(this, storage, conf, serviceType);
+ datasets.put(serviceType, dataset);
datasetsMap.put(bpid, dataset);
if (serviceType == NodeType.NAME_NODE) {
// 'data' is retained for existing mock-based HDFS unit tests.
- Preconditions.checkState(data == null || data == dataset);
- data = dataset;
+ data = (FsDatasetSpi<? extends FsVolumeSpi>) dataset;
}
return dataset;
@@ -2555,22 +2575,17 @@ public class DataNode extends ReconfigurableBase
* @return the fsdataset that stores the blocks
*/
@VisibleForTesting
- public FsDatasetSpi<?> getFSDataset(final String bpid) {
+ public DatasetSpi<?> getDataset(final String bpid) {
return datasetsMap.get(bpid);
}
- @VisibleForTesting
- public Set<FsDatasetSpi<?>> getFSDatasets() {
- return datasets;
- }
-
/**
* Do NOT use this method outside of tests.
* Retained for compatibility with existing tests and subject to removal.
*
* @return the fsdataset that stores the blocks
*/
- @VisibleForTesting
+ @Deprecated
public FsDatasetSpi<?> getFSDataset() {
Preconditions.checkState(datasets.size() <= 1,
"Did not expect more than one Dataset here.");
@@ -2578,7 +2593,7 @@ public class DataNode extends ReconfigurableBase
if (datasets.size() == 0) {
return null;
}
- return (FsDatasetSpi<?>) datasets.iterator().next();
+ return (FsDatasetSpi<?>) datasets.values().iterator().next();
}
@VisibleForTesting
@@ -2593,7 +2608,7 @@ public class DataNode extends ReconfigurableBase
*
* @return
*/
- @VisibleForTesting
+ @Deprecated
DirectoryScanner getDirectoryScanner() {
return directoryScannersMap.get(getFSDataset());
}
@@ -2651,12 +2666,16 @@ public class DataNode extends ReconfigurableBase
return d;
}
- // InterDataNodeProtocol implementation
- @Override // InterDatanodeProtocol
+ /**
+ * InterDatanodeProtocol implementation.
+ *
+ * Only valid for blocks stored by FsDatasetSpi instances.
+ */
+ @Override
public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
throws IOException {
final FsDatasetSpi<?> dataset =
- getFSDataset(rBlock.getBlock().getBlockPoolId());
+ (FsDatasetSpi<?>) getDataset(rBlock.getBlock().getBlockPoolId());
if (dataset != null) {
return dataset.initReplicaRecovery(rBlock);
}
@@ -2684,10 +2703,10 @@ public class DataNode extends ReconfigurableBase
public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock,
final long recoveryId, final long newBlockId, final long newLength)
throws IOException {
- final String storageID =
- getFSDataset(oldBlock.getBlockPoolId())
- .updateReplicaUnderRecovery(oldBlock, recoveryId,
- newBlockId, newLength);
+ final FsDatasetSpi<?> dataset =
+ (FsDatasetSpi<?>) getDataset(oldBlock.getBlockPoolId());
+ final String storageID = dataset.updateReplicaUnderRecovery(
+ oldBlock, recoveryId, newBlockId, newLength);
// Notify the namenode of the updated block info. This is important
// for HA, since otherwise the standby node may lose track of the
// block locations until the next block report.
@@ -2924,10 +2943,14 @@ public class DataNode extends ReconfigurableBase
+ ")");
}
+ /**
+ * Only valid for blocks stored by FsDatasetSpi instances.
+ */
@Override // ClientDataNodeProtocol
public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException {
checkReadAccess(block);
- return getFSDataset(block.getBlockPoolId()).getReplicaVisibleLength(block);
+ return ((FsDatasetSpi<?>) getDataset(block.getBlockPoolId()))
+ .getReplicaVisibleLength(block);
}
private void checkReadAccess(final ExtendedBlock block) throws IOException {
@@ -2951,6 +2974,7 @@ public class DataNode extends ReconfigurableBase
/**
* Transfer a replica to the datanode targets.
+ * Only valid for blocks stored by FsDatasetSpi instances.
* @param b the block to transfer.
* The corresponding replica must be an RBW or a Finalized.
* Its GS and numBytes will be set to
@@ -2964,7 +2988,8 @@ public class DataNode extends ReconfigurableBase
final long storedGS;
final long visible;
final BlockConstructionStage stage;
- final FsDatasetSpi<?> dataset = getFSDataset(b.getBlockPoolId());
+ final FsDatasetSpi<?> dataset =
+ (FsDatasetSpi<?>) getDataset(b.getBlockPoolId());
//get replica information
synchronized(dataset) {
@@ -3090,7 +3115,10 @@ public class DataNode extends ReconfigurableBase
conf = new Configuration();
refreshNamenodes(conf);
}
-
+
+ /**
+ * Only valid for blocks stored by FsDatasetSpi instances.
+ */
@Override // ClientDatanodeProtocol
public void deleteBlockPool(String blockPoolId, boolean force)
throws IOException {
@@ -3104,8 +3132,9 @@ public class DataNode extends ReconfigurableBase
"The block pool is still running. First do a refreshNamenodes to " +
"shutdown the block pool service");
}
-
- getFSDataset(blockPoolId).deleteBlockPool(blockPoolId, force);
+
+ ((FsDatasetSpi<?>) getDataset(blockPoolId))
+ .deleteBlockPool(blockPoolId, force);
}
@Override // ClientDatanodeProtocol
@@ -3260,7 +3289,7 @@ public class DataNode extends ReconfigurableBase
/**
* Check the disk error
*/
- private void checkDiskError(final FsDatasetSpi<?> dataset) {
+ private void checkDiskError(final DatasetSpi<?> dataset) {
Set<File> unhealthyDataDirs = dataset.checkDataDir();
if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) {
try {
@@ -3294,7 +3323,7 @@ public class DataNode extends ReconfigurableBase
}
if(tempFlag) {
try {
- for (final FsDatasetSpi<?> dataset : datasets) {
+ for (final DatasetSpi<?> dataset : datasets.values()) {
checkDiskError(dataset);
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index fbb8897..9702691 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -314,8 +314,8 @@ class DataXceiver extends Receiver implements Runnable {
}
if (slotId != null) {
final String bpid = blk.getBlockPoolId();
- boolean isCached = datanode.getFSDataset(bpid).
- isCached(bpid, blk.getBlockId());
+ FsDatasetSpi<?> dataset = (FsDatasetSpi<?>) datanode.getDataset(bpid);
+ boolean isCached = dataset.isCached(bpid, blk.getBlockId());
datanode.shortCircuitRegistry.registerSlot(
ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached);
registeredSlotId = slotId;
@@ -527,7 +527,7 @@ class DataXceiver extends Receiver implements Runnable {
Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ);
final FsDatasetSpi<?> dataset =
- datanode.getFSDataset(block.getBlockPoolId());
+ (FsDatasetSpi<?>) datanode.getDataset(block.getBlockPoolId());
if (dataset == null) {
throw new IOException(
"Unknown or unitialized blockpool " + block.getBlockPoolId());
@@ -640,7 +640,7 @@ class DataXceiver extends Receiver implements Runnable {
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;
long size = 0;
final FsDatasetSpi<?> dataset =
- datanode.getFSDataset(block.getBlockPoolId());
+ (FsDatasetSpi<?>) datanode.getDataset(block.getBlockPoolId());
if (dataset == null) {
throw new IOException(
"Unknown or unitialized blockpool " + block.getBlockPoolId());
@@ -907,7 +907,7 @@ class DataXceiver extends Receiver implements Runnable {
final byte[] buffer = new byte[4*1024];
MessageDigest digester = MD5Hash.getDigester();
final FsDatasetSpi<?> dataset =
- datanode.getFSDataset(block.getBlockPoolId());
+ (FsDatasetSpi<?>) datanode.getDataset(block.getBlockPoolId());
if (dataset == null) {
throw new IOException(
"Unknown or unitialized blockpool " + block.getBlockPoolId());
@@ -951,7 +951,7 @@ class DataXceiver extends Receiver implements Runnable {
Op.BLOCK_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
// client side now can specify a range of the block for checksum
final FsDatasetSpi<?> dataset =
- datanode.getFSDataset(block.getBlockPoolId());
+ (FsDatasetSpi<?>) datanode.getDataset(block.getBlockPoolId());
if (dataset == null) {
throw new IOException(
"Unknown or unitialized blockpool " + block.getBlockPoolId());
@@ -1015,7 +1015,7 @@ class DataXceiver extends Receiver implements Runnable {
public void copyBlock(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
final FsDatasetSpi<?> dataset =
- datanode.getFSDataset(block.getBlockPoolId());
+ (FsDatasetSpi<?>) datanode.getDataset(block.getBlockPoolId());
if (dataset == null) {
throw new IOException(
"Unknown or unitialized blockpool " + block.getBlockPoolId());
@@ -1109,7 +1109,7 @@ class DataXceiver extends Receiver implements Runnable {
final String delHint,
final DatanodeInfo proxySource) throws IOException {
final FsDatasetSpi<?> dataset =
- datanode.getFSDataset(block.getBlockPoolId());
+ (FsDatasetSpi<?>) datanode.getDataset(block.getBlockPoolId());
if (dataset == null) {
throw new IOException(
"Unknown or unitialized blockpool " + block.getBlockPoolId());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index 3383d0e..cb0a6ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.DatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.util.Daemon;
@@ -309,9 +310,14 @@ public class DirectoryScanner implements Runnable {
}
}
- DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration conf) {
+ DirectoryScanner(DataNode datanode, DatasetSpi<?> dataset,
+ Configuration conf) {
+ if (!(dataset instanceof FsDatasetSpi)) {
+ throw new IllegalArgumentException(
+ "DirectoryScanner not implemented for " + dataset.getClass());
+ }
this.datanode = datanode;
- this.dataset = dataset;
+ this.dataset = (FsDatasetSpi<?>) dataset;
int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT);
scanPeriodMsecs = interval * 1000L; //msec
http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
index 2cc3516..e7c6150 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
@@ -34,6 +34,7 @@ import com.google.common.cache.CacheBuilder;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -434,7 +435,7 @@ public class VolumeScanner extends Thread {
try {
blockSender = new BlockSender(block, 0, -1,
false, true, true, datanode,
- datanode.getFSDataset(block.getBlockPoolId()),
+ (FsDatasetSpi<?>) datanode.getDataset(block.getBlockPoolId()),
null, CachingStrategy.newDropBehind());
throttler.setBandwidth(bytesPerSec);
long bytesRead = blockSender.sendBlock(nullStream, null, throttler);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/DatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/DatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/DatasetSpi.java
new file mode 100644
index 0000000..5cd52db
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/DatasetSpi.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
+import org.apache.hadoop.hdfs.server.protocol.*;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This is a service provider interface for the underlying storage that
+ * stores replicas for a data node.
+ * The default implementation stores replicas on local drives.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface DatasetSpi<V extends VolumeSpi> {
+ /**
+ * A factory for creating {@link FsDatasetSpi} objects.
+ */
+ abstract class Factory {
+ /**
+ * @return the configured factory.
+ */
+ public static Factory getFactory(Configuration conf) {
+ @SuppressWarnings("rawtypes")
+ final Class<? extends Factory> clazz = conf.getClass(
+ DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
+ FsDatasetFactory.class,
+ Factory.class);
+ return ReflectionUtils.newInstance(clazz, conf);
+ }
+
+ /**
+ * Create a new dataset object for a specific service type
+ */
+ public abstract DatasetSpi<? extends VolumeSpi> newInstance(
+ DataNode datanode, DataStorage storage, Configuration conf,
+ HdfsServerConstants.NodeType serviceType) throws IOException;
+
+ /** Does the factory create simulated objects? */
+ public boolean isSimulated() {
+ return false;
+ }
+ }
+
+ /**
+ * @return the volume that contains a replica of the block.
+ */
+ V getVolume(ExtendedBlock b);
+
+ /**
+ * Does the dataset contain the block?
+ */
+ boolean contains(ExtendedBlock block);
+
+
+ /**
+ * Add a new volume to the FsDataset.<p/>
+ *
+ * If the FSDataset supports block scanning, this function registers
+ * the new volume with the block scanner.
+ *
+ * @param location The storage location for the new volume.
+ * @param nsInfos Namespace information for the new volume.
+ */
+ void addVolume(
+ final StorageLocation location,
+ final List<NamespaceInfo> nsInfos) throws IOException;
+
+ /**
+ * Removes a collection of volumes from FsDataset.
+ *
+ * If the FSDataset supports block scanning, this function removes
+ * the volumes from the block scanner.
+ *
+ * @param volumes The paths of the volumes to be removed.
+ * @param clearFailure set true to clear the failure information about the
+ * volumes.
+ */
+ void removeVolumes(Set<File> volumes, boolean clearFailure);
+
+ /** @return a storage with the given storage ID */
+ DatanodeStorage getStorage(final String storageUuid);
+
+ /** @return one or more storage reports for attached volumes. */
+ StorageReport[] getStorageReports(String bpid)
+ throws IOException;
+
+ /**
+ * Returns one block report per volume.
+ * @param bpid Block Pool Id
+ * @return - a map of DatanodeStorage to block report for the volume.
+ */
+ Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid);
+
+ /**
+ * Invalidates the specified blocks
+ * @param bpid Block pool Id
+ * @param invalidBlks - the blocks to be invalidated
+ * @throws IOException
+ */
+ void invalidate(String bpid, Block[] invalidBlks) throws IOException;
+
+ /**
+ * Returns info about volume failures.
+ *
+ * @return info about volume failures, possibly null
+ */
+ VolumeFailureSummary getVolumeFailureSummary();
+
+ /**
+ * Check if all the data directories are healthy
+ * @return A set of unhealthy data directories.
+ */
+ Set<File> checkDataDir();
+
+ /**
+ * Shutdown the FSDataset
+ */
+ void shutdown();
+
+ /**
+ * add new block pool ID
+ * @param bpid Block pool Id
+ * @param conf Configuration
+ */
+ void addBlockPool(String bpid, Configuration conf) throws IOException;
+
+ /**
+ * Shutdown and remove the block pool from underlying storage.
+ * @param bpid Block pool Id to be removed
+ */
+ void shutdownBlockPool(String bpid);
+
+ /**
+ * Checks how many valid storage volumes there are in the DataNode.
+ * @return true if more than the minimum number of valid volumes are left
+ * in the FSDataSet.
+ */
+ boolean hasEnoughResource();
+
+ /**
+ * Does the dataset support caching blocks?
+ *
+ * @return
+ */
+ boolean isCachingSupported();
+
+ /**
+ * Caches the specified blocks
+ * @param bpid Block pool id
+ * @param blockIds - block ids to cache
+ */
+ void cache(String bpid, long[] blockIds);
+
+ /**
+ * Uncaches the specified blocks
+ * @param bpid Block pool id
+ * @param blockIds - blocks ids to uncache
+ */
+ void uncache(String bpid, long[] blockIds);
+
+
+ /**
+ * Returns the cache report - the full list of cached block IDs of a
+ * block pool.
+ * @param bpid Block Pool Id
+ * @return the cache report - the full list of cached block IDs.
+ */
+ List<Long> getCacheReport(String bpid);
+
+ /**
+ * Enable 'trash' for the given dataset. When trash is enabled, files are
+ * moved to a separate trash directory instead of being deleted immediately.
+ * This can be useful for example during rolling upgrades.
+ */
+ void enableTrash(String bpid);
+
+ /**
+ * Restore trash
+ */
+ void clearTrash(String bpid);
+
+ /**
+ * @return true when trash is enabled
+ */
+ boolean trashEnabled(String bpid);
+
+ /**
+ * Create a marker file indicating that a rolling upgrade is in progress.
+ */
+ void setRollingUpgradeMarker(String bpid) throws IOException;
+
+ /**
+ * Delete the rolling upgrade marker file if it exists.
+ * @param bpid
+ */
+ void clearRollingUpgradeMarker(String bpid) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index a4672b7..7052f54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -30,72 +30,33 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
-import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
-import org.apache.hadoop.hdfs.server.protocol.StorageReport;
-import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
-import org.apache.hadoop.util.ReflectionUtils;
/**
* This is a service provider interface for the underlying storage that
* stores replicas for a data node.
- * The default implementation stores replicas on local drives.
+ * The default implementation stores replicas on local drives.
*/
@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
- /**
- * A factory for creating {@link FsDatasetSpi} objects.
- */
- public static abstract class Factory<D extends FsDatasetSpi<?>> {
- /** @return the configured factory. */
- public static Factory<?> getFactory(Configuration conf) {
- @SuppressWarnings("rawtypes")
- final Class<? extends Factory> clazz = conf.getClass(
- DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
- FsDatasetFactory.class,
- Factory.class);
- return ReflectionUtils.newInstance(clazz, conf);
- }
-
- /** Create a new dataset object for a specific service type. */
- public abstract D newInstance(DataNode datanode,
- DataStorage storage, Configuration conf,
- NodeType serviceType) throws IOException;
-
- /** Does the factory create simulated objects? */
- public boolean isSimulated() {
- return false;
- }
- }
+public interface FsDatasetSpi<V extends FsVolumeSpi>
+ extends FSDatasetMBean, DatasetSpi<FsVolumeSpi> {
/**
* It behaviors as an unmodifiable list of FsVolume. Individual FsVolume can
@@ -188,51 +149,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
*/
public FsVolumeReferences getFsVolumeReferences();
- /**
- * Add a new volume to the FsDataset.<p/>
- *
- * If the FSDataset supports block scanning, this function registers
- * the new volume with the block scanner.
- *
- * @param location The storage location for the new volume.
- * @param nsInfos Namespace information for the new volume.
- */
- public void addVolume(
- final StorageLocation location,
- final List<NamespaceInfo> nsInfos) throws IOException;
-
- /**
- * Removes a collection of volumes from FsDataset.
- *
- * If the FSDataset supports block scanning, this function removes
- * the volumes from the block scanner.
- *
- * @param volumes The paths of the volumes to be removed.
- * @param clearFailure set true to clear the failure information about the
- * volumes.
- */
- public void removeVolumes(Set<File> volumes, boolean clearFailure);
-
- /** @return a storage with the given storage ID */
- public DatanodeStorage getStorage(final String storageUuid);
-
- /** @return one or more storage reports for attached volumes. */
- public StorageReport[] getStorageReports(String bpid)
- throws IOException;
-
- /** @return the volume that contains a replica of the block. */
- public V getVolume(ExtendedBlock b);
-
/** @return a volume information map (name => info). */
public Map<String, Object> getVolumeInfoMap();
- /**
- * Returns info about volume failures.
- *
- * @return info about volume failures, possibly null
- */
- VolumeFailureSummary getVolumeFailureSummary();
-
/** @return a list of finalized blocks for the given block pool. */
public List<FinalizedReplica> getFinalizedBlocks(String bpid);
@@ -400,24 +319,6 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
public void unfinalizeBlock(ExtendedBlock b) throws IOException;
/**
- * Returns one block report per volume.
- * @param bpid Block Pool Id
- * @return - a map of DatanodeStorage to block report for the volume.
- */
- public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid);
-
- /**
- * Returns the cache report - the full list of cached block IDs of a
- * block pool.
- * @param bpid Block Pool Id
- * @return the cache report - the full list of cached block IDs.
- */
- public List<Long> getCacheReport(String bpid);
-
- /** Does the dataset contain the block? */
- public boolean contains(ExtendedBlock block);
-
- /**
* Check if a block is valid.
*
* @param b The block to check.
@@ -453,28 +354,6 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
public boolean isValidRbw(ExtendedBlock b);
/**
- * Invalidates the specified blocks
- * @param bpid Block pool Id
- * @param invalidBlks - the blocks to be invalidated
- * @throws IOException
- */
- public void invalidate(String bpid, Block invalidBlks[]) throws IOException;
-
- /**
- * Caches the specified blocks
- * @param bpid Block pool id
- * @param blockIds - block ids to cache
- */
- public void cache(String bpid, long[] blockIds);
-
- /**
- * Uncaches the specified blocks
- * @param bpid Block pool id
- * @param blockIds - blocks ids to uncache
- */
- public void uncache(String bpid, long[] blockIds);
-
- /**
* Determine if the specified block is cached.
* @param bpid Block pool id
* @param blockIds - block id
@@ -482,17 +361,6 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
*/
public boolean isCached(String bpid, long blockId);
- /**
- * Check if all the data directories are healthy
- * @return A set of unhealthy data directories.
- */
- public Set<File> checkDataDir();
-
- /**
- * Shutdown the FSDataset
- */
- public void shutdown();
-
/**
* Sets the file pointer of the checksum stream so that the last checksum
* will be overwritten
@@ -505,46 +373,11 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
ReplicaOutputStreams outs, int checksumSize) throws IOException;
/**
- * Checks how many valid storage volumes there are in the DataNode.
- * @return true if more than the minimum number of valid volumes are left
- * in the FSDataSet.
- */
- public boolean hasEnoughResource();
-
- /**
* Get visible length of the specified replica.
*/
long getReplicaVisibleLength(final ExtendedBlock block) throws IOException;
/**
- * Initialize a replica recovery.
- * @return actual state of the replica on this data-node or
- * null if data-node does not have the replica.
- */
- public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock
- ) throws IOException;
-
- /**
- * Update replica's generation stamp and length and finalize it.
- * @return the ID of storage that stores the block
- */
- public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
- long recoveryId, long newBlockId, long newLength) throws IOException;
-
- /**
- * add new block pool ID
- * @param bpid Block pool Id
- * @param conf Configuration
- */
- public void addBlockPool(String bpid, Configuration conf) throws IOException;
-
- /**
- * Shutdown and remove the block pool from underlying storage.
- * @param bpid Block pool Id to be removed
- */
- public void shutdownBlockPool(String bpid) ;
-
- /**
* Deletes the block pool directories. If force is false, directories are
* deleted only if no block files exist for the block pool. If force
* is true entire directory for the blockpool is deleted along with its
@@ -576,34 +409,6 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
long[] blockIds) throws IOException;
/**
- * Enable 'trash' for the given dataset. When trash is enabled, files are
- * moved to a separate trash directory instead of being deleted immediately.
- * This can be useful for example during rolling upgrades.
- */
- public void enableTrash(String bpid);
-
- /**
- * Clear trash
- */
- public void clearTrash(String bpid);
-
- /**
- * @return true when trash is enabled
- */
- public boolean trashEnabled(String bpid);
-
- /**
- * Create a marker file indicating that a rolling upgrade is in progress.
- */
- public void setRollingUpgradeMarker(String bpid) throws IOException;
-
- /**
- * Delete the rolling upgrade marker file if it exists.
- * @param bpid
- */
- public void clearRollingUpgradeMarker(String bpid) throws IOException;
-
- /**
* submit a sync_file_range request to AsyncDiskService
*/
public void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block,
@@ -644,4 +449,20 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* Confirm whether the block is deleting
*/
public boolean isDeletingBlock(String bpid, long blockId);
+
+ /**
+ * Initialize a replica recovery.
+ * @return actual state of the replica on this data-node or
+ * null if data-node does not have the replica.
+ */
+ ReplicaRecoveryInfo initReplicaRecovery(
+ BlockRecoveryCommand.RecoveringBlock rBlock) throws IOException;
+
+ /**
+ * Update replica's generation stamp and length and finalize it.
+ * @return the ID of storage that stores the block
+ */
+ String updateReplicaUnderRecovery(
+ ExtendedBlock oldBlock, long recoveryId,
+ long newBlockId, long newLength) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index 7c7b9a7..c083a95 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -24,15 +24,14 @@ import java.nio.channels.ClosedChannelException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
/**
- * This is an interface for the underlying volume.
+ * This is an interface for the underlying volume used by DFS.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public interface FsVolumeSpi {
+public interface FsVolumeSpi extends VolumeSpi {
/**
* Obtain a reference object that had increased 1 reference count of the
* volume.
@@ -42,29 +41,9 @@ public interface FsVolumeSpi {
*/
FsVolumeReference obtainReference() throws ClosedChannelException;
- /** @return the StorageUuid of the volume */
- public String getStorageID();
-
- /** @return a list of block pools. */
- public String[] getBlockPoolList();
-
- /** @return the available storage space in bytes. */
- public long getAvailable() throws IOException;
-
- /** @return the base path to the volume */
- public String getBasePath();
-
- /** @return the path to the volume */
- public String getPath(String bpid) throws IOException;
-
/** @return the directory for the finalized blocks in the block pool. */
public File getFinalizedDir(String bpid) throws IOException;
- public StorageType getStorageType();
-
- /** Returns true if the volume is NOT backed by persistent storage. */
- public boolean isTransientStorage();
-
/**
* Reserve disk space for an RBW block so a writer does not run out of
* space before the block is full.
@@ -187,8 +166,6 @@ public interface FsVolumeSpi {
public BlockIterator loadBlockIterator(String bpid, String name)
throws IOException;
- /**
- * Get the FSDatasetSpi which this volume is a part of.
- */
+ @Override
public FsDatasetSpi getDataset();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeSpi.java
new file mode 100644
index 0000000..e7fd741
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/VolumeSpi.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageType;
+
+/**
+ * This is an interface for the underlying volume.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface VolumeSpi {
+ /**
+ * @return the available storage space in bytes.
+ */
+ long getAvailable() throws IOException;
+
+ /**
+ * @return the base path to the volume
+ */
+ String getBasePath();
+
+ /**
+ * @return the StorageUuid of the volume
+ */
+ String getStorageID();
+
+ /**
+ * Returns true if the volume is NOT backed by persistent storage.
+ */
+ boolean isTransientStorage();
+
+ /**
+ * @return a list of block pools.
+ */
+ String[] getBlockPoolList();
+
+ /**
+ * @return the path to the volume
+ */
+ String getPath(String bpid) throws IOException;
+
+ /**
+ * Return the StorageType i.e. media type of this volume.
+ * @return
+ */
+ StorageType getStorageType();
+
+ /**
+ * Get the DatasetSpi which this volume is a part of.
+ */
+ DatasetSpi getDataset();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java
index 01c3830..4efc0b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java
@@ -18,38 +18,30 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.DatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeSpi;
/**
* A factory for creating {@link FsDatasetImpl} objects.
*/
-public class FsDatasetFactory extends FsDatasetSpi.Factory<FsDatasetImpl> {
+public class FsDatasetFactory extends DatasetSpi.Factory {
- private final Map<NodeType, FsDatasetImpl> datasetMap = new HashMap<>();
@Override
- public synchronized FsDatasetImpl newInstance(DataNode datanode,
- DataStorage storage, Configuration conf,
+ public synchronized DatasetSpi<? extends VolumeSpi> newInstance(
+ DataNode datanode, DataStorage storage, Configuration conf,
NodeType serviceType) throws IOException {
- FsDatasetImpl dataset = datasetMap.get(serviceType);
- if (dataset != null) {
- return dataset;
- }
switch (serviceType) {
case NAME_NODE:
- dataset = new FsDatasetImpl(datanode, storage, conf);
- break;
+ return new FsDatasetImpl(datanode, storage, conf);
default:
- throw new IllegalArgumentException("Unsupported node type " + serviceType);
+ throw new IllegalArgumentException(
+ "Unsupported node type " + serviceType);
}
- datasetMap.put(serviceType, dataset);
- return dataset;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 999b827..20d1c06 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -2000,6 +2000,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
blockFileName, length, genstamp, volumeExecutor);
}
+ @Override
+ public boolean isCachingSupported() {
+ return true;
+ }
+
@Override // FsDatasetSpi
public void cache(String bpid, long[] blockIds) {
for (int i=0; i < blockIds.length; i++) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
index d925b93..12e6741 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
@@ -71,7 +71,7 @@ public class TestWriteBlockGetsBlockLengthHint {
}
static class FsDatasetChecker extends SimulatedFSDataset {
- static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> {
+ static class Factory extends FsDatasetSpi.Factory {
@Override
public SimulatedFSDataset newInstance(DataNode datanode,
DataStorage storage, Configuration conf,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index a48582b..9404d74 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -81,7 +82,7 @@ import org.apache.hadoop.util.DataChecksum;
*/
public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public final static int BYTE_MASK = 0xff;
- static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> {
+ static class Factory extends DatasetSpi.Factory {
@Override
public SimulatedFSDataset newInstance(DataNode datanode,
DataStorage storage, Configuration conf,
@@ -788,13 +789,18 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
}
- @Override // FSDatasetSpi
+ @Override // DatasetSpi
+ public boolean isCachingSupported() {
+ return false;
+ }
+
+ @Override // DatasetSpi
public void cache(String bpid, long[] cacheBlks) {
throw new UnsupportedOperationException(
"SimulatedFSDataset does not support cache operation!");
}
- @Override // FSDatasetSpi
+ @Override // DatasetSpi
public void uncache(String bpid, long[] uncacheBlks) {
throw new UnsupportedOperationException(
"SimulatedFSDataset does not support uncache operation!");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 6ba044d..aa148a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -121,7 +121,7 @@ public class TestBPOfferService {
// Wire the dataset to the DN.
Mockito.doReturn(mockFSDataset).when(mockDn).initBlockPool(Mockito.any(BPOfferService.class));
- Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(anyString());
+ Mockito.doReturn(mockFSDataset).when(mockDn).getDataset(anyString());
}
/**
@@ -330,7 +330,7 @@ public class TestBPOfferService {
Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")).
when(mockDn).getMetrics();
final AtomicInteger count = new AtomicInteger();
- Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(anyString());
+ Mockito.doReturn(mockFSDataset).when(mockDn).getDataset(anyString());
Mockito.doAnswer(new Answer<FsDatasetSpi<?>>() {
@Override
public FsDatasetSpi<?> answer(InvocationOnMock invocation) throws Throwable {
@@ -338,7 +338,7 @@ public class TestBPOfferService {
throw new IOException("faked initBlockPool exception");
}
// The initBlockPool is called again. Now mock init is done.
- Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(anyString());
+ Mockito.doReturn(mockFSDataset).when(mockDn).getDataset(anyString());
return mockFSDataset;
}
}).when(mockDn).initBlockPool(Mockito.any(BPOfferService.class));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java
index ad4135b..017223c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java
@@ -42,7 +42,7 @@ public class TestDataNodeInitStorage {
public static final Log LOG = LogFactory.getLog(TestDataNodeInitStorage.class);
static private class SimulatedFsDatasetVerifier extends SimulatedFSDataset {
- static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> {
+ static class Factory extends FsDatasetSpi.Factory {
@Override
public SimulatedFsDatasetVerifier newInstance(
DataNode datanode, DataStorage storage,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
index 8dc80d5..461d5ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
@@ -114,12 +114,12 @@ public class TestSimulatedFSDataset {
@Test
public void testFSDatasetFactory() {
final Configuration conf = new Configuration();
- FsDatasetSpi.Factory<?> f = FsDatasetSpi.Factory.getFactory(conf);
+ FsDatasetSpi.Factory f = FsDatasetSpi.Factory.getFactory(conf);
assertEquals(FsDatasetFactory.class, f.getClass());
assertFalse(f.isSimulated());
SimulatedFSDataset.setFactory(conf);
- FsDatasetSpi.Factory<?> s = FsDatasetSpi.Factory.getFactory(conf);
+ FsDatasetSpi.Factory s = FsDatasetSpi.Factory.getFactory(conf);
assertEquals(SimulatedFSDataset.Factory.class, s.getClass());
assertTrue(s.isSimulated());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/197b8fb0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 28666a0..0c2c610 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -229,6 +229,11 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
}
@Override
+ public boolean isCachingSupported() {
+ return false;
+ }
+
+ @Override
public void cache(String bpid, long[] blockIds) {
}