You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2020/11/03 23:08:07 UTC

[hadoop] branch branch-2.10 updated: HDFS-15665. Balancer logging improvements. Contributed by Konstantin V Shvachko.

This is an automated email from the ASF dual-hosted git repository.

shv pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new a9bf374  HDFS-15665. Balancer logging improvements. Contributed by Konstantin V Shvachko.
a9bf374 is described below

commit a9bf3743801558f7b453e5f6db86ca8e935e3763
Author: Konstantin V Shvachko <sh...@apache.org>
AuthorDate: Tue Nov 3 12:01:30 2020 -0800

    HDFS-15665. Balancer logging improvements. Contributed by Konstantin V Shvachko.
    
    (cherry picked from commit d07dc7afb4aa0d6cc9f9be530802e54610776a4d)
---
 .../hadoop/hdfs/server/balancer/Balancer.java      | 42 +++++++++++++---------
 .../hadoop/hdfs/server/balancer/Dispatcher.java    | 27 ++++++++++----
 .../hdfs/server/balancer/NameNodeConnector.java    | 14 ++++++++
 .../hadoop/hdfs/server/balancer/TestBalancer.java  |  2 +-
 .../TestBalancerWithMultipleNameNodes.java         | 12 +++----
 5 files changed, 68 insertions(+), 29 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index be72eb4..bb4b697 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -34,8 +34,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -64,7 +62,8 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import static com.google.common.base.Preconditions.checkArgument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
@@ -171,7 +170,7 @@ import com.google.common.base.Preconditions;
 
 @InterfaceAudience.Private
 public class Balancer {
-  static final Log LOG = LogFactory.getLog(Balancer.class);
+  static final Logger LOG = LoggerFactory.getLogger(Balancer.class);
 
   static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
 
@@ -261,6 +260,9 @@ public class Balancer {
    */
   Balancer(NameNodeConnector theblockpool, BalancerParameters p,
       Configuration conf) {
+    // NameNode configuration parameters for balancing
+    getInt(conf, DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY,
+        DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT);
     final long movedWinWidth = getLong(conf,
         DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
         DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
@@ -270,10 +272,6 @@ public class Balancer {
     final int dispatcherThreads = getInt(conf,
         DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
         DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT);
-    final int maxConcurrentMovesPerNode = getInt(conf,
-        DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
-        DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
-
     final long getBlocksSize = getLongBytes(conf,
         DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_KEY,
         DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT);
@@ -290,6 +288,13 @@ public class Balancer {
         DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY,
         DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT);
 
+    // DataNode configuration parameters for balancing
+    final int maxConcurrentMovesPerNode = getInt(conf,
+        DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+        DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
+    getLongBytes(conf, DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
+        DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT);
+
     this.nnc = theblockpool;
     this.dispatcher =
         new Dispatcher(theblockpool, p.getIncludedNodes(),
@@ -582,12 +587,13 @@ public class Balancer {
       this.bytesAlreadyMoved = bytesAlreadyMoved;
     }
 
-    void print(int iteration, PrintStream out) {
-      out.printf("%-24s %10d  %19s  %18s  %17s%n",
+    void print(int iteration, NameNodeConnector nnc, PrintStream out) {
+      out.printf("%-24s %10d  %19s  %18s  %17s  %s%n",
           DateFormat.getDateTimeInstance().format(new Date()), iteration,
           StringUtils.byteDesc(bytesAlreadyMoved),
           StringUtils.byteDesc(bytesLeftToMove),
-          StringUtils.byteDesc(bytesBeingMoved));
+          StringUtils.byteDesc(bytesBeingMoved),
+          nnc.getNameNodeUri());
     }
   }
 
@@ -633,8 +639,10 @@ public class Balancer {
         System.out.println("No block can be moved. Exiting...");
         return newResult(ExitStatus.NO_MOVE_BLOCK, bytesLeftToMove, bytesBeingMoved);
       } else {
-        LOG.info( "Will move " + StringUtils.byteDesc(bytesBeingMoved) +
-            " in this iteration");
+        LOG.info("Will move {}  in this iteration for {}",
+            StringUtils.byteDesc(bytesBeingMoved), nnc.toString());
+        LOG.info("Total target DataNodes in this iteration: {}",
+            dispatcher.moveTasksTotal());
       }
 
       /* For each pair of <source, target>, start a thread that repeatedly 
@@ -681,7 +689,9 @@ public class Balancer {
     LOG.info("excluded nodes = " + p.getExcludedNodes());
     LOG.info("source nodes = " + p.getSourceNodes());
     checkKeytabAndInit(conf);
-    System.out.println("Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved");
+    System.out.println("Time Stamp               Iteration#"
+        + "  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved"
+        + "  NameNode");
     
     List<NameNodeConnector> connectors = Collections.emptyList();
     try {
@@ -698,7 +708,7 @@ public class Balancer {
               || p.getBlockPools().contains(nnc.getBlockpoolID())) {
             final Balancer b = new Balancer(nnc, p, conf);
             final Result r = b.runOneIteration();
-            r.print(iteration, System.out);
+            r.print(iteration, nnc, System.out);
 
             // clean all lists
             b.resetData(conf);
@@ -718,7 +728,7 @@ public class Balancer {
       }
     } finally {
       for(NameNodeConnector nnc : connectors) {
-        IOUtils.cleanup(LOG, nnc);
+        IOUtils.cleanupWithLogger(LOG, nnc);
       }
     }
     return ExitStatus.SUCCESS.getExitCode();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 84cc13b..afa811e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -42,11 +42,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -77,6 +74,8 @@ import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -84,7 +83,7 @@ import com.google.common.base.Preconditions;
 /** Dispatching block replica moves between datanodes. */
 @InterfaceAudience.Private
 public class Dispatcher {
-  static final Log LOG = LogFactory.getLog(Dispatcher.class);
+  static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class);
 
   /**
    * the period of time to delay the usage of a DataNode after hitting
@@ -378,7 +377,7 @@ public class Dispatcher {
 
         sendRequest(out, eb, accessToken);
         receiveResponse(in);
-        nnc.getBytesMoved().addAndGet(block.getNumBytes());
+        nnc.addBytesMoved(block.getNumBytes());
         target.getDDatanode().setHasSuccess();
         LOG.info("Successfully moved " + this);
       } catch (IOException e) {
@@ -967,6 +966,10 @@ public class Dispatcher {
     return nnc.getBytesMoved().get();
   }
 
+  long getBblocksMoved() {
+    return nnc.getBlocksMoved().get();
+  }
+
   long bytesToMove() {
     Preconditions.checkState(
         storageGroupMap.size() >= sources.size() + targets.size(),
@@ -986,6 +989,14 @@ public class Dispatcher {
     targets.add(target);
   }
 
+  public int moveTasksTotal() {
+    int b = 0;
+    for (Source src : sources) {
+      b += src.tasks.size();
+    }
+    return b;
+  }
+
   private boolean shouldIgnore(DatanodeInfo dn) {
     // ignore out-of-service nodes
     final boolean outOfService = !dn.isInService();
@@ -1067,12 +1078,13 @@ public class Dispatcher {
    */
   private long dispatchBlockMoves() throws InterruptedException {
     final long bytesLastMoved = getBytesMoved();
+    final long blocksLastMoved = getBblocksMoved();
     final Future<?>[] futures = new Future<?>[sources.size()];
 
     int concurrentThreads = Math.min(sources.size(),
         ((ThreadPoolExecutor)dispatchExecutor).getCorePoolSize());
     assert concurrentThreads > 0 : "Number of concurrent threads is 0.";
-    LOG.debug("Balancer concurrent dispatcher threads " + concurrentThreads);
+    LOG.info("Balancer concurrent dispatcher threads = {}", concurrentThreads);
 
     // Determine the size of each mover thread pool per target
     int threadsPerTarget = maxMoverThreads/targets.size();
@@ -1114,6 +1126,9 @@ public class Dispatcher {
 
     // wait for all block moving to be done
     waitForMoveCompletion(targets);
+    LOG.info("Total bytes (blocks) moved in this iteration {} ({})",
+        StringUtils.byteDesc(getBytesMoved() - bytesLastMoved),
+        (getBblocksMoved() - blocksLastMoved));
 
     return getBytesMoved() - bytesLastMoved;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 1148d1b..14b25d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -112,6 +112,7 @@ public class NameNodeConnector implements Closeable {
   private final OutputStream out;
   private final List<Path> targetPaths;
   private final AtomicLong bytesMoved = new AtomicLong();
+  private final AtomicLong blocksMoved = new AtomicLong();
 
   private final int maxNotChangedIterations;
   private int notChangedIterations = 0;
@@ -168,6 +169,19 @@ public class NameNodeConnector implements Closeable {
     return bytesMoved;
   }
 
+  AtomicLong getBlocksMoved() {
+    return blocksMoved;
+  }
+
+  public void addBytesMoved(long numBytes) {
+    bytesMoved.addAndGet(numBytes);
+    blocksMoved.incrementAndGet();
+  }
+
+  public URI getNameNodeUri() {
+    return nameNodeUri;
+  }
+
   /** @return blocks with locations. */
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
       throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 079cb4a..c347fb2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -972,7 +972,7 @@ public class TestBalancer {
         for(NameNodeConnector nnc : connectors) {
           final Balancer b = new Balancer(nnc, p, conf);
           final Result r = b.runOneIteration();
-          r.print(iteration, System.out);
+          r.print(iteration, nnc, System.out);
 
           // clean all lists
           b.resetData(conf);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
index 062b22e..8145343 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
@@ -30,7 +30,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.commons.logging.Log;
+import org.slf4j.Logger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -50,7 +50,7 @@ import org.apache.hadoop.hdfs.server.balancer.BalancerParameters;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.log4j.Level;
+import org.slf4j.event.Level;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -58,10 +58,10 @@ import org.junit.Test;
  * Test balancer with multiple NameNodes
  */
 public class TestBalancerWithMultipleNameNodes {
-  static final Log LOG = Balancer.LOG;
+  static final Logger LOG = Balancer.LOG;
   {
-    GenericTestUtils.setLogLevel(LOG, Level.ALL);
-    DFSTestUtil.setNameNodeLogLevel(Level.ALL);
+    GenericTestUtils.setLogLevel(LOG, Level.TRACE);
+    DFSTestUtil.setNameNodeLogLevel(org.apache.log4j.Level.TRACE);
   }
 
   
@@ -309,7 +309,7 @@ public class TestBalancerWithMultipleNameNodes {
     try {
       Thread.sleep(ms);
     } catch(InterruptedException e) {
-      LOG.error(e);
+      LOG.error("{}", e);
     }
   }
   


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org