You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ay...@apache.org on 2020/12/20 17:42:23 UTC
[hadoop] branch trunk updated: HDFS-15655. Add option to make
balancer prefer to get cold blocks. Contributed by Yang Yun.
This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2aea43bf HDFS-15655. Add option to make balancer prefer to get cold blocks. Contributed by Yang Yun.
2aea43bf is described below
commit 2aea43bf4fcc63f7b38292942df1fea600bb8dc9
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Sun Dec 20 23:06:19 2020 +0530
HDFS-15655. Add option to make balancer prefer to get cold blocks. Contributed by Yang Yun.
---
.../federation/router/RouterNamenodeProtocol.java | 8 +-
.../server/federation/router/RouterRpcServer.java | 5 +-
.../server/federation/router/TestRouterRpc.java | 4 +-
.../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 +
.../NamenodeProtocolServerSideTranslatorPB.java | 2 +-
.../protocolPB/NamenodeProtocolTranslatorPB.java | 4 +-
.../hadoop/hdfs/server/balancer/Balancer.java | 19 ++++-
.../hdfs/server/balancer/BalancerParameters.java | 13 +++-
.../hadoop/hdfs/server/balancer/Dispatcher.java | 10 ++-
.../hdfs/server/balancer/NameNodeConnector.java | 4 +-
.../hdfs/server/blockmanagement/BlockManager.java | 38 +++++++++-
.../hadoop/hdfs/server/namenode/FSNamesystem.java | 4 +-
.../hdfs/server/namenode/NameNodeRpcServer.java | 4 +-
.../hdfs/server/protocol/NamenodeProtocol.java | 4 +-
.../src/main/proto/NamenodeProtocol.proto | 1 +
.../src/main/resources/hdfs-default.xml | 9 +++
.../hadoop-hdfs/src/site/markdown/HDFSCommands.md | 1 +
.../java/org/apache/hadoop/hdfs/TestGetBlocks.java | 86 ++++++++++++++++++++--
.../hadoop/hdfs/server/balancer/TestBalancer.java | 3 +-
.../balancer/TestBalancerWithHANameNodes.java | 2 +-
20 files changed, 188 insertions(+), 37 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java
index c6b0209..278d282 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java
@@ -53,7 +53,7 @@ public class RouterNamenodeProtocol implements NamenodeProtocol {
@Override
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
- long minBlockSize) throws IOException {
+ long minBlockSize, long hotBlockTimeInterval) throws IOException {
rpcServer.checkOperation(OperationCategory.READ);
// Get the namespace where the datanode is located
@@ -78,9 +78,9 @@ public class RouterNamenodeProtocol implements NamenodeProtocol {
// Forward to the proper namenode
if (nsId != null) {
RemoteMethod method = new RemoteMethod(
- NamenodeProtocol.class, "getBlocks",
- new Class<?>[] {DatanodeInfo.class, long.class, long.class},
- datanode, size, minBlockSize);
+ NamenodeProtocol.class, "getBlocks", new Class<?>[]
+ {DatanodeInfo.class, long.class, long.class, long.class},
+ datanode, size, minBlockSize, hotBlockTimeInterval);
return rpcClient.invokeSingle(nsId, method, BlocksWithLocations.class);
}
return null;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 315f864..a8cb5c6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -1490,8 +1490,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
@Override // NamenodeProtocol
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
- long minBlockSize) throws IOException {
- return nnProto.getBlocks(datanode, size, minBlockSize);
+ long minBlockSize, long hotBlockTimeInterval) throws IOException {
+ return nnProto.getBlocks(datanode, size, minBlockSize,
+ hotBlockTimeInterval);
}
@Override // NamenodeProtocol
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
index 09ca0d4..4b997eb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
@@ -1350,9 +1350,9 @@ public class TestRouterRpc {
// Verify that checking that datanode works
BlocksWithLocations routerBlockLocations =
- routerNamenodeProtocol.getBlocks(dn0, 1024, 0);
+ routerNamenodeProtocol.getBlocks(dn0, 1024, 0, 0);
BlocksWithLocations nnBlockLocations =
- nnNamenodeProtocol.getBlocks(dn0, 1024, 0);
+ nnNamenodeProtocol.getBlocks(dn0, 1024, 0, 0);
BlockWithLocations[] routerBlocks = routerBlockLocations.getBlocks();
BlockWithLocations[] nnBlocks = nnBlockLocations.getBlocks();
assertEquals(nnBlocks.length, routerBlocks.length);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index e904f08..0a5caed 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -713,6 +713,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT = 2L*1024*1024*1024; // 2GB
public static final String DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY = "dfs.balancer.getBlocks.min-block-size";
public static final long DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT = 10L*1024*1024; // 10MB
+ public static final String DFS_BALANCER_GETBLOCKS_HOT_TIME_INTERVAL_KEY =
+ "dfs.balancer.getBlocks.hot-time-interval";
+ public static final long DFS_BALANCER_GETBLOCKS_HOT_TIME_INTERVAL_DEFAULT =
+ 0;
public static final String DFS_BALANCER_KEYTAB_ENABLED_KEY = "dfs.balancer.keytab.enabled";
public static final boolean DFS_BALANCER_KEYTAB_ENABLED_DEFAULT = false;
public static final String DFS_BALANCER_ADDRESS_KEY = "dfs.balancer.address";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
index 49fe99b..e89a6b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
@@ -89,7 +89,7 @@ public class NamenodeProtocolServerSideTranslatorPB implements
BlocksWithLocations blocks;
try {
blocks = impl.getBlocks(dnInfo, request.getSize(),
- request.getMinBlockSize());
+ request.getMinBlockSize(), request.getTimeInterval());
} catch (IOException e) {
throw new ServiceException(e);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
index 603e14d..201004d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
@@ -102,11 +102,11 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
@Override
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
- minBlockSize)
+ minBlockSize, long timeInterval)
throws IOException {
GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder()
.setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size)
- .setMinBlockSize(minBlockSize).build();
+ .setMinBlockSize(minBlockSize).setTimeInterval(timeInterval).build();
try {
return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req)
.getBlocks());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index e5f9e8c..6734c97 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -203,6 +203,7 @@ public class Balancer {
+ "on over-utilized machines."
+ "\n\t[-asService]\tRun as a long running service."
+ "\n\t[-sortTopNodes]"
+ + "\n\t[-hotBlockTimeInterval]\tprefer to move cold blocks."
+ "\tSort datanodes based on the utilization so "
+ "that highly utilized datanodes get scheduled first.";
@@ -315,6 +316,14 @@ public class Balancer {
final long maxIterationTime = conf.getLong(
DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY,
DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT);
+ /**
+ * Balancer prefer to get blocks which are belong to the cold files
+ * created before this time period.
+ */
+ final long hotBlockTimeInterval = conf.getTimeDuration(
+ DFSConfigKeys.DFS_BALANCER_GETBLOCKS_HOT_TIME_INTERVAL_KEY,
+ DFSConfigKeys.DFS_BALANCER_GETBLOCKS_HOT_TIME_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
// DataNode configuration parameters for balancing
final int maxConcurrentMovesPerNode = getInt(conf,
@@ -329,7 +338,7 @@ public class Balancer {
p.getExcludedNodes(), movedWinWidth, moverThreads,
dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize,
getBlocksMinBlockSize, blockMoveTimeout, maxNoMoveInterval,
- maxIterationTime, conf);
+ maxIterationTime, hotBlockTimeInterval, conf);
this.threshold = p.getThreshold();
this.policy = p.getBalancingPolicy();
this.sourceNodes = p.getSourceNodes();
@@ -990,6 +999,14 @@ public class Balancer {
} else if ("-asService".equalsIgnoreCase(args[i])) {
b.setRunAsService(true);
LOG.info("Balancer will run as a long running service");
+ } else if ("-hotBlockTimeInterval".equalsIgnoreCase(args[i])) {
+ checkArgument(++i < args.length,
+ "hotBlockTimeInterval value is missing: args = "
+ + Arrays.toString(args));
+ long hotBlockTimeInterval = Long.parseLong(args[i]);
+ LOG.info("Using a hotBlockTimeInterval of "
+ + hotBlockTimeInterval);
+ b.setHotBlockTimeInterval(hotBlockTimeInterval);
} else if ("-sortTopNodes".equalsIgnoreCase(args[i])) {
b.setSortTopNodes(true);
LOG.info("Balancer will sort nodes by" +
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java
index e614327..a8ce338 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java
@@ -27,6 +27,7 @@ final class BalancerParameters {
private final BalancingPolicy policy;
private final double threshold;
private final int maxIdleIteration;
+ private final long hotBlockTimeInterval;
/** Exclude the nodes in this set. */
private final Set<String> excludedNodes;
/** If empty, include any node; otherwise, include only these nodes. */
@@ -66,6 +67,7 @@ final class BalancerParameters {
this.runDuringUpgrade = builder.runDuringUpgrade;
this.runAsService = builder.runAsService;
this.sortTopNodes = builder.sortTopNodes;
+ this.hotBlockTimeInterval = builder.hotBlockTimeInterval;
}
BalancingPolicy getBalancingPolicy() {
@@ -113,12 +115,13 @@ final class BalancerParameters {
return String.format("%s.%s [%s," + " threshold = %s,"
+ " max idle iteration = %s," + " #excluded nodes = %s,"
+ " #included nodes = %s," + " #source nodes = %s,"
- + " #blockpools = %s," + " run during upgrade = %s]"
+ + " #blockpools = %s," + " run during upgrade = %s,"
+ + " hot block time interval = %s]"
+ " sort top nodes = %s",
Balancer.class.getSimpleName(), getClass().getSimpleName(), policy,
threshold, maxIdleIteration, excludedNodes.size(),
includedNodes.size(), sourceNodes.size(), blockpools.size(),
- runDuringUpgrade, sortTopNodes);
+ runDuringUpgrade, sortTopNodes, hotBlockTimeInterval);
}
static class Builder {
@@ -134,6 +137,7 @@ final class BalancerParameters {
private boolean runDuringUpgrade = false;
private boolean runAsService = false;
private boolean sortTopNodes = false;
+ private long hotBlockTimeInterval = 0;
Builder() {
}
@@ -153,6 +157,11 @@ final class BalancerParameters {
return this;
}
+ Builder setHotBlockTimeInterval(long t) {
+ this.hotBlockTimeInterval = t;
+ return this;
+ }
+
Builder setExcludedNodes(Set<String> nodes) {
this.excludedNodes = nodes;
return this;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index e19fbeb..c34e6a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -128,6 +128,7 @@ public class Dispatcher {
private final long getBlocksSize;
private final long getBlocksMinBlockSize;
private final long blockMoveTimeout;
+ private final long hotBlockTimeInterval;
/**
* If no block can be moved out of a {@link Source} after this configured
* amount of time, the Source should give up choosing the next possible move.
@@ -797,7 +798,8 @@ public class Dispatcher {
private long getBlockList() throws IOException {
final long size = Math.min(getBlocksSize, blocksToReceive);
final BlocksWithLocations newBlksLocs =
- nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize);
+ nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize,
+ hotBlockTimeInterval);
if (LOG.isTraceEnabled()) {
LOG.trace("getBlocks(" + getDatanodeInfo() + ", "
@@ -1011,14 +1013,15 @@ public class Dispatcher {
int maxNoMoveInterval, Configuration conf) {
this(nnc, includedNodes, excludedNodes, movedWinWidth,
moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
- 0L, 0L, 0, maxNoMoveInterval, -1, conf);
+ 0L, 0L, 0, maxNoMoveInterval, -1, 0, conf);
}
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
int dispatcherThreads, int maxConcurrentMovesPerNode,
long getBlocksSize, long getBlocksMinBlockSize, int blockMoveTimeout,
- int maxNoMoveInterval, long maxIterationTime, Configuration conf) {
+ int maxNoMoveInterval, long maxIterationTime, long hotBlockTimeInterval,
+ Configuration conf) {
this.nnc = nnc;
this.excludedNodes = excludedNodes;
this.includedNodes = includedNodes;
@@ -1034,6 +1037,7 @@ public class Dispatcher {
this.getBlocksSize = getBlocksSize;
this.getBlocksMinBlockSize = getBlocksMinBlockSize;
+ this.hotBlockTimeInterval = hotBlockTimeInterval;
this.blockMoveTimeout = blockMoveTimeout;
this.maxNoMoveInterval = maxNoMoveInterval;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 7f54c63..4d05242 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -249,7 +249,7 @@ public class NameNodeConnector implements Closeable {
/** @return blocks with locations. */
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
- minBlockSize) throws IOException {
+ minBlockSize, long timeInterval) throws IOException {
if (getBlocksRateLimiter != null) {
getBlocksRateLimiter.acquire();
}
@@ -284,7 +284,7 @@ public class NameNodeConnector implements Closeable {
} else {
nnproxy = namenode;
}
- return nnproxy.getBlocks(datanode, size, minBlockSize);
+ return nnproxy.getBlocks(datanode, size, minBlockSize, timeInterval);
} finally {
if (isRequestStandby) {
LOG.info("Request #getBlocks to Standby NameNode success.");
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index e42373f..d612fff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -89,6 +89,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@@ -1635,9 +1636,23 @@ public class BlockManager implements BlockStatsMXBean {
return liveReplicas >= getDatanodeManager().getNumLiveDataNodes();
}
+ private boolean isHotBlock(BlockInfo blockInfo, long time) {
+ INodeFile iFile = (INodeFile)getBlockCollection(blockInfo);
+ if(iFile == null) {
+ return false;
+ }
+ if(iFile.isUnderConstruction()) {
+ return true;
+ }
+ if (iFile.getAccessTime() > time || iFile.getModificationTime() > time) {
+ return true;
+ }
+ return false;
+ }
+
/** Get all blocks with location information from a datanode. */
public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
- final long size, final long minBlockSize) throws
+ final long size, final long minBlockSize, final long timeInterval) throws
UnregisteredNodeException {
final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode);
if (node == null) {
@@ -1655,15 +1670,21 @@ public class BlockManager implements BlockStatsMXBean {
int startBlock = ThreadLocalRandom.current().nextInt(numBlocks);
Iterator<BlockInfo> iter = node.getBlockIterator(startBlock);
List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
+ List<BlockInfo> pending = new ArrayList<BlockInfo>();
long totalSize = 0;
BlockInfo curBlock;
+ long hotTimePos = Time.now() - timeInterval;
while(totalSize<size && iter.hasNext()) {
curBlock = iter.next();
if(!curBlock.isComplete()) continue;
if (curBlock.getNumBytes() < minBlockSize) {
continue;
}
- totalSize += addBlock(curBlock, results);
+ if(timeInterval > 0 && isHotBlock(curBlock, hotTimePos)) {
+ pending.add(curBlock);
+ } else {
+ totalSize += addBlock(curBlock, results);
+ }
}
if(totalSize<size) {
iter = node.getBlockIterator(); // start from the beginning
@@ -1673,10 +1694,19 @@ public class BlockManager implements BlockStatsMXBean {
if (curBlock.getNumBytes() < minBlockSize) {
continue;
}
- totalSize += addBlock(curBlock, results);
+ if(timeInterval > 0 && isHotBlock(curBlock, hotTimePos)) {
+ pending.add(curBlock);
+ } else {
+ totalSize += addBlock(curBlock, results);
+ }
}
}
-
+ // if the cold block (access before timeInterval) is less than the
+ // asked size, it will add the pending hot block in end of return list.
+ for(int i = 0; i < pending.size() && totalSize < size; i++) {
+ curBlock = pending.get(i);
+ totalSize += addBlock(curBlock, results);
+ }
return new BlocksWithLocations(
results.toArray(new BlockWithLocations[results.size()]));
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index cc413a8..e48e20b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1893,13 +1893,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @param minimumBlockSize
*/
public BlocksWithLocations getBlocks(DatanodeID datanode, long size, long
- minimumBlockSize) throws IOException {
+ minimumBlockSize, long timeInterval) throws IOException {
checkOperation(OperationCategory.READ);
readLock();
try {
checkOperation(OperationCategory.READ);
return getBlockManager().getBlocksWithLocations(datanode, size,
- minimumBlockSize);
+ minimumBlockSize, timeInterval);
} finally {
readUnlock("getBlocks");
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index fde7ece..1d648f2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -651,7 +651,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
/////////////////////////////////////////////////////
@Override // NamenodeProtocol
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
- minBlockSize)
+ minBlockSize, long timeInterval)
throws IOException {
if(size <= 0) {
throw new IllegalArgumentException(
@@ -664,7 +664,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
namesystem.checkNameNodeSafeMode("Cannot execute getBlocks");
- return namesystem.getBlocks(datanode, size, minBlockSize);
+ return namesystem.getBlocks(datanode, size, minBlockSize, timeInterval);
}
@Override // NamenodeProtocol
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
index 90c3b23..44ffb85 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
@@ -74,6 +74,8 @@ public interface NamenodeProtocol {
* @param datanode a data node
* @param size requested size
* @param minBlockSize each block should be of this minimum Block Size
+ * @param hotBlockTimeInterval prefer to get blocks which are belong to
+ * the cold files accessed before the time interval
* @return BlocksWithLocations a list of blocks & their locations
* @throws IOException if size is less than or equal to 0 or
datanode does not exist
@@ -81,7 +83,7 @@ public interface NamenodeProtocol {
@Idempotent
@ReadOnly
BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
- minBlockSize) throws IOException;
+ minBlockSize, long hotBlockTimeInterval) throws IOException;
/**
* Get the current block keys
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
index 97f5bca..88d9fbc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
@@ -47,6 +47,7 @@ message GetBlocksRequestProto {
// cause problem during rolling upgrade, when balancers are upgraded later.
// For more info refer HDFS-13356
optional uint64 minBlockSize = 3 [default = 10485760];
+ optional uint64 timeInterval = 4 [default = 0];
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index c2fafb9..b1a0b1f 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -6068,4 +6068,13 @@
until capacity is balanced out.
</description>
</property>
+
+ <property>
+ <name>dfs.balancer.getBlocks.hot-time-interval</name>
+ <value>0</value>
+ <description>
+ Balancer prefer moving cold blocks i.e blocks associated with files
+ accessed or modified before the specified time interval.
+ </description>
+ </property>
</configuration>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
index 4b7a7a7..175c865 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
@@ -301,6 +301,7 @@ Usage:
| `-idleiterations` \<iterations\> | Maximum number of idle iterations before exit. This overwrites the default idleiterations(5). |
| `-runDuringUpgrade` | Whether to run the balancer during an ongoing HDFS upgrade. This is usually not desired since it will not affect used space on over-utilized machines. |
| `-asService` | Run Balancer as a long running service. |
+| `-hotBlockTimeInterval` | Prefer moving cold blocks i.e blocks associated with files accessed or modified before the specified time interval. |
| `-h`\|`--help` | Display the tool usage and help information and exit. |
Runs a cluster balancing utility. An administrator can simply press Ctrl-C to stop the rebalancing process. See [Balancer](./HdfsUserGuide.html#Balancer) for more details.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java
index e82b990..1ee166e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java
@@ -238,26 +238,26 @@ public class TestGetBlocks {
DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy();
// Should return all 13 blocks, as minBlockSize is not passed
- locs = namenode.getBlocks(dataNodes[0], fileLen, 0).getBlocks();
+ locs = namenode.getBlocks(dataNodes[0], fileLen, 0, 0).getBlocks();
assertEquals(blkLocsSize, locs.length);
assertEquals(locs[0].getStorageIDs().length, replicationFactor);
assertEquals(locs[1].getStorageIDs().length, replicationFactor);
// Should return 12 blocks, as minBlockSize is blkSize
- locs = namenode.getBlocks(dataNodes[0], fileLen, blkSize).getBlocks();
+ locs = namenode.getBlocks(dataNodes[0], fileLen, blkSize, 0).getBlocks();
assertEquals(blkLocsSize - 1, locs.length);
assertEquals(locs[0].getStorageIDs().length, replicationFactor);
assertEquals(locs[1].getStorageIDs().length, replicationFactor);
// get blocks of size BlockSize from dataNodes[0]
locs = namenode.getBlocks(dataNodes[0], blkSize,
- blkSize).getBlocks();
+ blkSize, 0).getBlocks();
assertEquals(locs.length, 1);
assertEquals(locs[0].getStorageIDs().length, replicationFactor);
// get blocks of size 1 from dataNodes[0]
- locs = namenode.getBlocks(dataNodes[0], 1, 1).getBlocks();
+ locs = namenode.getBlocks(dataNodes[0], 1, 1, 0).getBlocks();
assertEquals(locs.length, 1);
assertEquals(locs[0].getStorageIDs().length, replicationFactor);
@@ -282,7 +282,7 @@ public class TestGetBlocks {
// Namenode should refuse to provide block locations to the balancer
// while in safemode.
- locs = namenode.getBlocks(dataNodes[0], fileLen, 0).getBlocks();
+ locs = namenode.getBlocks(dataNodes[0], fileLen, 0, 0).getBlocks();
assertEquals(blkLocsSize, locs.length);
assertFalse(fs.isInSafeMode());
LOG.info("Entering safe mode");
@@ -309,7 +309,7 @@ public class TestGetBlocks {
// Namenode should refuse should fail
LambdaTestUtils.intercept(exClass,
- msg, () -> namenode.getBlocks(datanode, size, minBlkSize));
+ msg, () -> namenode.getBlocks(datanode, size, minBlkSize, 0));
}
/**
@@ -396,4 +396,76 @@ public class TestGetBlocks {
}
}
-}
+ private boolean belongToFile(BlockWithLocations blockWithLocations,
+ List<LocatedBlock> blocks) {
+ for(LocatedBlock block : blocks) {
+ if (block.getBlock().getLocalBlock().equals(
+ blockWithLocations.getBlock())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * test GetBlocks with dfs.namenode.hot.block.interval.
+ * Balancer prefer to get blocks which are belong to the cold files
+ * created before this time period.
+ */
+ @Test
+ public void testGetBlocksWithHotBlockTimeInterval() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ final short repFactor = (short) 1;
+ final int blockNum = 2;
+ final int fileLen = BLOCK_SIZE * blockNum;
+ final long hotInterval = 2000;
+
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).
+ numDataNodes(repFactor).build();
+ try {
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+ final DFSClient dfsclient = ((DistributedFileSystem) fs).getClient();
+
+ String fileOld = "/f.old";
+ DFSTestUtil.createFile(fs, new Path(fileOld), fileLen, repFactor, 0);
+
+ List<LocatedBlock> locatedBlocksOld = dfsclient.getNamenode().
+ getBlockLocations(fileOld, 0, fileLen).getLocatedBlocks();
+ DatanodeInfo[] dataNodes = locatedBlocksOld.get(0).getLocations();
+
+ InetSocketAddress addr = new InetSocketAddress("localhost",
+ cluster.getNameNodePort());
+ NamenodeProtocol namenode = NameNodeProxies.createProxy(conf,
+ DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy();
+
+ // make the file as old.
+ dfsclient.getNamenode().setTimes(fileOld, 0, 0);
+
+ String fileNew = "/f.new";
+ DFSTestUtil.createFile(fs, new Path(fileNew), fileLen, repFactor, 0);
+ List<LocatedBlock> locatedBlocksNew = dfsclient.getNamenode()
+ .getBlockLocations(fileNew, 0, fileLen).getLocatedBlocks();
+
+ BlockWithLocations[] locsAll = namenode.getBlocks(
+ dataNodes[0], fileLen*2, 0, hotInterval).getBlocks();
+ assertEquals(locsAll.length, 4);
+
+ for(int i = 0; i < blockNum; i++) {
+ assertTrue(belongToFile(locsAll[i], locatedBlocksOld));
+ }
+ for(int i = blockNum; i < blockNum*2; i++) {
+ assertTrue(belongToFile(locsAll[i], locatedBlocksNew));
+ }
+
+ BlockWithLocations[] locs2 = namenode.getBlocks(
+ dataNodes[0], fileLen*2, 0, hotInterval).getBlocks();
+ for(int i = 0; i < 2; i++) {
+ assertTrue(belongToFile(locs2[i], locatedBlocksOld));
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 82d710d..9f65ffa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -2170,7 +2170,8 @@ public class TestBalancer {
endGetBlocksTime.getAndUpdate((curr) -> Math.max(curr, endTime));
numGetBlocksCalls.incrementAndGet();
return blk;
- }}).when(fsnSpy).getBlocks(any(DatanodeID.class), anyLong(), anyLong());
+ }}).when(fsnSpy).getBlocks(any(DatanodeID.class),
+ anyLong(), anyLong(), anyLong());
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
index b0ee04e..a74f94f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
@@ -228,7 +228,7 @@ public class TestBalancerWithHANameNodes {
int expectedObserverIdx = withObserverFailure ? 3 : 2;
int expectedCount = (i == expectedObserverIdx) ? 2 : 0;
verify(namesystemSpies.get(i), times(expectedCount))
- .getBlocks(any(), anyLong(), anyLong());
+ .getBlocks(any(), anyLong(), anyLong(), anyLong());
}
} finally {
if (qjmhaCluster != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org