You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/08/17 00:54:23 UTC

[hadoop] branch branch-2 updated: HDFS-14725. Backport HDFS-12914 to branch-2 (Block report leases cause missing blocks until next report). Contributed by He Xiaoqiao.

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new b020c5c  HDFS-14725. Backport HDFS-12914 to branch-2 (Block report leases cause missing blocks until next report). Contributed by He Xiaoqiao.
b020c5c is described below

commit b020c5ccfd38e295f04e4c26a556c9e523f175e6
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Fri Aug 16 17:52:03 2019 -0700

    HDFS-14725. Backport HDFS-12914 to branch-2 (Block report leases cause missing blocks until next report). Contributed by He Xiaoqiao.
    
    Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
---
 .../hdfs/server/blockmanagement/BlockManager.java  |  26 +++-
 .../hdfs/server/namenode/NameNodeRpcServer.java    |  39 +++--
 .../blockmanagement/TestBlockReportLease.java      | 170 +++++++++++++++++++++
 3 files changed, 212 insertions(+), 23 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index fad811b..8dd0e1d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -2133,6 +2133,21 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /**
+   * Check block report lease.
+   * @return true if lease exist and not expire
+   */
+  public boolean checkBlockReportLease(BlockReportContext context,
+      final DatanodeID nodeID) throws UnregisteredNodeException {
+    if (context == null) {
+      return true;
+    }
+    DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
+    final long startTime = Time.monotonicNow();
+    return blockReportLeaseManager.checkLease(node, startTime,
+        context.getLeaseId());
+  }
+
+  /**
    * The given storage is reporting all its blocks.
    * Update the (storage-->block list) and (block-->storage list) maps.
    *
@@ -2175,12 +2190,6 @@ public class BlockManager implements BlockStatsMXBean {
         blockReportLeaseManager.removeLease(node);
         return !node.hasStaleStorages();
       }
-      if (context != null) {
-        if (!blockReportLeaseManager.checkLease(node, startTime,
-              context.getLeaseId())) {
-          return false;
-        }
-      }
 
       if (storageInfo.getBlockReportCount() == 0) {
         // The first block report can be processed a lot more efficiently than
@@ -2288,8 +2297,9 @@ public class BlockManager implements BlockStatsMXBean {
           (startSize - endSize) + " blocks were removed.");
     }
   }
-  
-  private Collection<Block> processReport(
+
+  @VisibleForTesting
+  Collection<Block> processReport(
       final DatanodeStorageInfo storageInfo,
       final BlockListAsLongs report,
       BlockReportContext context) throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 0e71ae0..0ef1343 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -155,6 +155,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
@@ -1465,22 +1466,30 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     }
     final BlockManager bm = namesystem.getBlockManager(); 
     boolean noStaleStorages = false;
-    for (int r = 0; r < reports.length; r++) {
-      final BlockListAsLongs blocks = reports[r].getBlocks();
-      //
-      // BlockManager.processReport accumulates information of prior calls
-      // for the same node and storage, so the value returned by the last
-      // call of this loop is the final updated value for noStaleStorage.
-      //
-      final int index = r;
-      noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
-        @Override
-        public Boolean call() throws IOException {
-          return bm.processReport(nodeReg, reports[index].getStorage(),
-              blocks, context);
+    try {
+      if (bm.checkBlockReportLease(context, nodeReg)) {
+        for (int r = 0; r < reports.length; r++) {
+          final BlockListAsLongs blocks = reports[r].getBlocks();
+          //
+          // BlockManager.processReport accumulates information of prior calls
+          // for the same node and storage, so the value returned by the last
+          // call of this loop is the final updated value for noStaleStorage.
+          //
+          final int index = r;
+          noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
+            @Override
+            public Boolean call() throws IOException {
+              return bm.processReport(nodeReg, reports[index].getStorage(),
+                  blocks, context);
+            }
+          });
+          metrics.incrStorageBlockReportOps();
         }
-      });
-      metrics.incrStorageBlockReportOps();
+      }
+    } catch (UnregisteredNodeException une) {
+      LOG.debug("Datanode {} is attempting to report but not register yet.",
+          nodeReg);
+      return RegisterCommand.REGISTER;
     }
     bm.removeBRLeaseIfNeeded(nodeReg, context);
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java
new file mode 100644
index 0000000..431f608
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
+import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+/**
+ * Tests that BlockReportLease in BlockManager.
+ */
+public class TestBlockReportLease {
+  private static final Log LOG = LogFactory.getLog(TestBlockReportLease.class);
+  /**
+   * Test check lease about one BlockReport with many StorageBlockReport.
+   * Before HDFS-12914, when batch storage report to NameNode, it will check
+   * less for one storage by one, So it could part storage report can
+   * be process normally, however, the rest storage report can not be process
+   * since check lease failed.
+   * After HDFS-12914, NameNode check lease once for every blockreport request,
+   * So this issue will not exist anymore.
+   */
+  @Test
+  public void testCheckBlockReportLease() throws Exception {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    Random rand = new Random();
+
+    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).build()) {
+      cluster.waitActive();
+
+      FSNamesystem fsn = cluster.getNamesystem();
+      BlockManager blockManager = fsn.getBlockManager();
+      BlockManager spyBlockManager = spy(blockManager);
+      fsn.setBlockManagerForTesting(spyBlockManager);
+      final String poolId = cluster.getNamesystem().getBlockPoolId();
+
+      final NamenodeProtocols rpcServer = cluster.getNameNodeRpc();
+
+      // Test based on one DataNode report to Namenode
+      DataNode dn = cluster.getDataNodes().get(0);
+      DatanodeDescriptor datanodeDescriptor = spyBlockManager
+          .getDatanodeManager().getDatanode(dn.getDatanodeId());
+
+      final DatanodeRegistration dnRegistration
+          = dn.getDNRegistrationForBP(poolId);
+      final StorageReport[] storages
+          = dn.getFSDataset().getStorageReports(poolId);
+
+      // Send heartbeat and request full block report lease
+      HeartbeatResponse hbResponse = rpcServer.sendHeartbeat(
+          dnRegistration, storages, 0, 0, 0, 0, 0, null, true,
+          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
+
+      DelayAnswer delayer = new DelayAnswer(LOG);
+      doAnswer(delayer).when(spyBlockManager).processReport(
+          any(DatanodeStorageInfo.class),
+          any(BlockListAsLongs.class),
+          any(BlockReportContext.class));
+
+      ExecutorService pool = Executors.newFixedThreadPool(1);
+
+      // Trigger sendBlockReport
+      final BlockReportContext brContext = new BlockReportContext(1, 0,
+          rand.nextLong(), hbResponse.getFullBlockReportLeaseId());
+      Future<DatanodeCommand> sendBRfuturea = pool.submit(
+          new Callable<DatanodeCommand>() {
+            @Override
+            public DatanodeCommand call() throws Exception {
+              // Build every storage with 100 blocks for sending report
+              DatanodeStorage[] datanodeStorages
+                  = new DatanodeStorage[storages.length];
+              for (int i = 0; i < storages.length; i++) {
+                datanodeStorages[i] = storages[i].getStorage();
+              }
+              StorageBlockReport[] reports = createReports(datanodeStorages,
+                  100);
+
+              // Send blockReport
+              return rpcServer.blockReport(dnRegistration, poolId, reports,
+                  brContext);
+            }
+          }
+      );
+
+      // Wait until BlockManager calls processReport
+      delayer.waitForCall();
+
+      // Remove full block report lease about dn
+      spyBlockManager.getBlockReportLeaseManager()
+          .removeLease(datanodeDescriptor);
+
+      // Allow blockreport to proceed
+      delayer.proceed();
+
+      // Get result, it will not null if process successfully
+      DatanodeCommand datanodeCommand = sendBRfuturea.get();
+      assertTrue(datanodeCommand instanceof FinalizeCommand);
+      assertEquals(poolId, ((FinalizeCommand)datanodeCommand)
+          .getBlockPoolId());
+    }
+  }
+
+  private StorageBlockReport[] createReports(DatanodeStorage[] dnStorages,
+      int numBlocks) {
+    int longsPerBlock = 3;
+    int blockListSize = 2 + numBlocks * longsPerBlock;
+    int numStorages = dnStorages.length;
+    StorageBlockReport[] storageBlockReports
+        = new StorageBlockReport[numStorages];
+    for (int i = 0; i < numStorages; i++) {
+      List<Long> longs = new ArrayList<Long>(blockListSize);
+      longs.add(Long.valueOf(numBlocks));
+      longs.add(0L);
+      for (int j = 0; j < blockListSize; ++j) {
+        longs.add(Long.valueOf(j));
+      }
+      BlockListAsLongs blockList = BlockListAsLongs.decodeLongs(longs);
+      storageBlockReports[i] = new StorageBlockReport(dnStorages[i], blockList);
+    }
+    return storageBlockReports;
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org