You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2015/02/12 00:13:59 UTC
hadoop git commit: HDFS-6133. Add a feature for replica pinning so
that a pinned replica will not be moved by Balancer/Mover. Contributed by
zhaoyunjiong
Repository: hadoop
Updated Branches:
refs/heads/trunk 50625e660 -> 085b1e293
HDFS-6133. Add a feature for replica pinning so that a pinned replica will not be moved by Balancer/Mover. Contributed by zhaoyunjiong
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/085b1e29
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/085b1e29
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/085b1e29
Branch: refs/heads/trunk
Commit: 085b1e293ff53f7a86aa21406cfd4bfa0f3bf33b
Parents: 50625e6
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Wed Feb 11 15:09:29 2015 -0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Wed Feb 11 15:12:12 2015 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 ++
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 22 +++++-
.../hadoop/hdfs/DistributedFileSystem.java | 7 +-
.../datatransfer/DataTransferProtocol.java | 6 +-
.../hdfs/protocol/datatransfer/Receiver.java | 4 +-
.../hdfs/protocol/datatransfer/Sender.java | 8 ++-
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 21 ++++++
.../hdfs/server/datanode/BlockReceiver.java | 12 +++-
.../hadoop/hdfs/server/datanode/DataNode.java | 2 +-
.../hdfs/server/datanode/DataXceiver.java | 34 +++++++---
.../server/datanode/fsdataset/FsDatasetSpi.java | 13 ++++
.../datanode/fsdataset/impl/FsDatasetImpl.java | 40 +++++++++++
.../src/main/proto/datatransfer.proto | 3 +
.../src/main/resources/hdfs-default.xml | 6 ++
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 27 ++++++--
.../hadoop/hdfs/TestDataTransferProtocol.java | 2 +-
.../hdfs/server/balancer/TestBalancer.java | 71 +++++++++++++++++---
.../server/datanode/SimulatedFSDataset.java | 11 +++
.../hdfs/server/datanode/TestDiskError.java | 2 +-
.../extdataset/ExternalDatasetImpl.java | 9 +++
21 files changed, 271 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 9a6c551..5d0a6f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -332,6 +332,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode Protocol
changes. (Xiaoyu Yao via Arpit Agarwal)
+ HDFS-6133. Add a feature for replica pinning so that a pinned replica
+ will not be moved by Balancer/Mover. (zhaoyunjiong via szetszwo)
+
IMPROVEMENTS
HDFS-7055. Add tracing to DFSInputStream (cmccabe)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index e4343bb..975f023 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -778,4 +778,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// 10 days
public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
TimeUnit.DAYS.toMillis(10);
+ public static final String DFS_DATANODE_BLOCK_PINNING_ENABLED =
+ "dfs.datanode.block-pinning.enabled";
+ public static final boolean DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT =
+ false;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 9560e01..2bf5c40 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -1443,11 +1443,13 @@ public class DFSOutputStream extends FSOutputSummer
ExtendedBlock blockCopy = new ExtendedBlock(block);
blockCopy.setNumBytes(blockSize);
+ boolean[] targetPinnings = getPinnings(nodes);
// send the request
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS,
- checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile);
+ checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
+ (targetPinnings == null ? false : targetPinnings[0]), targetPinnings);
// receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@@ -1535,6 +1537,24 @@ public class DFSOutputStream extends FSOutputSummer
}
}
+ private boolean[] getPinnings(DatanodeInfo[] nodes) {
+ if (favoredNodes == null) {
+ return null;
+ } else {
+ boolean[] pinnings = new boolean[nodes.length];
+ for (int i = 0; i < nodes.length; i++) {
+ pinnings[i] = false;
+ for (int j = 0; j < favoredNodes.length; j++) {
+ if (nodes[i].getXferAddrWithHostname().equals(favoredNodes[j])) {
+ pinnings[i] = true;
+ break;
+ }
+ }
+ }
+ return pinnings;
+ }
+ }
+
private LocatedBlock locateFollowingBlock(long start,
DatanodeInfo[] excludedNodes) throws IOException {
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 63d48bc..f777817 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -352,9 +352,10 @@ public class DistributedFileSystem extends FileSystem {
* Progressable)} with the addition of favoredNodes that is a hint to
* where the namenode should place the file blocks.
* The favored nodes hint is not persisted in HDFS. Hence it may be honored
- * at the creation time only. HDFS could move the blocks during balancing or
- * replication, to move the blocks from favored nodes. A value of null means
- * no favored nodes for this create
+ * at the creation time only. And with favored nodes, blocks will be pinned
+ * on the datanodes to prevent balancing move the block. HDFS could move the
+ * blocks during replication, to move the blocks from favored nodes. A value
+ * of null means no favored nodes for this create
*/
public HdfsDataOutputStream create(final Path f,
final FsPermission permission, final boolean overwrite,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
index f6b99e6..5fc263e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -92,6 +92,8 @@ public interface DataTransferProtocol {
* @param minBytesRcvd minimum number of bytes received.
* @param maxBytesRcvd maximum number of bytes received.
* @param latestGenerationStamp the latest generation stamp of the block.
+ * @param pinning whether to pin the block, so Balancer won't move it.
+ * @param targetPinnings whether to pin the block on target datanode
*/
public void writeBlock(final ExtendedBlock blk,
final StorageType storageType,
@@ -107,7 +109,9 @@ public interface DataTransferProtocol {
final long latestGenerationStamp,
final DataChecksum requestedChecksum,
final CachingStrategy cachingStrategy,
- final boolean allowLazyPersist) throws IOException;
+ final boolean allowLazyPersist,
+ final boolean pinning,
+ final boolean[] targetPinnings) throws IOException;
/**
* Transfer a block to another datanode.
* The block stage must be
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index 2ea1787..7994027 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -149,7 +149,9 @@ public abstract class Receiver implements DataTransferProtocol {
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy()),
- (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false));
+ (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false),
+ (proto.hasPinning() ? proto.getPinning(): false),
+ (PBHelper.convertBooleanList(proto.getTargetPinningsList())));
} finally {
if (traceScope != null) traceScope.close();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index 844c270..eb30afb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -129,7 +129,9 @@ public class Sender implements DataTransferProtocol {
final long latestGenerationStamp,
DataChecksum requestedChecksum,
final CachingStrategy cachingStrategy,
- final boolean allowLazyPersist) throws IOException {
+ final boolean allowLazyPersist,
+ final boolean pinning,
+ final boolean[] targetPinnings) throws IOException {
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken);
@@ -148,7 +150,9 @@ public class Sender implements DataTransferProtocol {
.setLatestGenerationStamp(latestGenerationStamp)
.setRequestedChecksum(checksumProto)
.setCachingStrategy(getCachingStrategy(cachingStrategy))
- .setAllowLazyPersist(allowLazyPersist);
+ .setAllowLazyPersist(allowLazyPersist)
+ .setPinning(pinning)
+ .addAllTargetPinnings(PBHelper.convert(targetPinnings, 1));
if (source != null) {
proto.setSource(PBHelper.convertDatanodeInfo(source));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index e4746cc..8ecd767 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -2960,4 +2960,25 @@ public class PBHelper {
ezKeyVersionName);
}
+ public static List<Boolean> convert(boolean[] targetPinnings, int idx) {
+ List<Boolean> pinnings = new ArrayList<Boolean>();
+ if (targetPinnings == null) {
+ pinnings.add(Boolean.FALSE);
+ } else {
+ for (; idx < targetPinnings.length; ++idx) {
+ pinnings.add(Boolean.valueOf(targetPinnings[idx]));
+ }
+ }
+ return pinnings;
+ }
+
+ public static boolean[] convertBooleanList(
+ List<Boolean> targetPinningsList) {
+ final boolean[] targetPinnings = new boolean[targetPinningsList.size()];
+ for (int i = 0; i < targetPinningsList.size(); i++) {
+ targetPinnings[i] = targetPinningsList.get(i);
+ }
+ return targetPinnings;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 65b4f2d..368d80d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -132,6 +132,8 @@ class BlockReceiver implements Closeable {
private long lastResponseTime = 0;
private boolean isReplaceBlock = false;
private DataOutputStream replyOut = null;
+
+ private boolean pinning;
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
final DataInputStream in,
@@ -141,7 +143,8 @@ class BlockReceiver implements Closeable {
final String clientname, final DatanodeInfo srcDataNode,
final DataNode datanode, DataChecksum requestedChecksum,
CachingStrategy cachingStrategy,
- final boolean allowLazyPersist) throws IOException {
+ final boolean allowLazyPersist,
+ final boolean pinning) throws IOException {
try{
this.block = block;
this.in = in;
@@ -165,12 +168,14 @@ class BlockReceiver implements Closeable {
this.isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;
+ this.pinning = pinning;
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ": " + block
+ "\n isClient =" + isClient + ", clientname=" + clientname
+ "\n isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode
+ "\n inAddr=" + inAddr + ", myAddr=" + myAddr
+ "\n cachingStrategy = " + cachingStrategy
+ + "\n pinning=" + pinning
);
}
@@ -1279,6 +1284,11 @@ class BlockReceiver implements Closeable {
: 0;
block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
+
+ if (pinning) {
+ datanode.data.setPinning(block);
+ }
+
datanode.closeBlock(
block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
if (ClientTraceLog.isInfoEnabled() && isClient) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index ed3ec7d..8d3b3a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2068,7 +2068,7 @@ public class DataNode extends ReconfigurableBase
new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
clientname, targets, targetStorageTypes, srcNode,
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
- false);
+ false, false, null);
// send data & checksum
blockSender.sendBlock(out, unbufOut, null);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 3a2723f..bb5323a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -581,7 +581,9 @@ class DataXceiver extends Receiver implements Runnable {
final long latestGenerationStamp,
DataChecksum requestedChecksum,
CachingStrategy cachingStrategy,
- final boolean allowLazyPersist) throws IOException {
+ final boolean allowLazyPersist,
+ final boolean pinning,
+ final boolean[] targetPinnings) throws IOException {
previousOpClientName = clientname;
updateCurrentThreadName("Receiving block " + block);
final boolean isDatanode = clientname.length() == 0;
@@ -594,14 +596,14 @@ class DataXceiver extends Receiver implements Runnable {
throw new IOException(stage + " does not support multiple targets "
+ Arrays.asList(targets));
}
-
+
if (LOG.isDebugEnabled()) {
LOG.debug("opWriteBlock: stage=" + stage + ", clientname=" + clientname
+ "\n block =" + block + ", newGs=" + latestGenerationStamp
+ ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]"
+ "\n targets=" + Arrays.asList(targets)
+ "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode
- );
+ + ", pinning=" + pinning);
LOG.debug("isDatanode=" + isDatanode
+ ", isClient=" + isClient
+ ", isTransfer=" + isTransfer);
@@ -643,7 +645,7 @@ class DataXceiver extends Receiver implements Runnable {
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
- cachingStrategy, allowLazyPersist);
+ cachingStrategy, allowLazyPersist, pinning);
storageUuid = blockReceiver.getStorageUuid();
} else {
@@ -686,10 +688,19 @@ class DataXceiver extends Receiver implements Runnable {
mirrorIn = new DataInputStream(unbufMirrorIn);
// Do not propagate allowLazyPersist to downstream DataNodes.
- new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
+ if (targetPinnings != null && targetPinnings.length > 0) {
+ new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
- latestGenerationStamp, requestedChecksum, cachingStrategy, false);
+ latestGenerationStamp, requestedChecksum, cachingStrategy,
+ false, targetPinnings[0], targetPinnings);
+ } else {
+ new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
+ blockToken, clientname, targets, targetStorageTypes, srcDataNode,
+ stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
+ latestGenerationStamp, requestedChecksum, cachingStrategy,
+ false, false, targetPinnings);
+ }
mirrorOut.flush();
@@ -949,7 +960,14 @@ class DataXceiver extends Receiver implements Runnable {
}
}
-
+
+ if (datanode.data.getPinning(block)) {
+ String msg = "Not able to copy block " + block.getBlockId() + " " +
+ "to " + peer.getRemoteAddressString() + " because it's pinned ";
+ LOG.info(msg);
+ sendResponse(ERROR, msg);
+ }
+
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
String msg = "Not able to copy block " + block.getBlockId() + " " +
"to " + peer.getRemoteAddressString() + " because threads " +
@@ -1109,7 +1127,7 @@ class DataXceiver extends Receiver implements Runnable {
proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode, remoteChecksum,
- CachingStrategy.newDropBehind(), false);
+ CachingStrategy.newDropBehind(), false, false);
// receive a block
blockReceiver.receiveBlock(null, null, replyOut, null,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 162e306..7d880d3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -522,4 +522,17 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
*/
public ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
StorageType targetStorageType) throws IOException;
+
+ /**
+ * Set a block to be pinned on this datanode so that it cannot be moved
+ * by Balancer/Mover.
+ *
+ * It is a no-op when dfs.datanode.block-pinning.enabled is set to false.
+ */
+ public void setPinning(ExtendedBlock block) throws IOException;
+
+ /**
+ * Check whether the block was pinned
+ */
+ public boolean getPinning(ExtendedBlock block) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 9a981fe..8c8d744 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -50,6 +50,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.ExtendedBlockId;
@@ -239,6 +243,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Used for synchronizing access to usage stats
private final Object statsLock = new Object();
+ final LocalFileSystem localFS;
+
+ private boolean blockPinningEnabled;
+
/**
* An FSDataset has a directory where it loads its data files.
*/
@@ -299,6 +307,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
lazyWriter = new Daemon(new LazyWriter(conf));
lazyWriter.start();
registerMBean(datanode.getDatanodeUuid());
+ localFS = FileSystem.getLocal(conf);
+ blockPinningEnabled = conf.getBoolean(
+ DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED,
+ DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT);
}
private void addVolume(Collection<StorageLocation> dataLocations,
@@ -2842,5 +2854,33 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
shouldRun = false;
}
}
+
+ @Override
+ public void setPinning(ExtendedBlock block) throws IOException {
+ if (!blockPinningEnabled) {
+ return;
+ }
+
+ File f = getBlockFile(block);
+ Path p = new Path(f.getAbsolutePath());
+
+ FsPermission oldPermission = localFS.getFileStatus(
+ new Path(f.getAbsolutePath())).getPermission();
+ //sticky bit is used for pinning purpose
+ FsPermission permission = new FsPermission(oldPermission.getUserAction(),
+ oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
+ localFS.setPermission(p, permission);
+ }
+
+ @Override
+ public boolean getPinning(ExtendedBlock block) throws IOException {
+ if (!blockPinningEnabled) {
+ return false;
+ }
+ File f = getBlockFile(block);
+
+ FileStatus fss = localFS.getFileStatus(new Path(f.getAbsolutePath()));
+ return fss.getPermission().getStickyBit();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
index 9512688..d72bb5e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
@@ -123,6 +123,9 @@ message OpWriteBlockProto {
* to ignore this hint.
*/
optional bool allowLazyPersist = 13 [default = false];
+ //whether to pin the block, so Balancer won't move it.
+ optional bool pinning = 14 [default = false];
+ repeated bool targetPinnings = 15;
}
message OpTransferBlockProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 9697543..9299ea3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2264,4 +2264,10 @@
</description>
</property>
+ <property>
+ <name>dfs.datanode.block-pinning.enabled</name>
+ <value>false</value>
+ <description>Whether pin blocks on favored DataNode.</description>
+ </property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index f47bd24..c7fd2f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -293,13 +293,21 @@ public class DFSTestUtil {
public static void createFile(FileSystem fs, Path fileName, int bufferLen,
long fileLen, long blockSize, short replFactor, long seed)
throws IOException {
- createFile(fs, fileName, false, bufferLen, fileLen, blockSize,
- replFactor, seed, false);
+ createFile(fs, fileName, false, bufferLen, fileLen, blockSize, replFactor,
+ seed, false);
}
public static void createFile(FileSystem fs, Path fileName,
boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
short replFactor, long seed, boolean flush) throws IOException {
+ createFile(fs, fileName, isLazyPersist, bufferLen, fileLen, blockSize,
+ replFactor, seed, flush, null);
+ }
+
+ public static void createFile(FileSystem fs, Path fileName,
+ boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
+ short replFactor, long seed, boolean flush,
+ InetSocketAddress[] favoredNodes) throws IOException {
assert bufferLen > 0;
if (!fs.mkdirs(fileName.getParent())) {
throw new IOException("Mkdirs failed to create " +
@@ -312,10 +320,19 @@ public class DFSTestUtil {
createFlags.add(LAZY_PERSIST);
}
try {
- out = fs.create(fileName, FsPermission.getFileDefault(), createFlags,
- fs.getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+ if (favoredNodes == null) {
+ out = fs.create(
+ fileName,
+ FsPermission.getFileDefault(),
+ createFlags,
+ fs.getConf().getInt(
+ CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
replFactor, blockSize, null);
-
+ } else {
+ out = ((DistributedFileSystem) fs).create(fileName,
+ FsPermission.getDefault(), true, bufferLen, replFactor, blockSize,
+ null, favoredNodes);
+ }
if (fileLen > 0) {
byte[] toWrite = new byte[bufferLen];
Random rb = new Random(seed);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index 55cd8e2..519c511 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -528,6 +528,6 @@ public class TestDataTransferProtocol {
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], new StorageType[1], null, stage,
0, block.getNumBytes(), block.getNumBytes(), newGS,
- checksum, CachingStrategy.newDefaultStrategy(), false);
+ checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 153baeb..03d2812 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -17,12 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.balancer;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
import static org.junit.Assert.assertEquals;
@@ -33,6 +28,7 @@ import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URI;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -59,12 +55,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
@@ -310,6 +302,63 @@ public class TestBalancer {
}
/**
+ * Make sure that balancer can't move pinned blocks.
+ * If specified favoredNodes when create file, blocks will be pinned use
+ * sticky bit.
+ * @throws Exception
+ */
+ @Test(timeout=100000)
+ public void testBalancerWithPinnedBlocks() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ conf.setBoolean(DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
+
+ long[] capacities = new long[] { CAPACITY, CAPACITY };
+ String[] racks = { RACK0, RACK1 };
+ int numOfDatanodes = capacities.length;
+
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
+ .hosts(new String[]{"localhost", "localhost"})
+ .racks(racks).simulatedCapacities(capacities).build();
+
+ try {
+ cluster.waitActive();
+ client = NameNodeProxies.createProxy(conf,
+ cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
+
+ // fill up the cluster to be 80% full
+ long totalCapacity = sum(capacities);
+ long totalUsedSpace = totalCapacity * 8 / 10;
+ InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes];
+ for (int i = 0; i < favoredNodes.length; i++) {
+ favoredNodes[i] = cluster.getDataNodes().get(i).getXferAddress();
+ }
+
+ DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
+ totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE,
+ (short) numOfDatanodes, 0, false, favoredNodes);
+
+ // start up an empty node with the same capacity
+ cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 },
+ new long[] { CAPACITY });
+
+ totalCapacity += CAPACITY;
+
+ // run balancer and validate results
+ waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
+
+ // start rebalancing
+ Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+ int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
+ assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+
+ } finally {
+ cluster.shutdown();
+ }
+
+ }
+
+ /**
* Wait until balanced: each datanode gives utilization within
* BALANCE_ALLOWED_VARIANCE of average
* @throws IOException
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 6ff4603..36b62e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -127,6 +127,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
SimulatedOutputStream oStream = null;
private long bytesAcked;
private long bytesRcvd;
+ private boolean pinned = false;
BInfo(String bpid, Block b, boolean forWriting) throws IOException {
theBlock = new Block(b);
if (theBlock.getNumBytes() < 0) {
@@ -1285,5 +1286,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public void setPinning(ExtendedBlock b) throws IOException {
+ blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned = true;
+ }
+
+ @Override
+ public boolean getPinning(ExtendedBlock b) throws IOException {
+ return blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
index f440bb6..fb219d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
@@ -152,7 +152,7 @@ public class TestDiskError {
BlockTokenSecretManager.DUMMY_TOKEN, "",
new DatanodeInfo[0], new StorageType[0], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
- checksum, CachingStrategy.newDefaultStrategy(), false);
+ checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
out.flush();
// close the connection before sending the content of the block
http://git-wip-us.apache.org/repos/asf/hadoop/blob/085b1e29/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index aa868d5..c377bdb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -399,4 +399,13 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
public long getNumBlocksFailedToUncache() {
return 0;
}
+
+ @Override
+ public void setPinning(ExtendedBlock block) throws IOException {
+ }
+
+ @Override
+ public boolean getPinning(ExtendedBlock block) throws IOException {
+ return false;
+ }
}