You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/08/06 21:32:53 UTC
[hadoop] branch branch-2 updated: HDFS-12914. Block report leases
cause missing blocks until next report. Contributed by Santosh Marella,
He Xiaoqiao.
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 567e117 HDFS-12914. Block report leases cause missing blocks until next report. Contributed by Santosh Marella, He Xiaoqiao.
567e117 is described below
commit 567e1178d88ccfc258ce2ade4f8af66cc5a4daa7
Author: Weiwei Yang <ww...@apache.org>
AuthorDate: Tue Jul 9 22:22:50 2019 +0800
HDFS-12914. Block report leases cause missing blocks until next report. Contributed by Santosh Marella, He Xiaoqiao.
---
.../hdfs/server/blockmanagement/BlockManager.java | 23 ++-
.../hadoop/hdfs/server/namenode/FSNamesystem.java | 7 +-
.../hdfs/server/namenode/NameNodeRpcServer.java | 37 +++--
.../blockmanagement/TestBlockReportLease.java | 169 +++++++++++++++++++++
4 files changed, 214 insertions(+), 22 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index fad811b..2589caf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -2133,6 +2133,21 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
+ * Check block report lease.
+ * @return true if lease exist and not expire
+ */
+ public boolean checkBlockReportLease(BlockReportContext context,
+ final DatanodeID nodeID) throws UnregisteredNodeException {
+ if (context == null) {
+ return true;
+ }
+ DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
+ final long startTime = Time.monotonicNow();
+ return blockReportLeaseManager.checkLease(node, startTime,
+ context.getLeaseId());
+ }
+
+ /**
* The given storage is reporting all its blocks.
* Update the (storage-->block list) and (block-->storage list) maps.
*
@@ -2175,12 +2190,6 @@ public class BlockManager implements BlockStatsMXBean {
blockReportLeaseManager.removeLease(node);
return !node.hasStaleStorages();
}
- if (context != null) {
- if (!blockReportLeaseManager.checkLease(node, startTime,
- context.getLeaseId())) {
- return false;
- }
- }
if (storageInfo.getBlockReportCount() == 0) {
// The first block report can be processed a lot more efficiently than
@@ -2289,7 +2298,7 @@ public class BlockManager implements BlockStatsMXBean {
}
}
- private Collection<Block> processReport(
+ Collection<Block> processReport(
final DatanodeStorageInfo storageInfo,
final BlockListAsLongs report,
BlockReportContext context) throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 52e2bbf..992704b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -425,7 +425,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/** The namespace tree. */
FSDirectory dir;
- private final BlockManager blockManager;
+ private BlockManager blockManager;
private final SnapshotManager snapshotManager;
private final CacheManager cacheManager;
private final DatanodeStatistics datanodeStatistics;
@@ -5815,6 +5815,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return blockManager;
}
+ @VisibleForTesting
+ public void setBlockManagerForTesting(BlockManager bm) {
+ this.blockManager = bm;
+ }
+
/** @return the FSDirectory. */
@Override
public FSDirectory getFSDirectory() {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index e6d03bb..830320a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -153,6 +153,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
@@ -1449,22 +1450,30 @@ public class NameNodeRpcServer implements NamenodeProtocols {
}
final BlockManager bm = namesystem.getBlockManager();
boolean noStaleStorages = false;
- for (int r = 0; r < reports.length; r++) {
- final BlockListAsLongs blocks = reports[r].getBlocks();
- //
- // BlockManager.processReport accumulates information of prior calls
- // for the same node and storage, so the value returned by the last
- // call of this loop is the final updated value for noStaleStorage.
- //
- final int index = r;
- noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
- @Override
- public Boolean call() throws IOException {
- return bm.processReport(nodeReg, reports[index].getStorage(),
- blocks, context);
+ try {
+ if (bm.checkBlockReportLease(context, nodeReg)) {
+ for (int r = 0; r < reports.length; r++) {
+ final BlockListAsLongs blocks = reports[r].getBlocks();
+ //
+ // BlockManager.processReport accumulates information of prior calls
+ // for the same node and storage, so the value returned by the last
+ // call of this loop is the final updated value for noStaleStorage.
+ //
+ final int index = r;
+ noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws IOException {
+ return bm.processReport(nodeReg, reports[index].getStorage(),
+ blocks, context);
+ }
+ });
}
- });
+ }
metrics.incrStorageBlockReportOps();
+ } catch (UnregisteredNodeException une) {
+ LOG.debug("Datanode {} is attempting to report but not register yet.",
+ nodeReg);
+ return RegisterCommand.REGISTER;
}
bm.removeBRLeaseIfNeeded(nodeReg, context);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java
new file mode 100644
index 0000000..a4f16b9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
+import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
+import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+/**
+ * Tests that BlockReportLease in BlockManager.
+ */
+public class TestBlockReportLease {
+ private static final Log LOG = LogFactory.getLog(TestBlockReportLease.class);
+ /**
+ * Test check lease about one BlockReport with many StorageBlockReport.
+ * Before HDFS-12914, when batch storage report to NameNode, it will check
+ * less for one storage by one, So it could part storage report can
+ * be process normally, however, the rest storage report can not be process
+ * since check lease failed.
+ * After HDFS-12914, NameNode check lease once for every blockreport request,
+ * So this issue will not exist anymore.
+ */
+ @Test
+ public void testCheckBlockReportLease() throws Exception {
+ HdfsConfiguration conf = new HdfsConfiguration();
+ Random rand = new Random();
+
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1).build()) {
+ cluster.waitActive();
+
+ FSNamesystem fsn = cluster.getNamesystem();
+ BlockManager blockManager = fsn.getBlockManager();
+ BlockManager spyBlockManager = spy(blockManager);
+ fsn.setBlockManagerForTesting(spyBlockManager);
+ final String poolId = cluster.getNamesystem().getBlockPoolId();
+
+ final NamenodeProtocols rpcServer = cluster.getNameNodeRpc();
+
+ // Test based on one DataNode report to Namenode
+ DataNode dn = cluster.getDataNodes().get(0);
+ DatanodeDescriptor datanodeDescriptor = spyBlockManager
+ .getDatanodeManager().getDatanode(dn.getDatanodeId());
+
+ final DatanodeRegistration dnRegistration =
+ dn.getDNRegistrationForBP(poolId);
+ final StorageReport[] storages = dn.getFSDataset().
+ getStorageReports(poolId);
+
+ // Send heartbeat and request full block report lease
+ HeartbeatResponse hbResponse = rpcServer.sendHeartbeat(
+ dnRegistration, storages, 0, 0, 0, 0, 0, null, true,
+ SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
+
+ DelayAnswer delayer = new DelayAnswer(LOG);
+ doAnswer(delayer).when(spyBlockManager).processReport(
+ any(DatanodeStorageInfo.class),
+ any(BlockListAsLongs.class),
+ any(BlockReportContext.class));
+
+ ExecutorService pool = Executors.newFixedThreadPool(1);
+
+ // Trigger sendBlockReport
+ final BlockReportContext brContext = new BlockReportContext(1, 0,
+ rand.nextLong(), hbResponse.getFullBlockReportLeaseId());
+ Future<DatanodeCommand> sendBRfuturea = pool.submit(
+ new Callable<DatanodeCommand>() {
+ @Override
+ public DatanodeCommand call() throws Exception {
+ // Build every storage with 100 blocks for sending report
+ DatanodeStorage[] datanodeStorages
+ = new DatanodeStorage[storages.length];
+ for (int i = 0; i < storages.length; i++) {
+ datanodeStorages[i] = storages[i].getStorage();
+ }
+ StorageBlockReport[] reports = createReports(datanodeStorages,
+ 100);
+
+ // Send blockReport
+ return rpcServer.blockReport(dnRegistration, poolId, reports,
+ brContext);
+ }
+ });
+
+ // Wait until BlockManager calls processReport
+ delayer.waitForCall();
+
+ // Remove full block report lease about dn
+ spyBlockManager.getBlockReportLeaseManager()
+ .removeLease(datanodeDescriptor);
+
+ // Allow blockreport to proceed
+ delayer.proceed();
+
+ // Get result, it will not null if process successfully
+ DatanodeCommand datanodeCommand = sendBRfuturea.get();
+ assertTrue(datanodeCommand instanceof FinalizeCommand);
+ assertEquals(poolId, ((FinalizeCommand)datanodeCommand)
+ .getBlockPoolId());
+ }
+ }
+
+ private StorageBlockReport[] createReports(DatanodeStorage[] dnStorages,
+ int numBlocks) {
+ int longsPerBlock = 3;
+ int blockListSize = 2 + numBlocks * longsPerBlock;
+ int numStorages = dnStorages.length;
+ StorageBlockReport[] storageBlockReports
+ = new StorageBlockReport[numStorages];
+ for (int i = 0; i < numStorages; i++) {
+ List<Long> longs = new ArrayList<Long>(blockListSize);
+ longs.add(Long.valueOf(numBlocks));
+ longs.add(0L);
+ for (int j = 0; j < blockListSize; ++j) {
+ longs.add(Long.valueOf(j));
+ }
+ BlockListAsLongs blockList = BlockListAsLongs.decodeLongs(longs);
+ storageBlockReports[i] = new StorageBlockReport(dnStorages[i], blockList);
+ }
+ return storageBlockReports;
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org