You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2015/06/01 23:58:03 UTC
hadoop git commit: HDFS-8392. DataNode support for multiple datasets.
(Arpit Agarwal)
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7240 770ed9262 -> e721a39e2
HDFS-8392. DataNode support for multiple datasets. (Arpit Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e721a39e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e721a39e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e721a39e
Branch: refs/heads/HDFS-7240
Commit: e721a39e275ae4ee34af99b2c2450e1793b695ac
Parents: 770ed92
Author: Arpit Agarwal <ar...@apache.org>
Authored: Mon Jun 1 14:57:07 2015 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Mon Jun 1 14:57:07 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-7240.txt | 6 +
.../hadoop/hdfs/server/common/StorageInfo.java | 9 +
.../hdfs/server/datanode/BPOfferService.java | 23 +-
.../hdfs/server/datanode/BPServiceActor.java | 24 +-
.../hdfs/server/datanode/BlockReceiver.java | 35 +--
.../hdfs/server/datanode/BlockSender.java | 20 +-
.../hadoop/hdfs/server/datanode/DataNode.java | 269 +++++++++++++------
.../hdfs/server/datanode/DataXceiver.java | 70 ++++-
.../hdfs/server/datanode/VolumeScanner.java | 5 +-
.../server/datanode/fsdataset/FsDatasetSpi.java | 10 +-
.../server/datanode/fsdataset/FsVolumeSpi.java | 4 +
.../fsdataset/impl/FsDatasetFactory.java | 25 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 9 +-
.../impl/RamDiskAsyncLazyPersistService.java | 6 +-
.../hdfs/TestWriteBlockGetsBlockLengthHint.java | 4 +-
.../server/datanode/SimulatedFSDataset.java | 4 +-
.../server/datanode/TestBPOfferService.java | 19 +-
.../datanode/TestDataNodeInitStorage.java | 3 +-
18 files changed, 384 insertions(+), 161 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
new file mode 100644
index 0000000..a98d407
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
@@ -0,0 +1,6 @@
+ Breakdown of HDFS-7240 sub-tasks:
+
+ HDFS-8210. Ozone: Implement storage container manager. (Jitendra Pandey)
+
+ HDFS-8392. Ozone: DataNode support for multiple datasets. (Arpit Agarwal)
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
index 50363c9..5a19dae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
@@ -220,6 +220,15 @@ public class StorageInfo {
this.layoutVersion = lv;
}
+ /**
+ * Return the type of node serviced by this storage.
+ *
+ * @return type of node serviced by this storage.
+ */
+ public NodeType getNodeType() {
+ return storageType;
+ }
+
public int getServiceLayoutVersion() {
return storageType == NodeType.DATA_NODE ? HdfsServerConstants.DATANODE_LAYOUT_VERSION
: HdfsServerConstants.NAMENODE_LAYOUT_VERSION;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 92323f1..092a8f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
@@ -70,6 +71,8 @@ class BPOfferService {
private final DataNode dn;
+ private FsDatasetSpi<?> dataset = null;
+
/**
* A reference to the BPServiceActor associated with the currently
* ACTIVE NN. In the case that all NameNodes are in STANDBY mode,
@@ -303,7 +306,8 @@ class BPOfferService {
* verifies that this namespace matches (eg to prevent a misconfiguration
* where a StandbyNode from a different cluster is specified)
*/
- void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
+ FsDatasetSpi<?> verifyAndSetNamespaceInfo(NamespaceInfo nsInfo)
+ throws IOException {
writeLock();
try {
if (this.bpNSInfo == null) {
@@ -314,7 +318,7 @@ class BPOfferService {
// The DN can now initialize its local storage if we are the
// first BP to handshake, etc.
try {
- dn.initBlockPool(this);
+ dataset = dn.initBlockPool(this);
success = true;
} finally {
if (!success) {
@@ -335,6 +339,7 @@ class BPOfferService {
} finally {
writeUnlock();
}
+ return dataset;
}
/**
@@ -480,11 +485,11 @@ class BPOfferService {
}
String bpid = getBlockPoolId();
if (!rollingUpgradeStatus.isFinalized()) {
- dn.getFSDataset().enableTrash(bpid);
- dn.getFSDataset().setRollingUpgradeMarker(bpid);
+ dataset.enableTrash(bpid);
+ dataset.setRollingUpgradeMarker(bpid);
} else {
- dn.getFSDataset().clearTrash(bpid);
- dn.getFSDataset().clearRollingUpgradeMarker(bpid);
+ dataset.clearTrash(bpid);
+ dataset.clearRollingUpgradeMarker(bpid);
}
}
@@ -665,7 +670,7 @@ class BPOfferService {
Block toDelete[] = bcmd.getBlocks();
try {
// using global fsdataset
- dn.getFSDataset().invalidate(bcmd.getBlockPoolId(), toDelete);
+ dataset.invalidate(bcmd.getBlockPoolId(), toDelete);
} catch(IOException e) {
// Exceptions caught here are not expected to be disk-related.
throw e;
@@ -676,13 +681,13 @@ class BPOfferService {
LOG.info("DatanodeCommand action: DNA_CACHE for " +
blockIdCmd.getBlockPoolId() + " of [" +
blockIdArrayToString(blockIdCmd.getBlockIds()) + "]");
- dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
+ dataset.cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
break;
case DatanodeProtocol.DNA_UNCACHE:
LOG.info("DatanodeCommand action: DNA_UNCACHE for " +
blockIdCmd.getBlockPoolId() + " of [" +
blockIdArrayToString(blockIdCmd.getBlockIds()) + "]");
- dn.getFSDataset().uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
+ dataset.uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
break;
case DatanodeProtocol.DNA_SHUTDOWN:
// TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 63a0bb6..1fe1c9e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -110,6 +111,7 @@ class BPServiceActor implements Runnable {
private volatile boolean sendImmediateIBR = false;
private volatile boolean shouldServiceRun = true;
private final DataNode dn;
+ private FsDatasetSpi<?> dataset = null;
private final DNConf dnConf;
private long prevBlockReportId;
@@ -220,7 +222,7 @@ class BPServiceActor implements Runnable {
// Verify that this matches the other NN in this HA pair.
// This also initializes our block pool in the DN if we are
// the first NN connection for this BP.
- bpos.verifyAndSetNamespaceInfo(nsInfo);
+ dataset = bpos.verifyAndSetNamespaceInfo(nsInfo);
// Second phase of the handshake with the NN.
register(nsInfo);
@@ -330,7 +332,7 @@ class BPServiceActor implements Runnable {
String storageUuid, boolean now) {
synchronized (pendingIncrementalBRperStorage) {
addPendingReplicationBlockInfo(
- bInfo, dn.getFSDataset().getStorage(storageUuid));
+ bInfo, dataset.getStorage(storageUuid));
sendImmediateIBR = true;
// If now is true, the report is sent right away.
// Otherwise, it will be sent out in the next heartbeat.
@@ -344,7 +346,7 @@ class BPServiceActor implements Runnable {
ReceivedDeletedBlockInfo bInfo, String storageUuid) {
synchronized (pendingIncrementalBRperStorage) {
addPendingReplicationBlockInfo(
- bInfo, dn.getFSDataset().getStorage(storageUuid));
+ bInfo, dataset.getStorage(storageUuid));
}
}
@@ -435,7 +437,7 @@ class BPServiceActor implements Runnable {
long brCreateStartTime = monotonicNow();
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
- dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());
+ dataset.getBlockReports(bpos.getBlockPoolId());
// Convert the reports to the format expected by the NN.
int i = 0;
@@ -508,7 +510,7 @@ class BPServiceActor implements Runnable {
DatanodeCommand cacheReport() throws IOException {
// If caching is disabled, do not send a cache report
- if (dn.getFSDataset().getCacheCapacity() == 0) {
+ if (dataset.getCacheCapacity() == 0) {
return null;
}
// send cache report if timer has expired.
@@ -521,7 +523,7 @@ class BPServiceActor implements Runnable {
lastCacheReport = startTime;
String bpid = bpos.getBlockPoolId();
- List<Long> blockIds = dn.getFSDataset().getCacheReport(bpid);
+ List<Long> blockIds = dataset.getCacheReport(bpid);
long createTime = monotonicNow();
cmd = bpNamenode.cacheReport(bpRegistration, bpid, blockIds);
@@ -540,20 +542,20 @@ class BPServiceActor implements Runnable {
HeartbeatResponse sendHeartBeat() throws IOException {
StorageReport[] reports =
- dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
+ dataset.getStorageReports(bpos.getBlockPoolId());
if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat with " + reports.length +
" storage reports from service actor: " + this);
}
- VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
- .getVolumeFailureSummary();
+ VolumeFailureSummary volumeFailureSummary =
+ dataset.getVolumeFailureSummary();
int numFailedVolumes = volumeFailureSummary != null ?
volumeFailureSummary.getFailedStorageLocations().length : 0;
return bpNamenode.sendHeartbeat(bpRegistration,
reports,
- dn.getFSDataset().getCacheCapacity(),
- dn.getFSDataset().getCacheUsed(),
+ dataset.getCacheCapacity(),
+ dataset.getCacheUsed(),
dn.getXmitsInProgress(),
dn.getXceiverCount(),
numFailedVolumes,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 2e11600..686541b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -99,6 +100,7 @@ class BlockReceiver implements Closeable {
private ReplicaOutputStreams streams;
private DatanodeInfo srcDataNode = null;
private final DataNode datanode;
+ private final FsDatasetSpi<?> dataset;
volatile private boolean mirrorError;
// Cache management state
@@ -141,8 +143,8 @@ class BlockReceiver implements Closeable {
final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
final String clientname, final DatanodeInfo srcDataNode,
- final DataNode datanode, DataChecksum requestedChecksum,
- CachingStrategy cachingStrategy,
+ final DataNode datanode, final FsDatasetSpi<?> dataset,
+ DataChecksum requestedChecksum, CachingStrategy cachingStrategy,
final boolean allowLazyPersist,
final boolean pinning) throws IOException {
try{
@@ -152,6 +154,7 @@ class BlockReceiver implements Closeable {
this.myAddr = myAddr;
this.srcDataNode = srcDataNode;
this.datanode = datanode;
+ this.dataset = dataset;
this.clientname = clientname;
this.isDatanode = clientname.length() == 0;
@@ -183,27 +186,27 @@ class BlockReceiver implements Closeable {
// Open local disk out
//
if (isDatanode) { //replication or move
- replicaHandler = datanode.data.createTemporary(storageType, block);
+ replicaHandler = dataset.createTemporary(storageType, block);
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
- replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist);
+ replicaHandler = dataset.createRbw(storageType, block, allowLazyPersist);
datanode.notifyNamenodeReceivingBlock(
block, replicaHandler.getReplica().getStorageUuid());
break;
case PIPELINE_SETUP_STREAMING_RECOVERY:
- replicaHandler = datanode.data.recoverRbw(
+ replicaHandler = dataset.recoverRbw(
block, newGs, minBytesRcvd, maxBytesRcvd);
block.setGenerationStamp(newGs);
break;
case PIPELINE_SETUP_APPEND:
- replicaHandler = datanode.data.append(block, newGs, minBytesRcvd);
+ replicaHandler = dataset.append(block, newGs, minBytesRcvd);
block.setGenerationStamp(newGs);
datanode.notifyNamenodeReceivingBlock(
block, replicaHandler.getReplica().getStorageUuid());
break;
case PIPELINE_SETUP_APPEND_RECOVERY:
- replicaHandler = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
+ replicaHandler = dataset.recoverAppend(block, newGs, minBytesRcvd);
block.setGenerationStamp(newGs);
datanode.notifyNamenodeReceivingBlock(
block, replicaHandler.getReplica().getStorageUuid());
@@ -212,7 +215,7 @@ class BlockReceiver implements Closeable {
case TRANSFER_FINALIZED:
// this is a transfer destination
replicaHandler =
- datanode.data.createTemporary(storageType, block);
+ dataset.createTemporary(storageType, block);
break;
default: throw new IOException("Unsupported stage " + stage +
" while receiving block " + block + " from " + inAddr);
@@ -717,7 +720,7 @@ class BlockReceiver implements Closeable {
//
if (syncBehindWrites) {
if (syncBehindWritesInBackground) {
- this.datanode.getFSDataset().submitBackgroundSyncFileRangeRequest(
+ dataset.submitBackgroundSyncFileRangeRequest(
block, outFd, lastCacheManagementOffset,
offsetInBlock - lastCacheManagementOffset,
NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
@@ -807,11 +810,11 @@ class BlockReceiver implements Closeable {
if (stage == BlockConstructionStage.TRANSFER_RBW) {
// for TRANSFER_RBW, convert temporary to RBW
- datanode.data.convertTemporaryToRbw(block);
+ dataset.convertTemporaryToRbw(block);
} else {
// for isDatnode or TRANSFER_FINALIZED
// Finalize the block.
- datanode.data.finalizeBlock(block);
+ dataset.finalizeBlock(block);
}
}
datanode.metrics.incrBlocksWritten();
@@ -904,7 +907,7 @@ class BlockReceiver implements Closeable {
*/
private void cleanupBlock() throws IOException {
if (isDatanode) {
- datanode.data.unfinalizeBlock(block);
+ dataset.unfinalizeBlock(block);
}
}
@@ -921,7 +924,7 @@ class BlockReceiver implements Closeable {
}
// rollback the position of the meta file
- datanode.data.adjustCrcChannelPosition(block, streams, checksumSize);
+ dataset.adjustCrcChannelPosition(block, streams, checksumSize);
}
/**
@@ -959,7 +962,7 @@ class BlockReceiver implements Closeable {
byte[] buf = new byte[sizePartialChunk];
byte[] crcbuf = new byte[checksumSize];
try (ReplicaInputStreams instr =
- datanode.data.getTmpInputStreams(block, blkoff, ckoff)) {
+ dataset.getTmpInputStreams(block, blkoff, ckoff)) {
IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk);
// open meta file and read in crc value computer earlier
@@ -1298,11 +1301,11 @@ class BlockReceiver implements Closeable {
BlockReceiver.this.close();
endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
block.setNumBytes(replicaInfo.getNumBytes());
- datanode.data.finalizeBlock(block);
+ dataset.finalizeBlock(block);
}
if (pinning) {
- datanode.data.setPinning(block);
+ dataset.setPinning(block);
}
datanode.closeBlock(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 79f4dd7..5f4ea10 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@@ -147,6 +148,7 @@ class BlockSender implements java.io.Closeable {
private final String clientTraceFmt;
private volatile ChunkChecksum lastChunkChecksum = null;
private DataNode datanode;
+ private final FsDatasetSpi<?> dataset;
/** The file descriptor of the block being sent */
private FileDescriptor blockInFd;
@@ -190,7 +192,8 @@ class BlockSender implements java.io.Closeable {
*/
BlockSender(ExtendedBlock block, long startOffset, long length,
boolean corruptChecksumOk, boolean verifyChecksum,
- boolean sendChecksum, DataNode datanode, String clientTraceFmt,
+ boolean sendChecksum, DataNode datanode,
+ final FsDatasetSpi<?> dataset, String clientTraceFmt,
CachingStrategy cachingStrategy)
throws IOException {
try {
@@ -227,6 +230,7 @@ class BlockSender implements java.io.Closeable {
this.readaheadLength = cachingStrategy.getReadahead().longValue();
}
this.datanode = datanode;
+ this.dataset = dataset;
if (verifyChecksum) {
// To simplify implementation, callers may not specify verification
@@ -237,7 +241,7 @@ class BlockSender implements java.io.Closeable {
final Replica replica;
final long replicaVisibleLength;
- synchronized(datanode.data) {
+ synchronized(dataset) {
replica = getReplica(block, datanode);
replicaVisibleLength = replica.getVisibleLength();
}
@@ -274,7 +278,7 @@ class BlockSender implements java.io.Closeable {
(!is32Bit || length <= Integer.MAX_VALUE);
// Obtain a reference before reading data
- this.volumeRef = datanode.data.getVolume(block).obtainReference();
+ this.volumeRef = dataset.getVolume(block).obtainReference();
/*
* (corruptChecksumOK, meta_file_exist): operation
@@ -288,7 +292,7 @@ class BlockSender implements java.io.Closeable {
LengthInputStream metaIn = null;
boolean keepMetaInOpen = false;
try {
- metaIn = datanode.data.getMetaDataInputStream(block);
+ metaIn = dataset.getMetaDataInputStream(block);
if (!corruptChecksumOk || metaIn != null) {
if (metaIn == null) {
//need checksum but meta-data not found
@@ -387,7 +391,7 @@ class BlockSender implements java.io.Closeable {
if (DataNode.LOG.isDebugEnabled()) {
DataNode.LOG.debug("replica=" + replica);
}
- blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
+ blockIn = dataset.getBlockInputStream(block, offset); // seek to offset
if (blockIn instanceof FileInputStream) {
blockInFd = ((FileInputStream)blockIn).getFD();
} else {
@@ -451,8 +455,10 @@ class BlockSender implements java.io.Closeable {
private static Replica getReplica(ExtendedBlock block, DataNode datanode)
throws ReplicaNotFoundException {
- Replica replica = datanode.data.getReplica(block.getBlockPoolId(),
- block.getBlockId());
+ final FsDatasetSpi<?> dataset =
+ datanode.getFSDataset(block.getBlockPoolId());
+ Replica replica =
+ dataset.getReplica(block.getBlockPoolId(), block.getBlockId());
if (replica == null) {
throw new ReplicaNotFoundException(block);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index d2b2939..70109a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -79,6 +79,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -285,9 +286,33 @@ public class DataNode extends ReconfigurableBase
volatile boolean shutdownForUpgrade = false;
private boolean shutdownInProgress = false;
private BlockPoolManager blockPoolManager;
- volatile FsDatasetSpi<? extends FsVolumeSpi> data = null;
+
+ private final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> datasetFactory;
+
+ // This is an onto (many-one) mapping. Multiple block pool IDs may share
+ // the same dataset.
+ private volatile Map<String,
+ FsDatasetSpi<? extends FsVolumeSpi>> datasetsMap =
+ new ConcurrentHashMap<>();
+
+ // Hash set of datasets, used to avoid having to deduplicate the values of datasetsMap
+ // every time we need to iterate over all datasets.
+ private volatile Set<FsDatasetSpi<? extends FsVolumeSpi>> datasets =
+ Collections.newSetFromMap(
+ new ConcurrentHashMap<FsDatasetSpi<? extends FsVolumeSpi>,
+ Boolean>());
+
private String clusterId = null;
+ /**
+ * Do NOT reference this field outside of tests.
+ * It is retained to avoid breaking existing tests and subject to removal.
+ * In existing HDFS unit tests we are guaranteed not to have more than one
+ * dataset instance.
+ */
+ @VisibleForTesting
+ volatile FsDatasetSpi<? extends FsVolumeSpi> data = null;
+
public final static String EMPTY_DEL_HINT = "";
final AtomicInteger xmitsInProgress = new AtomicInteger();
Daemon dataXceiverServer = null;
@@ -319,7 +344,8 @@ public class DataNode extends ReconfigurableBase
private boolean hasAnyBlockPoolRegistered = false;
private final BlockScanner blockScanner;
- private DirectoryScanner directoryScanner = null;
+ private Map<FsDatasetSpi<?>, DirectoryScanner> directoryScannersMap =
+ new ConcurrentHashMap<>();
/** Activated plug-ins. */
private List<ServicePlugin> plugins;
@@ -373,6 +399,7 @@ public class DataNode extends ReconfigurableBase
this.getHdfsBlockLocationsEnabled = false;
this.blockScanner = new BlockScanner(this, conf);
this.pipelineSupportECN = false;
+ this.datasetFactory = null;
}
/**
@@ -387,7 +414,7 @@ public class DataNode extends ReconfigurableBase
this.lastDiskErrorCheck = 0;
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
-
+ datasetFactory = FsDatasetSpi.Factory.getFactory(conf);
this.usersWithLocalPathAccess = Arrays.asList(
conf.getTrimmedStrings(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY));
this.connectToDnViaHostname = conf.getBoolean(
@@ -595,7 +622,9 @@ public class DataNode extends ReconfigurableBase
@Override
public IOException call() {
try {
- data.addVolume(location, nsInfos);
+ for (FsDatasetSpi<?> dataset : datasets) {
+ dataset.addVolume(location, nsInfos);
+ }
} catch (IOException e) {
return e;
}
@@ -698,7 +727,9 @@ public class DataNode extends ReconfigurableBase
IOException ioe = null;
// Remove volumes and block infos from FsDataset.
- data.removeVolumes(absoluteVolumePaths, clearFailure);
+ for (final FsDatasetSpi<?> dataset : datasets) {
+ dataset.removeVolumes(absoluteVolumePaths, clearFailure);
+ }
// Remove volumes from DataStorage.
try {
@@ -878,36 +909,42 @@ public class DataNode extends ReconfigurableBase
}
private void shutdownPeriodicScanners() {
- shutdownDirectoryScanner();
+ shutdownDirectoryScanners();
blockScanner.removeAllVolumeScanners();
}
/**
* See {@link DirectoryScanner}
*/
- private synchronized void initDirectoryScanner(Configuration conf) {
- if (directoryScanner != null) {
- return;
- }
- String reason = null;
- if (conf.getInt(DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
- DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT) < 0) {
- reason = "verification is turned off by configuration";
- } else if ("SimulatedFSDataset".equals(data.getClass().getSimpleName())) {
- reason = "verifcation is not supported by SimulatedFSDataset";
- }
- if (reason == null) {
- directoryScanner = new DirectoryScanner(this, data, conf);
- directoryScanner.start();
- } else {
- LOG.info("Periodic Directory Tree Verification scan is disabled because " +
- reason);
+ private synchronized void initDirectoryScanners(Configuration conf) {
+ for (FsDatasetSpi<?> dataset : datasets) {
+ if (directoryScannersMap.get(dataset) != null) {
+ continue;
+ }
+
+ String reason = null;
+ if (conf.getInt(DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
+ DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT) < 0) {
+ reason = "verification is turned off by configuration";
+ } else if ("SimulatedFSDataset".equals(
+ dataset.getClass().getSimpleName())) {
+ reason = "verifcation is not supported by SimulatedFSDataset";
+ }
+ if (reason == null) {
+ DirectoryScanner scanner = new DirectoryScanner(this, dataset, conf);
+ directoryScannersMap.put(dataset, scanner);
+ scanner.start();
+ } else {
+ LOG.info(
+ "Periodic Directory Tree Verification scan is disabled because " +
+ reason);
+ }
}
}
- private synchronized void shutdownDirectoryScanner() {
- if (directoryScanner != null) {
- directoryScanner.shutdown();
+ private synchronized void shutdownDirectoryScanners() {
+ for (DirectoryScanner scanner : directoryScannersMap.values()) {
+ scanner.shutdown();
}
}
@@ -1013,7 +1050,7 @@ public class DataNode extends ReconfigurableBase
*/
public void reportBadBlocks(ExtendedBlock block) throws IOException{
BPOfferService bpos = getBPOSForBlock(block);
- FsVolumeSpi volume = getFSDataset().getVolume(block);
+ FsVolumeSpi volume = getFSDataset(block.getBlockPoolId()).getVolume(block);
bpos.reportBadBlocks(
block, volume.getStorageID(), volume.getStorageType());
}
@@ -1328,8 +1365,9 @@ public class DataNode extends ReconfigurableBase
blockScanner.disableBlockPoolId(bpId);
- if (data != null) {
- data.shutdownBlockPool(bpId);
+ FsDatasetSpi<?> dataset = getFSDataset(bpId);
+ if (dataset != null) {
+ dataset.shutdownBlockPool(bpId);
}
if (storage != null) {
@@ -1350,7 +1388,7 @@ public class DataNode extends ReconfigurableBase
* @param bpos Block pool offer service
* @throws IOException if the NN is inconsistent with the local storage.
*/
- void initBlockPool(BPOfferService bpos) throws IOException {
+ FsDatasetSpi<?> initBlockPool(BPOfferService bpos) throws IOException {
NamespaceInfo nsInfo = bpos.getNamespaceInfo();
if (nsInfo == null) {
throw new IOException("NamespaceInfo not found: Block pool " + bpos
@@ -1364,15 +1402,16 @@ public class DataNode extends ReconfigurableBase
// In the case that this is the first block pool to connect, initialize
// the dataset, block scanners, etc.
- initStorage(nsInfo);
+ FsDatasetSpi<?> dataset = initStorage(bpos.getBlockPoolId(), nsInfo);
// Exclude failed disks before initializing the block pools to avoid startup
// failures.
- checkDiskError();
+ checkDiskError(getFSDataset(nsInfo.getBlockPoolID()));
- initDirectoryScanner(conf);
- data.addBlockPool(nsInfo.getBlockPoolID(), conf);
+ initDirectoryScanners(conf);
+ dataset.addBlockPool(nsInfo.getBlockPoolID(), conf);
blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
+ return dataset;
}
List<BPOfferService> getAllBpOs() {
@@ -1387,11 +1426,9 @@ public class DataNode extends ReconfigurableBase
* Initializes the {@link #data}. The initialization is done only once, when
* handshake with the the first namenode is completed.
*/
- private void initStorage(final NamespaceInfo nsInfo) throws IOException {
- final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> factory
- = FsDatasetSpi.Factory.getFactory(conf);
-
- if (!factory.isSimulated()) {
+ private FsDatasetSpi<?> initStorage(
+ final String blockPoolId, final NamespaceInfo nsInfo) throws IOException {
+ if (!datasetFactory.isSimulated()) {
final StartupOption startOpt = getStartupOption(conf);
if (startOpt == null) {
throw new IOException("Startup option not set.");
@@ -1409,12 +1446,7 @@ public class DataNode extends ReconfigurableBase
// If this is a newly formatted DataNode then assign a new DatanodeUuid.
checkDatanodeUuid();
-
- synchronized(this) {
- if (data == null) {
- data = factory.newInstance(this, storage, conf);
- }
- }
+ return allocateFsDataset(blockPoolId, nsInfo.getNodeType());
}
/**
@@ -1556,8 +1588,9 @@ public class DataNode extends ReconfigurableBase
Token<BlockTokenIdentifier> token) throws IOException {
checkBlockLocalPathAccess();
checkBlockToken(block, token, BlockTokenIdentifier.AccessMode.READ);
- Preconditions.checkNotNull(data, "Storage not yet initialized");
- BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
+ FsDatasetSpi<?> dataset = getFSDataset(block.getBlockPoolId());
+ Preconditions.checkNotNull(dataset, "Storage not yet initialized");
+ BlockLocalPathInfo info = dataset.getBlockLocalPathInfo(block);
if (LOG.isDebugEnabled()) {
if (info != null) {
if (LOG.isTraceEnabled()) {
@@ -1612,8 +1645,10 @@ public class DataNode extends ReconfigurableBase
FileInputStream fis[] = new FileInputStream[2];
try {
- fis[0] = (FileInputStream)data.getBlockInputStream(blk, 0);
- fis[1] = DatanodeUtil.getMetaDataInputStream(blk, data);
+ final FsDatasetSpi<?> dataset = getFSDataset(blk.getBlockPoolId());
+ Preconditions.checkNotNull(dataset, "Storage not yet initialized");
+ fis[0] = (FileInputStream) dataset.getBlockInputStream(blk, 0);
+ fis[1] = DatanodeUtil.getMetaDataInputStream(blk, dataset);
} catch (ClassCastException e) {
LOG.debug("requestShortCircuitFdsForRead failed", e);
throw new ShortCircuitFdsUnsupportedException("This DataNode's " +
@@ -1642,7 +1677,9 @@ public class DataNode extends ReconfigurableBase
DataNodeFaultInjector.get().getHdfsBlocksMetadata();
- return data.getHdfsBlocksMetadata(bpId, blockIds);
+ final FsDatasetSpi<?> dataset = getFSDataset(bpId);
+ Preconditions.checkNotNull(dataset, "Storage not yet initialized");
+ return dataset.getHdfsBlocksMetadata(bpId, blockIds);
}
private void checkBlockToken(ExtendedBlock block, Token<BlockTokenIdentifier> token,
@@ -1804,8 +1841,8 @@ public class DataNode extends ReconfigurableBase
LOG.warn("Exception when unlocking storage: " + ie, ie);
}
}
- if (data != null) {
- data.shutdown();
+ for (FsDatasetSpi<?> dataset : datasets) {
+ dataset.shutdown();
}
if (metrics != null) {
metrics.shutdown();
@@ -1842,8 +1879,9 @@ public class DataNode extends ReconfigurableBase
}
}
- private void handleDiskError(String errMsgr) {
- final boolean hasEnoughResources = data.hasEnoughResource();
+ private void handleDiskError(final FsDatasetSpi<?> dataset,
+ final String errMsgr) {
+ final boolean hasEnoughResources = dataset.hasEnoughResource();
LOG.warn("DataNode.handleDiskError: Keep Running: " + hasEnoughResources);
// If we have enough active valid volumes then we do not want to
@@ -1902,7 +1940,7 @@ public class DataNode extends ReconfigurableBase
private void reportBadBlock(final BPOfferService bpos,
final ExtendedBlock block, final String msg) {
- FsVolumeSpi volume = getFSDataset().getVolume(block);
+ FsVolumeSpi volume = getFSDataset(block.getBlockPoolId()).getVolume(block);
bpos.reportBadBlocks(
block, volume.getStorageID(), volume.getStorageType());
LOG.warn(msg);
@@ -1917,9 +1955,11 @@ public class DataNode extends ReconfigurableBase
boolean replicaStateNotFinalized = false;
boolean blockFileNotExist = false;
boolean lengthTooShort = false;
+ final FsDatasetSpi<?> dataset = getFSDataset(block.getBlockPoolId());
+ Preconditions.checkNotNull(dataset, "Storage not yet initialized");
try {
- data.checkBlock(block, block.getNumBytes(), ReplicaState.FINALIZED);
+ dataset.checkBlock(block, block.getNumBytes(), ReplicaState.FINALIZED);
} catch (ReplicaNotFoundException e) {
replicaNotExist = true;
} catch (UnexpectedReplicaStateException e) {
@@ -1949,10 +1989,13 @@ public class DataNode extends ReconfigurableBase
}
if (lengthTooShort) {
// Check if NN recorded length matches on-disk length
- // Shorter on-disk len indicates corruption so report NN the corrupt block
+ // Shorter on-disk len indicates corruption so report NN
+ // the corrupt block
reportBadBlock(bpos, block, "Can't replicate block " + block
- + " because on-disk length " + data.getLength(block)
- + " is shorter than NameNode recorded length " + block.getNumBytes());
+ + " because on-disk length "
+ + getFSDataset(block.getBlockPoolId()).getLength(block)
+ + " is shorter than NameNode recorded length "
+ + block.getNumBytes());
return;
}
@@ -2159,7 +2202,8 @@ public class DataNode extends ReconfigurableBase
DFSUtil.getSmallBufferSize(conf)));
in = new DataInputStream(unbufIn);
blockSender = new BlockSender(b, 0, b.getNumBytes(),
- false, false, true, DataNode.this, null, cachingStrategy);
+ false, false, true, DataNode.this, getFSDataset(b.getBlockPoolId()),
+ null, cachingStrategy);
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
@@ -2447,8 +2491,10 @@ public class DataNode extends ReconfigurableBase
@Override
public String toString() {
- return "DataNode{data=" + data + ", localName='" + getDisplayName()
- + "', datanodeUuid='" + storage.getDatanodeUuid() + "', xmitsInProgress="
+ return "DataNode{datasets=" + datasets.toString()
+ + ", localName='" + getDisplayName()
+ + "', datanodeUuid='" + storage.getDatanodeUuid()
+ + "', xmitsInProgress="
+ xmitsInProgress.get() + "}";
}
@@ -2506,14 +2552,61 @@ public class DataNode extends ReconfigurableBase
}
/**
+ * Allocate a new dataset for the given serviceType. This may return a
+ * previously allocated dataset.
+ *
+ * @param bpid
+ * @param serviceType
+ * @return
+ * @throws IOException
+ */
+ private FsDatasetSpi<?> allocateFsDataset(
+ final String bpid, final NodeType serviceType) throws IOException {
+ FsDatasetSpi<?> dataset =
+ datasetFactory.newInstance(this, storage, conf, serviceType);
+ datasets.add(dataset);
+ datasetsMap.put(bpid, dataset);
+
+ if (serviceType == NodeType.NAME_NODE) {
+ // 'data' is retained for existing mock-based HDFS unit tests.
+ Preconditions.checkState(data == null || data == dataset);
+ data = dataset;
+ }
+
+ return dataset;
+ }
+
+ /**
* Examples are adding and deleting blocks directly.
* The most common usage will be when the data node's storage is simulated.
*
* @return the fsdataset that stores the blocks
*/
@VisibleForTesting
+ public FsDatasetSpi<?> getFSDataset(final String bpid) {
+ return datasetsMap.get(bpid);
+ }
+
+ @VisibleForTesting
+ public Set<FsDatasetSpi<?>> getFSDatasets() {
+ return datasets;
+ }
+
+ /**
+ * Do NOT use this method outside of tests.
+ * Retained for compatibility with existing tests and subject to removal.
+ *
+ * @return the fsdataset that stores the blocks
+ */
+ @VisibleForTesting
public FsDatasetSpi<?> getFSDataset() {
- return data;
+ Preconditions.checkState(datasets.size() <= 1,
+ "Did not expect more than one Dataset here.");
+
+ if (datasets.size() == 0) {
+ return null;
+ }
+ return (FsDatasetSpi<?>) datasets.iterator().next();
}
@VisibleForTesting
@@ -2522,9 +2615,15 @@ public class DataNode extends ReconfigurableBase
return blockScanner;
}
+ /**
+ * Do NOT use this method outside of tests.
+ * Retained for compatibility with existing tests and subject to removal.
+ *
+ * @return
+ */
@VisibleForTesting
DirectoryScanner getDirectoryScanner() {
- return directoryScanner;
+ return directoryScannersMap.get(getFSDataset());
}
public static void secureMain(String args[], SecureResources resources) {
@@ -2584,7 +2683,12 @@ public class DataNode extends ReconfigurableBase
@Override // InterDatanodeProtocol
public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
throws IOException {
- return data.initReplicaRecovery(rBlock);
+ final FsDatasetSpi<?> dataset =
+ getFSDataset(rBlock.getBlock().getBlockPoolId());
+ if (dataset != null) {
+ return dataset.initReplicaRecovery(rBlock);
+ }
+ return null;
}
/**
@@ -2608,8 +2712,10 @@ public class DataNode extends ReconfigurableBase
public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock,
final long recoveryId, final long newBlockId, final long newLength)
throws IOException {
- final String storageID = data.updateReplicaUnderRecovery(oldBlock,
- recoveryId, newBlockId, newLength);
+ final String storageID =
+ getFSDataset(oldBlock.getBlockPoolId())
+ .updateReplicaUnderRecovery(oldBlock, recoveryId,
+ newBlockId, newLength);
// Notify the namenode of the updated block info. This is important
// for HA, since otherwise the standby node may lose track of the
// block locations until the next block report.
@@ -2849,7 +2955,7 @@ public class DataNode extends ReconfigurableBase
@Override // ClientDataNodeProtocol
public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException {
checkReadAccess(block);
- return data.getReplicaVisibleLength(block);
+ return getFSDataset(block.getBlockPoolId()).getReplicaVisibleLength(block);
}
private void checkReadAccess(final ExtendedBlock block) throws IOException {
@@ -2886,10 +2992,11 @@ public class DataNode extends ReconfigurableBase
final long storedGS;
final long visible;
final BlockConstructionStage stage;
+ final FsDatasetSpi<?> dataset = getFSDataset(b.getBlockPoolId());
//get replica information
- synchronized(data) {
- Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
+ synchronized(dataset) {
+ Block storedBlock = dataset.getStoredBlock(b.getBlockPoolId(),
b.getBlockId());
if (null == storedBlock) {
throw new IOException(b + " not found in datanode.");
@@ -2901,15 +3008,16 @@ public class DataNode extends ReconfigurableBase
}
// Update the genstamp with storedGS
b.setGenerationStamp(storedGS);
- if (data.isValidRbw(b)) {
+ if (dataset.isValidRbw(b)) {
stage = BlockConstructionStage.TRANSFER_RBW;
- } else if (data.isValidBlock(b)) {
+ } else if (dataset.isValidBlock(b)) {
stage = BlockConstructionStage.TRANSFER_FINALIZED;
} else {
- final String r = data.getReplicaString(b.getBlockPoolId(), b.getBlockId());
+ final String r = dataset.getReplicaString(
+ b.getBlockPoolId(), b.getBlockId());
throw new IOException(b + " is neither a RBW nor a Finalized, r=" + r);
}
- visible = data.getReplicaVisibleLength(b);
+ visible = dataset.getReplicaVisibleLength(b);
}
//set visible length
b.setNumBytes(visible);
@@ -2990,6 +3098,7 @@ public class DataNode extends ReconfigurableBase
*/
@Override // DataNodeMXBean
public String getVolumeInfo() {
+ // Default implementation for backwards compatibility.
Preconditions.checkNotNull(data, "Storage not yet initialized");
return JSON.toString(data.getVolumeInfoMap());
}
@@ -3024,7 +3133,7 @@ public class DataNode extends ReconfigurableBase
"shutdown the block pool service");
}
- data.deleteBlockPool(blockPoolId, force);
+ getFSDataset(blockPoolId).deleteBlockPool(blockPoolId, force);
}
@Override // ClientDatanodeProtocol
@@ -3179,8 +3288,8 @@ public class DataNode extends ReconfigurableBase
/**
* Check the disk error
*/
- private void checkDiskError() {
- Set<File> unhealthyDataDirs = data.checkDataDir();
+ private void checkDiskError(final FsDatasetSpi<?> dataset) {
+ Set<File> unhealthyDataDirs = dataset.checkDataDir();
if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) {
try {
// Remove all unhealthy volumes from DataNode.
@@ -3193,7 +3302,7 @@ public class DataNode extends ReconfigurableBase
for (File dataDir : unhealthyDataDirs) {
sb.append(dataDir.getAbsolutePath() + ";");
}
- handleDiskError(sb.toString());
+ handleDiskError(dataset, sb.toString());
}
}
@@ -3213,7 +3322,9 @@ public class DataNode extends ReconfigurableBase
}
if(tempFlag) {
try {
- checkDiskError();
+ for (final FsDatasetSpi<?> dataset : datasets) {
+ checkDiskError(dataset);
+ }
} catch (Exception e) {
LOG.warn("Unexpected exception occurred while checking disk error " + e);
checkDiskErrorThread = null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 26d669c..fbb8897 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
@@ -312,8 +313,9 @@ class DataXceiver extends Receiver implements Runnable {
"anything but a UNIX domain socket.");
}
if (slotId != null) {
- boolean isCached = datanode.data.
- isCached(blk.getBlockPoolId(), blk.getBlockId());
+ final String bpid = blk.getBlockPoolId();
+ boolean isCached = datanode.getFSDataset(bpid).
+ isCached(bpid, blk.getBlockId());
datanode.shortCircuitRegistry.registerSlot(
ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached);
registeredSlotId = slotId;
@@ -523,7 +525,14 @@ class DataXceiver extends Receiver implements Runnable {
baseStream, smallBufferSize));
checkAccess(out, true, block, blockToken,
Op.READ_BLOCK, BlockTokenIdentifier.AccessMode.READ);
-
+
+ final FsDatasetSpi<?> dataset =
+ datanode.getFSDataset(block.getBlockPoolId());
+ if (dataset == null) {
+ throw new IOException(
+ "Unknown or unitialized blockpool " + block.getBlockPoolId());
+ }
+
// send the block
BlockSender blockSender = null;
DatanodeRegistration dnR =
@@ -540,7 +549,7 @@ class DataXceiver extends Receiver implements Runnable {
try {
try {
blockSender = new BlockSender(block, blockOffset, length,
- true, false, sendChecksum, datanode, clientTraceFmt,
+ true, false, sendChecksum, datanode, dataset, clientTraceFmt,
cachingStrategy);
} catch(IOException e) {
String msg = "opReadBlock " + block + " received exception " + e;
@@ -630,6 +639,13 @@ class DataXceiver extends Receiver implements Runnable {
final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;
long size = 0;
+ final FsDatasetSpi<?> dataset =
+ datanode.getFSDataset(block.getBlockPoolId());
+ if (dataset == null) {
+ throw new IOException(
+ "Unknown or unitialized blockpool " + block.getBlockPoolId());
+ }
+
// check single target for transfer-RBW/Finalized
if (isTransfer && targets.length > 0) {
throw new IOException(stage + " does not support multiple targets "
@@ -683,12 +699,12 @@ class DataXceiver extends Receiver implements Runnable {
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
- clientname, srcDataNode, datanode, requestedChecksum,
+ clientname, srcDataNode, datanode, dataset, requestedChecksum,
cachingStrategy, allowLazyPersist, pinning);
storageUuid = blockReceiver.getStorageUuid();
} else {
- storageUuid = datanode.data.recoverClose(
+ storageUuid = dataset.recoverClose(
block, latestGenerationStamp, minBytesRcvd);
}
@@ -890,6 +906,12 @@ class DataXceiver extends Receiver implements Runnable {
final int csize = checksum.getChecksumSize();
final byte[] buffer = new byte[4*1024];
MessageDigest digester = MD5Hash.getDigester();
+ final FsDatasetSpi<?> dataset =
+ datanode.getFSDataset(block.getBlockPoolId());
+ if (dataset == null) {
+ throw new IOException(
+ "Unknown or unitialized blockpool " + block.getBlockPoolId());
+ }
long remaining = requestLength / bytesPerCRC * csize;
for (int toDigest = 0; remaining > 0; remaining -= toDigest) {
@@ -904,7 +926,7 @@ class DataXceiver extends Receiver implements Runnable {
int partialLength = (int) (requestLength % bytesPerCRC);
if (partialLength > 0) {
byte[] buf = new byte[partialLength];
- final InputStream blockIn = datanode.data.getBlockInputStream(block,
+ final InputStream blockIn = dataset.getBlockInputStream(block,
requestLength - partialLength);
try {
// Get the CRC of the partialLength.
@@ -928,14 +950,20 @@ class DataXceiver extends Receiver implements Runnable {
checkAccess(out, true, block, blockToken,
Op.BLOCK_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
// client side now can specify a range of the block for checksum
+ final FsDatasetSpi<?> dataset =
+ datanode.getFSDataset(block.getBlockPoolId());
+ if (dataset == null) {
+ throw new IOException(
+ "Unknown or unitialized blockpool " + block.getBlockPoolId());
+ }
+
long requestLength = block.getNumBytes();
Preconditions.checkArgument(requestLength >= 0);
- long visibleLength = datanode.data.getReplicaVisibleLength(block);
+ long visibleLength = dataset.getReplicaVisibleLength(block);
boolean partialBlk = requestLength < visibleLength;
updateCurrentThreadName("Reading metadata for block " + block);
- final LengthInputStream metadataIn = datanode.data
- .getMetaDataInputStream(block);
+ final LengthInputStream metadataIn = dataset.getMetaDataInputStream(block);
final DataInputStream checksumIn = new DataInputStream(
new BufferedInputStream(metadataIn, ioFileBufferSize));
@@ -986,6 +1014,13 @@ class DataXceiver extends Receiver implements Runnable {
@Override
public void copyBlock(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
+ final FsDatasetSpi<?> dataset =
+ datanode.getFSDataset(block.getBlockPoolId());
+ if (dataset == null) {
+ throw new IOException(
+ "Unknown or unitialized blockpool " + block.getBlockPoolId());
+ }
+
updateCurrentThreadName("Copying block " + block);
// Read in the header
if (datanode.isBlockTokenEnabled) {
@@ -1002,7 +1037,7 @@ class DataXceiver extends Receiver implements Runnable {
}
- if (datanode.data.getPinning(block)) {
+ if (dataset.getPinning(block)) {
String msg = "Not able to copy block " + block.getBlockId() + " " +
"to " + peer.getRemoteAddressString() + " because it's pinned ";
LOG.info(msg);
@@ -1025,7 +1060,7 @@ class DataXceiver extends Receiver implements Runnable {
try {
// check if the block exists or not
blockSender = new BlockSender(block, 0, -1, false, false, true, datanode,
- null, CachingStrategy.newDropBehind());
+ dataset, null, CachingStrategy.newDropBehind());
// set up response stream
OutputStream baseStream = getOutputStream();
@@ -1073,6 +1108,13 @@ class DataXceiver extends Receiver implements Runnable {
final Token<BlockTokenIdentifier> blockToken,
final String delHint,
final DatanodeInfo proxySource) throws IOException {
+ final FsDatasetSpi<?> dataset =
+ datanode.getFSDataset(block.getBlockPoolId());
+ if (dataset == null) {
+ throw new IOException(
+ "Unknown or unitialized blockpool " + block.getBlockPoolId());
+ }
+
updateCurrentThreadName("Replacing block " + block + " from " + delHint);
/* read header */
@@ -1109,7 +1151,7 @@ class DataXceiver extends Receiver implements Runnable {
try {
// Move the block to different storage in the same datanode
if (proxySource.equals(datanode.getDatanodeId())) {
- ReplicaInfo oldReplica = datanode.data.moveBlockAcrossStorage(block,
+ ReplicaInfo oldReplica = dataset.moveBlockAcrossStorage(block,
storageType);
if (oldReplica != null) {
LOG.info("Moved " + block + " from StorageType "
@@ -1164,7 +1206,7 @@ class DataXceiver extends Receiver implements Runnable {
blockReceiver = new BlockReceiver(block, storageType,
proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
- null, 0, 0, 0, "", null, datanode, remoteChecksum,
+ null, 0, 0, 0, "", null, datanode, dataset, remoteChecksum,
CachingStrategy.newDropBehind(), false, false);
// receive a block
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
index 615abe9..2cc3516 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
@@ -433,8 +433,9 @@ public class VolumeScanner extends Thread {
BlockSender blockSender = null;
try {
blockSender = new BlockSender(block, 0, -1,
- false, true, true, datanode, null,
- CachingStrategy.newDropBehind());
+ false, true, true, datanode,
+ datanode.getFSDataset(block.getBlockPoolId()),
+ null, CachingStrategy.newDropBehind());
throttler.setBandwidth(bytesPerSec);
long bytesRead = blockSender.sendBlock(nullStream, null, throttler);
resultHandler.handle(block, null);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 76c4f02..a4672b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
@@ -68,6 +70,7 @@ import org.apache.hadoop.util.ReflectionUtils;
* The default implementation stores replicas on local drives.
*/
@InterfaceAudience.Private
+@InterfaceStability.Unstable
public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
/**
* A factory for creating {@link FsDatasetSpi} objects.
@@ -83,9 +86,10 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
return ReflectionUtils.newInstance(clazz, conf);
}
- /** Create a new object. */
- public abstract D newInstance(DataNode datanode, DataStorage storage,
- Configuration conf) throws IOException;
+ /** Create a new dataset object for a specific service type. */
+ public abstract D newInstance(DataNode datanode,
+ DataStorage storage, Configuration conf,
+ NodeType serviceType) throws IOException;
/** Does the factory create simulated objects? */
public boolean isSimulated() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index 8d1bb2a..7c7b9a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -22,12 +22,16 @@ import java.io.File;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
/**
* This is an interface for the underlying volume.
*/
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
public interface FsVolumeSpi {
/**
* Obtain a reference object that had increased 1 reference count of the
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java
index 52e385b..01c3830 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetFactory.java
@@ -18,8 +18,11 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -28,9 +31,25 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
* A factory for creating {@link FsDatasetImpl} objects.
*/
public class FsDatasetFactory extends FsDatasetSpi.Factory<FsDatasetImpl> {
+
+ private final Map<NodeType, FsDatasetImpl> datasetMap = new HashMap<>();
+
@Override
- public FsDatasetImpl newInstance(DataNode datanode,
- DataStorage storage, Configuration conf) throws IOException {
- return new FsDatasetImpl(datanode, storage, conf);
+ public synchronized FsDatasetImpl newInstance(DataNode datanode,
+ DataStorage storage, Configuration conf,
+ NodeType serviceType) throws IOException {
+ FsDatasetImpl dataset = datasetMap.get(serviceType);
+ if (dataset != null) {
+ return dataset;
+ }
+ switch (serviceType) {
+ case NAME_NODE:
+ dataset = new FsDatasetImpl(datanode, storage, conf);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported node type " + serviceType);
+ }
+ datasetMap.put(serviceType, dataset);
+ return dataset;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 8ebd214..999b827 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -308,7 +308,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
blockChooserImpl);
asyncDiskService = new FsDatasetAsyncDiskService(datanode, this);
- asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
+ asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode, this);
deletingBlock = new HashMap<String, Set<Long>>();
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
@@ -347,9 +347,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
/**
* Gets initial volume failure information for all volumes that failed
- * immediately at startup. The method works by determining the set difference
- * between all configured storage locations and the actual storage locations in
- * use after attempting to put all of them into service.
+ * immediately at startup. The method works by determining the set
+ * difference between all configured storage locations and the actual
+ * storage locations in use after attempting to put all of them into
+ * service.
*
* @return each storage location that has failed
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
index 884df2e..effbd4b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
@@ -53,6 +53,7 @@ class RamDiskAsyncLazyPersistService {
private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
private final DataNode datanode;
+ private final FsDatasetImpl dataset;
private final ThreadGroup threadGroup;
private Map<File, ThreadPoolExecutor> executors
= new HashMap<File, ThreadPoolExecutor>();
@@ -65,8 +66,10 @@ class RamDiskAsyncLazyPersistService {
* The RamDiskAsyncLazyPersistService uses one ThreadPool per volume to do the async
* disk operations.
*/
- RamDiskAsyncLazyPersistService(DataNode datanode) {
+ RamDiskAsyncLazyPersistService(DataNode datanode,
+ final FsDatasetImpl dataset) {
this.datanode = datanode;
+ this.dataset = dataset;
this.threadGroup = new ThreadGroup(getClass().getSimpleName());
}
@@ -234,7 +237,6 @@ class RamDiskAsyncLazyPersistService {
@Override
public void run() {
boolean succeeded = false;
- final FsDatasetImpl dataset = (FsDatasetImpl)datanode.getFSDataset();
try (FsVolumeReference ref = this.targetVolume) {
int smallBufferSize = DFSUtil.getSmallBufferSize(EMPTY_HDFS_CONF);
// No FsDatasetImpl lock for the file copy
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
index 5c1b38f..d925b93 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.*;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.test.GenericTestUtils;
@@ -73,7 +74,8 @@ public class TestWriteBlockGetsBlockLengthHint {
static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> {
@Override
public SimulatedFSDataset newInstance(DataNode datanode,
- DataStorage storage, Configuration conf) throws IOException {
+ DataStorage storage, Configuration conf,
+ HdfsServerConstants.NodeType serviceType) throws IOException {
return new FsDatasetChecker(storage, conf);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 778dd28..d957767 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
@@ -83,7 +84,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> {
@Override
public SimulatedFSDataset newInstance(DataNode datanode,
- DataStorage storage, Configuration conf) throws IOException {
+ DataStorage storage, Configuration conf,
+ HdfsServerConstants.NodeType serviceType) throws IOException {
return new SimulatedFSDataset(datanode, storage, conf);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 64cc78b..2059f1d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
import java.io.File;
import java.io.IOException;
@@ -118,7 +119,8 @@ public class TestBPOfferService {
mockFSDataset.addBlockPool(FAKE_BPID, conf);
// Wire the dataset to the DN.
- Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset();
+ Mockito.doReturn(mockFSDataset).when(mockDn).initBlockPool(Mockito.any(BPOfferService.class));
+ Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(anyString());
}
/**
@@ -325,15 +327,16 @@ public class TestBPOfferService {
Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")).
when(mockDn).getMetrics();
final AtomicInteger count = new AtomicInteger();
- Mockito.doAnswer(new Answer<Void>() {
+ Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(anyString());
+ Mockito.doAnswer(new Answer<FsDatasetSpi<?>>() {
@Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
+ public FsDatasetSpi<?> answer(InvocationOnMock invocation) throws Throwable {
if (count.getAndIncrement() == 0) {
throw new IOException("faked initBlockPool exception");
}
// The initBlockPool is called again. Now mock init is done.
- Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset();
- return null;
+ Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(anyString());
+ return mockFSDataset;
}
}).when(mockDn).initBlockPool(Mockito.any(BPOfferService.class));
BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2);
@@ -563,10 +566,10 @@ public class TestBPOfferService {
assertSame(mockNN1, bpos.getActiveNN());
Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(0))
.when(mockNN1).errorReport(Mockito.any(DatanodeRegistration.class),
- Mockito.anyInt(), Mockito.anyString());
+ Mockito.anyInt(), anyString());
Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(1))
.when(mockNN2).errorReport(Mockito.any(DatanodeRegistration.class),
- Mockito.anyInt(), Mockito.anyString());
+ Mockito.anyInt(), anyString());
String errorString = "Can't send invalid block " + FAKE_BLOCK;
bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
@@ -614,7 +617,7 @@ public class TestBPOfferService {
}
}
}).when(mockNN1).errorReport(Mockito.any(DatanodeRegistration.class),
- Mockito.anyInt(), Mockito.anyString());
+ Mockito.anyInt(), anyString());
String errorString = "Can't send invalid block " + FAKE_BLOCK;
bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
Thread.sleep(10000);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e721a39e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java
index 07a26cc..ad4135b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeInitStorage.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.junit.Test;
@@ -45,7 +46,7 @@ public class TestDataNodeInitStorage {
@Override
public SimulatedFsDatasetVerifier newInstance(
DataNode datanode, DataStorage storage,
- Configuration conf) throws IOException {
+ Configuration conf, HdfsServerConstants.NodeType serviceType) throws IOException {
return new SimulatedFsDatasetVerifier(storage, conf);
}