You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by he...@apache.org on 2021/09/22 10:27:26 UTC

[hadoop] branch branch-3.2.3 updated (0d0f344 -> 7fea1dc)

This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a change to branch branch-3.2.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    from 0d0f344  HDFS-15160. amend to fix javac error supressing unchecked warning
     new baa4190  HDFS-14997. BPServiceActor processes commands from NameNode asynchronously. Contributed by Xiaoqiao He.
     new cedb8fe  HDFS-14997. Addendum: BPServiceActor processes commands from NameNode asynchronously. Contributed by Xiaoqiao He.
     new 79187b5  HDFS-15075. Remove process command timing from BPServiceActor. Contributed by Xiaoqiao He.
     new c7213e4  HDFS-15651. Client could not obtain block when DN CommandProcessingThread exit. Contributed by Aiphago.
     new 56f7e52  HDFS-15113. Missing IBR when NameNode restart if open processCommand async feature. Contributed by Xiaoqiao He.
     new 7fea1dc  HDFS-15113. Addendum: Missing IBR when NameNode restart if open processCommand async feature. Contributed by Xiaoqiao He.

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   5 +
 .../hdfs/server/datanode/BPServiceActor.java       | 185 ++++++++++++++-----
 .../apache/hadoop/hdfs/server/datanode/DNConf.java |  14 ++
 .../server/datanode/DataNodeFaultInjector.java     |   5 +
 .../server/datanode/metrics/DataNodeMetrics.java   |  22 +++
 .../src/main/resources/hdfs-default.xml            |   9 +
 .../hdfs/server/datanode/TestBPOfferService.java   | 195 ++++++++++++++++++++-
 7 files changed, 387 insertions(+), 48 deletions(-)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/06: HDFS-14997. BPServiceActor processes commands from NameNode asynchronously. Contributed by Xiaoqiao He.

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch branch-3.2.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit baa4190eab165f95d53a64d57b0aa73b1dc5245d
Author: Inigo Goiri <in...@apache.org>
AuthorDate: Thu Dec 19 09:34:43 2019 -0800

    HDFS-14997. BPServiceActor processes commands from NameNode asynchronously. Contributed by Xiaoqiao He.
    
    (cherry picked from commit b86895485d95440de099831e0db38db037f16bdd)
    (cherry picked from commit d5501c125fa0fffe846c8be8718e05f55e288959)
---
 .../hdfs/server/datanode/BPServiceActor.java       | 144 +++++++++++++++------
 .../server/datanode/metrics/DataNodeMetrics.java   |  12 ++
 .../hdfs/server/datanode/TestBPOfferService.java   |  36 +++++-
 3 files changed, 155 insertions(+), 37 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 4f2b9ec..c45b9e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -32,7 +32,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -115,6 +117,7 @@ class BPServiceActor implements Runnable {
   private DatanodeRegistration bpRegistration;
   final LinkedList<BPServiceActorAction> bpThreadQueue 
       = new LinkedList<BPServiceActorAction>();
+  private final CommandProcessingThread commandProcessingThread;
 
   BPServiceActor(InetSocketAddress nnAddr, InetSocketAddress lifelineNnAddr,
       BPOfferService bpos) {
@@ -136,6 +139,9 @@ class BPServiceActor implements Runnable {
         dnConf.outliersReportIntervalMs);
     // get the value of maxDataLength.
     this.maxDataLength = dnConf.getMaxDataLength();
+
+    commandProcessingThread = new CommandProcessingThread(this);
+    commandProcessingThread.start();
   }
 
   public DatanodeRegistration getBpRegistration() {
@@ -676,8 +682,7 @@ class BPServiceActor implements Runnable {
             }
 
             long startProcessCommands = monotonicNow();
-            if (!processCommand(resp.getCommands()))
-              continue;
+            commandProcessingThread.enqueue(resp.getCommands());
             long endProcessCommands = monotonicNow();
             if (endProcessCommands - startProcessCommands > 2000) {
               LOG.info("Took " + (endProcessCommands - startProcessCommands)
@@ -702,11 +707,11 @@ class BPServiceActor implements Runnable {
           cmds = blockReport(fullBlockReportLeaseId);
           fullBlockReportLeaseId = 0;
         }
-        processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
+        commandProcessingThread.enqueue(cmds);
 
         if (!dn.areCacheReportsDisabledForTests()) {
           DatanodeCommand cmd = cacheReport();
-          processCommand(new DatanodeCommand[]{ cmd });
+          commandProcessingThread.enqueue(cmd);
         }
 
         if (sendHeartbeat) {
@@ -877,37 +882,6 @@ class BPServiceActor implements Runnable {
   }
 
   /**
-   * Process an array of datanode commands
-   * 
-   * @param cmds an array of datanode commands
-   * @return true if further processing may be required or false otherwise. 
-   */
-  boolean processCommand(DatanodeCommand[] cmds) {
-    if (cmds != null) {
-      for (DatanodeCommand cmd : cmds) {
-        try {
-          if (bpos.processCommandFromActor(cmd, this) == false) {
-            return false;
-          }
-        } catch (RemoteException re) {
-          String reClass = re.getClassName();
-          if (UnregisteredNodeException.class.getName().equals(reClass) ||
-              DisallowedDatanodeException.class.getName().equals(reClass) ||
-              IncorrectVersionException.class.getName().equals(reClass)) {
-            LOG.warn(this + " is shutting down", re);
-            shouldServiceRun = false;
-            return false;
-          }
-        } catch (IOException ioe) {
-          LOG.warn("Error processing datanode Command", ioe);
-        }
-      }
-    }
-    return true;
-  }
-
-
-  /**
    * Report a bad block from another DN in this cluster.
    */
   void reportRemoteBadBlock(DatanodeInfo dnInfo, ExtendedBlock block)
@@ -1281,4 +1255,102 @@ class BPServiceActor implements Runnable {
       return Time.monotonicNow();
     }
   }
-}
+
+  /**
+   * CommandProcessingThread that process commands asynchronously.
+   */
+  class CommandProcessingThread extends Thread {
+    private final BPServiceActor actor;
+    private final BlockingQueue<Runnable> queue;
+
+    CommandProcessingThread(BPServiceActor actor) {
+      super("Command processor");
+      this.actor = actor;
+      this.queue = new LinkedBlockingQueue<>();
+      setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+      try {
+        processQueue();
+      } catch (Throwable t) {
+        LOG.error("{} encountered fatal exception and exit.", getName(), t);
+      }
+    }
+
+    /**
+     * Process commands in queue one by one, and wait until queue not empty.
+     */
+    private void processQueue() {
+      while (shouldRun()) {
+        try {
+          Runnable action = queue.take();
+          action.run();
+          dn.getMetrics().incrActorCmdQueueLength(-1);
+          dn.getMetrics().incrNumProcessedCommands();
+        } catch (InterruptedException e) {
+          LOG.error("{} encountered interrupt and exit.", getName());
+          // ignore unless thread was specifically interrupted.
+          if (Thread.interrupted()) {
+            break;
+          }
+        }
+      }
+      dn.getMetrics().incrActorCmdQueueLength(-1 * queue.size());
+      queue.clear();
+    }
+
+    /**
+     * Process an array of datanode commands.
+     *
+     * @param cmds an array of datanode commands
+     * @return true if further processing may be required or false otherwise.
+     */
+    private boolean processCommand(DatanodeCommand[] cmds) {
+      if (cmds != null) {
+        for (DatanodeCommand cmd : cmds) {
+          try {
+            if (!bpos.processCommandFromActor(cmd, actor)) {
+              return false;
+            }
+          } catch (RemoteException re) {
+            String reClass = re.getClassName();
+            if (UnregisteredNodeException.class.getName().equals(reClass) ||
+                DisallowedDatanodeException.class.getName().equals(reClass) ||
+                IncorrectVersionException.class.getName().equals(reClass)) {
+              LOG.warn("{} is shutting down", this, re);
+              shouldServiceRun = false;
+              return false;
+            }
+          } catch (IOException ioe) {
+            LOG.warn("Error processing datanode Command", ioe);
+          }
+        }
+      }
+      return true;
+    }
+
+    void enqueue(DatanodeCommand cmd) throws InterruptedException {
+      if (cmd == null) {
+        return;
+      }
+      queue.put(() -> processCommand(new DatanodeCommand[]{cmd}));
+      dn.getMetrics().incrActorCmdQueueLength(1);
+    }
+
+    void enqueue(List<DatanodeCommand> cmds) throws InterruptedException {
+      if (cmds == null) {
+        return;
+      }
+      queue.put(() -> processCommand(
+          cmds.toArray(new DatanodeCommand[cmds.size()])));
+      dn.getMetrics().incrActorCmdQueueLength(1);
+    }
+
+    void enqueue(DatanodeCommand[] cmds) throws InterruptedException {
+      queue.put(() -> processCommand(cmds));
+      dn.getMetrics().incrActorCmdQueueLength(1);
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index 01731d6..888456b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -161,6 +161,10 @@ public class DataNodeMetrics {
   private MutableCounterLong ecReconstructionDecodingTimeMillis;
   @Metric("Milliseconds spent on write by erasure coding worker")
   private MutableCounterLong ecReconstructionWriteTimeMillis;
+  @Metric("Sum of all BPServiceActors command queue length")
+  private MutableCounterLong sumOfActorCommandQueueLength;
+  @Metric("Num of processed commands of all BPServiceActors")
+  private MutableCounterLong numProcessedCommands;
 
   final MetricsRegistry registry = new MetricsRegistry("datanode");
   final String name;
@@ -541,4 +545,12 @@ public class DataNodeMetrics {
             .value(), totalWriteTime.value(), totalReadTime.value(),
         blocksWritten.value(), blocksRead.value(), timeSinceLastReport);
   }
+
+  public void incrActorCmdQueueLength(int delta) {
+    sumOfActorCommandQueueLength.incr(delta);
+  }
+
+  public void incrNumProcessedCommands() {
+    numProcessedCommands.incr();
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index a568ba1..71162cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -18,7 +18,15 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
+
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertNotNull;
@@ -36,6 +44,7 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -1050,4 +1059,29 @@ public class TestBPOfferService {
       bpos.stop();
     }
   }
-}
+
+  @Test(timeout = 15000)
+  public void testCommandProcessingThread() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    try {
+      List<DataNode> datanodes = cluster.getDataNodes();
+      assertEquals(datanodes.size(), 1);
+      DataNode datanode = datanodes.get(0);
+
+      // Try to write file and trigger NN send back command to DataNode.
+      FileSystem fs = cluster.getFileSystem();
+      Path file = new Path("/test");
+      DFSTestUtil.createFile(fs, file, 10240L, (short)1, 0L);
+
+      MetricsRecordBuilder mrb = getMetrics(datanode.getMetrics().name());
+      assertTrue("Process command nums is not expected.",
+          getLongCounter("NumProcessedCommands", mrb) > 0);
+      assertEquals(0, getLongCounter("SumOfActorCommandQueueLength", mrb));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 04/06: HDFS-15651. Client could not obtain block when DN CommandProcessingThread exit. Contributed by Aiphago.

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch branch-3.2.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit c7213e4d2bfdac44e3e03adab1a89800fdfd4aee
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Wed Nov 4 13:53:46 2020 +0800

    HDFS-15651. Client could not obtain block when DN CommandProcessingThread exit. Contributed by Aiphago.
    
    Reviewed-by: He Xiaoqiao <he...@apache.org>
    Reviewed-by: Yiqun Lin <yq...@apache.org>
    (cherry picked from commit 3067a25fa12a012709f43aa35cc606db6fb137f9)
    (cherry picked from commit fc5fe67b3f73e98d931d475c02efb52fb3c7a2b4)
---
 .../hdfs/server/datanode/BPServiceActor.java       | 12 ++++++++++++
 .../hdfs/server/datanode/TestBPOfferService.java   | 22 ++++++++++++++++++++++
 2 files changed, 34 insertions(+)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index e4a30cb..e8c64d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -1271,6 +1271,10 @@ class BPServiceActor implements Runnable {
         processQueue();
       } catch (Throwable t) {
         LOG.error("{} encountered fatal exception and exit.", getName(), t);
+        runningState = RunningState.FAILED;
+      } finally {
+        LOG.warn("Ending command processor service for: " + this);
+        shouldServiceRun = false;
       }
     }
 
@@ -1286,6 +1290,7 @@ class BPServiceActor implements Runnable {
           dn.getMetrics().incrNumProcessedCommands();
         } catch (InterruptedException e) {
           LOG.error("{} encountered interrupt and exit.", getName());
+          Thread.currentThread().interrupt();
           // ignore unless thread was specifically interrupted.
           if (Thread.interrupted()) {
             break;
@@ -1357,4 +1362,11 @@ class BPServiceActor implements Runnable {
       dn.getMetrics().incrActorCmdQueueLength(1);
     }
   }
+
+  @VisibleForTesting
+  void stopCommandProcessingThread() {
+    if (commandProcessingThread != null) {
+      commandProcessingThread.interrupt();
+    }
+  }
 }
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 8cf7299..98af2fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -1088,4 +1088,26 @@ public class TestBPOfferService {
       }
     }
   }
+
+  @Test(timeout = 5000)
+  public void testCommandProcessingThreadExit() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).
+        numDataNodes(1).build();
+    try {
+      List<DataNode> datanodes = cluster.getDataNodes();
+      DataNode dataNode = datanodes.get(0);
+      List<BPOfferService> allBpOs = dataNode.getAllBpOs();
+      BPOfferService bpos = allBpOs.get(0);
+      waitForInitialization(bpos);
+      BPServiceActor actor = bpos.getBPServiceActors().get(0);
+      // Stop and wait util actor exit.
+      actor.stopCommandProcessingThread();
+      GenericTestUtils.waitFor(() -> !actor.isAlive(), 100, 3000);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 06/06: HDFS-15113. Addendum: Missing IBR when NameNode restart if open processCommand async feature. Contributed by Xiaoqiao He.

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch branch-3.2.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 7fea1dc006e19e4edd3b601915ad07990b209545
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Mon Mar 23 12:42:57 2020 -0700

    HDFS-15113. Addendum: Missing IBR when NameNode restart if open processCommand async feature. Contributed by Xiaoqiao He.
    
    Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
    (cherry picked from commit af64ce2f4a72705c5b68ddfe4d29f0d208fd38e7)
    (cherry picked from commit 4944dafa297e9aad93d39e861b5dc2835b7cbb5f)
---
 .../hdfs/server/datanode/TestBPOfferService.java   | 33 ++++++++++++++++++----
 1 file changed, 27 insertions(+), 6 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 79ad5d1..a045653 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -286,23 +286,30 @@ public class TestBPOfferService {
   public void testMissBlocksWhenReregister() throws Exception {
     BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
     bpos.start();
+    int totalTestBlocks = 4000;
+    Thread addNewBlockThread = null;
+    final AtomicInteger count = new AtomicInteger(0);
+
     try {
       waitForBothActors(bpos);
       waitForInitialization(bpos);
-
       DataNodeFaultInjector.set(new DataNodeFaultInjector() {
         public void blockUtilSendFullBlockReport() {
           try {
-            Thread.sleep(200);
-          } catch (InterruptedException e) {
+            GenericTestUtils.waitFor(() -> {
+              if(count.get() > 2000) {
+                return true;
+              }
+              return false;
+            }, 100, 1000);
+          } catch (Exception e) {
             e.printStackTrace();
           }
         }
       });
 
       countBlockReportItems(FAKE_BLOCK, mockNN1);
-      int totalTestBlocks = 4000;
-      Thread addNewBlockThread = new Thread(() -> {
+      addNewBlockThread = new Thread(() -> {
         for (int i = 0; i < totalTestBlocks; i++) {
           SimulatedFSDataset fsDataset = (SimulatedFSDataset) mockFSDataset;
           SimulatedStorage simulatedStorage = fsDataset.getStorages().get(0);
@@ -312,6 +319,7 @@ public class TestBPOfferService {
             fsDataset.createRbw(StorageType.DEFAULT, storageId, b, false);
             bpos.notifyNamenodeReceivingBlock(b, storageId);
             fsDataset.finalizeBlock(b, false);
+            count.addAndGet(1);
             Thread.sleep(1);
           } catch (Exception e) {
             e.printStackTrace();
@@ -321,7 +329,13 @@ public class TestBPOfferService {
       addNewBlockThread.start();
 
       // Make sure that generate blocks for DataNode and IBR not empty now.
-      Thread.sleep(200);
+      GenericTestUtils.waitFor(() -> {
+        if(count.get() > 0) {
+          return true;
+        }
+        return false;
+      }, 100, 1000);
+
       // Trigger re-register using DataNode Command.
       datanodeCommands[0] = new DatanodeCommand[]{RegisterCommand.REGISTER};
       bpos.triggerHeartbeatForTests();
@@ -340,6 +354,7 @@ public class TestBPOfferService {
       assertTrue(fullBlockReportCount == totalTestBlocks ||
           incrBlockReportCount == totalTestBlocks);
     } finally {
+      addNewBlockThread.join();
       bpos.stop();
       bpos.join();
 
@@ -698,12 +713,17 @@ public class TestBPOfferService {
     }
   }
 
+  /**
+   * Record blocks counts of block report and total adding blocks count of IBR
+   * which assume no deleting blocks here.
+   */
   private void countBlockReportItems(final ExtendedBlock fakeBlock,
       final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
     final String fakeBlockPoolId = fakeBlock.getBlockPoolId();
     final ArgumentCaptor<StorageBlockReport[]> captor =
         ArgumentCaptor.forClass(StorageBlockReport[].class);
 
+    // Record blocks count about the last time block report.
     Mockito.doAnswer((Answer<Object>) invocation -> {
       Object[] arguments = invocation.getArguments();
       StorageBlockReport[] list = (StorageBlockReport[])arguments[2];
@@ -716,6 +736,7 @@ public class TestBPOfferService {
         Mockito.any()
     );
 
+    // Record total adding blocks count and assume no deleting blocks here.
     Mockito.doAnswer((Answer<Object>) invocation -> {
       Object[] arguments = invocation.getArguments();
       StorageReceivedDeletedBlocks[] list =

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 05/06: HDFS-15113. Missing IBR when NameNode restart if open processCommand async feature. Contributed by Xiaoqiao He.

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch branch-3.2.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 56f7e52be55d79cbba13d54faa9bc7727c79915d
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Fri Mar 13 18:54:36 2020 -0700

    HDFS-15113. Missing IBR when NameNode restart if open processCommand async feature. Contributed by Xiaoqiao He.
    
    Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
    Reviewed-by: Brahma Reddy Battula <br...@apache.org>
    Reviewed-by: Inigo Goiri <in...@apache.org>
    (cherry picked from commit e9955bb8ff44396eb478f709307d647ca884de99)
    (cherry picked from commit 50b0f0dc42d5cdcfbd381527ff774f20852888ba)
---
 .../hdfs/server/datanode/BPServiceActor.java       |   9 +-
 .../server/datanode/DataNodeFaultInjector.java     |   5 +
 .../hdfs/server/datanode/TestBPOfferService.java   | 112 +++++++++++++++++++++
 3 files changed, 123 insertions(+), 3 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index e8c64d2..ea91402 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -891,14 +891,17 @@ class BPServiceActor implements Runnable {
       // re-retrieve namespace info to make sure that, if the NN
       // was restarted, we still match its version (HDFS-2120)
       NamespaceInfo nsInfo = retrieveNamespaceInfo();
-      // and re-register
-      register(nsInfo);
-      scheduler.scheduleHeartbeat();
       // HDFS-9917,Standby NN IBR can be very huge if standby namenode is down
       // for sometime.
       if (state == HAServiceState.STANDBY || state == HAServiceState.OBSERVER) {
         ibrManager.clearIBRs();
       }
+      // HDFS-15113, register and trigger FBR after clean IBR to avoid missing
+      // some blocks report to Standby util next FBR.
+      // and re-register
+      register(nsInfo);
+      scheduler.scheduleHeartbeat();
+      DataNodeFaultInjector.get().blockUtilSendFullBlockReport();
     }
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index 821717b..4d3d220 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -134,4 +134,9 @@ public class DataNodeFaultInjector {
    * into an erasure coding reconstruction.
    */
   public void badDecoding(ByteBuffer[] outputs) {}
+
+  /**
+   * Used as a hook to inject intercept when re-register.
+   */
+  public void blockUtilSendFullBlockReport() {}
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 98af2fa..79ad5d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset.SimulatedStorage;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -107,6 +109,8 @@ public class TestBPOfferService {
   private long firstLeaseId = 0;
   private long secondLeaseId = 0;
   private long nextFullBlockReportLeaseId = 1L;
+  private int fullBlockReportCount = 0;
+  private int incrBlockReportCount = 0;
 
   static {
     GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
@@ -229,6 +233,14 @@ public class TestBPOfferService {
     }
   }
 
+  private void setBlockReportCount(int count) {
+    fullBlockReportCount = count;
+  }
+
+  private void setIncreaseBlockReportCount(int count) {
+    incrBlockReportCount += count;
+  }
+
   /**
    * Test that the BPOS can register to talk to two different NNs,
    * sends block reports to both, etc.
@@ -267,6 +279,76 @@ public class TestBPOfferService {
     }
   }
 
+  /**
+   * HDFS-15113: Test and verify missing block when re-register.
+   */
+  @Test
+  public void testMissBlocksWhenReregister() throws Exception {
+    BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+    bpos.start();
+    try {
+      waitForBothActors(bpos);
+      waitForInitialization(bpos);
+
+      DataNodeFaultInjector.set(new DataNodeFaultInjector() {
+        public void blockUtilSendFullBlockReport() {
+          try {
+            Thread.sleep(200);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+      });
+
+      countBlockReportItems(FAKE_BLOCK, mockNN1);
+      int totalTestBlocks = 4000;
+      Thread addNewBlockThread = new Thread(() -> {
+        for (int i = 0; i < totalTestBlocks; i++) {
+          SimulatedFSDataset fsDataset = (SimulatedFSDataset) mockFSDataset;
+          SimulatedStorage simulatedStorage = fsDataset.getStorages().get(0);
+          String storageId = simulatedStorage.getStorageUuid();
+          ExtendedBlock b = new ExtendedBlock(bpos.getBlockPoolId(), i, 0, i);
+          try {
+            fsDataset.createRbw(StorageType.DEFAULT, storageId, b, false);
+            bpos.notifyNamenodeReceivingBlock(b, storageId);
+            fsDataset.finalizeBlock(b, false);
+            Thread.sleep(1);
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+        }
+      });
+      addNewBlockThread.start();
+
+      // Make sure that generate blocks for DataNode and IBR not empty now.
+      Thread.sleep(200);
+      // Trigger re-register using DataNode Command.
+      datanodeCommands[0] = new DatanodeCommand[]{RegisterCommand.REGISTER};
+      bpos.triggerHeartbeatForTests();
+
+      try {
+        GenericTestUtils.waitFor(() -> {
+          if(fullBlockReportCount == totalTestBlocks ||
+              incrBlockReportCount == totalTestBlocks) {
+            return true;
+          }
+          return false;
+        }, 1000, 15000);
+      } catch (Exception e) {}
+
+      // Verify FBR/IBR count is equal to generate number.
+      assertTrue(fullBlockReportCount == totalTestBlocks ||
+          incrBlockReportCount == totalTestBlocks);
+    } finally {
+      bpos.stop();
+      bpos.join();
+
+      DataNodeFaultInjector.set(new DataNodeFaultInjector() {
+        public void blockUtilSendFullBlockReport() {}
+      });
+    }
+  }
+
   @Test
   public void testLocklessBlockPoolId() throws Exception {
     BPOfferService bpos = Mockito.spy(setupBPOSForNNs(mockNN1));
@@ -615,6 +697,36 @@ public class TestBPOfferService {
       secondCallTime = Time.now();
     }
   }
+
+  private void countBlockReportItems(final ExtendedBlock fakeBlock,
+      final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
+    final String fakeBlockPoolId = fakeBlock.getBlockPoolId();
+    final ArgumentCaptor<StorageBlockReport[]> captor =
+        ArgumentCaptor.forClass(StorageBlockReport[].class);
+
+    Mockito.doAnswer((Answer<Object>) invocation -> {
+      Object[] arguments = invocation.getArguments();
+      StorageBlockReport[] list = (StorageBlockReport[])arguments[2];
+      setBlockReportCount(list[0].getBlocks().getNumberOfBlocks());
+      return null;
+    }).when(mockNN).blockReport(
+        Mockito.any(),
+        Mockito.eq(fakeBlockPoolId),
+        captor.capture(),
+        Mockito.any()
+    );
+
+    Mockito.doAnswer((Answer<Object>) invocation -> {
+      Object[] arguments = invocation.getArguments();
+      StorageReceivedDeletedBlocks[] list =
+          (StorageReceivedDeletedBlocks[])arguments[2];
+      setIncreaseBlockReportCount(list[0].getBlocks().length);
+      return null;
+    }).when(mockNN).blockReceivedAndDeleted(
+        Mockito.any(),
+        Mockito.eq(fakeBlockPoolId),
+        Mockito.any());
+  }
   
   private class BPOfferServiceSynchronousCallAnswer implements Answer<Void> {
     private final int nnIdx;

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/06: HDFS-14997. Addendum: BPServiceActor processes commands from NameNode asynchronously. Contributed by Xiaoqiao He.

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch branch-3.2.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit cedb8fec9d68a36bffb1915bdc48c6b6d09886ab
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Fri Dec 27 09:12:44 2019 +0530

    HDFS-14997. Addendum: BPServiceActor processes commands from NameNode asynchronously. Contributed by Xiaoqiao He.
    
    (cherry picked from commit 80f91d14ab0fb385252d4eeb19141bd059303d59)
    (cherry picked from commit f77965b29eb3f33713c39f1cb341fdc7c91b0220)
---
 .../java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java    | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index c45b9e7..e36c401 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -578,6 +578,9 @@ class BPServiceActor implements Runnable {
     if (bpThread != null) {
       bpThread.interrupt();
     }
+    if (commandProcessingThread != null) {
+      commandProcessingThread.interrupt();
+    }
   }
   
   //This must be called only by blockPoolManager

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 03/06: HDFS-15075. Remove process command timing from BPServiceActor. Contributed by Xiaoqiao He.

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch branch-3.2.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 79187b5a5ef96312e10808da6589aa1124720cf1
Author: Inigo Goiri <in...@apache.org>
AuthorDate: Wed Mar 25 11:30:54 2020 -0700

    HDFS-15075. Remove process command timing from BPServiceActor. Contributed by Xiaoqiao He.
    
    (cherry picked from commit cdcb77a2c5ca99502d2ac2fbf803f22463eb1343)
    (cherry picked from commit 5da46e197b63e4c847732a64e9694c9e485d5f2d)
---
 .../main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java |  5 +++++
 .../hadoop/hdfs/server/datanode/BPServiceActor.java     | 17 +++++++++--------
 .../org/apache/hadoop/hdfs/server/datanode/DNConf.java  | 14 ++++++++++++++
 .../hdfs/server/datanode/metrics/DataNodeMetrics.java   | 10 ++++++++++
 .../hadoop-hdfs/src/main/resources/hdfs-default.xml     |  9 +++++++++
 .../hadoop/hdfs/server/datanode/TestBPOfferService.java |  4 ++++
 6 files changed, 51 insertions(+), 8 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index b02b5f4..1813787 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -398,6 +398,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final boolean DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT =
       true;
 
+  public static final String DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_KEY =
+      "dfs.datanode.processcommands.threshold";
+  public static final long DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_DEFAULT =
+      TimeUnit.SECONDS.toMillis(2);
+
   public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
   public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index e36c401..e4a30cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -683,15 +683,7 @@ class BPServiceActor implements Runnable {
             if (state == HAServiceState.ACTIVE) {
               handleRollingUpgradeStatus(resp);
             }
-
-            long startProcessCommands = monotonicNow();
             commandProcessingThread.enqueue(resp.getCommands());
-            long endProcessCommands = monotonicNow();
-            if (endProcessCommands - startProcessCommands > 2000) {
-              LOG.info("Took " + (endProcessCommands - startProcessCommands)
-                  + "ms to process " + resp.getCommands().length
-                  + " commands from NN");
-            }
           }
         }
         if (!dn.areIBRDisabledForTests() &&
@@ -1312,6 +1304,7 @@ class BPServiceActor implements Runnable {
      */
     private boolean processCommand(DatanodeCommand[] cmds) {
       if (cmds != null) {
+        long startProcessCommands = monotonicNow();
         for (DatanodeCommand cmd : cmds) {
           try {
             if (!bpos.processCommandFromActor(cmd, actor)) {
@@ -1330,6 +1323,14 @@ class BPServiceActor implements Runnable {
             LOG.warn("Error processing datanode Command", ioe);
           }
         }
+        long processCommandsMs = monotonicNow() - startProcessCommands;
+        if (cmds.length > 0) {
+          dn.getMetrics().addNumProcessedCommands(processCommandsMs);
+        }
+        if (processCommandsMs > dnConf.getProcessCommandsThresholdMs()) {
+          LOG.info("Took {} ms to process {} commands from NN",
+              processCommandsMs, cmds.length);
+        }
       }
       return true;
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 5252e68..ea57012 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -35,6 +35,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_DIRS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
@@ -122,6 +124,8 @@ public class DNConf {
   final long xceiverStopTimeout;
   final long restartReplicaExpiry;
 
+  private final long processCommandsThresholdMs;
+
   final long maxLockedMemory;
   private final String[] pmemDirs;
 
@@ -298,6 +302,12 @@ public class DNConf {
     this.pmemCacheRecoveryEnabled = getConf().getBoolean(
         DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY,
         DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT);
+
+    this.processCommandsThresholdMs = getConf().getTimeDuration(
+        DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_KEY,
+        DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_DEFAULT,
+        TimeUnit.MILLISECONDS
+    );
   }
 
   // We get minimumNameNodeVersion via a method so it can be mocked out in tests.
@@ -460,4 +470,8 @@ public class DNConf {
   public boolean getPmemCacheRecoveryEnabled() {
     return pmemCacheRecoveryEnabled;
   }
+
+  public long getProcessCommandsThresholdMs() {
+    return processCommandsThresholdMs;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index 888456b..2565ab0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -165,6 +165,8 @@ public class DataNodeMetrics {
   private MutableCounterLong sumOfActorCommandQueueLength;
   @Metric("Num of processed commands of all BPServiceActors")
   private MutableCounterLong numProcessedCommands;
+  @Metric("Rate of processed commands of all BPServiceActors")
+  private MutableRate processedCommandsOp;
 
   final MetricsRegistry registry = new MetricsRegistry("datanode");
   final String name;
@@ -553,4 +555,12 @@ public class DataNodeMetrics {
   public void incrNumProcessedCommands() {
     numProcessedCommands.incr();
   }
+
+  /**
+   * Add processedCommandsOp metrics.
+   * @param latency milliseconds of process commands
+   */
+  public void addNumProcessedCommands(long latency) {
+    processedCommandsOp.add(latency);
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index c36b333..5dd6e83 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2934,6 +2934,15 @@
 </property>
 
   <property>
+    <name>dfs.datanode.processcommands.threshold</name>
+    <value>2s</value>
+    <description>The threshold in milliseconds at which we will log a slow
+      command processing in BPServiceActor. By default, this parameter is set
+      to 2 seconds.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.client.refresh.read-block-locations.ms</name>
     <value>0</value>
     <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 71162cb..8cf7299 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
@@ -1078,6 +1079,9 @@ public class TestBPOfferService {
       assertTrue("Process command nums is not expected.",
           getLongCounter("NumProcessedCommands", mrb) > 0);
       assertEquals(0, getLongCounter("SumOfActorCommandQueueLength", mrb));
+      // Check new metric result about processedCommandsOp.
+      // One command send back to DataNode here is #FinalizeCommand.
+      assertCounter("ProcessedCommandsOpNumOps", 1L, mrb);
     } finally {
       if (cluster != null) {
         cluster.shutdown();

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org