You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2014/11/26 19:00:53 UTC
hadoop git commit: HDFS-7310. Mover can give first priority to local
DN if it has target storage type available in local DN. (Vinayakumar B via
umamahesh)
Repository: hadoop
Updated Branches:
refs/heads/trunk aa7dac335 -> 058af60c5
HDFS-7310. Mover can give first priority to local DN if it has target storage type available in local DN. (Vinayakumar B via umamahesh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/058af60c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/058af60c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/058af60c
Branch: refs/heads/trunk
Commit: 058af60c56207907f2bedf76df4284e86d923e0c
Parents: aa7dac3
Author: Uma Maheswara Rao G <um...@apache.org>
Authored: Wed Nov 26 23:27:25 2014 +0530
Committer: Uma Maheswara Rao G <um...@apache.org>
Committed: Wed Nov 26 23:27:25 2014 +0530
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hadoop/hdfs/server/balancer/Dispatcher.java | 17 +--
.../server/blockmanagement/BlockManager.java | 18 ++-
.../hdfs/server/blockmanagement/BlocksMap.java | 6 +-
.../blockmanagement/DatanodeStorageInfo.java | 14 +-
.../hdfs/server/datanode/DataXceiver.java | 141 ++++++++++---------
.../server/datanode/fsdataset/FsDatasetSpi.java | 7 +
.../datanode/fsdataset/impl/BlockPoolSlice.java | 4 +
.../datanode/fsdataset/impl/FsDatasetImpl.java | 134 ++++++++++++++----
.../datanode/fsdataset/impl/FsVolumeImpl.java | 4 +
.../impl/RamDiskAsyncLazyPersistService.java | 2 +-
.../apache/hadoop/hdfs/server/mover/Mover.java | 29 +++-
.../server/blockmanagement/TestBlockInfo.java | 4 +-
.../blockmanagement/TestDatanodeDescriptor.java | 8 +-
.../server/datanode/SimulatedFSDataset.java | 7 +
.../server/datanode/TestBlockReplacement.java | 101 ++++++++++---
.../hadoop/hdfs/server/mover/TestMover.java | 48 ++++++-
17 files changed, 397 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 89e67eb..12219a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -401,6 +401,9 @@ Release 2.7.0 - UNRELEASED
HDFS-6803 Document DFSClient#DFSInputStream expectations reading and preading
in concurrent context. (stack via stevel)
+ HDFS-7310. Mover can give first priority to local DN if it has target storage type
+ available in local DN. (Vinayakumar B via umamahesh)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 6ede40a..63e151c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -243,6 +243,10 @@ public class Dispatcher {
*/
private boolean chooseProxySource() {
final DatanodeInfo targetDN = target.getDatanodeInfo();
+ // if source and target are same nodes then no need of proxy
+ if (source.getDatanodeInfo().equals(targetDN) && addTo(source)) {
+ return true;
+ }
// if node group is supported, first try add nodes in the same node group
if (cluster.isNodeGroupAware()) {
for (StorageGroup loc : block.getLocations()) {
@@ -375,19 +379,6 @@ public class Dispatcher {
public DBlock(Block block) {
super(block);
}
-
- @Override
- public synchronized boolean isLocatedOn(StorageGroup loc) {
- // currently we only check if replicas are located on the same DataNodes
- // since we do not have the capability to store two replicas in the same
- // DataNode even though they are on two different storage types
- for (StorageGroup existing : locations) {
- if (existing.getDatanodeInfo().equals(loc.getDatanodeInfo())) {
- return true;
- }
- }
- return false;
- }
}
/** The class represents a desired move. */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 254643c..2676696 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.fs.FileEncryptionInfo;
-
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
@@ -63,6 +62,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.Acces
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -2000,8 +2000,9 @@ public class BlockManager {
// place a delimiter in the list which separates blocks
// that have been reported from those that have not
BlockInfo delimiter = new BlockInfo(new Block(), (short) 1);
- boolean added = storageInfo.addBlock(delimiter);
- assert added : "Delimiting block cannot be present in the node";
+ AddBlockResult result = storageInfo.addBlock(delimiter);
+ assert result == AddBlockResult.ADDED
+ : "Delimiting block cannot be present in the node";
int headIndex = 0; //currently the delimiter is in the head of the list
int curIndex;
@@ -2394,14 +2395,19 @@ public class BlockManager {
assert bc != null : "Block must belong to a file";
// add block to the datanode
- boolean added = storageInfo.addBlock(storedBlock);
+ AddBlockResult result = storageInfo.addBlock(storedBlock);
int curReplicaDelta;
- if (added) {
+ if (result == AddBlockResult.ADDED) {
curReplicaDelta = 1;
if (logEveryBlock) {
logAddStoredBlock(storedBlock, node);
}
+ } else if (result == AddBlockResult.REPLACED) {
+ curReplicaDelta = 0;
+ blockLog.warn("BLOCK* addStoredBlock: " + "block " + storedBlock
+ + " moved to storageType " + storageInfo.getStorageType()
+ + " on node " + node);
} else {
// if the same block is added again and the replica was corrupt
// previously because of a wrong gen stamp, remove it from the
@@ -2423,7 +2429,7 @@ public class BlockManager {
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
numLiveReplicas >= minReplication) {
storedBlock = completeBlock(bc, storedBlock, false);
- } else if (storedBlock.isComplete() && added) {
+ } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
// check whether safe replication is reached for the block
// only complete blocks are counted towards that
// Is no-op if not in safe mode.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
index a675635..6664034 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Iterator;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.LightWeightGSet;
@@ -223,8 +224,9 @@ class BlocksMap {
final boolean removed = storage.removeBlock(currentBlock);
Preconditions.checkState(removed, "currentBlock not found.");
- final boolean added = storage.addBlock(newBlock);
- Preconditions.checkState(added, "newBlock already exists.");
+ final AddBlockResult result = storage.addBlock(newBlock);
+ Preconditions.checkState(result == AddBlockResult.ADDED,
+ "newBlock already exists.");
}
// replace block in the map itself
blocks.put(newBlock);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 8c44b30..a3198e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -215,10 +215,10 @@ public class DatanodeStorageInfo {
return blockPoolUsed;
}
- public boolean addBlock(BlockInfo b) {
+ public AddBlockResult addBlock(BlockInfo b) {
// First check whether the block belongs to a different storage
// on the same DN.
- boolean replaced = false;
+ AddBlockResult result = AddBlockResult.ADDED;
DatanodeStorageInfo otherStorage =
b.findStorageInfo(getDatanodeDescriptor());
@@ -226,10 +226,10 @@ public class DatanodeStorageInfo {
if (otherStorage != this) {
// The block belongs to a different storage. Remove it first.
otherStorage.removeBlock(b);
- replaced = true;
+ result = AddBlockResult.REPLACED;
} else {
// The block is already associated with this storage.
- return false;
+ return AddBlockResult.ALREADY_EXIST;
}
}
@@ -237,7 +237,7 @@ public class DatanodeStorageInfo {
b.addStorage(this);
blockList = b.listInsert(blockList, this);
numBlocks++;
- return !replaced;
+ return result;
}
boolean removeBlock(BlockInfo b) {
@@ -358,4 +358,8 @@ public class DatanodeStorageInfo {
}
return null;
}
+
+ static enum AddBlockResult {
+ ADDED, REPLACED, ALREADY_EXIST;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 61b9c67..bbc23e6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -1007,7 +1007,6 @@ class DataXceiver extends Receiver implements Runnable {
updateCurrentThreadName("Replacing block " + block + " from " + delHint);
/* read header */
- block.setNumBytes(dataXceiverServer.estimateBlockSize);
if (datanode.isBlockTokenEnabled) {
try {
datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
@@ -1039,73 +1038,83 @@ class DataXceiver extends Receiver implements Runnable {
DataOutputStream replyOut = new DataOutputStream(getOutputStream());
boolean IoeDuringCopyBlockOperation = false;
try {
- // get the output stream to the proxy
- final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connecting to datanode " + dnAddr);
- }
- InetSocketAddress proxyAddr = NetUtils.createSocketAddr(dnAddr);
- proxySock = datanode.newSocket();
- NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
- proxySock.setSoTimeout(dnConf.socketTimeout);
-
- OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
- dnConf.socketWriteTimeout);
- InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
- DataEncryptionKeyFactory keyFactory =
- datanode.getDataEncryptionKeyFactoryForBlock(block);
- IOStreamPair saslStreams = datanode.saslClient.socketSend(proxySock,
- unbufProxyOut, unbufProxyIn, keyFactory, blockToken, proxySource);
- unbufProxyOut = saslStreams.out;
- unbufProxyIn = saslStreams.in;
-
- proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut,
- HdfsConstants.SMALL_BUFFER_SIZE));
- proxyReply = new DataInputStream(new BufferedInputStream(unbufProxyIn,
- HdfsConstants.IO_FILE_BUFFER_SIZE));
-
- /* send request to the proxy */
- IoeDuringCopyBlockOperation = true;
- new Sender(proxyOut).copyBlock(block, blockToken);
- IoeDuringCopyBlockOperation = false;
-
- // receive the response from the proxy
-
- BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
- PBHelper.vintPrefixed(proxyReply));
-
- if (copyResponse.getStatus() != SUCCESS) {
- if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) {
+ // Move the block to different storage in the same datanode
+ if (proxySource.equals(datanode.getDatanodeId())) {
+ ReplicaInfo oldReplica = datanode.data.moveBlockAcrossStorage(block,
+ storageType);
+ if (oldReplica != null) {
+ LOG.info("Moved " + block + " from StorageType "
+ + oldReplica.getVolume().getStorageType() + " to " + storageType);
+ }
+ } else {
+ block.setNumBytes(dataXceiverServer.estimateBlockSize);
+ // get the output stream to the proxy
+ final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connecting to datanode " + dnAddr);
+ }
+ InetSocketAddress proxyAddr = NetUtils.createSocketAddr(dnAddr);
+ proxySock = datanode.newSocket();
+ NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
+ proxySock.setSoTimeout(dnConf.socketTimeout);
+
+ OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
+ dnConf.socketWriteTimeout);
+ InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
+ DataEncryptionKeyFactory keyFactory =
+ datanode.getDataEncryptionKeyFactoryForBlock(block);
+ IOStreamPair saslStreams = datanode.saslClient.socketSend(proxySock,
+ unbufProxyOut, unbufProxyIn, keyFactory, blockToken, proxySource);
+ unbufProxyOut = saslStreams.out;
+ unbufProxyIn = saslStreams.in;
+
+ proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut,
+ HdfsConstants.SMALL_BUFFER_SIZE));
+ proxyReply = new DataInputStream(new BufferedInputStream(unbufProxyIn,
+ HdfsConstants.IO_FILE_BUFFER_SIZE));
+
+ /* send request to the proxy */
+ IoeDuringCopyBlockOperation = true;
+ new Sender(proxyOut).copyBlock(block, blockToken);
+ IoeDuringCopyBlockOperation = false;
+
+ // receive the response from the proxy
+
+ BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
+ PBHelper.vintPrefixed(proxyReply));
+
+ if (copyResponse.getStatus() != SUCCESS) {
+ if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) {
+ throw new IOException("Copy block " + block + " from "
+ + proxySock.getRemoteSocketAddress()
+ + " failed due to access token error");
+ }
throw new IOException("Copy block " + block + " from "
- + proxySock.getRemoteSocketAddress()
- + " failed due to access token error");
+ + proxySock.getRemoteSocketAddress() + " failed");
}
- throw new IOException("Copy block " + block + " from "
- + proxySock.getRemoteSocketAddress() + " failed");
+
+ // get checksum info about the block we're copying
+ ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
+ DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
+ checksumInfo.getChecksum());
+ // open a block receiver and check if the block does not exist
+ blockReceiver = new BlockReceiver(block, storageType,
+ proxyReply, proxySock.getRemoteSocketAddress().toString(),
+ proxySock.getLocalSocketAddress().toString(),
+ null, 0, 0, 0, "", null, datanode, remoteChecksum,
+ CachingStrategy.newDropBehind(), false);
+
+ // receive a block
+ blockReceiver.receiveBlock(null, null, replyOut, null,
+ dataXceiverServer.balanceThrottler, null, true);
+
+ // notify name node
+ datanode.notifyNamenodeReceivedBlock(
+ block, delHint, blockReceiver.getStorageUuid());
+
+ LOG.info("Moved " + block + " from " + peer.getRemoteAddressString()
+ + ", delHint=" + delHint);
}
-
- // get checksum info about the block we're copying
- ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
- DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
- checksumInfo.getChecksum());
- // open a block receiver and check if the block does not exist
- blockReceiver = new BlockReceiver(block, storageType,
- proxyReply, proxySock.getRemoteSocketAddress().toString(),
- proxySock.getLocalSocketAddress().toString(),
- null, 0, 0, 0, "", null, datanode, remoteChecksum,
- CachingStrategy.newDropBehind(), false);
-
- // receive a block
- blockReceiver.receiveBlock(null, null, replyOut, null,
- dataXceiverServer.balanceThrottler, null, true);
-
- // notify name node
- datanode.notifyNamenodeReceivedBlock(
- block, delHint, blockReceiver.getStorageUuid());
-
- LOG.info("Moved " + block + " from " + peer.getRemoteAddressString()
- + ", delHint=" + delHint);
-
} catch (IOException ioe) {
opStatus = ERROR;
errMsg = "opReplaceBlock " + block + " received exception " + ioe;
@@ -1117,7 +1126,7 @@ class DataXceiver extends Receiver implements Runnable {
throw ioe;
} finally {
// receive the last byte that indicates the proxy released its thread resource
- if (opStatus == SUCCESS) {
+ if (opStatus == SUCCESS && proxyReply != null) {
try {
proxyReply.readChar();
} catch (IOException ignored) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index a02ee0a..462ad31 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
@@ -508,4 +509,10 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* Callback from RamDiskAsyncLazyPersistService upon async lazy persist task fail
*/
public void onFailLazyPersist(String bpId, long blockId);
+
+ /**
+ * Move block from one storage to another storage
+ */
+ public ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
+ StorageType targetStorageType) throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 5c8709c..77cdb91 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -157,6 +157,10 @@ class BlockPoolSlice {
return rbwDir;
}
+ File getTmpDir() {
+ return tmpDir;
+ }
+
/** Run DU on local drives. It must be synchronized from caller. */
void decDfsUsed(long value) {
dfsUsage.decDfsUsed(value);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 4a89778..2c6f409 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -663,13 +663,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* @return the new meta and block files.
* @throws IOException
*/
- static File[] copyBlockFiles(long blockId, long genStamp,
- File srcMeta, File srcFile, File destRoot)
+ static File[] copyBlockFiles(long blockId, long genStamp, File srcMeta,
+ File srcFile, File destRoot, boolean calculateChecksum)
throws IOException {
final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
final File dstFile = new File(destDir, srcFile.getName());
final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
- computeChecksum(srcMeta, dstMeta, srcFile);
+ if (calculateChecksum) {
+ computeChecksum(srcMeta, dstMeta, srcFile);
+ } else {
+ try {
+ Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true);
+ } catch (IOException e) {
+ throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e);
+ }
+ }
try {
Storage.nativeCopyFileUnbuffered(srcFile, dstFile, true);
@@ -677,14 +685,73 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e);
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Copied " + srcMeta + " to " + dstMeta +
- " and calculated checksum");
- LOG.debug("Copied " + srcFile + " to " + dstFile);
+ if (calculateChecksum) {
+ LOG.debug("Copied " + srcMeta + " to " + dstMeta
+ + " and calculated checksum");
+ } else {
+ LOG.debug("Copied " + srcFile + " to " + dstFile);
+ }
}
return new File[] {dstMeta, dstFile};
}
/**
+ * Move block files from one storage to another storage.
+ * @return Returns the Old replicaInfo
+ * @throws IOException
+ */
+ @Override
+ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
+ StorageType targetStorageType) throws IOException {
+ ReplicaInfo replicaInfo = getReplicaInfo(block);
+ if (replicaInfo.getState() != ReplicaState.FINALIZED) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.UNFINALIZED_REPLICA + block);
+ }
+ if (replicaInfo.getNumBytes() != block.getNumBytes()) {
+ throw new IOException("Corrupted replica " + replicaInfo
+ + " with a length of " + replicaInfo.getNumBytes()
+ + " expected length is " + block.getNumBytes());
+ }
+ if (replicaInfo.getVolume().getStorageType() == targetStorageType) {
+ throw new ReplicaAlreadyExistsException("Replica " + replicaInfo
+ + " already exists on storage " + targetStorageType);
+ }
+
+ if (replicaInfo.isOnTransientStorage()) {
+ // Block movement from RAM_DISK will be done by LazyPersist mechanism
+ throw new IOException("Replica " + replicaInfo
+ + " cannot be moved from storageType : "
+ + replicaInfo.getVolume().getStorageType());
+ }
+
+ FsVolumeImpl targetVolume = volumes.getNextVolume(targetStorageType,
+ block.getNumBytes());
+ File oldBlockFile = replicaInfo.getBlockFile();
+ File oldMetaFile = replicaInfo.getMetaFile();
+
+ // Copy files to temp dir first
+ File[] blockFiles = copyBlockFiles(block.getBlockId(),
+ block.getGenerationStamp(), oldMetaFile, oldBlockFile,
+ targetVolume.getTmpDir(block.getBlockPoolId()),
+ replicaInfo.isOnTransientStorage());
+
+ ReplicaInfo newReplicaInfo = new ReplicaInPipeline(
+ replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(),
+ targetVolume, blockFiles[0].getParentFile(), 0);
+ newReplicaInfo.setNumBytes(blockFiles[1].length());
+ // Finalize the copied files
+ newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
+
+ removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile,
+ oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId());
+
+ // Replace the old block if any to reschedule the scanning.
+ datanode.getBlockScanner().addBlock(block);
+ return replicaInfo;
+ }
+
+ /**
* Compute and store the checksum for a block file that does not already have
* its checksum computed.
*
@@ -2442,6 +2509,35 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
+ private void removeOldReplica(ReplicaInfo replicaInfo,
+ ReplicaInfo newReplicaInfo, File blockFile, File metaFile,
+ long blockFileUsed, long metaFileUsed, final String bpid) {
+ // Before deleting the files from old storage we must notify the
+ // NN that the files are on the new storage. Else a blockReport from
+ // the transient storage might cause the NN to think the blocks are lost.
+ // Replicas must be evicted from client short-circuit caches, because the
+ // storage will no longer be same, and thus will require validating
+ // checksum. This also stops a client from holding file descriptors,
+ // which would prevent the OS from reclaiming the memory.
+ ExtendedBlock extendedBlock =
+ new ExtendedBlock(bpid, newReplicaInfo);
+ datanode.getShortCircuitRegistry().processBlockInvalidation(
+ ExtendedBlockId.fromExtendedBlock(extendedBlock));
+ datanode.notifyNamenodeReceivedBlock(
+ extendedBlock, null, newReplicaInfo.getStorageUuid());
+
+ // Remove the old replicas
+ if (blockFile.delete() || !blockFile.exists()) {
+ ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed);
+ if (metaFile.delete() || !metaFile.exists()) {
+ ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed);
+ }
+ }
+
+ // If deletion failed then the directory scanner will cleanup the blocks
+ // eventually.
+ }
+
class LazyWriter implements Runnable {
private volatile boolean shouldRun = true;
final int checkpointerInterval;
@@ -2601,30 +2697,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
- // Before deleting the files from transient storage we must notify the
- // NN that the files are on the new storage. Else a blockReport from
- // the transient storage might cause the NN to think the blocks are lost.
- // Replicas must be evicted from client short-circuit caches, because the
- // storage will no longer be transient, and thus will require validating
- // checksum. This also stops a client from holding file descriptors,
- // which would prevent the OS from reclaiming the memory.
- ExtendedBlock extendedBlock =
- new ExtendedBlock(bpid, newReplicaInfo);
- datanode.getShortCircuitRegistry().processBlockInvalidation(
- ExtendedBlockId.fromExtendedBlock(extendedBlock));
- datanode.notifyNamenodeReceivedBlock(
- extendedBlock, null, newReplicaInfo.getStorageUuid());
-
- // Remove the old replicas from transient storage.
- if (blockFile.delete() || !blockFile.exists()) {
- ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed);
- if (metaFile.delete() || !metaFile.exists()) {
- ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed);
- }
- }
-
- // If deletion failed then the directory scanner will cleanup the blocks
- // eventually.
+ removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile,
+ blockFileUsed, metaFileUsed, bpid);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 1d7540c..48427ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -129,6 +129,10 @@ public class FsVolumeImpl implements FsVolumeSpi {
return getBlockPoolSlice(bpid).getLazypersistDir();
}
+ File getTmpDir(String bpid) throws IOException {
+ return getBlockPoolSlice(bpid).getTmpDir();
+ }
+
void decDfsUsed(String bpid, long value) {
synchronized(dataset) {
BlockPoolSlice bp = bpSlices.get(bpid);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
index 5fdcc2f..c9aba8a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
@@ -232,7 +232,7 @@ class RamDiskAsyncLazyPersistService {
try {
// No FsDatasetImpl lock for the file copy
File targetFiles[] = FsDatasetImpl.copyBlockFiles(
- blockId, genStamp, metaFile, blockFile, lazyPersistDir);
+ blockId, genStamp, metaFile, blockFile, lazyPersistDir, true);
// Lock FsDataSetImpl during onCompleteLazyPersist callback
datanode.getFSDataset().onCompleteLazyPersist(bpId, blockId,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index 59814af..108eb38 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -86,8 +86,8 @@ public class Mover {
return get(sources, ml);
}
- private StorageGroup getTarget(MLocation ml) {
- return get(targets, ml);
+ private StorageGroup getTarget(String uuid, StorageType storageType) {
+ return targets.get(uuid, storageType);
}
private static <G extends StorageGroup> G get(StorageGroupMap<G> map, MLocation ml) {
@@ -387,6 +387,11 @@ public class Mover {
boolean scheduleMoveReplica(DBlock db, Source source,
List<StorageType> targetTypes) {
+ // Match storage on the same node
+ if (chooseTargetInSameNode(db, source, targetTypes)) {
+ return true;
+ }
+
if (dispatcher.getCluster().isNodeGroupAware()) {
if (chooseTarget(db, source, targetTypes, Matcher.SAME_NODE_GROUP)) {
return true;
@@ -401,6 +406,26 @@ public class Mover {
return chooseTarget(db, source, targetTypes, Matcher.ANY_OTHER);
}
+ /**
+ * Choose the target storage within same Datanode if possible.
+ */
+ boolean chooseTargetInSameNode(DBlock db, Source source,
+ List<StorageType> targetTypes) {
+ for (StorageType t : targetTypes) {
+ StorageGroup target = storages.getTarget(source.getDatanodeInfo()
+ .getDatanodeUuid(), t);
+ if (target == null) {
+ continue;
+ }
+ final PendingMove pm = source.addPendingMove(db, target);
+ if (pm != null) {
+ dispatcher.executePendingMove(pm);
+ return true;
+ }
+ }
+ return false;
+ }
+
boolean chooseTarget(DBlock db, Source source,
List<StorageType> targetTypes, Matcher matcher) {
final NetworkTopology cluster = dispatcher.getCluster();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
index f8c583a..41c8f8a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.junit.Assert;
@@ -75,7 +76,8 @@ public class TestBlockInfo {
}
// Try to move one of the blocks to a different storage.
- boolean added = storage2.addBlock(blockInfos[NUM_BLOCKS/2]);
+ boolean added =
+ storage2.addBlock(blockInfos[NUM_BLOCKS / 2]) == AddBlockResult.ADDED;
Assert.assertThat(added, is(false));
Assert.assertThat(blockInfos[NUM_BLOCKS/2].getStorageInfo(0), is(storage2));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
index e00a4c3..fe639e4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.junit.Test;
@@ -61,18 +62,17 @@ public class TestDatanodeDescriptor {
BlockInfo blk1 = new BlockInfo(new Block(2L), (short) 2);
DatanodeStorageInfo[] storages = dd.getStorageInfos();
assertTrue(storages.length > 0);
- final String storageID = storages[0].getStorageID();
// add first block
- assertTrue(storages[0].addBlock(blk));
+ assertTrue(storages[0].addBlock(blk) == AddBlockResult.ADDED);
assertEquals(1, dd.numBlocks());
// remove a non-existent block
assertFalse(dd.removeBlock(blk1));
assertEquals(1, dd.numBlocks());
// add an existent block
- assertFalse(storages[0].addBlock(blk));
+ assertFalse(storages[0].addBlock(blk) == AddBlockResult.ADDED);
assertEquals(1, dd.numBlocks());
// add second block
- assertTrue(storages[0].addBlock(blk1));
+ assertTrue(storages[0].addBlock(blk1) == AddBlockResult.ADDED);
assertEquals(2, dd.numBlocks());
// remove first block
assertTrue(dd.removeBlock(blk));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 3e5034a..e03b756 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1260,5 +1260,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public void onFailLazyPersist(String bpId, long blockId) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
+ StorageType targetStorageType) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
index e0d7964..fbdfebf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
@@ -17,15 +17,14 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.SocketException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -40,6 +39,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StorageType;
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
@@ -200,7 +201,51 @@ public class TestBlockReplacement {
cluster.shutdown();
}
}
-
+
+ @Test
+ public void testBlockMoveAcrossStorageInSameNode() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ // create only one datanode in the cluster to verify movement within
+ // datanode.
+ final MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(1).storageTypes(
+ new StorageType[] { StorageType.DISK, StorageType.ARCHIVE })
+ .build();
+ try {
+ cluster.waitActive();
+ final DistributedFileSystem dfs = cluster.getFileSystem();
+ final Path file = new Path("/testBlockMoveAcrossStorageInSameNode/file");
+ DFSTestUtil.createFile(dfs, file, 1024, (short) 1, 1024);
+ LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file.toString(), 0);
+ // get the current
+ LocatedBlock locatedBlock = locatedBlocks.get(0);
+ ExtendedBlock block = locatedBlock.getBlock();
+ DatanodeInfo[] locations = locatedBlock.getLocations();
+ assertEquals(1, locations.length);
+ StorageType[] storageTypes = locatedBlock.getStorageTypes();
+ // current block should be written to DISK
+ assertTrue(storageTypes[0] == StorageType.DISK);
+
+ DatanodeInfo source = locations[0];
+ // move block to ARCHIVE by using same DataNodeInfo for source, proxy and
+ // destination so that movement happens within datanode
+ assertTrue(replaceBlock(block, source, source, source,
+ StorageType.ARCHIVE));
+
+ // wait till namenode notified
+ Thread.sleep(3000);
+ locatedBlocks = dfs.getClient().getLocatedBlocks(file.toString(), 0);
+ // get the current
+ locatedBlock = locatedBlocks.get(0);
+ assertEquals("Storage should be only one", 1,
+ locatedBlock.getLocations().length);
+ assertTrue("Block should be moved to ARCHIVE", locatedBlock
+ .getStorageTypes()[0] == StorageType.ARCHIVE);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
/* check if file's blocks have expected number of replicas,
* and exist at all of includeNodes
*/
@@ -259,24 +304,42 @@ public class TestBlockReplacement {
*/
private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source,
DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
+ return replaceBlock(block, source, sourceProxy, destination,
+ StorageType.DEFAULT);
+ }
+
+ /*
+ * Replace block
+ */
+ private boolean replaceBlock(
+ ExtendedBlock block,
+ DatanodeInfo source,
+ DatanodeInfo sourceProxy,
+ DatanodeInfo destination,
+ StorageType targetStorageType) throws IOException, SocketException {
Socket sock = new Socket();
- sock.connect(NetUtils.createSocketAddr(
- destination.getXferAddr()), HdfsServerConstants.READ_TIMEOUT);
- sock.setKeepAlive(true);
- // sendRequest
- DataOutputStream out = new DataOutputStream(sock.getOutputStream());
- new Sender(out).replaceBlock(block, StorageType.DEFAULT,
- BlockTokenSecretManager.DUMMY_TOKEN,
- source.getDatanodeUuid(), sourceProxy);
- out.flush();
- // receiveResponse
- DataInputStream reply = new DataInputStream(sock.getInputStream());
+ try {
+ sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()),
+ HdfsServerConstants.READ_TIMEOUT);
+ sock.setKeepAlive(true);
+ // sendRequest
+ DataOutputStream out = new DataOutputStream(sock.getOutputStream());
+ new Sender(out).replaceBlock(block, targetStorageType,
+ BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(),
+ sourceProxy);
+ out.flush();
+ // receiveResponse
+ DataInputStream reply = new DataInputStream(sock.getInputStream());
- BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply);
- while (proto.getStatus() == Status.IN_PROGRESS) {
- proto = BlockOpResponseProto.parseDelimitedFrom(reply);
+ BlockOpResponseProto proto =
+ BlockOpResponseProto.parseDelimitedFrom(reply);
+ while (proto.getStatus() == Status.IN_PROGRESS) {
+ proto = BlockOpResponseProto.parseDelimitedFrom(reply);
+ }
+ return proto.getStatus() == Status.SUCCESS;
+ } finally {
+ sock.close();
}
- return proto.getStatus() == Status.SUCCESS;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/058af60c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
index 5866c7f..c9fc5ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.ToolRunner;
import org.junit.Assert;
import org.junit.Test;
@@ -79,6 +79,52 @@ public class TestMover {
}
}
+ @Test
+ public void testScheduleBlockWithinSameNode() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(3)
+ .storageTypes(
+ new StorageType[] { StorageType.DISK, StorageType.ARCHIVE })
+ .build();
+ try {
+ cluster.waitActive();
+ final DistributedFileSystem dfs = cluster.getFileSystem();
+ final String file = "/testScheduleWithinSameNode/file";
+ Path dir = new Path("/testScheduleWithinSameNode");
+ dfs.mkdirs(dir);
+ // write to DISK
+ dfs.setStoragePolicy(dir, "HOT");
+ {
+ final FSDataOutputStream out = dfs.create(new Path(file));
+ out.writeChars("testScheduleWithinSameNode");
+ out.close();
+ }
+
+ //verify before movement
+ LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+ StorageType[] storageTypes = lb.getStorageTypes();
+ for (StorageType storageType : storageTypes) {
+ Assert.assertTrue(StorageType.DISK == storageType);
+ }
+ // move to ARCHIVE
+ dfs.setStoragePolicy(dir, "COLD");
+ int rc = ToolRunner.run(conf, new Mover.Cli(),
+ new String[] { "-p", dir.toString() });
+ Assert.assertEquals("Movement to ARCHIVE should be successfull", 0, rc);
+
+ // Wait till namenode notified
+ Thread.sleep(3000);
+ lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+ storageTypes = lb.getStorageTypes();
+ for (StorageType storageType : storageTypes) {
+ Assert.assertTrue(StorageType.ARCHIVE == storageType);
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
private void checkMovePaths(List<Path> actual, Path... expected) {
Assert.assertEquals(expected.length, actual.size());
for (Path p : expected) {