You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/03/05 05:24:11 UTC
[09/10] hadoop git commit: HDFS-9239. DataNode Lifeline Protocol: an
alternative protocol for reporting DataNode liveness. Contributed by Chris
Nauroth.
HDFS-9239. DataNode Lifeline Protocol: an alternative protocol for reporting DataNode liveness. Contributed by Chris Nauroth.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2759689d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2759689d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2759689d
Branch: refs/heads/HDFS-1312
Commit: 2759689d7d23001f007cb0dbe2521de90734dd5c
Parents: 8e08861
Author: Chris Nauroth <cn...@apache.org>
Authored: Fri Mar 4 15:29:50 2016 -0800
Committer: Chris Nauroth <cn...@apache.org>
Committed: Fri Mar 4 15:29:50 2016 -0800
----------------------------------------------------------------------
.../hadoop-common/src/site/markdown/Metrics.md | 2 +
hadoop-hdfs-project/hadoop-hdfs/pom.xml | 1 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 10 +-
.../java/org/apache/hadoop/hdfs/DFSUtil.java | 35 +++
...eLifelineProtocolClientSideTranslatorPB.java | 113 +++++++
.../protocolPB/DatanodeLifelineProtocolPB.java | 40 +++
...eLifelineProtocolServerSideTranslatorPB.java | 71 +++++
.../server/blockmanagement/DatanodeManager.java | 41 +++
.../blockmanagement/HeartbeatManager.java | 16 +-
.../hdfs/server/datanode/BPOfferService.java | 13 +-
.../hdfs/server/datanode/BPServiceActor.java | 211 ++++++++++++-
.../hdfs/server/datanode/BlockPoolManager.java | 41 ++-
.../hadoop/hdfs/server/datanode/DNConf.java | 25 ++
.../hadoop/hdfs/server/datanode/DataNode.java | 14 +
.../datanode/metrics/DataNodeMetrics.java | 5 +
.../hdfs/server/namenode/FSNamesystem.java | 31 ++
.../hdfs/server/namenode/NameNodeRpcServer.java | 36 ++-
.../protocol/DatanodeLifelineProtocol.java | 42 +++
.../hdfs/server/protocol/NamenodeProtocols.java | 1 +
.../main/proto/DatanodeLifelineProtocol.proto | 43 +++
.../src/main/resources/hdfs-default.xml | 48 ++-
.../server/datanode/TestBPOfferService.java | 4 +-
.../server/datanode/TestBlockPoolManager.java | 4 +-
.../datanode/TestBpServiceActorScheduler.java | 17 +-
.../server/datanode/TestDataNodeLifeline.java | 300 +++++++++++++++++++
.../server/datanode/TestDatanodeRegister.java | 2 +-
26 files changed, 1128 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
index f43e725..b660b16 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -290,6 +290,8 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
| `ReplaceBlockOpAvgTime` | Average time of block replace operations in milliseconds |
| `HeartbeatsNumOps` | Total number of heartbeats |
| `HeartbeatsAvgTime` | Average heartbeat time in milliseconds |
+| `LifelinesNumOps` | Total number of lifeline messages |
+| `LifelinesAvgTime` | Average lifeline message processing time in milliseconds |
| `BlockReportsNumOps` | Total number of block report operations |
| `BlockReportsAvgTime` | Average time of block report operations in milliseconds |
| `IncrementalBlockReportsNumOps` | Total number of incremental block report operations |
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index 83c706f..32c060d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -343,6 +343,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<includes>
<include>HdfsServer.proto</include>
<include>DatanodeProtocol.proto</include>
+ <include>DatanodeLifelineProtocol.proto</include>
<include>HAZKInfo.proto</include>
<include>InterDatanodeProtocol.proto</include>
<include>JournalProtocol.proto</include>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 9c06e29..5eaada4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -512,6 +512,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval";
public static final long DFS_HEARTBEAT_INTERVAL_DEFAULT = 3;
+ public static final String DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY =
+ "dfs.datanode.lifeline.interval.seconds";
public static final String DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS = "dfs.namenode.path.based.cache.retry.interval.ms";
public static final long DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT = 30000L;
public static final String DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY = "dfs.namenode.decommission.interval";
@@ -522,8 +524,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT = 100;
public static final String DFS_NAMENODE_HANDLER_COUNT_KEY = "dfs.namenode.handler.count";
public static final int DFS_NAMENODE_HANDLER_COUNT_DEFAULT = 10;
- public static final int DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT = 1;
- public static final String DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY = "dfs.namenode.lifeline.handler.count";
+ public static final String DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY =
+ "dfs.namenode.lifeline.handler.ratio";
+ public static final float DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT =
+ 0.1f;
+ public static final String DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY =
+ "dfs.namenode.lifeline.handler.count";
public static final String DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count";
public static final int DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT = 10;
public static final String DFS_HTTP_POLICY_KEY = "dfs.http.policy";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 1a0d192..2148c75 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -27,6 +27,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DE
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
@@ -570,6 +571,40 @@ public class DFSUtil {
}
/**
+ * Returns list of InetSocketAddresses corresponding to lifeline RPC servers
+ * at namenodes from the configuration.
+ *
+ * @param conf configuration
+ * @return list of InetSocketAddress
+ * @throws IOException on error
+ */
+ public static Map<String, Map<String, InetSocketAddress>>
+ getNNLifelineRpcAddressesForCluster(Configuration conf)
+ throws IOException {
+
+ Collection<String> parentNameServices = conf.getTrimmedStringCollection(
+ DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY);
+
+ if (parentNameServices.isEmpty()) {
+ parentNameServices = conf.getTrimmedStringCollection(
+ DFSConfigKeys.DFS_NAMESERVICES);
+ } else {
+ // Ensure that the internal service is indeed in the list of all available
+ // nameservices.
+ Set<String> availableNameServices = Sets.newHashSet(conf
+ .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES));
+ for (String nsId : parentNameServices) {
+ if (!availableNameServices.contains(nsId)) {
+ throw new IOException("Unknown nameservice: " + nsId);
+ }
+ }
+ }
+
+ return DFSUtilClient.getAddressesForNsIds(conf, parentNameServices, null,
+ DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY);
+ }
+
+ /**
* Map a logical namenode ID to its lifeline address. Use the given
* nameservice if specified, or the configured one if none is given.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolClientSideTranslatorPB.java
new file mode 100644
index 0000000..5c323eb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolClientSideTranslatorPB.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeLifelineProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.ProtocolMetaInterface;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcClientUtil;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * This class is the client side translator to translate the requests made on
+ * {@link DatanodeLifelineProtocol} interfaces to the RPC server implementing
+ * {@link DatanodeLifelineProtocolPB}.
+ */
+@InterfaceAudience.Private
+public class DatanodeLifelineProtocolClientSideTranslatorPB implements
+ ProtocolMetaInterface, DatanodeLifelineProtocol, Closeable {
+
+ /** RpcController is not used and hence is set to null. */
+ private static final RpcController NULL_CONTROLLER = null;
+
+ private final DatanodeLifelineProtocolPB rpcProxy;
+
+ public DatanodeLifelineProtocolClientSideTranslatorPB(
+ InetSocketAddress nameNodeAddr, Configuration conf) throws IOException {
+ RPC.setProtocolEngine(conf, DatanodeLifelineProtocolPB.class,
+ ProtobufRpcEngine.class);
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ rpcProxy = createNamenode(nameNodeAddr, conf, ugi);
+ }
+
+ private static DatanodeLifelineProtocolPB createNamenode(
+ InetSocketAddress nameNodeAddr, Configuration conf,
+ UserGroupInformation ugi) throws IOException {
+ return RPC.getProxy(DatanodeLifelineProtocolPB.class,
+ RPC.getProtocolVersion(DatanodeLifelineProtocolPB.class), nameNodeAddr,
+ ugi, conf,
+ NetUtils.getSocketFactory(conf, DatanodeLifelineProtocolPB.class));
+ }
+
+ @Override
+ public void close() throws IOException {
+ RPC.stopProxy(rpcProxy);
+ }
+
+ @Override
+ public void sendLifeline(DatanodeRegistration registration,
+ StorageReport[] reports, long cacheCapacity, long cacheUsed,
+ int xmitsInProgress, int xceiverCount, int failedVolumes,
+ VolumeFailureSummary volumeFailureSummary) throws IOException {
+ HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
+ .setRegistration(PBHelper.convert(registration))
+ .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
+ .setFailedVolumes(failedVolumes);
+ builder.addAllReports(PBHelperClient.convertStorageReports(reports));
+ if (cacheCapacity != 0) {
+ builder.setCacheCapacity(cacheCapacity);
+ }
+ if (cacheUsed != 0) {
+ builder.setCacheUsed(cacheUsed);
+ }
+ if (volumeFailureSummary != null) {
+ builder.setVolumeFailureSummary(PBHelper.convertVolumeFailureSummary(
+ volumeFailureSummary));
+ }
+ try {
+ rpcProxy.sendLifeline(NULL_CONTROLLER, builder.build());
+ } catch (ServiceException se) {
+ throw ProtobufHelper.getRemoteException(se);
+ }
+ }
+
+ @Override // ProtocolMetaInterface
+ public boolean isMethodSupported(String methodName)
+ throws IOException {
+ return RpcClientUtil.isMethodSupported(rpcProxy,
+ DatanodeLifelineProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+ RPC.getProtocolVersion(DatanodeLifelineProtocolPB.class), methodName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolPB.java
new file mode 100644
index 0000000..a17a6b5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolPB.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeLifelineProtocolProtos.DatanodeLifelineProtocolService;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol used by a DataNode to send lifeline messages to a NameNode.
+ */
+@KerberosInfo(
+ serverPrincipal = DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
+ clientPrincipal = DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY)
+@ProtocolInfo(
+ protocolName =
+ "org.apache.hadoop.hdfs.server.protocol.DatanodeLifelineProtocol",
+ protocolVersion = 1)
+@InterfaceAudience.Private
+public interface DatanodeLifelineProtocolPB extends
+ DatanodeLifelineProtocolService.BlockingInterface {
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolServerSideTranslatorPB.java
new file mode 100644
index 0000000..8311993
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolServerSideTranslatorPB.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeLifelineProtocolProtos.LifelineResponseProto;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeLifelineProtocol;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * Implementation for protobuf service that forwards requests
+ * received on {@link DatanodeLifelineProtocolPB} to the
+ * {@link DatanodeLifelineProtocol} server implementation.
+ */
+@InterfaceAudience.Private
+public class DatanodeLifelineProtocolServerSideTranslatorPB implements
+ DatanodeLifelineProtocolPB {
+
+ private static final LifelineResponseProto VOID_LIFELINE_RESPONSE_PROTO =
+ LifelineResponseProto.newBuilder().build();
+
+ private final DatanodeLifelineProtocol impl;
+
+ public DatanodeLifelineProtocolServerSideTranslatorPB(
+ DatanodeLifelineProtocol impl) {
+ this.impl = impl;
+ }
+
+ @Override
+ public LifelineResponseProto sendLifeline(RpcController controller,
+ HeartbeatRequestProto request) throws ServiceException {
+ try {
+ final StorageReport[] report = PBHelperClient.convertStorageReports(
+ request.getReportsList());
+ VolumeFailureSummary volumeFailureSummary =
+ request.hasVolumeFailureSummary() ?
+ PBHelper.convertVolumeFailureSummary(
+ request.getVolumeFailureSummary()) : null;
+ impl.sendLifeline(PBHelper.convert(request.getRegistration()), report,
+ request.getCacheCapacity(), request.getCacheUsed(),
+ request.getXmitsInProgress(), request.getXceiverCount(),
+ request.getFailedVolumes(), volumeFailureSummary);
+ return VOID_LIFELINE_RESPONSE_PROTO;
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 999c1fa..3072fc0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1496,6 +1496,47 @@ public class DatanodeManager {
}
/**
+ * Handles a lifeline message sent by a DataNode.
+ *
+ * @param nodeReg registration info for DataNode sending the lifeline
+ * @param reports storage reports from DataNode
+ * @param blockPoolId block pool ID
+ * @param cacheCapacity cache capacity at DataNode
+ * @param cacheUsed cache used at DataNode
+ * @param xceiverCount estimated count of transfer threads running at DataNode
+ * @param maxTransfers count of transfers running at DataNode
+ * @param failedVolumes count of failed volumes at DataNode
+ * @param volumeFailureSummary info on failed volumes at DataNode
+ * @throws IOException if there is an error
+ */
+ public void handleLifeline(DatanodeRegistration nodeReg,
+ StorageReport[] reports, String blockPoolId, long cacheCapacity,
+ long cacheUsed, int xceiverCount, int maxTransfers, int failedVolumes,
+ VolumeFailureSummary volumeFailureSummary) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received handleLifeline from nodeReg = " + nodeReg);
+ }
+ DatanodeDescriptor nodeinfo = getDatanode(nodeReg);
+ if (nodeinfo == null) {
+ // This is null if the DataNode has not yet registered. We expect this
+ // will never happen, because the DataNode has logic to prevent sending
+ // lifeline messages until after initial registration is successful.
+ // Lifeline message handling can't send commands back to the DataNode to
+ // tell it to register, so simply exit.
+ return;
+ }
+ if (nodeinfo.isDisallowed()) {
+ // This is highly unlikely, because heartbeat handling is much more
+ // frequent and likely would have already sent the disallowed error.
+ // Lifeline messages are not intended to send any kind of control response
+ // back to the DataNode, so simply exit.
+ return;
+ }
+ heartbeatManager.updateLifeline(nodeinfo, reports, cacheCapacity, cacheUsed,
+ xceiverCount, failedVolumes, volumeFailureSummary);
+ }
+
+ /**
* Convert a CachedBlockList into a DatanodeCommand with a list of blocks.
*
* @param list The {@link CachedBlocksList}. This function
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
index b8d3043..cec4a1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
@@ -240,6 +240,20 @@ class HeartbeatManager implements DatanodeStatistics {
stats.add(node);
}
+ synchronized void updateLifeline(final DatanodeDescriptor node,
+ StorageReport[] reports, long cacheCapacity, long cacheUsed,
+ int xceiverCount, int failedVolumes,
+ VolumeFailureSummary volumeFailureSummary) {
+ stats.subtract(node);
+ // This intentionally calls updateHeartbeatState instead of
+ // updateHeartbeat, because we don't want to modify the
+ // heartbeatedSinceRegistration flag. Arrival of a lifeline message does
+ // not count as arrival of the first heartbeat.
+ node.updateHeartbeatState(reports, cacheCapacity, cacheUsed,
+ xceiverCount, failedVolumes, volumeFailureSummary);
+ stats.add(node);
+ }
+
synchronized void startDecommission(final DatanodeDescriptor node) {
if (!node.isAlive()) {
LOG.info("Dead node {} is decommissioned immediately.", node);
@@ -416,4 +430,4 @@ class HeartbeatManager implements DatanodeStatistics {
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 206e89a..00102eb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -120,17 +120,22 @@ class BPOfferService {
mWriteLock.unlock();
}
- BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
+ BPOfferService(List<InetSocketAddress> nnAddrs,
+ List<InetSocketAddress> lifelineNnAddrs, DataNode dn) {
Preconditions.checkArgument(!nnAddrs.isEmpty(),
"Must pass at least one NN.");
+ Preconditions.checkArgument(nnAddrs.size() == lifelineNnAddrs.size(),
+ "Must pass same number of NN addresses and lifeline addresses.");
this.dn = dn;
- for (InetSocketAddress addr : nnAddrs) {
- this.bpServices.add(new BPServiceActor(addr, this));
+ for (int i = 0; i < nnAddrs.size(); ++i) {
+ this.bpServices.add(new BPServiceActor(nnAddrs.get(i),
+ lifelineNnAddrs.get(i), this));
}
}
- void refreshNNList(ArrayList<InetSocketAddress> addrs) throws IOException {
+ void refreshNNList(ArrayList<InetSocketAddress> addrs,
+ ArrayList<InetSocketAddress> lifelineAddrs) throws IOException {
Set<InetSocketAddress> oldAddrs = Sets.newHashSet();
for (BPServiceActor actor : bpServices) {
oldAddrs.add(actor.getNNSocketAddress());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index d3d46be..7184a49 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.util.Time.monotonicNow;
+import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -28,6 +29,7 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -103,14 +106,20 @@ class BPServiceActor implements Runnable {
final LinkedList<BPServiceActorAction> bpThreadQueue
= new LinkedList<BPServiceActorAction>();
- BPServiceActor(InetSocketAddress nnAddr, BPOfferService bpos) {
+ BPServiceActor(InetSocketAddress nnAddr, InetSocketAddress lifelineNnAddr,
+ BPOfferService bpos) {
this.bpos = bpos;
this.dn = bpos.getDataNode();
this.nnAddr = nnAddr;
+ this.lifelineSender = lifelineNnAddr != null ?
+ new LifelineSender(lifelineNnAddr) : null;
+ this.initialRegistrationComplete = lifelineNnAddr != null ?
+ new CountDownLatch(1) : null;
this.dnConf = dn.getDnConf();
this.ibrManager = new IncrementalBlockReportManager(dnConf.ibrInterval);
prevBlockReportId = ThreadLocalRandom.current().nextLong();
- scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval);
+ scheduler = new Scheduler(dnConf.heartBeatInterval,
+ dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval);
}
public DatanodeRegistration getBpRegistration() {
@@ -138,6 +147,9 @@ class BPServiceActor implements Runnable {
return nnAddr;
}
+ private final CountDownLatch initialRegistrationComplete;
+ private final LifelineSender lifelineSender;
+
/**
* Used to inject a spy NN in the unit tests.
*/
@@ -152,6 +164,20 @@ class BPServiceActor implements Runnable {
}
/**
+ * Used to inject a spy NN in the unit tests.
+ */
+ @VisibleForTesting
+ void setLifelineNameNode(
+ DatanodeLifelineProtocolClientSideTranslatorPB dnLifelineProtocol) {
+ lifelineSender.lifelineNamenode = dnLifelineProtocol;
+ }
+
+ @VisibleForTesting
+ DatanodeLifelineProtocolClientSideTranslatorPB getLifelineNameNodeProxy() {
+ return lifelineSender.lifelineNamenode;
+ }
+
+ /**
* Perform the first part of the handshake with the NameNode.
* This calls <code>versionRequest</code> to determine the NN's
* namespace and version info. It automatically retries until
@@ -420,29 +446,39 @@ class BPServiceActor implements Runnable {
//Thread is started already
return;
}
- bpThread = new Thread(this, formatThreadName());
+ bpThread = new Thread(this, formatThreadName("heartbeating", nnAddr));
bpThread.setDaemon(true); // needed for JUnit testing
bpThread.start();
+
+ if (lifelineSender != null) {
+ lifelineSender.start();
+ }
}
- private String formatThreadName() {
+ private String formatThreadName(String action, InetSocketAddress addr) {
Collection<StorageLocation> dataDirs =
DataNode.getStorageLocations(dn.getConf());
- return "DataNode: [" + dataDirs.toString() + "] " +
- " heartbeating to " + nnAddr;
+ return "DataNode: [" + dataDirs.toString() + "] " +
+ action + " to " + addr;
}
//This must be called only by blockPoolManager.
void stop() {
shouldServiceRun = false;
+ if (lifelineSender != null) {
+ lifelineSender.stop();
+ }
if (bpThread != null) {
- bpThread.interrupt();
+ bpThread.interrupt();
}
}
//This must be called only by blockPoolManager
void join() {
try {
+ if (lifelineSender != null) {
+ lifelineSender.join();
+ }
if (bpThread != null) {
bpThread.join();
}
@@ -454,6 +490,7 @@ class BPServiceActor implements Runnable {
shouldServiceRun = false;
IOUtils.cleanup(null, bpNamenode);
+ IOUtils.cleanup(null, lifelineSender);
bpos.shutdownActor(this);
}
@@ -480,7 +517,9 @@ class BPServiceActor implements Runnable {
+ " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec"
+ " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec"
+ " Initial delay: " + dnConf.initialBlockReportDelayMs + "msec"
- + "; heartBeatInterval=" + dnConf.heartBeatInterval);
+ + "; heartBeatInterval=" + dnConf.heartBeatInterval
+ + (lifelineSender != null ?
+ "; lifelineIntervalMs=" + dnConf.getLifelineIntervalMs() : ""));
long fullBlockReportLeaseId = 0;
//
@@ -684,6 +723,9 @@ class BPServiceActor implements Runnable {
}
runningState = RunningState.RUNNING;
+ if (initialRegistrationComplete != null) {
+ initialRegistrationComplete.countDown();
+ }
while (shouldRun()) {
try {
@@ -797,6 +839,135 @@ class BPServiceActor implements Runnable {
return scheduler;
}
+ private final class LifelineSender implements Runnable, Closeable {
+
+ private final InetSocketAddress lifelineNnAddr;
+ private Thread lifelineThread;
+ private DatanodeLifelineProtocolClientSideTranslatorPB lifelineNamenode;
+
+ public LifelineSender(InetSocketAddress lifelineNnAddr) {
+ this.lifelineNnAddr = lifelineNnAddr;
+ }
+
+ @Override
+ public void close() {
+ stop();
+ try {
+ join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ IOUtils.cleanup(null, lifelineNamenode);
+ }
+
+ @Override
+ public void run() {
+ // The lifeline RPC depends on registration with the NameNode, so wait for
+ // initial registration to complete.
+ while (shouldRun()) {
+ try {
+ initialRegistrationComplete.await();
+ break;
+ } catch (InterruptedException e) {
+ // The only way thread interruption can happen while waiting on this
+ // latch is if the state of the actor has been updated to signal
+ // shutdown. The next loop's call to shouldRun() will return false,
+ // and the thread will finish.
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ // After initial NameNode registration has completed, execute the main
+ // loop for sending periodic lifeline RPCs if needed. This is done in a
+ // second loop to avoid a pointless wait on the above latch in every
+ // iteration of the main loop.
+ while (shouldRun()) {
+ try {
+ if (lifelineNamenode == null) {
+ lifelineNamenode = dn.connectToLifelineNN(lifelineNnAddr);
+ }
+ sendLifelineIfDue();
+ Thread.sleep(scheduler.getLifelineWaitTime());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ LOG.warn("IOException in LifelineSender for " + BPServiceActor.this,
+ e);
+ }
+ }
+
+ LOG.info("LifelineSender for " + BPServiceActor.this + " exiting.");
+ }
+
+ public void start() {
+ lifelineThread = new Thread(this, formatThreadName("lifeline",
+ lifelineNnAddr));
+ lifelineThread.setDaemon(true);
+ lifelineThread.setUncaughtExceptionHandler(
+ new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread thread, Throwable t) {
+ LOG.error(thread + " terminating on unexpected exception", t);
+ }
+ });
+ lifelineThread.start();
+ }
+
+ public void stop() {
+ if (lifelineThread != null) {
+ lifelineThread.interrupt();
+ }
+ }
+
+ public void join() throws InterruptedException {
+ if (lifelineThread != null) {
+ lifelineThread.join();
+ }
+ }
+
+ private void sendLifelineIfDue() throws IOException {
+ long startTime = scheduler.monotonicNow();
+ if (!scheduler.isLifelineDue(startTime)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping sending lifeline for " + BPServiceActor.this
+ + ", because it is not due.");
+ }
+ return;
+ }
+ if (dn.areHeartbeatsDisabledForTests()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping sending lifeline for " + BPServiceActor.this
+ + ", because heartbeats are disabled for tests.");
+ }
+ return;
+ }
+ sendLifeline();
+ dn.getMetrics().addLifeline(scheduler.monotonicNow() - startTime);
+ scheduler.scheduleNextLifeline(scheduler.monotonicNow());
+ }
+
+ private void sendLifeline() throws IOException {
+ StorageReport[] reports =
+ dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending lifeline with " + reports.length + " storage " +
+ " reports from service actor: " + BPServiceActor.this);
+ }
+ VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
+ .getVolumeFailureSummary();
+ int numFailedVolumes = volumeFailureSummary != null ?
+ volumeFailureSummary.getFailedStorageLocations().length : 0;
+ lifelineNamenode.sendLifeline(bpRegistration,
+ reports,
+ dn.getFSDataset().getCacheCapacity(),
+ dn.getFSDataset().getCacheUsed(),
+ dn.getXmitsInProgress(),
+ dn.getXceiverCount(),
+ numFailedVolumes,
+ volumeFailureSummary);
+ }
+ }
+
/**
* Utility class that wraps the timestamp computations for scheduling
* heartbeats and block reports.
@@ -812,16 +983,22 @@ class BPServiceActor implements Runnable {
volatile long nextHeartbeatTime = monotonicNow();
@VisibleForTesting
+ volatile long nextLifelineTime = monotonicNow();
+
+ @VisibleForTesting
boolean resetBlockReportTime = true;
private final AtomicBoolean forceFullBlockReport =
new AtomicBoolean(false);
private final long heartbeatIntervalMs;
+ private final long lifelineIntervalMs;
private final long blockReportIntervalMs;
- Scheduler(long heartbeatIntervalMs, long blockReportIntervalMs) {
+ Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs,
+ long blockReportIntervalMs) {
this.heartbeatIntervalMs = heartbeatIntervalMs;
+ this.lifelineIntervalMs = lifelineIntervalMs;
this.blockReportIntervalMs = blockReportIntervalMs;
}
@@ -835,19 +1012,31 @@ class BPServiceActor implements Runnable {
// Blockreport.
long scheduleHeartbeat() {
nextHeartbeatTime = monotonicNow();
+ scheduleNextLifeline(nextHeartbeatTime);
return nextHeartbeatTime;
}
long scheduleNextHeartbeat() {
// Numerical overflow is possible here and is okay.
nextHeartbeatTime = monotonicNow() + heartbeatIntervalMs;
+ scheduleNextLifeline(nextHeartbeatTime);
return nextHeartbeatTime;
}
+ long scheduleNextLifeline(long baseTime) {
+ // Numerical overflow is possible here and is okay.
+ nextLifelineTime = baseTime + lifelineIntervalMs;
+ return nextLifelineTime;
+ }
+
boolean isHeartbeatDue(long startTime) {
return (nextHeartbeatTime - startTime <= 0);
}
+ boolean isLifelineDue(long startTime) {
+ return (nextLifelineTime - startTime <= 0);
+ }
+
boolean isBlockReportDue(long curTime) {
return nextBlockReportTime - curTime <= 0;
}
@@ -903,6 +1092,10 @@ class BPServiceActor implements Runnable {
return nextHeartbeatTime - monotonicNow();
}
+ long getLifelineWaitTime() {
+ return nextLifelineTime - monotonicNow();
+ }
+
/**
* Wrapped for testing.
* @return
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
index 08b2fb0..e94bbb7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
@@ -151,14 +151,18 @@ class BlockPoolManager {
Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil
.getNNServiceRpcAddressesForCluster(conf);
+ Map<String, Map<String, InetSocketAddress>> newLifelineAddressMap = DFSUtil
+ .getNNLifelineRpcAddressesForCluster(conf);
synchronized (refreshNamenodesLock) {
- doRefreshNamenodes(newAddressMap);
+ doRefreshNamenodes(newAddressMap, newLifelineAddressMap);
}
}
private void doRefreshNamenodes(
- Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {
+ Map<String, Map<String, InetSocketAddress>> addrMap,
+ Map<String, Map<String, InetSocketAddress>> lifelineAddrMap)
+ throws IOException {
assert Thread.holdsLock(refreshNamenodesLock);
Set<String> toRefresh = Sets.newLinkedHashSet();
@@ -195,9 +199,19 @@ class BlockPoolManager {
Joiner.on(",").useForNull("<default>").join(toAdd));
for (String nsToAdd : toAdd) {
+ Map<String, InetSocketAddress> nnIdToAddr = addrMap.get(nsToAdd);
+ Map<String, InetSocketAddress> nnIdToLifelineAddr =
+ lifelineAddrMap.get(nsToAdd);
ArrayList<InetSocketAddress> addrs =
- Lists.newArrayList(addrMap.get(nsToAdd).values());
- BPOfferService bpos = createBPOS(addrs);
+ Lists.newArrayListWithCapacity(nnIdToAddr.size());
+ ArrayList<InetSocketAddress> lifelineAddrs =
+ Lists.newArrayListWithCapacity(nnIdToAddr.size());
+ for (String nnId : nnIdToAddr.keySet()) {
+ addrs.add(nnIdToAddr.get(nnId));
+ lifelineAddrs.add(nnIdToLifelineAddr != null ?
+ nnIdToLifelineAddr.get(nnId) : null);
+ }
+ BPOfferService bpos = createBPOS(addrs, lifelineAddrs);
bpByNameserviceId.put(nsToAdd, bpos);
offerServices.add(bpos);
}
@@ -227,9 +241,19 @@ class BlockPoolManager {
for (String nsToRefresh : toRefresh) {
BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
+ Map<String, InetSocketAddress> nnIdToAddr = addrMap.get(nsToRefresh);
+ Map<String, InetSocketAddress> nnIdToLifelineAddr =
+ lifelineAddrMap.get(nsToRefresh);
ArrayList<InetSocketAddress> addrs =
- Lists.newArrayList(addrMap.get(nsToRefresh).values());
- bpos.refreshNNList(addrs);
+ Lists.newArrayListWithCapacity(nnIdToAddr.size());
+ ArrayList<InetSocketAddress> lifelineAddrs =
+ Lists.newArrayListWithCapacity(nnIdToAddr.size());
+ for (String nnId : nnIdToAddr.keySet()) {
+ addrs.add(nnIdToAddr.get(nnId));
+ lifelineAddrs.add(nnIdToLifelineAddr != null ?
+ nnIdToLifelineAddr.get(nnId) : null);
+ }
+ bpos.refreshNNList(addrs, lifelineAddrs);
}
}
}
@@ -237,7 +261,8 @@ class BlockPoolManager {
/**
* Extracted out for test purposes.
*/
- protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
- return new BPOfferService(nnAddrs, dn);
+ protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs,
+ List<InetSocketAddress> lifelineNnAddrs) {
+ return new BPOfferService(nnAddrs, lifelineNnAddrs, dn);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 0fa6f6e..5cff2d3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -27,6 +27,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHO
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
@@ -87,6 +88,7 @@ public class DNConf {
final long readaheadLength;
final long heartBeatInterval;
+ private final long lifelineIntervalMs;
final long blockReportInterval;
final long blockReportSplitThreshold;
final long ibrInterval;
@@ -185,6 +187,20 @@ public class DNConf {
heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
+ long confLifelineIntervalMs =
+ conf.getLong(DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY,
+ 3 * conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
+ DFS_HEARTBEAT_INTERVAL_DEFAULT)) * 1000L;
+ if (confLifelineIntervalMs <= heartBeatInterval) {
+ confLifelineIntervalMs = 3 * heartBeatInterval;
+ DataNode.LOG.warn(
+ String.format("%s must be set to a value greater than %s. " +
+ "Resetting value to 3 * %s, which is %d milliseconds.",
+ DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY,
+ DFS_HEARTBEAT_INTERVAL_KEY, DFS_HEARTBEAT_INTERVAL_KEY,
+ confLifelineIntervalMs));
+ }
+ lifelineIntervalMs = confLifelineIntervalMs;
// do we need to sync block file contents to disk when blockfile is closed?
this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY,
@@ -338,4 +354,13 @@ public class DNConf {
public long getBpReadyTimeout() {
return bpReadyTimeout;
}
+
+ /**
+ * Returns the interval in milliseconds between sending lifeline messages.
+ *
+ * @return interval in milliseconds between sending lifeline messages
+ */
+ public long getLifelineIntervalMs() {
+ return lifelineIntervalMs;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 3e2a25d..2362610 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -142,6 +142,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
@@ -1647,6 +1648,19 @@ public class DataNode extends ReconfigurableBase
return new DatanodeProtocolClientSideTranslatorPB(nnAddr, conf);
}
+ /**
+ * Connect to the NN for the lifeline protocol. This is separated out for
+ * easier testing.
+ *
+ * @param lifelineNnAddr address of lifeline RPC server
+ * @return lifeline RPC proxy
+ */
+ DatanodeLifelineProtocolClientSideTranslatorPB connectToLifelineNN(
+ InetSocketAddress lifelineNnAddr) throws IOException {
+ return new DatanodeLifelineProtocolClientSideTranslatorPB(lifelineNnAddr,
+ conf);
+ }
+
public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
DatanodeID datanodeid, final Configuration conf, final int socketTimeout,
final boolean connectToDnViaHostname) throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index a0f25da..aa518fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -107,6 +107,7 @@ public class DataNodeMetrics {
@Metric MutableRate copyBlockOp;
@Metric MutableRate replaceBlockOp;
@Metric MutableRate heartbeats;
+ @Metric MutableRate lifelines;
@Metric MutableRate blockReports;
@Metric MutableRate incrementalBlockReports;
@Metric MutableRate cacheReports;
@@ -199,6 +200,10 @@ public class DataNodeMetrics {
heartbeats.add(latency);
}
+ public void addLifeline(long latency) {
+ lifelines.add(latency);
+ }
+
public void addBlockReport(long latency) {
blockReports.add(latency);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 907a0ea..a5b9dc2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -3626,6 +3626,37 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
/**
+ * Handles a lifeline message sent by a DataNode. This method updates contact
+ * information and statistics for the DataNode, so that it doesn't time out.
+ * Unlike a heartbeat, this method does not dispatch any commands back to the
+ * DataNode for local execution. This method also cannot request a lease for
+ * sending a full block report. Lifeline messages are used only as a fallback
+ * in case something prevents successful delivery of heartbeat messages.
+ * Therefore, the implementation of this method must remain lightweight
+ * compared to heartbeat handling. It should avoid lock contention and
+ * expensive computation.
+ *
+ * @param nodeReg registration info for DataNode sending the lifeline
+ * @param reports storage reports from DataNode
+ * @param cacheCapacity cache capacity at DataNode
+ * @param cacheUsed cache used at DataNode
+ * @param xceiverCount estimated count of transfer threads running at DataNode
+ * @param xmitsInProgress count of transfers running at DataNode
+ * @param failedVolumes count of failed volumes at DataNode
+ * @param volumeFailureSummary info on failed volumes at DataNode
+ * @throws IOException if there is an error
+ */
+ void handleLifeline(DatanodeRegistration nodeReg, StorageReport[] reports,
+ long cacheCapacity, long cacheUsed, int xceiverCount, int xmitsInProgress,
+ int failedVolumes, VolumeFailureSummary volumeFailureSummary)
+ throws IOException {
+ int maxTransfer = blockManager.getMaxReplicationStreams() - xmitsInProgress;
+ blockManager.getDatanodeManager().handleLifeline(nodeReg, reports,
+ getBlockPoolId(), cacheCapacity, cacheUsed, xceiverCount, maxTransfer,
+ failedVolumes, volumeFailureSummary);
+ }
+
+ /**
* Returns whether or not there were available resources at the last check of
* resources.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 275e210..0c4a440 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -19,8 +19,9 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
@@ -111,11 +112,14 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeLifelineProtocolProtos.DatanodeLifelineProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
@@ -256,6 +260,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
BlockingService dnProtoPbService = DatanodeProtocolService
.newReflectiveBlockingService(dnProtoPbTranslator);
+ DatanodeLifelineProtocolServerSideTranslatorPB lifelineProtoPbTranslator =
+ new DatanodeLifelineProtocolServerSideTranslatorPB(this);
+ BlockingService lifelineProtoPbService = DatanodeLifelineProtocolService
+ .newReflectiveBlockingService(lifelineProtoPbTranslator);
+
NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
new NamenodeProtocolServerSideTranslatorPB(this);
BlockingService NNPbService = NamenodeProtocolService
@@ -371,9 +380,14 @@ class NameNodeRpcServer implements NamenodeProtocols {
lifelineRpcAddr.getPort());
int lifelineHandlerCount = conf.getInt(
- DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY,
- DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT);
-
+ DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY, 0);
+ if (lifelineHandlerCount <= 0) {
+ float lifelineHandlerRatio = conf.getFloat(
+ DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY,
+ DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT);
+ lifelineHandlerCount = Math.max(
+ (int)(handlerCount * lifelineHandlerRatio), 1);
+ }
lifelineRpcServer = new RPC.Builder(conf)
.setProtocol(HAServiceProtocolPB.class)
.setInstance(haPbService)
@@ -384,6 +398,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
.setSecretManager(namesystem.getDelegationTokenSecretManager())
.build();
+ DFSUtil.addPBProtocol(conf, DatanodeLifelineProtocolPB.class,
+ lifelineProtoPbService, lifelineRpcServer);
+
// Update the address with the correct port
InetSocketAddress listenAddr = lifelineRpcServer.getListenerAddress();
lifelineRPCAddress = new InetSocketAddress(lifelineRpcAddr.getHostName(),
@@ -1509,6 +1526,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
return namesystem.getNamespaceInfo();
}
+ @Override // DatanodeLifelineProtocol
+ public void sendLifeline(DatanodeRegistration nodeReg, StorageReport[] report,
+ long dnCacheCapacity, long dnCacheUsed, int xmitsInProgress,
+ int xceiverCount, int failedVolumes,
+ VolumeFailureSummary volumeFailureSummary) throws IOException {
+ checkNNStartup();
+ verifyRequest(nodeReg);
+ namesystem.handleLifeline(nodeReg, report, dnCacheCapacity, dnCacheUsed,
+ xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary);
+ }
+
/**
* Verifies the given registration.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeLifelineProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeLifelineProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeLifelineProtocol.java
new file mode 100644
index 0000000..b30e60b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeLifelineProtocol.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.io.retry.Idempotent;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol used by a DataNode to send lifeline messages to a NameNode.
+ */
+@KerberosInfo(
+ serverPrincipal = DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
+ clientPrincipal = DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY)
+@InterfaceAudience.Private
+public interface DatanodeLifelineProtocol {
+
+ @Idempotent
+ void sendLifeline(DatanodeRegistration registration, StorageReport[] reports,
+ long dnCacheCapacity, long dnCacheUsed, int xmitsInProgress,
+ int xceiverCount, int failedVolumes,
+ VolumeFailureSummary volumeFailureSummary) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
index 4a3d83d..d874e8f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.tracing.TraceAdminProtocol;
public interface NamenodeProtocols
extends ClientProtocol,
DatanodeProtocol,
+ DatanodeLifelineProtocol,
NamenodeProtocol,
RefreshAuthorizationPolicyProtocol,
ReconfigurationProtocol,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeLifelineProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeLifelineProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeLifelineProtocol.proto
new file mode 100644
index 0000000..b6ab756
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeLifelineProtocol.proto
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are private and stable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *stable* .proto interface.
+ */
+
+option java_package = "org.apache.hadoop.hdfs.protocol.proto";
+option java_outer_classname = "DatanodeLifelineProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.hdfs.datanodelifeline;
+
+import "DatanodeProtocol.proto";
+
+// The lifeline protocol does not use a new request message type. Instead, it
+// reuses the existing heartbeat request message.
+
+// Unlike heartbeats, the response is empty. There is no command dispatch.
+message LifelineResponseProto {
+}
+
+service DatanodeLifelineProtocolService {
+ rpc sendLifeline(hadoop.hdfs.datanode.HeartbeatRequestProto)
+ returns(LifelineResponseProto);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index ea25a91..d837bd8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -705,6 +705,22 @@
</property>
<property>
+ <name>dfs.datanode.lifeline.interval.seconds</name>
+ <value></value>
+ <description>
+ Sets the interval in seconds between sending DataNode Lifeline Protocol
+ messages from the DataNode to the NameNode. The value must be greater than
+ the value of dfs.heartbeat.interval. If this property is not defined, then
+ the default behavior is to calculate the interval as 3x the value of
+ dfs.heartbeat.interval. Note that normal heartbeat processing may cause the
+ DataNode to postpone sending lifeline messages if they are not required.
+ Under normal operations with speedy heartbeat processing, it is possible
+ that no lifeline messages will need to be sent at all. This property has no
+ effect if dfs.namenode.lifeline.rpc-address is not defined.
+ </description>
+</property>
+
+<property>
<name>dfs.namenode.handler.count</name>
<value>10</value>
<description>The number of Namenode RPC server threads that listen to
@@ -725,14 +741,34 @@
</property>
<property>
+ <name>dfs.namenode.lifeline.handler.ratio</name>
+ <value>0.10</value>
+ <description>
+ A ratio applied to the value of dfs.namenode.handler.count, which then
+ provides the number of RPC server threads the NameNode runs for handling the
+ lifeline RPC server. For example, if dfs.namenode.handler.count is 100, and
+ dfs.namenode.lifeline.handler.factor is 0.10, then the NameNode starts
+ 100 * 0.10 = 10 threads for handling the lifeline RPC server. It is common
+ to tune the value of dfs.namenode.handler.count as a function of the number
+ of DataNodes in a cluster. Using this property allows for the lifeline RPC
+ server handler threads to be tuned automatically without needing to touch a
+ separate property. Lifeline message processing is lightweight, so it is
+ expected to require many fewer threads than the main NameNode RPC server.
+ This property is not used if dfs.namenode.lifeline.handler.count is defined,
+ which sets an absolute thread count. This property has no effect if
+ dfs.namenode.lifeline.rpc-address is not defined.
+ </description>
+</property>
+
+<property>
<name>dfs.namenode.lifeline.handler.count</name>
- <value>1</value>
+ <value></value>
<description>
- Sets number of RPC server threads the NameNode runs for handling the
- lifeline RPC server. The default value is 1, because this RPC server
- handles only HA health check requests from ZKFC. These are lightweight
- requests that run single-threaded from the ZKFC client side. This property
- has no effect if dfs.namenode.lifeline.rpc-address is not defined.
+ Sets an absolute number of RPC server threads the NameNode runs for handling
+ the DataNode Lifeline Protocol and HA health check requests from ZKFC. If
+ this property is defined, then it overrides the behavior of
+ dfs.namenode.lifeline.handler.ratio. By default, it is not defined. This
+ property has no effect if dfs.namenode.lifeline.rpc-address is not defined.
</description>
</property>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 1421f0f..95a103e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@@ -396,7 +397,8 @@ public class TestBPOfferService {
Mockito.eq(new InetSocketAddress(port)));
}
- return new BPOfferService(Lists.newArrayList(nnMap.keySet()), mockDn);
+ return new BPOfferService(Lists.newArrayList(nnMap.keySet()),
+ Collections.<InetSocketAddress>nCopies(nnMap.size(), null), mockDn);
}
private void waitForInitialization(final BPOfferService bpos)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
index 27e99db..48006dc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
@@ -51,7 +51,8 @@ public class TestBlockPoolManager {
bpm = new BlockPoolManager(mockDN){
@Override
- protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
+ protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs,
+ List<InetSocketAddress> lifelineNnAddrs) {
final int idx = mockIdx++;
doLog("create #" + idx);
final BPOfferService bpos = Mockito.mock(BPOfferService.class);
@@ -66,6 +67,7 @@ public class TestBlockPoolManager {
return null;
}
}).when(bpos).refreshNNList(
+ Mockito.<ArrayList<InetSocketAddress>>any(),
Mockito.<ArrayList<InetSocketAddress>>any());
} catch (IOException e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
index efdd87c..76885e4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
@@ -49,6 +49,7 @@ public class TestBpServiceActorScheduler {
public Timeout timeout = new Timeout(300000);
private static final long HEARTBEAT_INTERVAL_MS = 5000; // 5 seconds
+ private static final long LIFELINE_INTERVAL_MS = 3 * HEARTBEAT_INTERVAL_MS;
private static final long BLOCK_REPORT_INTERVAL_MS = 10000; // 10 seconds
private final Random random = new Random(System.nanoTime());
@@ -166,9 +167,23 @@ public class TestBpServiceActorScheduler {
}
}
+ @Test
+ public void testScheduleLifeline() {
+ for (final long now : getTimestamps()) {
+ Scheduler scheduler = makeMockScheduler(now);
+ scheduler.scheduleNextLifeline(now);
+ assertFalse(scheduler.isLifelineDue(now));
+ assertThat(scheduler.getLifelineWaitTime(), is(LIFELINE_INTERVAL_MS));
+ scheduler.scheduleNextLifeline(now - LIFELINE_INTERVAL_MS);
+ assertTrue(scheduler.isLifelineDue(now));
+ assertThat(scheduler.getLifelineWaitTime(), is(0L));
+ }
+ }
+
private Scheduler makeMockScheduler(long now) {
LOG.info("Using now = " + now);
- Scheduler mockScheduler = spy(new Scheduler(HEARTBEAT_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS));
+ Scheduler mockScheduler = spy(new Scheduler(HEARTBEAT_INTERVAL_MS,
+ LIFELINE_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS));
doReturn(now).when(mockScheduler).monotonicNow();
mockScheduler.nextBlockReportTime = now;
mockScheduler.nextHeartbeatTime = now;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
new file mode 100644
index 0000000..fd66115
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY;
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import org.apache.log4j.Level;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Supplier;
+
+/**
+ * Test suite covering lifeline protocol handling in the DataNode.
+ */
+public class TestDataNodeLifeline {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestDataNodeLifeline.class);
+
+ static {
+ GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
+ }
+
+ @Rule
+ public Timeout timeout = new Timeout(60000);
+
+ private MiniDFSCluster cluster;
+ private HdfsConfiguration conf;
+ private DatanodeLifelineProtocolClientSideTranslatorPB lifelineNamenode;
+ private DataNodeMetrics metrics;
+ private DatanodeProtocolClientSideTranslatorPB namenode;
+ private FSNamesystem namesystem;
+
+ @Before
+ public void setup() throws Exception {
+ // Configure cluster with lifeline RPC server enabled, and down-tune
+ // heartbeat timings to try to force quick dead/stale DataNodes.
+ conf = new HdfsConfiguration();
+ conf.setInt(DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY, 2);
+ conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+ conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1);
+ conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, "0.0.0.0:0");
+ conf.setInt(DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, 6 * 1000);
+
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ namesystem = cluster.getNameNode().getNamesystem();
+
+ // Set up spies on RPC proxies so that we can inject failures.
+ DataNode dn = cluster.getDataNodes().get(0);
+ metrics = dn.getMetrics();
+ assertNotNull(metrics);
+ List<BPOfferService> allBpos = dn.getAllBpOs();
+ assertNotNull(allBpos);
+ assertEquals(1, allBpos.size());
+
+ BPOfferService bpos = allBpos.get(0);
+ List<BPServiceActor> allBpsa = bpos.getBPServiceActors();
+ assertNotNull(allBpsa);
+ assertEquals(1, allBpsa.size());
+
+ final BPServiceActor bpsa = allBpsa.get(0);
+ assertNotNull(bpsa);
+
+ // Lifeline RPC proxy gets created on separate thread, so poll until found.
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ if (bpsa.getLifelineNameNodeProxy() != null) {
+ lifelineNamenode = spy(bpsa.getLifelineNameNodeProxy());
+ bpsa.setLifelineNameNode(lifelineNamenode);
+ }
+ return lifelineNamenode != null;
+ }
+ }, 100, 10000);
+
+ assertNotNull(bpsa.getNameNodeProxy());
+ namenode = spy(bpsa.getNameNodeProxy());
+ bpsa.setNameNode(namenode);
+ }
+
+ @After
+ public void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ GenericTestUtils.assertNoThreadsMatching(".*lifeline.*");
+ }
+ }
+
+ @Test
+ public void testSendLifelineIfHeartbeatBlocked() throws Exception {
+ // Run the test for the duration of sending 10 lifeline RPC messages.
+ int numLifelines = 10;
+ CountDownLatch lifelinesSent = new CountDownLatch(numLifelines);
+
+ // Intercept heartbeat to inject an artificial delay, until all expected
+ // lifeline RPC messages have been sent.
+ doAnswer(new LatchAwaitingAnswer<HeartbeatResponse>(lifelinesSent))
+ .when(namenode).sendHeartbeat(
+ any(DatanodeRegistration.class),
+ any(StorageReport[].class),
+ anyLong(),
+ anyLong(),
+ anyInt(),
+ anyInt(),
+ anyInt(),
+ any(VolumeFailureSummary.class),
+ anyBoolean());
+
+ // Intercept lifeline to trigger latch count-down on each call.
+ doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
+ .when(lifelineNamenode).sendLifeline(
+ any(DatanodeRegistration.class),
+ any(StorageReport[].class),
+ anyLong(),
+ anyLong(),
+ anyInt(),
+ anyInt(),
+ anyInt(),
+ any(VolumeFailureSummary.class));
+
+ // While waiting on the latch for the expected number of lifeline messages,
+ // poll DataNode tracking information. Thanks to the lifeline, we expect
+ // that the DataNode always stays alive, and never goes stale or dead.
+ while (!lifelinesSent.await(1, SECONDS)) {
+ assertEquals("Expect DataNode to be kept alive by lifeline.", 1,
+ namesystem.getNumLiveDataNodes());
+ assertEquals("Expect DataNode not marked dead due to lifeline.", 0,
+ namesystem.getNumDeadDataNodes());
+ assertEquals("Expect DataNode not marked stale due to lifeline.", 0,
+ namesystem.getNumStaleDataNodes());
+ }
+
+ // Verify that we did in fact call the lifeline RPC.
+ verify(lifelineNamenode, atLeastOnce()).sendLifeline(
+ any(DatanodeRegistration.class),
+ any(StorageReport[].class),
+ anyLong(),
+ anyLong(),
+ anyInt(),
+ anyInt(),
+ anyInt(),
+ any(VolumeFailureSummary.class));
+
+ // Also verify lifeline call through metrics. We expect at least
+ // numLifelines, guaranteed by waiting on the latch. There is a small
+ // possibility of extra lifeline calls depending on timing, so we allow
+ // slack in the assertion.
+ assertTrue("Expect metrics to count at least " + numLifelines + " calls.",
+ getLongCounter("LifelinesNumOps", getMetrics(metrics.name())) >=
+ numLifelines);
+ }
+
+ @Test
+ public void testNoLifelineSentIfHeartbeatsOnTime() throws Exception {
+ // Run the test for the duration of sending 10 heartbeat RPC messages.
+ int numHeartbeats = 10;
+ CountDownLatch heartbeatsSent = new CountDownLatch(numHeartbeats);
+
+ // Intercept heartbeat to trigger latch count-down on each call.
+ doAnswer(new LatchCountingAnswer<HeartbeatResponse>(heartbeatsSent))
+ .when(namenode).sendHeartbeat(
+ any(DatanodeRegistration.class),
+ any(StorageReport[].class),
+ anyLong(),
+ anyLong(),
+ anyInt(),
+ anyInt(),
+ anyInt(),
+ any(VolumeFailureSummary.class),
+ anyBoolean());
+
+ // While waiting on the latch for the expected number of heartbeat messages,
+ // poll DataNode tracking information. We expect that the DataNode always
+ // stays alive, and never goes stale or dead.
+ while (!heartbeatsSent.await(1, SECONDS)) {
+ assertEquals("Expect DataNode to be kept alive by lifeline.", 1,
+ namesystem.getNumLiveDataNodes());
+ assertEquals("Expect DataNode not marked dead due to lifeline.", 0,
+ namesystem.getNumDeadDataNodes());
+ assertEquals("Expect DataNode not marked stale due to lifeline.", 0,
+ namesystem.getNumStaleDataNodes());
+ }
+
+ // Verify that we did not call the lifeline RPC.
+ verify(lifelineNamenode, never()).sendLifeline(
+ any(DatanodeRegistration.class),
+ any(StorageReport[].class),
+ anyLong(),
+ anyLong(),
+ anyInt(),
+ anyInt(),
+ anyInt(),
+ any(VolumeFailureSummary.class));
+
+ // Also verify no lifeline calls through metrics.
+ assertEquals("Expect metrics to count no lifeline calls.", 0,
+ getLongCounter("LifelinesNumOps", getMetrics(metrics.name())));
+ }
+
+ /**
+ * Waits on a {@link CountDownLatch} before calling through to the method.
+ */
+ private final class LatchAwaitingAnswer<T> implements Answer<T> {
+ private final CountDownLatch latch;
+
+ public LatchAwaitingAnswer(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public T answer(InvocationOnMock invocation)
+ throws Throwable {
+ LOG.info("Awaiting, remaining latch count is {}.", latch.getCount());
+ latch.await();
+ return (T)invocation.callRealMethod();
+ }
+ }
+
+ /**
+ * Counts on a {@link CountDownLatch} after each call through to the method.
+ */
+ private final class LatchCountingAnswer<T> implements Answer<T> {
+ private final CountDownLatch latch;
+
+ public LatchCountingAnswer(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public T answer(InvocationOnMock invocation)
+ throws Throwable {
+ T result = (T)invocation.callRealMethod();
+ latch.countDown();
+ LOG.info("Countdown, remaining latch count is {}.", latch.getCount());
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java
index 216ff3d..f2a5d08 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java
@@ -61,7 +61,7 @@ public class TestDatanodeRegister {
BPOfferService mockBPOS = mock(BPOfferService.class);
doReturn(mockDN).when(mockBPOS).getDataNode();
- actor = new BPServiceActor(INVALID_ADDR, mockBPOS);
+ actor = new BPServiceActor(INVALID_ADDR, null, mockBPOS);
fakeNsInfo = mock(NamespaceInfo.class);
// Return a a good software version.