You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ay...@apache.org on 2020/12/20 17:42:23 UTC

[hadoop] branch trunk updated: HDFS-15655. Add option to make balancer prefer to get cold blocks. Contributed by Yang Yun.

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2aea43bf HDFS-15655. Add option to make balancer prefer to get cold blocks. Contributed by Yang Yun.
2aea43bf is described below

commit 2aea43bf4fcc63f7b38292942df1fea600bb8dc9
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Sun Dec 20 23:06:19 2020 +0530

    HDFS-15655. Add option to make balancer prefer to get cold blocks. Contributed by Yang Yun.
---
 .../federation/router/RouterNamenodeProtocol.java  |  8 +-
 .../server/federation/router/RouterRpcServer.java  |  5 +-
 .../server/federation/router/TestRouterRpc.java    |  4 +-
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |  4 +
 .../NamenodeProtocolServerSideTranslatorPB.java    |  2 +-
 .../protocolPB/NamenodeProtocolTranslatorPB.java   |  4 +-
 .../hadoop/hdfs/server/balancer/Balancer.java      | 19 ++++-
 .../hdfs/server/balancer/BalancerParameters.java   | 13 +++-
 .../hadoop/hdfs/server/balancer/Dispatcher.java    | 10 ++-
 .../hdfs/server/balancer/NameNodeConnector.java    |  4 +-
 .../hdfs/server/blockmanagement/BlockManager.java  | 38 +++++++++-
 .../hadoop/hdfs/server/namenode/FSNamesystem.java  |  4 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java    |  4 +-
 .../hdfs/server/protocol/NamenodeProtocol.java     |  4 +-
 .../src/main/proto/NamenodeProtocol.proto          |  1 +
 .../src/main/resources/hdfs-default.xml            |  9 +++
 .../hadoop-hdfs/src/site/markdown/HDFSCommands.md  |  1 +
 .../java/org/apache/hadoop/hdfs/TestGetBlocks.java | 86 ++++++++++++++++++++--
 .../hadoop/hdfs/server/balancer/TestBalancer.java  |  3 +-
 .../balancer/TestBalancerWithHANameNodes.java      |  2 +-
 20 files changed, 188 insertions(+), 37 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java
index c6b0209..278d282 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java
@@ -53,7 +53,7 @@ public class RouterNamenodeProtocol implements NamenodeProtocol {
 
   @Override
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
-      long minBlockSize) throws IOException {
+      long minBlockSize, long hotBlockTimeInterval) throws IOException {
     rpcServer.checkOperation(OperationCategory.READ);
 
     // Get the namespace where the datanode is located
@@ -78,9 +78,9 @@ public class RouterNamenodeProtocol implements NamenodeProtocol {
     // Forward to the proper namenode
     if (nsId != null) {
       RemoteMethod method = new RemoteMethod(
-          NamenodeProtocol.class, "getBlocks",
-          new Class<?>[] {DatanodeInfo.class, long.class, long.class},
-          datanode, size, minBlockSize);
+          NamenodeProtocol.class, "getBlocks", new Class<?>[]
+          {DatanodeInfo.class, long.class, long.class, long.class},
+          datanode, size, minBlockSize, hotBlockTimeInterval);
       return rpcClient.invokeSingle(nsId, method, BlocksWithLocations.class);
     }
     return null;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 315f864..a8cb5c6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -1490,8 +1490,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
 
   @Override // NamenodeProtocol
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
-      long minBlockSize) throws IOException {
-    return nnProto.getBlocks(datanode, size, minBlockSize);
+      long minBlockSize, long hotBlockTimeInterval) throws IOException {
+    return nnProto.getBlocks(datanode, size, minBlockSize,
+            hotBlockTimeInterval);
   }
 
   @Override // NamenodeProtocol
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
index 09ca0d4..4b997eb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
@@ -1350,9 +1350,9 @@ public class TestRouterRpc {
 
     // Verify that checking that datanode works
     BlocksWithLocations routerBlockLocations =
-        routerNamenodeProtocol.getBlocks(dn0, 1024, 0);
+        routerNamenodeProtocol.getBlocks(dn0, 1024, 0, 0);
     BlocksWithLocations nnBlockLocations =
-        nnNamenodeProtocol.getBlocks(dn0, 1024, 0);
+        nnNamenodeProtocol.getBlocks(dn0, 1024, 0, 0);
     BlockWithLocations[] routerBlocks = routerBlockLocations.getBlocks();
     BlockWithLocations[] nnBlocks = nnBlockLocations.getBlocks();
     assertEquals(nnBlocks.length, routerBlocks.length);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index e904f08..0a5caed 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -713,6 +713,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long    DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT = 2L*1024*1024*1024; // 2GB
   public static final String  DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY = "dfs.balancer.getBlocks.min-block-size";
   public static final long    DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT = 10L*1024*1024; // 10MB
+  public static final String  DFS_BALANCER_GETBLOCKS_HOT_TIME_INTERVAL_KEY =
+      "dfs.balancer.getBlocks.hot-time-interval";
+  public static final long    DFS_BALANCER_GETBLOCKS_HOT_TIME_INTERVAL_DEFAULT =
+      0;
   public static final String  DFS_BALANCER_KEYTAB_ENABLED_KEY = "dfs.balancer.keytab.enabled";
   public static final boolean DFS_BALANCER_KEYTAB_ENABLED_DEFAULT = false;
   public static final String  DFS_BALANCER_ADDRESS_KEY = "dfs.balancer.address";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
index 49fe99b..e89a6b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
@@ -89,7 +89,7 @@ public class NamenodeProtocolServerSideTranslatorPB implements
     BlocksWithLocations blocks;
     try {
       blocks = impl.getBlocks(dnInfo, request.getSize(),
-          request.getMinBlockSize());
+          request.getMinBlockSize(), request.getTimeInterval());
     } catch (IOException e) {
       throw new ServiceException(e);
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
index 603e14d..201004d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
@@ -102,11 +102,11 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
 
   @Override
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
-      minBlockSize)
+      minBlockSize, long timeInterval)
       throws IOException {
     GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder()
         .setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size)
-        .setMinBlockSize(minBlockSize).build();
+        .setMinBlockSize(minBlockSize).setTimeInterval(timeInterval).build();
     try {
       return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req)
           .getBlocks());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index e5f9e8c..6734c97 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -203,6 +203,7 @@ public class Balancer {
       + "on over-utilized machines."
       + "\n\t[-asService]\tRun as a long running service."
       + "\n\t[-sortTopNodes]"
+      + "\n\t[-hotBlockTimeInterval]\tprefer to move cold blocks."
       + "\tSort datanodes based on the utilization so "
       + "that highly utilized datanodes get scheduled first.";
 
@@ -315,6 +316,14 @@ public class Balancer {
     final long maxIterationTime = conf.getLong(
         DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY,
         DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT);
+    /**
+     * Balancer prefer to get blocks which are belong to the cold files
+     * created before this time period.
+     */
+    final long hotBlockTimeInterval = conf.getTimeDuration(
+        DFSConfigKeys.DFS_BALANCER_GETBLOCKS_HOT_TIME_INTERVAL_KEY,
+        DFSConfigKeys.DFS_BALANCER_GETBLOCKS_HOT_TIME_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
 
     // DataNode configuration parameters for balancing
     final int maxConcurrentMovesPerNode = getInt(conf,
@@ -329,7 +338,7 @@ public class Balancer {
             p.getExcludedNodes(), movedWinWidth, moverThreads,
             dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize,
             getBlocksMinBlockSize, blockMoveTimeout, maxNoMoveInterval,
-            maxIterationTime, conf);
+            maxIterationTime, hotBlockTimeInterval, conf);
     this.threshold = p.getThreshold();
     this.policy = p.getBalancingPolicy();
     this.sourceNodes = p.getSourceNodes();
@@ -990,6 +999,14 @@ public class Balancer {
             } else if ("-asService".equalsIgnoreCase(args[i])) {
               b.setRunAsService(true);
               LOG.info("Balancer will run as a long running service");
+            } else if ("-hotBlockTimeInterval".equalsIgnoreCase(args[i])) {
+              checkArgument(++i < args.length,
+                  "hotBlockTimeInterval value is missing: args = "
+                  + Arrays.toString(args));
+              long hotBlockTimeInterval = Long.parseLong(args[i]);
+              LOG.info("Using a hotBlockTimeInterval of "
+                  + hotBlockTimeInterval);
+              b.setHotBlockTimeInterval(hotBlockTimeInterval);
             } else if ("-sortTopNodes".equalsIgnoreCase(args[i])) {
               b.setSortTopNodes(true);
               LOG.info("Balancer will sort nodes by" +
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java
index e614327..a8ce338 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java
@@ -27,6 +27,7 @@ final class BalancerParameters {
   private final BalancingPolicy policy;
   private final double threshold;
   private final int maxIdleIteration;
+  private final long hotBlockTimeInterval;
   /** Exclude the nodes in this set. */
   private final Set<String> excludedNodes;
   /** If empty, include any node; otherwise, include only these nodes. */
@@ -66,6 +67,7 @@ final class BalancerParameters {
     this.runDuringUpgrade = builder.runDuringUpgrade;
     this.runAsService = builder.runAsService;
     this.sortTopNodes = builder.sortTopNodes;
+    this.hotBlockTimeInterval = builder.hotBlockTimeInterval;
   }
 
   BalancingPolicy getBalancingPolicy() {
@@ -113,12 +115,13 @@ final class BalancerParameters {
     return String.format("%s.%s [%s," + " threshold = %s,"
         + " max idle iteration = %s," + " #excluded nodes = %s,"
         + " #included nodes = %s," + " #source nodes = %s,"
-        + " #blockpools = %s," + " run during upgrade = %s]"
+        + " #blockpools = %s," + " run during upgrade = %s,"
+        + " hot block time interval = %s]"
         + " sort top nodes = %s",
         Balancer.class.getSimpleName(), getClass().getSimpleName(), policy,
         threshold, maxIdleIteration, excludedNodes.size(),
         includedNodes.size(), sourceNodes.size(), blockpools.size(),
-        runDuringUpgrade, sortTopNodes);
+        runDuringUpgrade, sortTopNodes, hotBlockTimeInterval);
   }
 
   static class Builder {
@@ -134,6 +137,7 @@ final class BalancerParameters {
     private boolean runDuringUpgrade = false;
     private boolean runAsService = false;
     private boolean sortTopNodes = false;
+    private long hotBlockTimeInterval = 0;
 
     Builder() {
     }
@@ -153,6 +157,11 @@ final class BalancerParameters {
       return this;
     }
 
+    Builder setHotBlockTimeInterval(long t) {
+      this.hotBlockTimeInterval = t;
+      return this;
+    }
+
     Builder setExcludedNodes(Set<String> nodes) {
       this.excludedNodes = nodes;
       return this;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index e19fbeb..c34e6a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -128,6 +128,7 @@ public class Dispatcher {
   private final long getBlocksSize;
   private final long getBlocksMinBlockSize;
   private final long blockMoveTimeout;
+  private final long hotBlockTimeInterval;
   /**
    * If no block can be moved out of a {@link Source} after this configured
    * amount of time, the Source should give up choosing the next possible move.
@@ -797,7 +798,8 @@ public class Dispatcher {
     private long getBlockList() throws IOException {
       final long size = Math.min(getBlocksSize, blocksToReceive);
       final BlocksWithLocations newBlksLocs =
-          nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize);
+          nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize,
+              hotBlockTimeInterval);
 
       if (LOG.isTraceEnabled()) {
         LOG.trace("getBlocks(" + getDatanodeInfo() + ", "
@@ -1011,14 +1013,15 @@ public class Dispatcher {
       int maxNoMoveInterval, Configuration conf) {
     this(nnc, includedNodes, excludedNodes, movedWinWidth,
         moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
-        0L, 0L, 0, maxNoMoveInterval, -1, conf);
+        0L, 0L, 0, maxNoMoveInterval, -1, 0, conf);
   }
 
   Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
       Set<String> excludedNodes, long movedWinWidth, int moverThreads,
       int dispatcherThreads, int maxConcurrentMovesPerNode,
       long getBlocksSize, long getBlocksMinBlockSize, int blockMoveTimeout,
-      int maxNoMoveInterval, long maxIterationTime, Configuration conf) {
+      int maxNoMoveInterval, long maxIterationTime, long hotBlockTimeInterval,
+      Configuration conf) {
     this.nnc = nnc;
     this.excludedNodes = excludedNodes;
     this.includedNodes = includedNodes;
@@ -1034,6 +1037,7 @@ public class Dispatcher {
 
     this.getBlocksSize = getBlocksSize;
     this.getBlocksMinBlockSize = getBlocksMinBlockSize;
+    this.hotBlockTimeInterval = hotBlockTimeInterval;
     this.blockMoveTimeout = blockMoveTimeout;
     this.maxNoMoveInterval = maxNoMoveInterval;
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 7f54c63..4d05242 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -249,7 +249,7 @@ public class NameNodeConnector implements Closeable {
 
   /** @return blocks with locations. */
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
-      minBlockSize) throws IOException {
+      minBlockSize, long timeInterval) throws IOException {
     if (getBlocksRateLimiter != null) {
       getBlocksRateLimiter.acquire();
     }
@@ -284,7 +284,7 @@ public class NameNodeConnector implements Closeable {
       } else {
         nnproxy = namenode;
       }
-      return nnproxy.getBlocks(datanode, size, minBlockSize);
+      return nnproxy.getBlocks(datanode, size, minBlockSize, timeInterval);
     } finally {
       if (isRequestStandby) {
         LOG.info("Request #getBlocks to Standby NameNode success.");
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index e42373f..d612fff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -89,6 +89,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@@ -1635,9 +1636,23 @@ public class BlockManager implements BlockStatsMXBean {
     return liveReplicas >= getDatanodeManager().getNumLiveDataNodes();
   }
 
+  private boolean isHotBlock(BlockInfo blockInfo, long time) {
+    INodeFile iFile = (INodeFile)getBlockCollection(blockInfo);
+    if(iFile == null) {
+      return false;
+    }
+    if(iFile.isUnderConstruction()) {
+      return true;
+    }
+    if (iFile.getAccessTime() > time || iFile.getModificationTime() > time) {
+      return true;
+    }
+    return false;
+  }
+
   /** Get all blocks with location information from a datanode. */
   public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
-      final long size, final long minBlockSize) throws
+      final long size, final long minBlockSize, final long timeInterval) throws
       UnregisteredNodeException {
     final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode);
     if (node == null) {
@@ -1655,15 +1670,21 @@ public class BlockManager implements BlockStatsMXBean {
     int startBlock = ThreadLocalRandom.current().nextInt(numBlocks);
     Iterator<BlockInfo> iter = node.getBlockIterator(startBlock);
     List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
+    List<BlockInfo> pending = new ArrayList<BlockInfo>();
     long totalSize = 0;
     BlockInfo curBlock;
+    long hotTimePos = Time.now() - timeInterval;
     while(totalSize<size && iter.hasNext()) {
       curBlock = iter.next();
       if(!curBlock.isComplete())  continue;
       if (curBlock.getNumBytes() < minBlockSize) {
         continue;
       }
-      totalSize += addBlock(curBlock, results);
+      if(timeInterval > 0 && isHotBlock(curBlock, hotTimePos)) {
+        pending.add(curBlock);
+      } else {
+        totalSize += addBlock(curBlock, results);
+      }
     }
     if(totalSize<size) {
       iter = node.getBlockIterator(); // start from the beginning
@@ -1673,10 +1694,19 @@ public class BlockManager implements BlockStatsMXBean {
         if (curBlock.getNumBytes() < minBlockSize) {
           continue;
         }
-        totalSize += addBlock(curBlock, results);
+        if(timeInterval > 0 && isHotBlock(curBlock, hotTimePos)) {
+          pending.add(curBlock);
+        } else {
+          totalSize += addBlock(curBlock, results);
+        }
       }
     }
-
+    // if the cold block (access before timeInterval) is less than the
+    // asked size, it will add the pending hot block in end of return list.
+    for(int i = 0; i < pending.size() && totalSize < size; i++) {
+      curBlock = pending.get(i);
+      totalSize += addBlock(curBlock, results);
+    }
     return new BlocksWithLocations(
         results.toArray(new BlockWithLocations[results.size()]));
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index cc413a8..e48e20b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1893,13 +1893,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * @param minimumBlockSize
    */
   public BlocksWithLocations getBlocks(DatanodeID datanode, long size, long
-      minimumBlockSize) throws IOException {
+      minimumBlockSize, long timeInterval) throws IOException {
     checkOperation(OperationCategory.READ);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
       return getBlockManager().getBlocksWithLocations(datanode, size,
-          minimumBlockSize);
+          minimumBlockSize, timeInterval);
     } finally {
       readUnlock("getBlocks");
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index fde7ece..1d648f2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -651,7 +651,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   /////////////////////////////////////////////////////
   @Override // NamenodeProtocol
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
-      minBlockSize)
+      minBlockSize, long timeInterval)
       throws IOException {
     if(size <= 0) {
       throw new IllegalArgumentException(
@@ -664,7 +664,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     checkNNStartup();
     namesystem.checkSuperuserPrivilege();
     namesystem.checkNameNodeSafeMode("Cannot execute getBlocks");
-    return namesystem.getBlocks(datanode, size, minBlockSize);
+    return namesystem.getBlocks(datanode, size, minBlockSize, timeInterval);
   }
 
   @Override // NamenodeProtocol
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
index 90c3b23..44ffb85 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
@@ -74,6 +74,8 @@ public interface NamenodeProtocol {
    * @param datanode  a data node
    * @param size      requested size
    * @param minBlockSize each block should be of this minimum Block Size
+   * @param hotBlockTimeInterval prefer to get blocks which are belong to
+   * the cold files accessed before the time interval
    * @return BlocksWithLocations a list of blocks &amp; their locations
    * @throws IOException if size is less than or equal to 0 or
   datanode does not exist
@@ -81,7 +83,7 @@ public interface NamenodeProtocol {
   @Idempotent
   @ReadOnly
   BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
-      minBlockSize) throws IOException;
+      minBlockSize, long hotBlockTimeInterval) throws IOException;
 
   /**
    * Get the current block keys
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
index 97f5bca..88d9fbc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
@@ -47,6 +47,7 @@ message GetBlocksRequestProto {
   // cause problem during rolling upgrade, when balancers are upgraded later.
   // For more info refer HDFS-13356
   optional uint64 minBlockSize = 3 [default = 10485760];
+  optional uint64 timeInterval = 4 [default = 0];
 }
 
  
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index c2fafb9..b1a0b1f 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -6068,4 +6068,13 @@
         until capacity is balanced out.
     </description>
   </property>
+
+  <property>
+    <name>dfs.balancer.getBlocks.hot-time-interval</name>
+    <value>0</value>
+    <description>
+        Balancer prefer moving cold blocks i.e blocks associated with files
+        accessed or modified before the specified time interval.
+    </description>
+  </property>
 </configuration>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
index 4b7a7a7..175c865 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
@@ -301,6 +301,7 @@ Usage:
 | `-idleiterations` \<iterations\> | Maximum number of idle iterations before exit. This overwrites the default idleiterations(5). |
 | `-runDuringUpgrade` | Whether to run the balancer during an ongoing HDFS upgrade. This is usually not desired since it will not affect used space on over-utilized machines. |
 | `-asService` | Run Balancer as a long running service. |
+| `-hotBlockTimeInterval` | Prefer moving cold blocks i.e blocks associated with files accessed or modified before the specified time interval. |
 | `-h`\|`--help` | Display the tool usage and help information and exit. |
 
 Runs a cluster balancing utility. An administrator can simply press Ctrl-C to stop the rebalancing process. See [Balancer](./HdfsUserGuide.html#Balancer) for more details.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java
index e82b990..1ee166e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java
@@ -238,26 +238,26 @@ public class TestGetBlocks {
           DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy();
 
       // Should return all 13 blocks, as minBlockSize is not passed
-      locs = namenode.getBlocks(dataNodes[0], fileLen, 0).getBlocks();
+      locs = namenode.getBlocks(dataNodes[0], fileLen, 0, 0).getBlocks();
       assertEquals(blkLocsSize, locs.length);
 
       assertEquals(locs[0].getStorageIDs().length, replicationFactor);
       assertEquals(locs[1].getStorageIDs().length, replicationFactor);
 
       // Should return 12 blocks, as minBlockSize is blkSize
-      locs = namenode.getBlocks(dataNodes[0], fileLen, blkSize).getBlocks();
+      locs = namenode.getBlocks(dataNodes[0], fileLen, blkSize, 0).getBlocks();
       assertEquals(blkLocsSize - 1, locs.length);
       assertEquals(locs[0].getStorageIDs().length, replicationFactor);
       assertEquals(locs[1].getStorageIDs().length, replicationFactor);
 
       // get blocks of size BlockSize from dataNodes[0]
       locs = namenode.getBlocks(dataNodes[0], blkSize,
-          blkSize).getBlocks();
+          blkSize, 0).getBlocks();
       assertEquals(locs.length, 1);
       assertEquals(locs[0].getStorageIDs().length, replicationFactor);
 
       // get blocks of size 1 from dataNodes[0]
-      locs = namenode.getBlocks(dataNodes[0], 1, 1).getBlocks();
+      locs = namenode.getBlocks(dataNodes[0], 1, 1, 0).getBlocks();
       assertEquals(locs.length, 1);
       assertEquals(locs[0].getStorageIDs().length, replicationFactor);
 
@@ -282,7 +282,7 @@ public class TestGetBlocks {
 
       // Namenode should refuse to provide block locations to the balancer
       // while in safemode.
-      locs = namenode.getBlocks(dataNodes[0], fileLen, 0).getBlocks();
+      locs = namenode.getBlocks(dataNodes[0], fileLen, 0, 0).getBlocks();
       assertEquals(blkLocsSize, locs.length);
       assertFalse(fs.isInSafeMode());
       LOG.info("Entering safe mode");
@@ -309,7 +309,7 @@ public class TestGetBlocks {
 
     // Namenode should refuse should fail
     LambdaTestUtils.intercept(exClass,
-        msg, () -> namenode.getBlocks(datanode, size, minBlkSize));
+        msg, () -> namenode.getBlocks(datanode, size, minBlkSize, 0));
   }
 
   /**
@@ -396,4 +396,76 @@ public class TestGetBlocks {
     }
   }
 
-}
+  private boolean belongToFile(BlockWithLocations blockWithLocations,
+                               List<LocatedBlock> blocks) {
+    for(LocatedBlock block : blocks) {
+      if (block.getBlock().getLocalBlock().equals(
+          blockWithLocations.getBlock())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * test GetBlocks with dfs.namenode.hot.block.interval.
+   * Balancer prefer to get blocks which are belong to the cold files
+   * created before this time period.
+   */
+  @Test
+  public void testGetBlocksWithHotBlockTimeInterval() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    final short repFactor = (short) 1;
+    final int blockNum = 2;
+    final int fileLen = BLOCK_SIZE * blockNum;
+    final long hotInterval = 2000;
+
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).
+        numDataNodes(repFactor).build();
+    try {
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      final DFSClient dfsclient = ((DistributedFileSystem) fs).getClient();
+
+      String fileOld = "/f.old";
+      DFSTestUtil.createFile(fs, new Path(fileOld), fileLen, repFactor, 0);
+
+      List<LocatedBlock> locatedBlocksOld = dfsclient.getNamenode().
+          getBlockLocations(fileOld, 0, fileLen).getLocatedBlocks();
+      DatanodeInfo[] dataNodes = locatedBlocksOld.get(0).getLocations();
+
+      InetSocketAddress addr = new InetSocketAddress("localhost",
+          cluster.getNameNodePort());
+      NamenodeProtocol namenode = NameNodeProxies.createProxy(conf,
+          DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy();
+
+      // make the file as old.
+      dfsclient.getNamenode().setTimes(fileOld, 0, 0);
+
+      String fileNew = "/f.new";
+      DFSTestUtil.createFile(fs, new Path(fileNew), fileLen, repFactor, 0);
+      List<LocatedBlock> locatedBlocksNew = dfsclient.getNamenode()
+          .getBlockLocations(fileNew, 0, fileLen).getLocatedBlocks();
+
+      BlockWithLocations[] locsAll = namenode.getBlocks(
+          dataNodes[0], fileLen*2, 0, hotInterval).getBlocks();
+      assertEquals(locsAll.length, 4);
+
+      for(int i = 0; i < blockNum; i++) {
+        assertTrue(belongToFile(locsAll[i], locatedBlocksOld));
+      }
+      for(int i = blockNum; i < blockNum*2; i++) {
+        assertTrue(belongToFile(locsAll[i], locatedBlocksNew));
+      }
+
+      BlockWithLocations[]  locs2 = namenode.getBlocks(
+          dataNodes[0], fileLen*2, 0, hotInterval).getBlocks();
+      for(int i = 0; i < 2; i++) {
+        assertTrue(belongToFile(locs2[i], locatedBlocksOld));
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 82d710d..9f65ffa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -2170,7 +2170,8 @@ public class TestBalancer {
         endGetBlocksTime.getAndUpdate((curr) -> Math.max(curr, endTime));
         numGetBlocksCalls.incrementAndGet();
         return blk;
-      }}).when(fsnSpy).getBlocks(any(DatanodeID.class), anyLong(), anyLong());
+      }}).when(fsnSpy).getBlocks(any(DatanodeID.class),
+        anyLong(), anyLong(), anyLong());
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
index b0ee04e..a74f94f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
@@ -228,7 +228,7 @@ public class TestBalancerWithHANameNodes {
         int expectedObserverIdx = withObserverFailure ? 3 : 2;
         int expectedCount = (i == expectedObserverIdx) ? 2 : 0;
         verify(namesystemSpies.get(i), times(expectedCount))
-            .getBlocks(any(), anyLong(), anyLong());
+            .getBlocks(any(), anyLong(), anyLong(), anyLong());
       }
     } finally {
       if (qjmhaCluster != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org