You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/08/22 00:20:12 UTC
[hadoop] branch branch-2.8 updated: HDFS-14725. Backport HDFS-12914
to branch-2 (Block report leases cause missing blocks until next report).
Contributed by He Xiaoqiao.
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 7ead267 HDFS-14725. Backport HDFS-12914 to branch-2 (Block report leases cause missing blocks until next report). Contributed by He Xiaoqiao.
7ead267 is described below
commit 7ead26754985cd514c117b34209e2eebccb3ddf6
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Wed Aug 21 17:19:03 2019 -0700
HDFS-14725. Backport HDFS-12914 to branch-2 (Block report leases cause missing blocks until next report). Contributed by He Xiaoqiao.
Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
---
.../hdfs/server/blockmanagement/BlockManager.java | 26 +++-
.../hdfs/server/namenode/NameNodeRpcServer.java | 39 +++--
.../blockmanagement/TestBlockReportLease.java | 167 +++++++++++++++++++++
3 files changed, 209 insertions(+), 23 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 4012f57..d11c24f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1945,6 +1945,21 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
+ * Check block report lease.
+ * @return true if lease exist and not expire
+ */
+ public boolean checkBlockReportLease(BlockReportContext context,
+ final DatanodeID nodeID) throws UnregisteredNodeException {
+ if (context == null) {
+ return true;
+ }
+ DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
+ final long startTime = Time.monotonicNow();
+ return blockReportLeaseManager.checkLease(node, startTime,
+ context.getLeaseId());
+ }
+
+ /**
* The given storage is reporting all its blocks.
* Update the (storage-->block list) and (block-->storage list) maps.
*
@@ -1987,12 +2002,6 @@ public class BlockManager implements BlockStatsMXBean {
blockReportLeaseManager.removeLease(node);
return !node.hasStaleStorages();
}
- if (context != null) {
- if (!blockReportLeaseManager.checkLease(node, startTime,
- context.getLeaseId())) {
- return false;
- }
- }
if (storageInfo.getBlockReportCount() == 0) {
// The first block report can be processed a lot more efficiently than
@@ -2098,8 +2107,9 @@ public class BlockManager implements BlockStatsMXBean {
(startSize - endSize) + " blocks were removed.");
}
}
-
- private Collection<Block> processReport(
+
+ @VisibleForTesting
+ Collection<Block> processReport(
final DatanodeStorageInfo storageInfo,
final BlockListAsLongs report,
BlockReportContext context) throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 84041fa..4f5979f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -148,6 +148,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
@@ -1420,22 +1421,30 @@ public class NameNodeRpcServer implements NamenodeProtocols {
}
final BlockManager bm = namesystem.getBlockManager();
boolean noStaleStorages = false;
- for (int r = 0; r < reports.length; r++) {
- final BlockListAsLongs blocks = reports[r].getBlocks();
- //
- // BlockManager.processReport accumulates information of prior calls
- // for the same node and storage, so the value returned by the last
- // call of this loop is the final updated value for noStaleStorage.
- //
- final int index = r;
- noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
- @Override
- public Boolean call() throws IOException {
- return bm.processReport(nodeReg, reports[index].getStorage(),
- blocks, context);
+ try {
+ if (bm.checkBlockReportLease(context, nodeReg)) {
+ for (int r = 0; r < reports.length; r++) {
+ final BlockListAsLongs blocks = reports[r].getBlocks();
+ //
+ // BlockManager.processReport accumulates information of prior calls
+ // for the same node and storage, so the value returned by the last
+ // call of this loop is the final updated value for noStaleStorage.
+ //
+ final int index = r;
+ noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws IOException {
+ return bm.processReport(nodeReg, reports[index].getStorage(),
+ blocks, context);
+ }
+ });
+ metrics.incrStorageBlockReportOps();
}
- });
- metrics.incrStorageBlockReportOps();
+ }
+ } catch (UnregisteredNodeException une) {
+ LOG.debug("Datanode {} is attempting to report but not register yet.",
+ nodeReg);
+ return RegisterCommand.REGISTER;
}
bm.removeBRLeaseIfNeeded(nodeReg, context);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java
new file mode 100644
index 0000000..bc79ade
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
+import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+/**
+ * Tests that BlockReportLease in BlockManager.
+ */
+public class TestBlockReportLease {
+ private static final Log LOG = LogFactory.getLog(TestBlockReportLease.class);
+ /**
+ * Test check lease about one BlockReport with many StorageBlockReport.
+ * Before HDFS-12914, when batch storage report to NameNode, it will check
+ * less for one storage by one, So it could part storage report can
+ * be process normally, however, the rest storage report can not be process
+ * since check lease failed.
+ * After HDFS-12914, NameNode check lease once for every blockreport request,
+ * So this issue will not exist anymore.
+ */
+ @Test
+ public void testCheckBlockReportLease() throws Exception {
+ HdfsConfiguration conf = new HdfsConfiguration();
+ Random rand = new Random();
+
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1).build()) {
+ cluster.waitActive();
+
+ FSNamesystem fsn = cluster.getNamesystem();
+ BlockManager blockManager = fsn.getBlockManager();
+ BlockManager spyBlockManager = spy(blockManager);
+ fsn.setBlockManagerForTesting(spyBlockManager);
+ final String poolId = cluster.getNamesystem().getBlockPoolId();
+
+ final NamenodeProtocols rpcServer = cluster.getNameNodeRpc();
+
+ // Test based on one DataNode report to Namenode
+ DataNode dn = cluster.getDataNodes().get(0);
+ DatanodeDescriptor datanodeDescriptor = spyBlockManager
+ .getDatanodeManager().getDatanode(dn.getDatanodeId());
+
+ final DatanodeRegistration dnRegistration
+ = dn.getDNRegistrationForBP(poolId);
+ final StorageReport[] storages
+ = dn.getFSDataset().getStorageReports(poolId);
+
+ // Send heartbeat and request full block report lease
+ HeartbeatResponse hbResponse = rpcServer.sendHeartbeat(
+ dnRegistration, storages, 0, 0, 0, 0, 0, null, true);
+
+ DelayAnswer delayer = new DelayAnswer(LOG);
+ doAnswer(delayer).when(spyBlockManager).processReport(
+ any(DatanodeStorageInfo.class),
+ any(BlockListAsLongs.class),
+ any(BlockReportContext.class));
+
+ ExecutorService pool = Executors.newFixedThreadPool(1);
+
+ // Trigger sendBlockReport
+ final BlockReportContext brContext = new BlockReportContext(1, 0,
+ rand.nextLong(), hbResponse.getFullBlockReportLeaseId());
+ Future<DatanodeCommand> sendBRfuturea = pool.submit(
+ new Callable<DatanodeCommand>() {
+ @Override
+ public DatanodeCommand call() throws Exception {
+ // Build every storage with 100 blocks for sending report
+ DatanodeStorage[] datanodeStorages
+ = new DatanodeStorage[storages.length];
+ for (int i = 0; i < storages.length; i++) {
+ datanodeStorages[i] = storages[i].getStorage();
+ }
+ StorageBlockReport[] reports = createReports(datanodeStorages,
+ 100);
+
+ // Send blockReport
+ return rpcServer.blockReport(dnRegistration, poolId, reports,
+ brContext);
+ }
+ }
+ );
+
+ // Wait until BlockManager calls processReport
+ delayer.waitForCall();
+
+ // Remove full block report lease about dn
+ spyBlockManager.getBlockReportLeaseManager()
+ .removeLease(datanodeDescriptor);
+
+ // Allow blockreport to proceed
+ delayer.proceed();
+
+ // Get result, it will not null if process successfully
+ DatanodeCommand datanodeCommand = sendBRfuturea.get();
+ assertTrue(datanodeCommand instanceof FinalizeCommand);
+ assertEquals(poolId, ((FinalizeCommand)datanodeCommand)
+ .getBlockPoolId());
+ }
+ }
+
+ private StorageBlockReport[] createReports(DatanodeStorage[] dnStorages,
+ int numBlocks) {
+ int longsPerBlock = 3;
+ int blockListSize = 2 + numBlocks * longsPerBlock;
+ int numStorages = dnStorages.length;
+ StorageBlockReport[] storageBlockReports
+ = new StorageBlockReport[numStorages];
+ for (int i = 0; i < numStorages; i++) {
+ List<Long> longs = new ArrayList<Long>(blockListSize);
+ longs.add(Long.valueOf(numBlocks));
+ longs.add(0L);
+ for (int j = 0; j < blockListSize; ++j) {
+ longs.add(Long.valueOf(j));
+ }
+ BlockListAsLongs blockList = BlockListAsLongs.decodeLongs(longs);
+ storageBlockReports[i] = new StorageBlockReport(dnStorages[i], blockList);
+ }
+ return storageBlockReports;
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org