You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ta...@apache.org on 2022/10/24 06:17:37 UTC

[hadoop] branch branch-3.3 updated: HDFS-16016. BPServiceActor to provide new thread to handle IBR (#2998)

This is an automated email from the ASF dual-hosted git repository.

tasanuma pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 36a0e818ec1 HDFS-16016. BPServiceActor to provide new thread to handle IBR (#2998)
36a0e818ec1 is described below

commit 36a0e818ec1cd448286ffe0eacddd00e6363cac9
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Tue Jun 15 02:53:50 2021 +0530

    HDFS-16016. BPServiceActor to provide new thread to handle IBR (#2998)
    
    Contributed by Viraj Jasani
    
    (cherry picked from commit c1bf3cb0daf0b6212aebb449c97b772af2133d98)
---
 .../hdfs/server/datanode/BPServiceActor.java       | 62 +++++++++++++++++++---
 .../org/apache/hadoop/hdfs/TestDatanodeReport.java | 17 ++++--
 .../datanode/TestIncrementalBlockReports.java      | 24 ++++++---
 3 files changed, 86 insertions(+), 17 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index d5e3cfd65c2..06c2c681c34 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -36,6 +36,8 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -69,6 +71,7 @@ import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.util.Preconditions;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.VersionInfo;
@@ -98,6 +101,8 @@ class BPServiceActor implements Runnable {
   
   volatile long lastCacheReport = 0;
   private final Scheduler scheduler;
+  private final Object sendIBRLock;
+  private final ExecutorService ibrExecutorService;
 
   Thread bpThread;
   DatanodeProtocolClientSideTranslatorPB bpNamenode;
@@ -153,6 +158,10 @@ class BPServiceActor implements Runnable {
     }
     commandProcessingThread = new CommandProcessingThread(this);
     commandProcessingThread.start();
+    sendIBRLock = new Object();
+    ibrExecutorService = Executors.newSingleThreadExecutor(
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("ibr-executor-%d").build());
   }
 
   public DatanodeRegistration getBpRegistration() {
@@ -372,8 +381,10 @@ class BPServiceActor implements Runnable {
     // we have a chance that we will miss the delHint information
     // or we will report an RBW replica after the BlockReport already reports
     // a FINALIZED one.
-    ibrManager.sendIBRs(bpNamenode, bpRegistration,
-        bpos.getBlockPoolId(), getRpcMetricSuffix());
+    synchronized (sendIBRLock) {
+      ibrManager.sendIBRs(bpNamenode, bpRegistration,
+          bpos.getBlockPoolId(), getRpcMetricSuffix());
+    }
 
     long brCreateStartTime = monotonicNow();
     Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
@@ -604,6 +615,9 @@ class BPServiceActor implements Runnable {
     if (commandProcessingThread != null) {
       commandProcessingThread.interrupt();
     }
+    if (ibrExecutorService != null && !ibrExecutorService.isShutdown()) {
+      ibrExecutorService.shutdownNow();
+    }
   }
   
   //This must be called only by blockPoolManager
@@ -618,13 +632,18 @@ class BPServiceActor implements Runnable {
     } catch (InterruptedException ie) { }
   }
   
-  //Cleanup method to be called by current thread before exiting.
+  // Cleanup method to be called by current thread before exiting.
+  // Any Thread / ExecutorService started by BPServiceActor can be shutdown
+  // here.
   private synchronized void cleanUp() {
     
     shouldServiceRun = false;
     IOUtils.cleanupWithLogger(null, bpNamenode);
     IOUtils.cleanupWithLogger(null, lifelineSender);
     bpos.shutdownActor(this);
+    if (!ibrExecutorService.isShutdown()) {
+      ibrExecutorService.shutdownNow();
+    }
   }
 
   private void handleRollingUpgradeStatus(HeartbeatResponse resp) throws IOException {
@@ -710,11 +729,6 @@ class BPServiceActor implements Runnable {
             commandProcessingThread.enqueue(resp.getCommands());
           }
         }
-        if (!dn.areIBRDisabledForTests() &&
-            (ibrManager.sendImmediately()|| sendHeartbeat)) {
-          ibrManager.sendIBRs(bpNamenode, bpRegistration,
-              bpos.getBlockPoolId(), getRpcMetricSuffix());
-        }
 
         List<DatanodeCommand> cmds = null;
         boolean forceFullBr =
@@ -878,6 +892,10 @@ class BPServiceActor implements Runnable {
         initialRegistrationComplete.countDown();
       }
 
+      // IBR tasks to be handled separately from offerService() in order to
+      // improve performance of offerService(), which can now focus only on
+      // FBR and heartbeat.
+      ibrExecutorService.submit(new IBRTaskHandler());
       while (shouldRun()) {
         try {
           offerService();
@@ -1108,6 +1126,34 @@ class BPServiceActor implements Runnable {
     }
   }
 
+  class IBRTaskHandler implements Runnable {
+
+    @Override
+    public void run() {
+      LOG.info("Starting IBR Task Handler.");
+      while (shouldRun()) {
+        try {
+          final long startTime = scheduler.monotonicNow();
+          final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
+          if (!dn.areIBRDisabledForTests() &&
+              (ibrManager.sendImmediately() || sendHeartbeat)) {
+            synchronized (sendIBRLock) {
+              ibrManager.sendIBRs(bpNamenode, bpRegistration,
+                  bpos.getBlockPoolId(), getRpcMetricSuffix());
+            }
+          }
+          // There is no work to do; sleep until heartbeat timer elapses,
+          // or work arrives, and then iterate again.
+          ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
+        } catch (Throwable t) {
+          LOG.error("Exception in IBRTaskHandler.", t);
+          sleepAndLogInterrupts(5000, "offering IBR service");
+        }
+      }
+    }
+
+  }
+
   /**
    * Utility class that wraps the timestamp computations for scheduling
    * heartbeats and block reports.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java
index 69dbf6438af..de738eef177 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java
@@ -172,8 +172,19 @@ public class TestDatanodeReport {
         // all bad datanodes
       }
       cluster.triggerHeartbeats(); // IBR delete ack
-      lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0);
-      assertEquals(0, lb.getLocations().length);
+      int retries = 0;
+      while (true) {
+        lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0);
+        if (0 != lb.getLocations().length) {
+          retries++;
+          if (retries > 7) {
+            Assert.fail("getLocatedBlocks failed after 7 retries");
+          }
+          Thread.sleep(2000);
+        } else {
+          break;
+        }
+      }
     } finally {
       cluster.shutdown();
     }
@@ -223,4 +234,4 @@ public class TestDatanodeReport {
     throw new IllegalStateException("Datnode " + id + " not in datanode list: "
         + datanodes);
   }
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java
index 4221ecaf2a0..e848cbfb37f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.times;
 
 import java.io.IOException;
 
+import org.mockito.exceptions.base.MockitoAssertionError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -156,7 +157,7 @@ public class TestIncrementalBlockReports {
 
       // Sleep for a very short time since IBR is generated
       // asynchronously.
-      Thread.sleep(2000);
+      Thread.sleep(1000);
 
       // Ensure that no block report was generated immediately.
       // Deleted blocks are reported when the IBR timer elapses.
@@ -167,13 +168,24 @@ public class TestIncrementalBlockReports {
 
       // Trigger a heartbeat, this also triggers an IBR.
       DataNodeTestUtils.triggerHeartbeat(singletonDn);
-      Thread.sleep(2000);
 
       // Ensure that the deleted block is reported.
-      Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted(
-          any(DatanodeRegistration.class),
-          anyString(),
-          any(StorageReceivedDeletedBlocks[].class));
+      int retries = 0;
+      while (true) {
+        try {
+          Mockito.verify(nnSpy, atLeastOnce()).blockReceivedAndDeleted(
+              any(DatanodeRegistration.class),
+              anyString(),
+              any(StorageReceivedDeletedBlocks[].class));
+          break;
+        } catch (MockitoAssertionError e) {
+          if (retries > 7) {
+            throw e;
+          }
+          retries++;
+          Thread.sleep(2000);
+        }
+      }
 
     } finally {
       cluster.shutdown();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org