You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2015/06/12 20:27:29 UTC
[2/2] hadoop git commit: HDFS-7923. The DataNodes should rate-limit
their full block reports by asking the NN on heartbeat messages (cmccabe)
HDFS-7923. The DataNodes should rate-limit their full block reports by asking the NN on heartbeat messages (cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/12b5b06c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/12b5b06c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/12b5b06c
Branch: refs/heads/trunk
Commit: 12b5b06c063d93e6c683c9b6fac9a96912f59e59
Parents: e4489d9
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Fri Jun 12 11:17:51 2015 -0700
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Fri Jun 12 11:17:51 2015 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 +
.../DatanodeProtocolClientSideTranslatorPB.java | 8 +-
.../DatanodeProtocolServerSideTranslatorPB.java | 3 +-
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 3 +-
.../server/blockmanagement/BlockManager.java | 41 ++-
.../BlockManagerFaultInjector.java | 52 +++
.../BlockReportLeaseManager.java | 355 +++++++++++++++++++
.../server/blockmanagement/DatanodeManager.java | 2 +
.../hdfs/server/datanode/BPServiceActor.java | 71 +++-
.../hadoop/hdfs/server/datanode/DNConf.java | 4 +-
.../hdfs/server/namenode/FSNamesystem.java | 11 +-
.../hdfs/server/namenode/NameNodeRpcServer.java | 9 +-
.../server/protocol/BlockReportContext.java | 25 +-
.../hdfs/server/protocol/DatanodeProtocol.java | 5 +-
.../hdfs/server/protocol/HeartbeatResponse.java | 10 +-
.../hdfs/server/protocol/RegisterCommand.java | 2 +-
.../src/main/proto/DatanodeProtocol.proto | 6 +
.../src/main/resources/hdfs-default.xml | 21 ++
.../hdfs/protocol/TestBlockListAsLongs.java | 4 +-
.../TestBlockReportRateLimiting.java | 246 +++++++++++++
.../blockmanagement/TestDatanodeManager.java | 21 +-
.../TestNameNodePrunesMissingStorages.java | 2 +-
.../server/datanode/TestBPOfferService.java | 7 +-
.../TestBlockHasMultipleReplicasOnSameDN.java | 2 +-
.../hdfs/server/datanode/TestBlockRecovery.java | 6 +-
.../datanode/TestBpServiceActorScheduler.java | 2 +-
.../TestDatanodeProtocolRetryPolicy.java | 8 +-
.../server/datanode/TestFsDatasetCache.java | 9 +-
.../TestNNHandlesBlockReportPerStorage.java | 2 +-
.../TestNNHandlesCombinedBlockReport.java | 2 +-
.../hdfs/server/datanode/TestStorageReport.java | 2 +-
.../server/namenode/NNThroughputBenchmark.java | 8 +-
.../hdfs/server/namenode/NameNodeAdapter.java | 2 +-
.../hdfs/server/namenode/TestDeadDatanode.java | 6 +-
34 files changed, 890 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 5bb6e53..3f72608 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -434,6 +434,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0;
public static final String DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY = "dfs.blockreport.split.threshold";
public static final long DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT = 1000 * 1000;
+ public static final String DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES = "dfs.namenode.max.full.block.report.leases";
+ public static final int DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES_DEFAULT = 6;
+ public static final String DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS = "dfs.namenode.full.block.report.lease.length.ms";
+ public static final long DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS_DEFAULT = 5L * 60L * 1000L;
public static final String DFS_CACHEREPORT_INTERVAL_MSEC_KEY = "dfs.cachereport.intervalMsec";
public static final long DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT = 10 * 1000;
public static final String DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index 825e835..94028a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -132,11 +132,13 @@ public class DatanodeProtocolClientSideTranslatorPB implements
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xmitsInProgress, int xceiverCount, int failedVolumes,
- VolumeFailureSummary volumeFailureSummary) throws IOException {
+ VolumeFailureSummary volumeFailureSummary,
+ boolean requestFullBlockReportLease) throws IOException {
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
- .setFailedVolumes(failedVolumes);
+ .setFailedVolumes(failedVolumes)
+ .setRequestFullBlockReportLease(requestFullBlockReportLease);
builder.addAllReports(PBHelper.convertStorageReports(reports));
if (cacheCapacity != 0) {
builder.setCacheCapacity(cacheCapacity);
@@ -165,7 +167,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
rollingUpdateStatus = PBHelper.convert(resp.getRollingUpgradeStatus());
}
return new HeartbeatResponse(cmds, PBHelper.convert(resp.getHaStatus()),
- rollingUpdateStatus);
+ rollingUpdateStatus, resp.getFullBlockReportLeaseId());
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index 873eb6d..e133ec7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -114,7 +114,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements
report, request.getCacheCapacity(), request.getCacheUsed(),
request.getXmitsInProgress(),
request.getXceiverCount(), request.getFailedVolumes(),
- volumeFailureSummary);
+ volumeFailureSummary, request.getRequestFullBlockReportLease());
} catch (IOException e) {
throw new ServiceException(e);
}
@@ -135,6 +135,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements
builder.setRollingUpgradeStatus(PBHelper
.convertRollingUpgradeStatus(rollingUpdateStatus));
}
+ builder.setFullBlockReportLeaseId(response.getFullBlockReportLeaseId());
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index c9a9c33..32d9614 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -3042,7 +3042,7 @@ public class PBHelper {
public static BlockReportContext convert(BlockReportContextProto proto) {
return new BlockReportContext(proto.getTotalRpcs(),
- proto.getCurRpc(), proto.getId());
+ proto.getCurRpc(), proto.getId(), proto.getLeaseId());
}
public static BlockReportContextProto convert(BlockReportContext context) {
@@ -3050,6 +3050,7 @@ public class PBHelper {
setTotalRpcs(context.getTotalRpcs()).
setCurRpc(context.getCurRpc()).
setId(context.getReportId()).
+ setLeaseId(context.getLeaseId()).
build();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index d18d7fe..4562d94 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
@@ -124,6 +125,7 @@ public class BlockManager {
private final AtomicLong excessBlocksCount = new AtomicLong(0L);
private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L);
private final long startupDelayBlockDeletionInMs;
+ private final BlockReportLeaseManager blockReportLeaseManager;
/** Used by metrics */
public long getPendingReplicationBlocksCount() {
@@ -348,7 +350,8 @@ public class BlockManager {
this.numBlocksPerIteration = conf.getInt(
DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT,
DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT);
-
+ this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
+
LOG.info("defaultReplication = " + defaultReplication);
LOG.info("maxReplication = " + maxReplication);
LOG.info("minReplication = " + minReplication);
@@ -1712,7 +1715,28 @@ public class BlockManager {
*/
}
}
-
+
+ public long requestBlockReportLeaseId(DatanodeRegistration nodeReg) {
+ assert namesystem.hasReadLock();
+ DatanodeDescriptor node = null;
+ try {
+ node = datanodeManager.getDatanode(nodeReg);
+ } catch (UnregisteredNodeException e) {
+ LOG.warn("Unregistered datanode {}", nodeReg);
+ return 0;
+ }
+ if (node == null) {
+ LOG.warn("Failed to find datanode {}", nodeReg);
+ return 0;
+ }
+ // Request a new block report lease. The BlockReportLeaseManager has
+ // its own internal locking.
+ long leaseId = blockReportLeaseManager.requestLease(node);
+ BlockManagerFaultInjector.getInstance().
+ requestBlockReportLease(node, leaseId);
+ return leaseId;
+ }
+
/**
* StatefulBlockInfo is used to build the "toUC" list, which is a list of
* updates to the information about under-construction blocks.
@@ -1817,6 +1841,12 @@ public class BlockManager {
+ " because namenode still in startup phase", nodeID);
return !node.hasStaleStorages();
}
+ if (context != null) {
+ if (!blockReportLeaseManager.checkLease(node, startTime,
+ context.getLeaseId())) {
+ return false;
+ }
+ }
if (storageInfo.getBlockReportCount() == 0) {
// The first block report can be processed a lot more efficiently than
@@ -1835,6 +1865,9 @@ public class BlockManager {
if (lastStorageInRpc) {
int rpcsSeen = node.updateBlockReportContext(context);
if (rpcsSeen >= context.getTotalRpcs()) {
+ long leaseId = blockReportLeaseManager.removeLease(node);
+ BlockManagerFaultInjector.getInstance().
+ removeBlockReportLease(node, leaseId);
List<DatanodeStorageInfo> zombies = node.removeZombieStorages();
if (zombies.isEmpty()) {
LOG.debug("processReport 0x{}: no zombie storages found.",
@@ -3845,4 +3878,8 @@ public class BlockManager {
clearQueues();
blocksMap.clear();
}
+
+ public BlockReportLeaseManager getBlockReportLeaseManager() {
+ return blockReportLeaseManager;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerFaultInjector.java
new file mode 100644
index 0000000..957c5c0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerFaultInjector.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.IOException;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+
+/**
+ * Used to inject certain faults for testing.
+ */
+public class BlockManagerFaultInjector {
+ @VisibleForTesting
+ public static BlockManagerFaultInjector instance =
+ new BlockManagerFaultInjector();
+
+ @VisibleForTesting
+ public static BlockManagerFaultInjector getInstance() {
+ return instance;
+ }
+
+ @VisibleForTesting
+ public void incomingBlockReportRpc(DatanodeID nodeID,
+ BlockReportContext context) throws IOException {
+
+ }
+
+ @VisibleForTesting
+ public void requestBlockReportLease(DatanodeDescriptor node, long leaseId) {
+ }
+
+ @VisibleForTesting
+ public void removeBlockReportLease(DatanodeDescriptor node, long leaseId) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java
new file mode 100644
index 0000000..cd037f5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * The BlockReportLeaseManager manages block report leases.<p/>
+ *
+ * DataNodes request BR leases from the NameNode by sending a heartbeat with
+ * the requestBlockReportLease field set. The NameNode may choose to respond
+ * with a non-zero lease ID. If so, that DataNode can send a block report with
+ * the given lease ID for the next few minutes. The NameNode will accept
+ * these full block reports.<p/>
+ *
+ * BR leases limit the number of incoming full block reports to the NameNode
+ * at any given time. For compatibility reasons, the NN will always accept
+ * block reports sent with a lease ID of 0 and queue them for processing
+ * immediately. Full block reports which were manually triggered will also
+ * have a lease ID of 0, bypassing the rate-limiting.<p/>
+ *
+ * Block report leases expire after a certain amount of time. This mechanism
+ * is in place so that a DN which dies while holding a lease does not
+ * permanently decrease the number of concurrent block reports which the NN is
+ * willing to accept.<p/>
+ *
+ * When considering which DNs to grant a BR lease, the NameNode gives priority
+ * to the DNs which have gone the longest without sending a full block
+ * report.<p/>
+ */
+class BlockReportLeaseManager {
+ static final Logger LOG =
+ LoggerFactory.getLogger(BlockReportLeaseManager.class);
+
+ private static class NodeData {
+ /**
+ * The UUID of the datanode.
+ */
+ final String datanodeUuid;
+
+ /**
+ * The lease ID, or 0 if there is no lease.
+ */
+ long leaseId;
+
+ /**
+ * The time when the lease was issued, or 0 if there is no lease.
+ */
+ long leaseTimeMs;
+
+ /**
+ * Previous element in the list.
+ */
+ NodeData prev;
+
+ /**
+ * Next element in the list.
+ */
+ NodeData next;
+
+ static NodeData ListHead(String name) {
+ NodeData node = new NodeData(name);
+ node.next = node;
+ node.prev = node;
+ return node;
+ }
+
+ NodeData(String datanodeUuid) {
+ this.datanodeUuid = datanodeUuid;
+ }
+
+ void removeSelf() {
+ if (this.prev != null) {
+ this.prev.next = this.next;
+ }
+ if (this.next != null) {
+ this.next.prev = this.prev;
+ }
+ this.next = null;
+ this.prev = null;
+ }
+
+ void addToEnd(NodeData node) {
+ Preconditions.checkState(node.next == null);
+ Preconditions.checkState(node.prev == null);
+ node.prev = this.prev;
+ node.next = this;
+ this.prev.next = node;
+ this.prev = node;
+ }
+
+ void addToBeginning(NodeData node) {
+ Preconditions.checkState(node.next == null);
+ Preconditions.checkState(node.prev == null);
+ node.next = this.next;
+ node.prev = this;
+ this.next.prev = node;
+ this.next = node;
+ }
+ }
+
+ /**
+ * List of datanodes which don't currently have block report leases.
+ */
+ private final NodeData deferredHead = NodeData.ListHead("deferredHead");
+
+ /**
+ * List of datanodes which currently have block report leases.
+ */
+ private final NodeData pendingHead = NodeData.ListHead("pendingHead");
+
+ /**
+ * Maps datanode UUIDs to NodeData.
+ */
+ private final HashMap<String, NodeData> nodes = new HashMap<>();
+
+ /**
+ * The current length of the pending list.
+ */
+ private int numPending = 0;
+
+ /**
+ * The maximum number of leases to hand out at any given time.
+ */
+ private final int maxPending;
+
+ /**
+ * The number of milliseconds after which a lease will expire.
+ */
+ private final long leaseExpiryMs;
+
+ /**
+ * The next ID we will use for a block report lease.
+ */
+ private long nextId = ThreadLocalRandom.current().nextLong();
+
+ BlockReportLeaseManager(Configuration conf) {
+ this(conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES,
+ DFSConfigKeys.DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES_DEFAULT),
+ conf.getLong(
+ DFSConfigKeys.DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS,
+ DFSConfigKeys.DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS_DEFAULT));
+ }
+
+ BlockReportLeaseManager(int maxPending, long leaseExpiryMs) {
+ Preconditions.checkArgument(maxPending >= 1,
+ "Cannot set the maximum number of block report leases to a " +
+ "value less than 1.");
+ this.maxPending = maxPending;
+ Preconditions.checkArgument(leaseExpiryMs >= 1,
+ "Cannot set full block report lease expiry period to a value " +
+ "less than 1.");
+ this.leaseExpiryMs = leaseExpiryMs;
+ }
+
+ /**
+ * Get the next block report lease ID. Any number is valid except 0.
+ */
+ private synchronized long getNextId() {
+ long id;
+ do {
+ id = nextId++;
+ } while (id == 0);
+ return id;
+ }
+
+ public synchronized void register(DatanodeDescriptor dn) {
+ registerNode(dn);
+ }
+
+ private synchronized NodeData registerNode(DatanodeDescriptor dn) {
+ if (nodes.containsKey(dn.getDatanodeUuid())) {
+ LOG.info("Can't register DN {} because it is already registered.",
+ dn.getDatanodeUuid());
+ return null;
+ }
+ NodeData node = new NodeData(dn.getDatanodeUuid());
+ deferredHead.addToBeginning(node);
+ nodes.put(dn.getDatanodeUuid(), node);
+ LOG.info("Registered DN {} ({}).", dn.getDatanodeUuid(), dn.getXferAddr());
+ return node;
+ }
+
+ private synchronized void remove(NodeData node) {
+ if (node.leaseId != 0) {
+ numPending--;
+ node.leaseId = 0;
+ node.leaseTimeMs = 0;
+ }
+ node.removeSelf();
+ }
+
+ public synchronized void unregister(DatanodeDescriptor dn) {
+ NodeData node = nodes.remove(dn.getDatanodeUuid());
+ if (node == null) {
+ LOG.info("Can't unregister DN {} because it is not currently " +
+ "registered.", dn.getDatanodeUuid());
+ return;
+ }
+ remove(node);
+ }
+
+ public synchronized long requestLease(DatanodeDescriptor dn) {
+ NodeData node = nodes.get(dn.getDatanodeUuid());
+ if (node == null) {
+ LOG.warn("DN {} ({}) requested a lease even though it wasn't yet " +
+ "registered. Registering now.", dn.getDatanodeUuid(),
+ dn.getXferAddr());
+ node = registerNode(dn);
+ }
+ if (node.leaseId != 0) {
+ // The DataNode wants a new lease, even though it already has one.
+ // This can happen if the DataNode is restarted in between requesting
+ // a lease and using it.
+ LOG.debug("Removing existing BR lease 0x{} for DN {} in order to " +
+ "issue a new one.", Long.toHexString(node.leaseId),
+ dn.getDatanodeUuid());
+ }
+ remove(node);
+ long monotonicNowMs = Time.monotonicNow();
+ pruneExpiredPending(monotonicNowMs);
+ if (numPending >= maxPending) {
+ if (LOG.isDebugEnabled()) {
+ StringBuilder allLeases = new StringBuilder();
+ String prefix = "";
+ for (NodeData cur = pendingHead.next; cur != pendingHead;
+ cur = cur.next) {
+ allLeases.append(prefix).append(cur.datanodeUuid);
+ prefix = ", ";
+ }
+ LOG.debug("Can't create a new BR lease for DN {}, because " +
+ "numPending equals maxPending at {}. Current leases: {}",
+ dn.getDatanodeUuid(), numPending, allLeases.toString());
+ }
+ return 0;
+ }
+ numPending++;
+ node.leaseId = getNextId();
+ node.leaseTimeMs = monotonicNowMs;
+ pendingHead.addToEnd(node);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created a new BR lease 0x{} for DN {}. numPending = {}",
+ Long.toHexString(node.leaseId), dn.getDatanodeUuid(), numPending);
+ }
+ return node.leaseId;
+ }
+
+ private synchronized boolean pruneIfExpired(long monotonicNowMs,
+ NodeData node) {
+ if (monotonicNowMs < node.leaseTimeMs + leaseExpiryMs) {
+ return false;
+ }
+ LOG.info("Removing expired block report lease 0x{} for DN {}.",
+ Long.toHexString(node.leaseId), node.datanodeUuid);
+ Preconditions.checkState(node.leaseId != 0);
+ remove(node);
+ deferredHead.addToBeginning(node);
+ return true;
+ }
+
+ private synchronized void pruneExpiredPending(long monotonicNowMs) {
+ NodeData cur = pendingHead.next;
+ while (cur != pendingHead) {
+ NodeData next = cur.next;
+ if (!pruneIfExpired(monotonicNowMs, cur)) {
+ return;
+ }
+ cur = next;
+ }
+ LOG.trace("No entries remaining in the pending list.");
+ }
+
+ public synchronized boolean checkLease(DatanodeDescriptor dn,
+ long monotonicNowMs, long id) {
+ if (id == 0) {
+ LOG.debug("Datanode {} is using BR lease id 0x0 to bypass " +
+ "rate-limiting.", dn.getDatanodeUuid());
+ return true;
+ }
+ NodeData node = nodes.get(dn.getDatanodeUuid());
+ if (node == null) {
+ LOG.info("BR lease 0x{} is not valid for unknown datanode {}",
+ Long.toHexString(id), dn.getDatanodeUuid());
+ return false;
+ }
+ if (node.leaseId == 0) {
+ LOG.warn("BR lease 0x{} is not valid for DN {}, because the DN " +
+ "is not in the pending set.",
+ Long.toHexString(id), dn.getDatanodeUuid());
+ return false;
+ }
+ if (pruneIfExpired(monotonicNowMs, node)) {
+ LOG.warn("BR lease 0x{} is not valid for DN {}, because the lease " +
+ "has expired.", Long.toHexString(id), dn.getDatanodeUuid());
+ return false;
+ }
+ if (id != node.leaseId) {
+ LOG.warn("BR lease 0x{} is not valid for DN {}. Expected BR lease 0x{}.",
+ Long.toHexString(id), dn.getDatanodeUuid(),
+ Long.toHexString(node.leaseId));
+ return false;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("BR lease 0x{} is valid for DN {}.",
+ Long.toHexString(id), dn.getDatanodeUuid());
+ }
+ return true;
+ }
+
+ public synchronized long removeLease(DatanodeDescriptor dn) {
+ NodeData node = nodes.get(dn.getDatanodeUuid());
+ if (node == null) {
+ LOG.info("Can't remove lease for unknown datanode {}",
+ dn.getDatanodeUuid());
+ return 0;
+ }
+ long id = node.leaseId;
+ if (id == 0) {
+ LOG.debug("DN {} has no lease to remove.", dn.getDatanodeUuid());
+ return 0;
+ }
+ remove(node);
+ deferredHead.addToEnd(node);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Removed BR lease 0x{} for DN {}. numPending = {}",
+ Long.toHexString(id), dn.getDatanodeUuid(), numPending);
+ }
+ return id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 01f7972..58349cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -540,6 +540,7 @@ public class DatanodeManager {
blockManager.removeBlocksAssociatedTo(nodeInfo);
networktopology.remove(nodeInfo);
decrementVersionCount(nodeInfo.getSoftwareVersion());
+ blockManager.getBlockReportLeaseManager().unregister(nodeInfo);
if (LOG.isDebugEnabled()) {
LOG.debug("remove datanode " + nodeInfo);
@@ -602,6 +603,7 @@ public class DatanodeManager {
networktopology.add(node); // may throw InvalidTopologyException
host2DatanodeMap.add(node);
checkIfClusterIsNowMultiRack(node);
+ blockManager.getBlockReportLeaseManager().register(node);
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".addDatanode: "
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 63a0bb6..ea1abbd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -29,6 +29,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Joiner;
import org.apache.commons.logging.Log;
@@ -355,9 +357,10 @@ class BPServiceActor implements Runnable {
void triggerBlockReportForTests() {
synchronized (pendingIncrementalBRperStorage) {
scheduler.scheduleHeartbeat();
- long nextBlockReportTime = scheduler.scheduleBlockReport(0);
+ long oldBlockReportTime = scheduler.nextBlockReportTime;
+ scheduler.forceFullBlockReportNow();
pendingIncrementalBRperStorage.notifyAll();
- while (nextBlockReportTime - scheduler.nextBlockReportTime >= 0) {
+ while (oldBlockReportTime == scheduler.nextBlockReportTime) {
try {
pendingIncrementalBRperStorage.wait(100);
} catch (InterruptedException e) {
@@ -419,12 +422,7 @@ class BPServiceActor implements Runnable {
* @return DatanodeCommands returned by the NN. May be null.
* @throws IOException
*/
- List<DatanodeCommand> blockReport() throws IOException {
- // send block report if timer has expired.
- if (!scheduler.isBlockReportDue()) {
- return null;
- }
-
+ List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {
final ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
// Flush any block information that precedes the block report. Otherwise
@@ -460,7 +458,7 @@ class BPServiceActor implements Runnable {
// Below split threshold, send all reports in a single message.
DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), reports,
- new BlockReportContext(1, 0, reportId));
+ new BlockReportContext(1, 0, reportId, fullBrLeaseId));
numRPCs = 1;
numReportsSent = reports.length;
if (cmd != null) {
@@ -472,7 +470,8 @@ class BPServiceActor implements Runnable {
StorageBlockReport singleReport[] = { reports[r] };
DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), singleReport,
- new BlockReportContext(reports.length, r, reportId));
+ new BlockReportContext(reports.length, r, reportId,
+ fullBrLeaseId));
numReportsSent++;
numRPCs++;
if (cmd != null) {
@@ -538,7 +537,8 @@ class BPServiceActor implements Runnable {
return cmd;
}
- HeartbeatResponse sendHeartBeat() throws IOException {
+ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
+ throws IOException {
StorageReport[] reports =
dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
if (LOG.isDebugEnabled()) {
@@ -557,7 +557,8 @@ class BPServiceActor implements Runnable {
dn.getXmitsInProgress(),
dn.getXceiverCount(),
numFailedVolumes,
- volumeFailureSummary);
+ volumeFailureSummary,
+ requestBlockReportLease);
}
//This must be called only by BPOfferService
@@ -625,8 +626,9 @@ class BPServiceActor implements Runnable {
LOG.info("For namenode " + nnAddr + " using"
+ " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec"
+ " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec"
- + " Initial delay: " + dnConf.initialBlockReportDelay + "msec"
+ + " Initial delay: " + dnConf.initialBlockReportDelayMs + "msec"
+ "; heartBeatInterval=" + dnConf.heartBeatInterval);
+ long fullBlockReportLeaseId = 0;
//
// Now loop for a long time....
@@ -639,6 +641,7 @@ class BPServiceActor implements Runnable {
// Every so often, send heartbeat or block-report
//
final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
+ HeartbeatResponse resp = null;
if (sendHeartbeat) {
//
// All heartbeat messages include following info:
@@ -647,10 +650,23 @@ class BPServiceActor implements Runnable {
// -- Total capacity
// -- Bytes remaining
//
+ boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&
+ scheduler.isBlockReportDue(startTime);
scheduler.scheduleNextHeartbeat();
if (!dn.areHeartbeatsDisabledForTests()) {
- HeartbeatResponse resp = sendHeartBeat();
+ resp = sendHeartBeat(requestBlockReportLease);
assert resp != null;
+ if (resp.getFullBlockReportLeaseId() != 0) {
+ if (fullBlockReportLeaseId != 0) {
+ LOG.warn(nnAddr + " sent back a full block report lease " +
+ "ID of 0x" +
+ Long.toHexString(resp.getFullBlockReportLeaseId()) +
+ ", but we already have a lease ID of 0x" +
+ Long.toHexString(fullBlockReportLeaseId) + ". " +
+ "Overwriting old lease ID.");
+ }
+ fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();
+ }
dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime);
// If the state of this NN has changed (eg STANDBY->ACTIVE)
@@ -682,7 +698,16 @@ class BPServiceActor implements Runnable {
reportReceivedDeletedBlocks();
}
- List<DatanodeCommand> cmds = blockReport();
+ List<DatanodeCommand> cmds = null;
+ boolean forceFullBr =
+ scheduler.forceFullBlockReport.getAndSet(false);
+ if (forceFullBr) {
+ LOG.info("Forcing a full block report to " + nnAddr);
+ }
+ if ((fullBlockReportLeaseId != 0) || forceFullBr) {
+ cmds = blockReport(fullBlockReportLeaseId);
+ fullBlockReportLeaseId = 0;
+ }
processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
DatanodeCommand cmd = cacheReport();
@@ -765,7 +790,7 @@ class BPServiceActor implements Runnable {
bpos.registrationSucceeded(this, bpRegistration);
// random short delay - helps scatter the BR from all DNs
- scheduler.scheduleBlockReport(dnConf.initialBlockReportDelay);
+ scheduler.scheduleBlockReport(dnConf.initialBlockReportDelayMs);
}
@@ -958,7 +983,7 @@ class BPServiceActor implements Runnable {
} else {
LOG.info(bpos.toString() + ": scheduling a full block report.");
synchronized(pendingIncrementalBRperStorage) {
- scheduler.scheduleBlockReport(0);
+ scheduler.forceFullBlockReportNow();
pendingIncrementalBRperStorage.notifyAll();
}
}
@@ -1011,6 +1036,9 @@ class BPServiceActor implements Runnable {
@VisibleForTesting
boolean resetBlockReportTime = true;
+ private final AtomicBoolean forceFullBlockReport =
+ new AtomicBoolean(false);
+
private final long heartbeatIntervalMs;
private final long blockReportIntervalMs;
@@ -1042,8 +1070,13 @@ class BPServiceActor implements Runnable {
return (nextHeartbeatTime - startTime <= 0);
}
- boolean isBlockReportDue() {
- return nextBlockReportTime - monotonicNow() <= 0;
+ boolean isBlockReportDue(long curTime) {
+ return nextBlockReportTime - curTime <= 0;
+ }
+
+ void forceFullBlockReportNow() {
+ forceFullBlockReport.set(true);
+ resetBlockReportTime = true;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 4b7fbc3..42b1b46 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -82,7 +82,7 @@ public class DNConf {
final long heartBeatInterval;
final long blockReportInterval;
final long blockReportSplitThreshold;
- final long initialBlockReportDelay;
+ final long initialBlockReportDelayMs;
final long cacheReportInterval;
final long dfsclientSlowIoWarningThresholdMs;
final long datanodeSlowIoWarningThresholdMs;
@@ -159,7 +159,7 @@ public class DNConf {
+ "greater than or equal to" + "dfs.blockreport.intervalMsec."
+ " Setting initial delay to 0 msec:");
}
- initialBlockReportDelay = initBRDelay;
+ initialBlockReportDelayMs = initBRDelay;
heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index d3d98fd..d3b32da 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -3976,7 +3976,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int xmitsInProgress, int failedVolumes,
- VolumeFailureSummary volumeFailureSummary) throws IOException {
+ VolumeFailureSummary volumeFailureSummary,
+ boolean requestFullBlockReportLease) throws IOException {
readLock();
try {
//get datanode commands
@@ -3985,13 +3986,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,
xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
-
+ long blockReportLeaseId = 0;
+ if (requestFullBlockReportLease) {
+ blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg);
+ }
//create ha status
final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
haContext.getState().getServiceState(),
getFSImage().getLastAppliedOrWrittenTxId());
- return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo);
+ return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo,
+ blockReportLeaseId);
} finally {
readUnlock();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 4a146ff..52aaabd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -116,6 +116,7 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
@@ -1277,13 +1278,13 @@ class NameNodeRpcServer implements NamenodeProtocols {
public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
int xmitsInProgress, int xceiverCount,
- int failedVolumes, VolumeFailureSummary volumeFailureSummary)
- throws IOException {
+ int failedVolumes, VolumeFailureSummary volumeFailureSummary,
+ boolean requestFullBlockReportLease) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
return namesystem.handleHeartbeat(nodeReg, report,
dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
- failedVolumes, volumeFailureSummary);
+ failedVolumes, volumeFailureSummary, requestFullBlockReportLease);
}
@Override // DatanodeProtocol
@@ -1309,6 +1310,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
blocks, context, (r == reports.length - 1));
metrics.incrStorageBlockReportOps();
}
+ BlockManagerFaultInjector.getInstance().
+ incomingBlockReportRpc(nodeReg, context);
if (nn.getFSImage().isUpgradeFinalized() &&
!namesystem.isRollingUpgrade() &&
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
index d0b0282..5bcd719 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
@@ -31,14 +31,33 @@ import org.apache.hadoop.classification.InterfaceAudience;
*/
@InterfaceAudience.Private
public class BlockReportContext {
+ /**
+ * The total number of RPCs contained in the block report.
+ */
private final int totalRpcs;
+
+ /**
+ * The index of this particular RPC.
+ */
private final int curRpc;
+
+ /**
+ * A 64-bit ID which identifies the block report as a whole.
+ */
private final long reportId;
- public BlockReportContext(int totalRpcs, int curRpc, long reportId) {
+ /**
+ * The lease ID which this block report is using, or 0 if this block report is
+ * bypassing rate-limiting.
+ */
+ private final long leaseId;
+
+ public BlockReportContext(int totalRpcs, int curRpc,
+ long reportId, long leaseId) {
this.totalRpcs = totalRpcs;
this.curRpc = curRpc;
this.reportId = reportId;
+ this.leaseId = leaseId;
}
public int getTotalRpcs() {
@@ -52,4 +71,8 @@ public class BlockReportContext {
public long getReportId() {
return reportId;
}
+
+ public long getLeaseId() {
+ return leaseId;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index a3b6004..dfe0813 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -102,6 +102,8 @@ public interface DatanodeProtocol {
* @param xceiverCount number of active transceiver threads
* @param failedVolumes number of failed volumes
* @param volumeFailureSummary info about volume failures
+ * @param requestFullBlockReportLease whether to request a full block
+ * report lease.
* @throws IOException on error
*/
@Idempotent
@@ -112,7 +114,8 @@ public interface DatanodeProtocol {
int xmitsInProgress,
int xceiverCount,
int failedVolumes,
- VolumeFailureSummary volumeFailureSummary)
+ VolumeFailureSummary volumeFailureSummary,
+ boolean requestFullBlockReportLease)
throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java
index d00179e..8d6384e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java
@@ -34,12 +34,16 @@ public class HeartbeatResponse {
private final NNHAStatusHeartbeat haStatus;
private final RollingUpgradeStatus rollingUpdateStatus;
+
+ private final long fullBlockReportLeaseId;
public HeartbeatResponse(DatanodeCommand[] cmds,
- NNHAStatusHeartbeat haStatus, RollingUpgradeStatus rollingUpdateStatus) {
+ NNHAStatusHeartbeat haStatus, RollingUpgradeStatus rollingUpdateStatus,
+ long fullBlockReportLeaseId) {
commands = cmds;
this.haStatus = haStatus;
this.rollingUpdateStatus = rollingUpdateStatus;
+ this.fullBlockReportLeaseId = fullBlockReportLeaseId;
}
public DatanodeCommand[] getCommands() {
@@ -53,4 +57,8 @@ public class HeartbeatResponse {
public RollingUpgradeStatus getRollingUpdateStatus() {
return rollingUpdateStatus;
}
+
+ public long getFullBlockReportLeaseId() {
+ return fullBlockReportLeaseId;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java
index a102c82..2f7d334 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
- * A BlockCommand is an instruction to a datanode to register with the namenode.
+ * A RegisterCommand is an instruction to a datanode to register with the namenode.
* This command can't be combined with other commands in the same response.
* This is because after the datanode processes RegisterCommand, it will skip
* the rest of the DatanodeCommands in the same HeartbeatResponse.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 3083dc9..b87e753 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -195,6 +195,7 @@ message HeartbeatRequestProto {
optional uint64 cacheCapacity = 6 [ default = 0 ];
optional uint64 cacheUsed = 7 [default = 0 ];
optional VolumeFailureSummaryProto volumeFailureSummary = 8;
+ optional bool requestFullBlockReportLease = 9 [ default = false ];
}
/**
@@ -214,6 +215,7 @@ message HeartbeatResponseProto {
repeated DatanodeCommandProto cmds = 1; // Returned commands can be null
required NNHAStatusHeartbeatProto haStatus = 2;
optional RollingUpgradeStatusProto rollingUpgradeStatus = 3;
+ optional uint64 fullBlockReportLeaseId = 4 [ default = 0 ];
}
/**
@@ -243,6 +245,10 @@ message BlockReportContextProto {
// The unique 64-bit ID of this block report
required int64 id = 3;
+
+ // The block report lease ID, or 0 if we are sending without a lease to
+ // bypass rate-limiting.
+ optional uint64 leaseId = 4 [ default = 0 ];
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 7b579cb..fdb0bc8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -578,6 +578,27 @@
</property>
<property>
+ <name>dfs.namenode.max.full.block.report.leases</name>
+ <value>6</value>
+ <description>The maximum number of leases for full block reports that the
+ NameNode will issue at any given time. This prevents the NameNode from
+ being flooded with full block reports that use up all the RPC handler
+ threads. This number should never be more than the number of RPC handler
+ threads or less than 1.
+ </description>
+</property>
+
+<property>
+ <name>dfs.namenode.full.block.report.lease.length.ms</name>
+ <value>300000</value>
+ <description>
+ The number of milliseconds that the NameNode will wait before invalidating
+ a full block report lease. This prevents a crashed DataNode from
+ permanently using up a full block report lease.
+ </description>
+</property>
+
+<property>
<name>dfs.datanode.directoryscan.interval</name>
<value>21600</value>
<description>Interval in seconds for Datanode to scan data directories and
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
index f0dab4c..9ead765 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
@@ -221,7 +221,7 @@ public class TestBlockListAsLongs {
request.set(null);
nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask());
nn.blockReport(reg, "pool", sbr,
- new BlockReportContext(1, 0, System.nanoTime()));
+ new BlockReportContext(1, 0, System.nanoTime(), 0L));
BlockReportRequestProto proto = request.get();
assertNotNull(proto);
assertTrue(proto.getReports(0).getBlocksList().isEmpty());
@@ -231,7 +231,7 @@ public class TestBlockListAsLongs {
request.set(null);
nsInfo.setCapabilities(Capability.UNKNOWN.getMask());
nn.blockReport(reg, "pool", sbr,
- new BlockReportContext(1, 0, System.nanoTime()));
+ new BlockReportContext(1, 0, System.nanoTime(), 0L));
proto = request.get();
assertNotNull(proto);
assertFalse(proto.getReports(0).getBlocksList().isEmpty());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportRateLimiting.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportRateLimiting.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportRateLimiting.java
new file mode 100644
index 0000000..fc5f9e7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportRateLimiting.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.commons.lang.mutable.MutableObject;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TestBlockReportRateLimiting {
+ static final Log LOG = LogFactory.getLog(TestBlockReportRateLimiting.class);
+
+ private static void setFailure(AtomicReference<String> failure,
+ String what) {
+ failure.compareAndSet("", what);
+ LOG.error("Test error: " + what);
+ }
+
+ @After
+ public void restoreNormalBlockManagerFaultInjector() {
+ BlockManagerFaultInjector.instance = new BlockManagerFaultInjector();
+ }
+
+ @BeforeClass
+ public static void raiseBlockManagerLogLevels() {
+ GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(BlockReportLeaseManager.LOG, Level.ALL);
+ }
+
+ @Test(timeout=180000)
+ public void testRateLimitingDuringDataNodeStartup() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setInt(DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES, 1);
+ conf.setLong(DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS,
+ 20L * 60L * 1000L);
+
+ final Semaphore fbrSem = new Semaphore(0);
+ final HashSet<DatanodeID> expectedFbrDns = new HashSet<>();
+ final HashSet<DatanodeID> fbrDns = new HashSet<>();
+ final AtomicReference<String> failure = new AtomicReference<String>("");
+
+ final BlockManagerFaultInjector injector = new BlockManagerFaultInjector() {
+ private int numLeases = 0;
+
+ @Override
+ public void incomingBlockReportRpc(DatanodeID nodeID,
+ BlockReportContext context) throws IOException {
+ LOG.info("Incoming full block report from " + nodeID +
+ ". Lease ID = 0x" + Long.toHexString(context.getLeaseId()));
+ if (context.getLeaseId() == 0) {
+ setFailure(failure, "Got unexpected rate-limiting-" +
+ "bypassing full block report RPC from " + nodeID);
+ }
+ fbrSem.acquireUninterruptibly();
+ synchronized (this) {
+ fbrDns.add(nodeID);
+ if (!expectedFbrDns.remove(nodeID)) {
+ setFailure(failure, "Got unexpected full block report " +
+ "RPC from " + nodeID + ". expectedFbrDns = " +
+ Joiner.on(", ").join(expectedFbrDns));
+ }
+ LOG.info("Proceeding with full block report from " +
+ nodeID + ". Lease ID = 0x" +
+ Long.toHexString(context.getLeaseId()));
+ }
+ }
+
+ @Override
+ public void requestBlockReportLease(DatanodeDescriptor node,
+ long leaseId) {
+ if (leaseId == 0) {
+ return;
+ }
+ synchronized (this) {
+ numLeases++;
+ expectedFbrDns.add(node);
+ LOG.info("requestBlockReportLease(node=" + node +
+ ", leaseId=0x" + Long.toHexString(leaseId) + "). " +
+ "expectedFbrDns = " + Joiner.on(", ").join(expectedFbrDns));
+ if (numLeases > 1) {
+ setFailure(failure, "More than 1 lease was issued at once.");
+ }
+ }
+ }
+
+ @Override
+ public void removeBlockReportLease(DatanodeDescriptor node, long leaseId) {
+ LOG.info("removeBlockReportLease(node=" + node +
+ ", leaseId=0x" + Long.toHexString(leaseId) + ")");
+ synchronized (this) {
+ numLeases--;
+ }
+ }
+ };
+ BlockManagerFaultInjector.instance = injector;
+
+ final int NUM_DATANODES = 5;
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+ cluster.waitActive();
+ for (int n = 1; n <= NUM_DATANODES; n++) {
+ LOG.info("Waiting for " + n + " datanode(s) to report in.");
+ fbrSem.release();
+ Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+ final int currentN = n;
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ synchronized (injector) {
+ if (fbrDns.size() > currentN) {
+ setFailure(failure, "Expected at most " + currentN +
+ " datanodes to have sent a block report, but actually " +
+ fbrDns.size() + " have.");
+ }
+ return (fbrDns.size() >= currentN);
+ }
+ }
+ }, 25, 50000);
+ }
+ cluster.shutdown();
+ Assert.assertEquals("", failure.get());
+ }
+
+ /**
+ * Start a 2-node cluster with only one block report lease. When the
+ * first datanode gets a lease, kill it. Then wait for the lease to
+ * expire, and the second datanode to send a full block report.
+ */
+ @Test(timeout=180000)
+ public void testLeaseExpiration() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setInt(DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES, 1);
+ conf.setLong(DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS, 100L);
+
+ final Semaphore gotFbrSem = new Semaphore(0);
+ final AtomicReference<String> failure = new AtomicReference<String>("");
+ final AtomicReference<MiniDFSCluster> cluster =
+ new AtomicReference<>(null);
+ final BlockingQueue<Integer> datanodeToStop =
+ new ArrayBlockingQueue<Integer>(1);
+ final BlockManagerFaultInjector injector = new BlockManagerFaultInjector() {
+ private String uuidToStop = "";
+
+ @Override
+ public void incomingBlockReportRpc(DatanodeID nodeID,
+ BlockReportContext context) throws IOException {
+ if (context.getLeaseId() == 0) {
+ setFailure(failure, "Got unexpected rate-limiting-" +
+ "bypassing full block report RPC from " + nodeID);
+ }
+ synchronized (this) {
+ if (uuidToStop.equals(nodeID.getDatanodeUuid())) {
+ throw new IOException("Injecting failure into block " +
+ "report RPC for " + nodeID);
+ }
+ }
+ gotFbrSem.release();
+ }
+
+ @Override
+ public void requestBlockReportLease(DatanodeDescriptor node,
+ long leaseId) {
+ if (leaseId == 0) {
+ return;
+ }
+ synchronized (this) {
+ if (uuidToStop.isEmpty()) {
+ MiniDFSCluster cl;
+ do {
+ cl = cluster.get();
+ } while (cl == null);
+ int datanodeIndexToStop = getDatanodeIndex(cl, node);
+ uuidToStop = node.getDatanodeUuid();
+ datanodeToStop.add(Integer.valueOf(datanodeIndexToStop));
+ }
+ }
+ }
+
+ private int getDatanodeIndex(MiniDFSCluster cl,
+ DatanodeDescriptor node) {
+ List<DataNode> datanodes = cl.getDataNodes();
+ for (int i = 0; i < datanodes.size(); i++) {
+ DataNode datanode = datanodes.get(i);
+ if (datanode.getDatanodeUuid().equals(node.getDatanodeUuid())) {
+ return i;
+ }
+ }
+ throw new RuntimeException("Failed to find UUID " +
+ node.getDatanodeUuid() + " in the list of datanodes.");
+ }
+
+ @Override
+ public void removeBlockReportLease(DatanodeDescriptor node, long leaseId) {
+ }
+ };
+ BlockManagerFaultInjector.instance = injector;
+ cluster.set(new MiniDFSCluster.Builder(conf).numDataNodes(2).build());
+ cluster.get().waitActive();
+ int datanodeIndexToStop = datanodeToStop.take();
+ cluster.get().stopDataNode(datanodeIndexToStop);
+ gotFbrSem.acquire();
+ cluster.get().shutdown();
+ Assert.assertEquals("", failure.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
index bf167a5..39bd5d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
@@ -59,6 +59,15 @@ public class TestDatanodeManager {
//The number of times the registration / removal of nodes should happen
final int NUM_ITERATIONS = 500;
+ private static DatanodeManager mockDatanodeManager(
+ FSNamesystem fsn, Configuration conf) throws IOException {
+ BlockManager bm = Mockito.mock(BlockManager.class);
+ BlockReportLeaseManager blm = new BlockReportLeaseManager(conf);
+ Mockito.when(bm.getBlockReportLeaseManager()).thenReturn(blm);
+ DatanodeManager dm = new DatanodeManager(bm, fsn, conf);
+ return dm;
+ }
+
/**
* This test sends a random sequence of node registrations and node removals
* to the DatanodeManager (of nodes with different IDs and versions), and
@@ -70,8 +79,7 @@ public class TestDatanodeManager {
//Create the DatanodeManager which will be tested
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
- DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class),
- fsn, new Configuration());
+ DatanodeManager dm = mockDatanodeManager(fsn, new Configuration());
//Seed the RNG with a known value so test failures are easier to reproduce
Random rng = new Random();
@@ -183,9 +191,8 @@ public class TestDatanodeManager {
TestDatanodeManager.MyResolver.class, DNSToSwitchMapping.class);
//create DatanodeManager
- DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class),
- fsn, conf);
-
+ DatanodeManager dm = mockDatanodeManager(fsn, conf);
+
//storageID to register.
String storageID = "someStorageID-123";
@@ -258,7 +265,6 @@ public class TestDatanodeManager {
HelperFunction("/"+ Shell.appendScriptExtension("topology-broken-script"));
}
-
/**
* Helper function that tests the DatanodeManagers SortedBlock function
* we invoke this function with and without topology scripts
@@ -281,8 +287,7 @@ public class TestDatanodeManager {
conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
resourcePath.toString());
}
- DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class),
- fsn, conf);
+ DatanodeManager dm = mockDatanodeManager(fsn, conf);
// register 5 datanodes, each with different storage ID and type
DatanodeInfo[] locs = new DatanodeInfo[5];
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index d73f63e..cea6865 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -115,7 +115,7 @@ public class TestNameNodePrunesMissingStorages {
// Stop the DataNode and send fake heartbeat with missing storage.
cluster.stopDataNode(0);
cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0,
- 0, null);
+ 0, null, true);
// Check that the missing storage was pruned.
assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 64cc78b..f970b3f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -143,7 +144,8 @@ public class TestBPOfferService {
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.anyInt(),
- Mockito.any(VolumeFailureSummary.class));
+ Mockito.any(VolumeFailureSummary.class),
+ Mockito.anyBoolean());
mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
return mock;
}
@@ -164,7 +166,8 @@ public class TestBPOfferService {
public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
heartbeatCounts[nnIdx]++;
return new HeartbeatResponse(new DatanodeCommand[0],
- mockHaStatuses[nnIdx], null);
+ mockHaStatuses[nnIdx], null,
+ ThreadLocalRandom.current().nextLong() | 1L);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
index c65ef85..27d1cea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
@@ -126,7 +126,7 @@ public class TestBlockHasMultipleReplicasOnSameDN {
// Should not assert!
cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports,
- new BlockReportContext(1, 0, System.nanoTime()));
+ new BlockReportContext(1, 0, System.nanoTime(), 0L));
// Get the block locations once again.
locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index f91c0bc..7552e10 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -39,6 +39,7 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@@ -162,11 +163,12 @@ public class TestBlockRecovery {
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.anyInt(),
- Mockito.any(VolumeFailureSummary.class)))
+ Mockito.any(VolumeFailureSummary.class),
+ Mockito.anyBoolean()))
.thenReturn(new HeartbeatResponse(
new DatanodeCommand[0],
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
- null));
+ null, ThreadLocalRandom.current().nextLong() | 1L));
dn = new DataNode(conf, locations, null) {
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
index 0d7484c..b9b6512 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
@@ -57,7 +57,7 @@ public class TestBpServiceActorScheduler {
for (final long now : getTimestamps()) {
Scheduler scheduler = makeMockScheduler(now);
assertTrue(scheduler.isHeartbeatDue(now));
- assertTrue(scheduler.isBlockReportDue());
+ assertTrue(scheduler.isBlockReportDue(scheduler.monotonicNow()));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
index bf80887..e784c7a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.concurrent.ThreadLocalRandom;
import com.google.common.base.Supplier;
import org.apache.commons.logging.Log;
@@ -199,13 +200,13 @@ public class TestDatanodeProtocolRetryPolicy {
heartbeatResponse = new HeartbeatResponse(
new DatanodeCommand[]{RegisterCommand.REGISTER},
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
- null);
+ null, ThreadLocalRandom.current().nextLong() | 1L);
} else {
LOG.info("mockito heartbeatResponse " + i);
heartbeatResponse = new HeartbeatResponse(
new DatanodeCommand[0],
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
- null);
+ null, ThreadLocalRandom.current().nextLong() | 1L);
}
return heartbeatResponse;
}
@@ -217,7 +218,8 @@ public class TestDatanodeProtocolRetryPolicy {
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.anyInt(),
- Mockito.any(VolumeFailureSummary.class));
+ Mockito.any(VolumeFailureSummary.class),
+ Mockito.anyBoolean());
dn = new DataNode(conf, locations, null) {
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
index 58932fb..cb4022e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doReturn;
@@ -31,6 +32,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -159,11 +161,14 @@ public class TestFsDatasetCache {
throws IOException {
NNHAStatusHeartbeat ha = new NNHAStatusHeartbeat(HAServiceState.ACTIVE,
fsImage.getLastAppliedOrWrittenTxId());
- HeartbeatResponse response = new HeartbeatResponse(cmds, ha, null);
+ HeartbeatResponse response =
+ new HeartbeatResponse(cmds, ha, null,
+ ThreadLocalRandom.current().nextLong() | 1L);
doReturn(response).when(spyNN).sendHeartbeat(
(DatanodeRegistration) any(),
(StorageReport[]) any(), anyLong(), anyLong(),
- anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any());
+ anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
+ anyBoolean());
}
private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
index b150b0d..67bbefe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
@@ -40,7 +40,7 @@ public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase {
LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
StorageBlockReport[] singletonReport = { report };
cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
- new BlockReportContext(reports.length, i, System.nanoTime()));
+ new BlockReportContext(reports.length, i, System.nanoTime(), 0L));
i++;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
index dca3c88..fd19ba6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
@@ -36,6 +36,6 @@ public class TestNNHandlesCombinedBlockReport extends BlockReportTestBase {
StorageBlockReport[] reports) throws IOException {
LOG.info("Sending combined block reports for " + dnR);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports,
- new BlockReportContext(1, 0, System.nanoTime()));
+ new BlockReportContext(1, 0, System.nanoTime(), 0L));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
index ecb28dc..a6032c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
@@ -106,7 +106,7 @@ public class TestStorageReport {
any(DatanodeRegistration.class),
captor.capture(),
anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
- Mockito.any(VolumeFailureSummary.class));
+ Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean());
StorageReport[] reports = captor.getValue();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index 2964f9a..39894b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -968,7 +968,7 @@ public class NNThroughputBenchmark implements Tool {
new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
};
dataNodeProto.blockReport(dnRegistration, bpid, reports,
- new BlockReportContext(1, 0, System.nanoTime()));
+ new BlockReportContext(1, 0, System.nanoTime(), 0L));
}
/**
@@ -981,7 +981,7 @@ public class NNThroughputBenchmark implements Tool {
StorageReport[] rep = { new StorageReport(storage, false,
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
- 0L, 0L, 0, 0, 0, null).getCommands();
+ 0L, 0L, 0, 0, 0, null, true).getCommands();
if(cmds != null) {
for (DatanodeCommand cmd : cmds ) {
if(LOG.isDebugEnabled()) {
@@ -1030,7 +1030,7 @@ public class NNThroughputBenchmark implements Tool {
StorageReport[] rep = { new StorageReport(storage,
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
- rep, 0L, 0L, 0, 0, 0, null).getCommands();
+ rep, 0L, 0L, 0, 0, 0, null, true).getCommands();
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {
@@ -1213,7 +1213,7 @@ public class NNThroughputBenchmark implements Tool {
StorageBlockReport[] report = { new StorageBlockReport(
dn.storage, dn.getBlockReportList()) };
dataNodeProto.blockReport(dn.dnRegistration, bpid, report,
- new BlockReportContext(1, 0, System.nanoTime()));
+ new BlockReportContext(1, 0, System.nanoTime(), 0L));
long end = Time.now();
return end-start;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12b5b06c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index 4ca5eda..b314584 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -117,7 +117,7 @@ public class NameNodeAdapter {
DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException {
return namesystem.handleHeartbeat(nodeReg,
BlockManagerTestUtil.getStorageReportsForDatanode(dd),
- dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null);
+ dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true);
}
public static boolean setReplication(final FSNamesystem ns,