You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2016/03/05 00:43:34 UTC

hadoop git commit: HDFS-9239. DataNode Lifeline Protocol: an alternative protocol for reporting DataNode liveness. Contributed by Chris Nauroth.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 8e08861a1 -> 2759689d7


HDFS-9239. DataNode Lifeline Protocol: an alternative protocol for reporting DataNode liveness. Contributed by Chris Nauroth.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2759689d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2759689d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2759689d

Branch: refs/heads/trunk
Commit: 2759689d7d23001f007cb0dbe2521de90734dd5c
Parents: 8e08861
Author: Chris Nauroth <cn...@apache.org>
Authored: Fri Mar 4 15:29:50 2016 -0800
Committer: Chris Nauroth <cn...@apache.org>
Committed: Fri Mar 4 15:29:50 2016 -0800

----------------------------------------------------------------------
 .../hadoop-common/src/site/markdown/Metrics.md  |   2 +
 hadoop-hdfs-project/hadoop-hdfs/pom.xml         |   1 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  10 +-
 .../java/org/apache/hadoop/hdfs/DFSUtil.java    |  35 +++
 ...eLifelineProtocolClientSideTranslatorPB.java | 113 +++++++
 .../protocolPB/DatanodeLifelineProtocolPB.java  |  40 +++
 ...eLifelineProtocolServerSideTranslatorPB.java |  71 +++++
 .../server/blockmanagement/DatanodeManager.java |  41 +++
 .../blockmanagement/HeartbeatManager.java       |  16 +-
 .../hdfs/server/datanode/BPOfferService.java    |  13 +-
 .../hdfs/server/datanode/BPServiceActor.java    | 211 ++++++++++++-
 .../hdfs/server/datanode/BlockPoolManager.java  |  41 ++-
 .../hadoop/hdfs/server/datanode/DNConf.java     |  25 ++
 .../hadoop/hdfs/server/datanode/DataNode.java   |  14 +
 .../datanode/metrics/DataNodeMetrics.java       |   5 +
 .../hdfs/server/namenode/FSNamesystem.java      |  31 ++
 .../hdfs/server/namenode/NameNodeRpcServer.java |  36 ++-
 .../protocol/DatanodeLifelineProtocol.java      |  42 +++
 .../hdfs/server/protocol/NamenodeProtocols.java |   1 +
 .../main/proto/DatanodeLifelineProtocol.proto   |  43 +++
 .../src/main/resources/hdfs-default.xml         |  48 ++-
 .../server/datanode/TestBPOfferService.java     |   4 +-
 .../server/datanode/TestBlockPoolManager.java   |   4 +-
 .../datanode/TestBpServiceActorScheduler.java   |  17 +-
 .../server/datanode/TestDataNodeLifeline.java   | 300 +++++++++++++++++++
 .../server/datanode/TestDatanodeRegister.java   |   2 +-
 26 files changed, 1128 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
index f43e725..b660b16 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -290,6 +290,8 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
 | `ReplaceBlockOpAvgTime` | Average time of block replace operations in milliseconds |
 | `HeartbeatsNumOps` | Total number of heartbeats |
 | `HeartbeatsAvgTime` | Average heartbeat time in milliseconds |
+| `LifelinesNumOps` | Total number of lifeline messages |
+| `LifelinesAvgTime` | Average lifeline message processing time in milliseconds |
 | `BlockReportsNumOps` | Total number of block report operations |
 | `BlockReportsAvgTime` | Average time of block report operations in milliseconds |
 | `IncrementalBlockReportsNumOps` | Total number of incremental block report operations |

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index 83c706f..32c060d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -343,6 +343,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
                 <includes>
                   <include>HdfsServer.proto</include>
                   <include>DatanodeProtocol.proto</include>
+                  <include>DatanodeLifelineProtocol.proto</include>
                   <include>HAZKInfo.proto</include>
                   <include>InterDatanodeProtocol.proto</include>
                   <include>JournalProtocol.proto</include>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 9c06e29..5eaada4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -512,6 +512,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
   public static final String  DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval";
   public static final long    DFS_HEARTBEAT_INTERVAL_DEFAULT = 3;
+  public static final String  DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY =
+      "dfs.datanode.lifeline.interval.seconds";
   public static final String  DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS = "dfs.namenode.path.based.cache.retry.interval.ms";
   public static final long    DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT = 30000L;
   public static final String  DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY = "dfs.namenode.decommission.interval";
@@ -522,8 +524,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT = 100;
   public static final String  DFS_NAMENODE_HANDLER_COUNT_KEY = "dfs.namenode.handler.count";
   public static final int     DFS_NAMENODE_HANDLER_COUNT_DEFAULT = 10;
-  public static final int     DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT = 1;
-  public static final String  DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY = "dfs.namenode.lifeline.handler.count";
+  public static final String  DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY =
+      "dfs.namenode.lifeline.handler.ratio";
+  public static final float   DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT =
+      0.1f;
+  public static final String  DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY =
+      "dfs.namenode.lifeline.handler.count";
   public static final String  DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count";
   public static final int     DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT = 10;
   public static final String  DFS_HTTP_POLICY_KEY = "dfs.http.policy";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 1a0d192..2148c75 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -27,6 +27,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DE
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
@@ -570,6 +571,40 @@ public class DFSUtil {
   }
 
   /**
+   * Returns list of InetSocketAddresses corresponding to lifeline RPC servers
+   * at namenodes from the configuration.
+   *
+   * @param conf configuration
+   * @return list of InetSocketAddress
+   * @throws IOException on error
+   */
+  public static Map<String, Map<String, InetSocketAddress>>
+      getNNLifelineRpcAddressesForCluster(Configuration conf)
+      throws IOException {
+
+    Collection<String> parentNameServices = conf.getTrimmedStringCollection(
+        DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY);
+
+    if (parentNameServices.isEmpty()) {
+      parentNameServices = conf.getTrimmedStringCollection(
+          DFSConfigKeys.DFS_NAMESERVICES);
+    } else {
+      // Ensure that the internal service is indeed in the list of all available
+      // nameservices.
+      Set<String> availableNameServices = Sets.newHashSet(conf
+          .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES));
+      for (String nsId : parentNameServices) {
+        if (!availableNameServices.contains(nsId)) {
+          throw new IOException("Unknown nameservice: " + nsId);
+        }
+      }
+    }
+
+    return DFSUtilClient.getAddressesForNsIds(conf, parentNameServices, null,
+        DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY);
+  }
+
+  /**
    * Map a logical namenode ID to its lifeline address.  Use the given
    * nameservice if specified, or the configured one if none is given.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolClientSideTranslatorPB.java
new file mode 100644
index 0000000..5c323eb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolClientSideTranslatorPB.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeLifelineProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.ProtocolMetaInterface;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcClientUtil;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * This class is the client side translator to translate the requests made on
+ * {@link DatanodeLifelineProtocol} interfaces to the RPC server implementing
+ * {@link DatanodeLifelineProtocolPB}.
+ */
+@InterfaceAudience.Private
+public class DatanodeLifelineProtocolClientSideTranslatorPB implements
+    ProtocolMetaInterface, DatanodeLifelineProtocol, Closeable {
+
+  /** RpcController is not used and hence is set to null. */
+  private static final RpcController NULL_CONTROLLER = null;
+
+  private final DatanodeLifelineProtocolPB rpcProxy;
+
+  public DatanodeLifelineProtocolClientSideTranslatorPB(
+      InetSocketAddress nameNodeAddr, Configuration conf) throws IOException {
+    RPC.setProtocolEngine(conf, DatanodeLifelineProtocolPB.class,
+        ProtobufRpcEngine.class);
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    rpcProxy = createNamenode(nameNodeAddr, conf, ugi);
+  }
+
+  private static DatanodeLifelineProtocolPB createNamenode(
+      InetSocketAddress nameNodeAddr, Configuration conf,
+      UserGroupInformation ugi) throws IOException {
+    return RPC.getProxy(DatanodeLifelineProtocolPB.class,
+        RPC.getProtocolVersion(DatanodeLifelineProtocolPB.class), nameNodeAddr,
+        ugi, conf,
+        NetUtils.getSocketFactory(conf, DatanodeLifelineProtocolPB.class));
+  }
+
+  @Override
+  public void close() throws IOException {
+    RPC.stopProxy(rpcProxy);
+  }
+
+  @Override
+  public void sendLifeline(DatanodeRegistration registration,
+      StorageReport[] reports, long cacheCapacity, long cacheUsed,
+      int xmitsInProgress, int xceiverCount, int failedVolumes,
+      VolumeFailureSummary volumeFailureSummary) throws IOException {
+    HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
+        .setRegistration(PBHelper.convert(registration))
+        .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
+        .setFailedVolumes(failedVolumes);
+    builder.addAllReports(PBHelperClient.convertStorageReports(reports));
+    if (cacheCapacity != 0) {
+      builder.setCacheCapacity(cacheCapacity);
+    }
+    if (cacheUsed != 0) {
+      builder.setCacheUsed(cacheUsed);
+    }
+    if (volumeFailureSummary != null) {
+      builder.setVolumeFailureSummary(PBHelper.convertVolumeFailureSummary(
+          volumeFailureSummary));
+    }
+    try {
+      rpcProxy.sendLifeline(NULL_CONTROLLER, builder.build());
+    } catch (ServiceException se) {
+      throw ProtobufHelper.getRemoteException(se);
+    }
+  }
+
+  @Override // ProtocolMetaInterface
+  public boolean isMethodSupported(String methodName)
+      throws IOException {
+    return RpcClientUtil.isMethodSupported(rpcProxy,
+        DatanodeLifelineProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+        RPC.getProtocolVersion(DatanodeLifelineProtocolPB.class), methodName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolPB.java
new file mode 100644
index 0000000..a17a6b5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolPB.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeLifelineProtocolProtos.DatanodeLifelineProtocolService;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol used by a DataNode to send lifeline messages to a NameNode.
+ */
+@KerberosInfo(
+    serverPrincipal = DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
+    clientPrincipal = DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY)
+@ProtocolInfo(
+    protocolName =
+        "org.apache.hadoop.hdfs.server.protocol.DatanodeLifelineProtocol",
+    protocolVersion = 1)
+@InterfaceAudience.Private
+public interface DatanodeLifelineProtocolPB extends
+    DatanodeLifelineProtocolService.BlockingInterface {
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolServerSideTranslatorPB.java
new file mode 100644
index 0000000..8311993
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolServerSideTranslatorPB.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeLifelineProtocolProtos.LifelineResponseProto;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeLifelineProtocol;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * Implementation for protobuf service that forwards requests
+ * received on {@link DatanodeLifelineProtocolPB} to the
+ * {@link DatanodeLifelineProtocol} server implementation.
+ */
+@InterfaceAudience.Private
+public class DatanodeLifelineProtocolServerSideTranslatorPB implements
+    DatanodeLifelineProtocolPB {
+
+  private static final LifelineResponseProto VOID_LIFELINE_RESPONSE_PROTO =
+      LifelineResponseProto.newBuilder().build();
+
+  private final DatanodeLifelineProtocol impl;
+
+  public DatanodeLifelineProtocolServerSideTranslatorPB(
+      DatanodeLifelineProtocol impl) {
+    this.impl = impl;
+  }
+
+  @Override
+  public LifelineResponseProto sendLifeline(RpcController controller,
+      HeartbeatRequestProto request) throws ServiceException {
+    try {
+      final StorageReport[] report = PBHelperClient.convertStorageReports(
+          request.getReportsList());
+      VolumeFailureSummary volumeFailureSummary =
+          request.hasVolumeFailureSummary() ?
+              PBHelper.convertVolumeFailureSummary(
+                  request.getVolumeFailureSummary()) : null;
+      impl.sendLifeline(PBHelper.convert(request.getRegistration()), report,
+          request.getCacheCapacity(), request.getCacheUsed(),
+          request.getXmitsInProgress(), request.getXceiverCount(),
+          request.getFailedVolumes(), volumeFailureSummary);
+      return VOID_LIFELINE_RESPONSE_PROTO;
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 999c1fa..3072fc0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1496,6 +1496,47 @@ public class DatanodeManager {
   }
 
   /**
+   * Handles a lifeline message sent by a DataNode.
+   *
+   * @param nodeReg registration info for DataNode sending the lifeline
+   * @param reports storage reports from DataNode
+   * @param blockPoolId block pool ID
+   * @param cacheCapacity cache capacity at DataNode
+   * @param cacheUsed cache used at DataNode
+   * @param xceiverCount estimated count of transfer threads running at DataNode
+   * @param maxTransfers count of transfers running at DataNode
+   * @param failedVolumes count of failed volumes at DataNode
+   * @param volumeFailureSummary info on failed volumes at DataNode
+   * @throws IOException if there is an error
+   */
+  public void handleLifeline(DatanodeRegistration nodeReg,
+      StorageReport[] reports, String blockPoolId, long cacheCapacity,
+      long cacheUsed, int xceiverCount, int maxTransfers, int failedVolumes,
+      VolumeFailureSummary volumeFailureSummary) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Received handleLifeline from nodeReg = " + nodeReg);
+    }
+    DatanodeDescriptor nodeinfo = getDatanode(nodeReg);
+    if (nodeinfo == null) {
+      // This is null if the DataNode has not yet registered.  We expect this
+      // will never happen, because the DataNode has logic to prevent sending
+      // lifeline messages until after initial registration is successful.
+      // Lifeline message handling can't send commands back to the DataNode to
+      // tell it to register, so simply exit.
+      return;
+    }
+    if (nodeinfo.isDisallowed()) {
+      // This is highly unlikely, because heartbeat handling is much more
+      // frequent and likely would have already sent the disallowed error.
+      // Lifeline messages are not intended to send any kind of control response
+      // back to the DataNode, so simply exit.
+      return;
+    }
+    heartbeatManager.updateLifeline(nodeinfo, reports, cacheCapacity, cacheUsed,
+        xceiverCount, failedVolumes, volumeFailureSummary);
+  }
+
+  /**
    * Convert a CachedBlockList into a DatanodeCommand with a list of blocks.
    *
    * @param list       The {@link CachedBlocksList}.  This function 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
index b8d3043..cec4a1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
@@ -240,6 +240,20 @@ class HeartbeatManager implements DatanodeStatistics {
     stats.add(node);
   }
 
+  synchronized void updateLifeline(final DatanodeDescriptor node,
+      StorageReport[] reports, long cacheCapacity, long cacheUsed,
+      int xceiverCount, int failedVolumes,
+      VolumeFailureSummary volumeFailureSummary) {
+    stats.subtract(node);
+    // This intentionally calls updateHeartbeatState instead of
+    // updateHeartbeat, because we don't want to modify the
+    // heartbeatedSinceRegistration flag.  Arrival of a lifeline message does
+    // not count as arrival of the first heartbeat.
+    node.updateHeartbeatState(reports, cacheCapacity, cacheUsed,
+        xceiverCount, failedVolumes, volumeFailureSummary);
+    stats.add(node);
+  }
+
   synchronized void startDecommission(final DatanodeDescriptor node) {
     if (!node.isAlive()) {
       LOG.info("Dead node {} is decommissioned immediately.", node);
@@ -416,4 +430,4 @@ class HeartbeatManager implements DatanodeStatistics {
       }
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 206e89a..00102eb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -120,17 +120,22 @@ class BPOfferService {
     mWriteLock.unlock();
   }
 
-  BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
+  BPOfferService(List<InetSocketAddress> nnAddrs,
+      List<InetSocketAddress> lifelineNnAddrs, DataNode dn) {
     Preconditions.checkArgument(!nnAddrs.isEmpty(),
         "Must pass at least one NN.");
+    Preconditions.checkArgument(nnAddrs.size() == lifelineNnAddrs.size(),
+        "Must pass same number of NN addresses and lifeline addresses.");
     this.dn = dn;
 
-    for (InetSocketAddress addr : nnAddrs) {
-      this.bpServices.add(new BPServiceActor(addr, this));
+    for (int i = 0; i < nnAddrs.size(); ++i) {
+      this.bpServices.add(new BPServiceActor(nnAddrs.get(i),
+          lifelineNnAddrs.get(i), this));
     }
   }
 
-  void refreshNNList(ArrayList<InetSocketAddress> addrs) throws IOException {
+  void refreshNNList(ArrayList<InetSocketAddress> addrs,
+      ArrayList<InetSocketAddress> lifelineAddrs) throws IOException {
     Set<InetSocketAddress> oldAddrs = Sets.newHashSet();
     for (BPServiceActor actor : bpServices) {
       oldAddrs.add(actor.getNNSocketAddress());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index d3d46be..7184a49 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.util.Time.monotonicNow;
 
+import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -28,6 +29,7 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -103,14 +106,20 @@ class BPServiceActor implements Runnable {
   final LinkedList<BPServiceActorAction> bpThreadQueue 
       = new LinkedList<BPServiceActorAction>();
 
-  BPServiceActor(InetSocketAddress nnAddr, BPOfferService bpos) {
+  BPServiceActor(InetSocketAddress nnAddr, InetSocketAddress lifelineNnAddr,
+      BPOfferService bpos) {
     this.bpos = bpos;
     this.dn = bpos.getDataNode();
     this.nnAddr = nnAddr;
+    this.lifelineSender = lifelineNnAddr != null ?
+        new LifelineSender(lifelineNnAddr) : null;
+    this.initialRegistrationComplete = lifelineNnAddr != null ?
+        new CountDownLatch(1) : null;
     this.dnConf = dn.getDnConf();
     this.ibrManager = new IncrementalBlockReportManager(dnConf.ibrInterval);
     prevBlockReportId = ThreadLocalRandom.current().nextLong();
-    scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval);
+    scheduler = new Scheduler(dnConf.heartBeatInterval,
+        dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval);
   }
 
   public DatanodeRegistration getBpRegistration() {
@@ -138,6 +147,9 @@ class BPServiceActor implements Runnable {
     return nnAddr;
   }
 
+  private final CountDownLatch initialRegistrationComplete;
+  private final LifelineSender lifelineSender;
+
   /**
    * Used to inject a spy NN in the unit tests.
    */
@@ -152,6 +164,20 @@ class BPServiceActor implements Runnable {
   }
 
   /**
+   * Used to inject a spy NN in the unit tests.
+   */
+  @VisibleForTesting
+  void setLifelineNameNode(
+      DatanodeLifelineProtocolClientSideTranslatorPB dnLifelineProtocol) {
+    lifelineSender.lifelineNamenode = dnLifelineProtocol;
+  }
+
+  @VisibleForTesting
+  DatanodeLifelineProtocolClientSideTranslatorPB getLifelineNameNodeProxy() {
+    return lifelineSender.lifelineNamenode;
+  }
+
+  /**
    * Perform the first part of the handshake with the NameNode.
    * This calls <code>versionRequest</code> to determine the NN's
    * namespace and version info. It automatically retries until
@@ -420,29 +446,39 @@ class BPServiceActor implements Runnable {
       //Thread is started already
       return;
     }
-    bpThread = new Thread(this, formatThreadName());
+    bpThread = new Thread(this, formatThreadName("heartbeating", nnAddr));
     bpThread.setDaemon(true); // needed for JUnit testing
     bpThread.start();
+
+    if (lifelineSender != null) {
+      lifelineSender.start();
+    }
   }
   
-  private String formatThreadName() {
+  private String formatThreadName(String action, InetSocketAddress addr) {
     Collection<StorageLocation> dataDirs =
         DataNode.getStorageLocations(dn.getConf());
-    return "DataNode: [" + dataDirs.toString() + "] " +
-      " heartbeating to " + nnAddr;
+    return "DataNode: [" + dataDirs.toString() + "]  " +
+        action + " to " + addr;
   }
   
   //This must be called only by blockPoolManager.
   void stop() {
     shouldServiceRun = false;
+    if (lifelineSender != null) {
+      lifelineSender.stop();
+    }
     if (bpThread != null) {
-        bpThread.interrupt();
+      bpThread.interrupt();
     }
   }
   
   //This must be called only by blockPoolManager
   void join() {
     try {
+      if (lifelineSender != null) {
+        lifelineSender.join();
+      }
       if (bpThread != null) {
         bpThread.join();
       }
@@ -454,6 +490,7 @@ class BPServiceActor implements Runnable {
     
     shouldServiceRun = false;
     IOUtils.cleanup(null, bpNamenode);
+    IOUtils.cleanup(null, lifelineSender);
     bpos.shutdownActor(this);
   }
 
@@ -480,7 +517,9 @@ class BPServiceActor implements Runnable {
         + " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec"
         + " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec"
         + " Initial delay: " + dnConf.initialBlockReportDelayMs + "msec"
-        + "; heartBeatInterval=" + dnConf.heartBeatInterval);
+        + "; heartBeatInterval=" + dnConf.heartBeatInterval
+        + (lifelineSender != null ?
+            "; lifelineIntervalMs=" + dnConf.getLifelineIntervalMs() : ""));
     long fullBlockReportLeaseId = 0;
 
     //
@@ -684,6 +723,9 @@ class BPServiceActor implements Runnable {
       }
 
       runningState = RunningState.RUNNING;
+      if (initialRegistrationComplete != null) {
+        initialRegistrationComplete.countDown();
+      }
 
       while (shouldRun()) {
         try {
@@ -797,6 +839,135 @@ class BPServiceActor implements Runnable {
     return scheduler;
   }
 
+  private final class LifelineSender implements Runnable, Closeable {
+
+    private final InetSocketAddress lifelineNnAddr;
+    private Thread lifelineThread;
+    private DatanodeLifelineProtocolClientSideTranslatorPB lifelineNamenode;
+
+    public LifelineSender(InetSocketAddress lifelineNnAddr) {
+      this.lifelineNnAddr = lifelineNnAddr;
+    }
+
+    @Override
+    public void close() {
+      stop();
+      try {
+        join();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      IOUtils.cleanup(null, lifelineNamenode);
+    }
+
+    @Override
+    public void run() {
+      // The lifeline RPC depends on registration with the NameNode, so wait for
+      // initial registration to complete.
+      while (shouldRun()) {
+        try {
+          initialRegistrationComplete.await();
+          break;
+        } catch (InterruptedException e) {
+          // The only way thread interruption can happen while waiting on this
+          // latch is if the state of the actor has been updated to signal
+          // shutdown.  The next loop's call to shouldRun() will return false,
+          // and the thread will finish.
+          Thread.currentThread().interrupt();
+        }
+      }
+
+      // After initial NameNode registration has completed, execute the main
+      // loop for sending periodic lifeline RPCs if needed.  This is done in a
+      // second loop to avoid a pointless wait on the above latch in every
+      // iteration of the main loop.
+      while (shouldRun()) {
+        try {
+          if (lifelineNamenode == null) {
+            lifelineNamenode = dn.connectToLifelineNN(lifelineNnAddr);
+          }
+          sendLifelineIfDue();
+          Thread.sleep(scheduler.getLifelineWaitTime());
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        } catch (IOException e) {
+          LOG.warn("IOException in LifelineSender for " + BPServiceActor.this,
+              e);
+        }
+      }
+
+      LOG.info("LifelineSender for " + BPServiceActor.this + " exiting.");
+    }
+
+    public void start() {
+      lifelineThread = new Thread(this, formatThreadName("lifeline",
+          lifelineNnAddr));
+      lifelineThread.setDaemon(true);
+      lifelineThread.setUncaughtExceptionHandler(
+          new Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread thread, Throwable t) {
+              LOG.error(thread + " terminating on unexpected exception", t);
+            }
+          });
+      lifelineThread.start();
+    }
+
+    public void stop() {
+      if (lifelineThread != null) {
+        lifelineThread.interrupt();
+      }
+    }
+
+    public void join() throws InterruptedException {
+      if (lifelineThread != null) {
+        lifelineThread.join();
+      }
+    }
+
+    private void sendLifelineIfDue() throws IOException {
+      long startTime = scheduler.monotonicNow();
+      if (!scheduler.isLifelineDue(startTime)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skipping sending lifeline for " + BPServiceActor.this
+              + ", because it is not due.");
+        }
+        return;
+      }
+      if (dn.areHeartbeatsDisabledForTests()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skipping sending lifeline for " + BPServiceActor.this
+              + ", because heartbeats are disabled for tests.");
+        }
+        return;
+      }
+      sendLifeline();
+      dn.getMetrics().addLifeline(scheduler.monotonicNow() - startTime);
+      scheduler.scheduleNextLifeline(scheduler.monotonicNow());
+    }
+
+    private void sendLifeline() throws IOException {
+      StorageReport[] reports =
+          dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Sending lifeline with " + reports.length + " storage " +
+                  " reports from service actor: " + BPServiceActor.this);
+      }
+      VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
+          .getVolumeFailureSummary();
+      int numFailedVolumes = volumeFailureSummary != null ?
+          volumeFailureSummary.getFailedStorageLocations().length : 0;
+      lifelineNamenode.sendLifeline(bpRegistration,
+                                    reports,
+                                    dn.getFSDataset().getCacheCapacity(),
+                                    dn.getFSDataset().getCacheUsed(),
+                                    dn.getXmitsInProgress(),
+                                    dn.getXceiverCount(),
+                                    numFailedVolumes,
+                                    volumeFailureSummary);
+    }
+  }
+
   /**
    * Utility class that wraps the timestamp computations for scheduling
    * heartbeats and block reports.
@@ -812,16 +983,22 @@ class BPServiceActor implements Runnable {
     volatile long nextHeartbeatTime = monotonicNow();
 
     @VisibleForTesting
+    volatile long nextLifelineTime = monotonicNow();
+
+    @VisibleForTesting
     boolean resetBlockReportTime = true;
 
     private final AtomicBoolean forceFullBlockReport =
         new AtomicBoolean(false);
 
     private final long heartbeatIntervalMs;
+    private final long lifelineIntervalMs;
     private final long blockReportIntervalMs;
 
-    Scheduler(long heartbeatIntervalMs, long blockReportIntervalMs) {
+    Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs,
+        long blockReportIntervalMs) {
       this.heartbeatIntervalMs = heartbeatIntervalMs;
+      this.lifelineIntervalMs = lifelineIntervalMs;
       this.blockReportIntervalMs = blockReportIntervalMs;
     }
 
@@ -835,19 +1012,31 @@ class BPServiceActor implements Runnable {
     //    Blockreport.
     long scheduleHeartbeat() {
       nextHeartbeatTime = monotonicNow();
+      scheduleNextLifeline(nextHeartbeatTime);
       return nextHeartbeatTime;
     }
 
     long scheduleNextHeartbeat() {
       // Numerical overflow is possible here and is okay.
       nextHeartbeatTime = monotonicNow() + heartbeatIntervalMs;
+      scheduleNextLifeline(nextHeartbeatTime);
       return nextHeartbeatTime;
     }
 
+    long scheduleNextLifeline(long baseTime) {
+      // Numerical overflow is possible here and is okay.
+      nextLifelineTime = baseTime + lifelineIntervalMs;
+      return nextLifelineTime;
+    }
+
     boolean isHeartbeatDue(long startTime) {
       return (nextHeartbeatTime - startTime <= 0);
     }
 
+    boolean isLifelineDue(long startTime) {
+      return (nextLifelineTime - startTime <= 0);
+    }
+
     boolean isBlockReportDue(long curTime) {
       return nextBlockReportTime - curTime <= 0;
     }
@@ -903,6 +1092,10 @@ class BPServiceActor implements Runnable {
       return nextHeartbeatTime - monotonicNow();
     }
 
+    long getLifelineWaitTime() {
+      return nextLifelineTime - monotonicNow();
+    }
+
     /**
      * Wrapped for testing.
      * @return

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
index 08b2fb0..e94bbb7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
@@ -151,14 +151,18 @@ class BlockPoolManager {
 
     Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil
             .getNNServiceRpcAddressesForCluster(conf);
+    Map<String, Map<String, InetSocketAddress>> newLifelineAddressMap = DFSUtil
+            .getNNLifelineRpcAddressesForCluster(conf);
 
     synchronized (refreshNamenodesLock) {
-      doRefreshNamenodes(newAddressMap);
+      doRefreshNamenodes(newAddressMap, newLifelineAddressMap);
     }
   }
   
   private void doRefreshNamenodes(
-      Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {
+      Map<String, Map<String, InetSocketAddress>> addrMap,
+      Map<String, Map<String, InetSocketAddress>> lifelineAddrMap)
+      throws IOException {
     assert Thread.holdsLock(refreshNamenodesLock);
 
     Set<String> toRefresh = Sets.newLinkedHashSet();
@@ -195,9 +199,19 @@ class BlockPoolManager {
             Joiner.on(",").useForNull("<default>").join(toAdd));
       
         for (String nsToAdd : toAdd) {
+          Map<String, InetSocketAddress> nnIdToAddr = addrMap.get(nsToAdd);
+          Map<String, InetSocketAddress> nnIdToLifelineAddr =
+              lifelineAddrMap.get(nsToAdd);
           ArrayList<InetSocketAddress> addrs =
-            Lists.newArrayList(addrMap.get(nsToAdd).values());
-          BPOfferService bpos = createBPOS(addrs);
+              Lists.newArrayListWithCapacity(nnIdToAddr.size());
+          ArrayList<InetSocketAddress> lifelineAddrs =
+              Lists.newArrayListWithCapacity(nnIdToAddr.size());
+          for (String nnId : nnIdToAddr.keySet()) {
+            addrs.add(nnIdToAddr.get(nnId));
+            lifelineAddrs.add(nnIdToLifelineAddr != null ?
+                nnIdToLifelineAddr.get(nnId) : null);
+          }
+          BPOfferService bpos = createBPOS(addrs, lifelineAddrs);
           bpByNameserviceId.put(nsToAdd, bpos);
           offerServices.add(bpos);
         }
@@ -227,9 +241,19 @@ class BlockPoolManager {
       
       for (String nsToRefresh : toRefresh) {
         BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
+        Map<String, InetSocketAddress> nnIdToAddr = addrMap.get(nsToRefresh);
+        Map<String, InetSocketAddress> nnIdToLifelineAddr =
+            lifelineAddrMap.get(nsToRefresh);
         ArrayList<InetSocketAddress> addrs =
-          Lists.newArrayList(addrMap.get(nsToRefresh).values());
-        bpos.refreshNNList(addrs);
+            Lists.newArrayListWithCapacity(nnIdToAddr.size());
+        ArrayList<InetSocketAddress> lifelineAddrs =
+            Lists.newArrayListWithCapacity(nnIdToAddr.size());
+        for (String nnId : nnIdToAddr.keySet()) {
+          addrs.add(nnIdToAddr.get(nnId));
+          lifelineAddrs.add(nnIdToLifelineAddr != null ?
+              nnIdToLifelineAddr.get(nnId) : null);
+        }
+        bpos.refreshNNList(addrs, lifelineAddrs);
       }
     }
   }
@@ -237,7 +261,8 @@ class BlockPoolManager {
   /**
    * Extracted out for test purposes.
    */
-  protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
-    return new BPOfferService(nnAddrs, dn);
+  protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs,
+      List<InetSocketAddress> lifelineNnAddrs) {
+    return new BPOfferService(nnAddrs, lifelineNnAddrs, dn);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 0fa6f6e..5cff2d3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -27,6 +27,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHO
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
@@ -87,6 +88,7 @@ public class DNConf {
 
   final long readaheadLength;
   final long heartBeatInterval;
+  private final long lifelineIntervalMs;
   final long blockReportInterval;
   final long blockReportSplitThreshold;
   final long ibrInterval;
@@ -185,6 +187,20 @@ public class DNConf {
     
     heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
         DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
+    long confLifelineIntervalMs =
+        conf.getLong(DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY,
+        3 * conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
+            DFS_HEARTBEAT_INTERVAL_DEFAULT)) * 1000L;
+    if (confLifelineIntervalMs <= heartBeatInterval) {
+      confLifelineIntervalMs = 3 * heartBeatInterval;
+      DataNode.LOG.warn(
+          String.format("%s must be set to a value greater than %s.  " +
+              "Resetting value to 3 * %s, which is %d milliseconds.",
+              DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY,
+              DFS_HEARTBEAT_INTERVAL_KEY, DFS_HEARTBEAT_INTERVAL_KEY,
+              confLifelineIntervalMs));
+    }
+    lifelineIntervalMs = confLifelineIntervalMs;
     
     // do we need to sync block file contents to disk when blockfile is closed?
     this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, 
@@ -338,4 +354,13 @@ public class DNConf {
   public long getBpReadyTimeout() {
     return bpReadyTimeout;
   }
+
+  /**
+   * Returns the interval in milliseconds between sending lifeline messages.
+   *
+   * @return interval in milliseconds between sending lifeline messages
+   */
+  public long getLifelineIntervalMs() {
+    return lifelineIntervalMs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 3e2a25d..2362610 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -142,6 +142,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
 import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
@@ -1647,6 +1648,19 @@ public class DataNode extends ReconfigurableBase
     return new DatanodeProtocolClientSideTranslatorPB(nnAddr, conf);
   }
 
+  /**
+   * Connect to the NN for the lifeline protocol. This is separated out for
+   * easier testing.
+   *
+   * @param lifelineNnAddr address of lifeline RPC server
+   * @return lifeline RPC proxy
+   */
+  DatanodeLifelineProtocolClientSideTranslatorPB connectToLifelineNN(
+      InetSocketAddress lifelineNnAddr) throws IOException {
+    return new DatanodeLifelineProtocolClientSideTranslatorPB(lifelineNnAddr,
+        conf);
+  }
+
   public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
       DatanodeID datanodeid, final Configuration conf, final int socketTimeout,
       final boolean connectToDnViaHostname) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index a0f25da..aa518fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -107,6 +107,7 @@ public class DataNodeMetrics {
   @Metric MutableRate copyBlockOp;
   @Metric MutableRate replaceBlockOp;
   @Metric MutableRate heartbeats;
+  @Metric MutableRate lifelines;
   @Metric MutableRate blockReports;
   @Metric MutableRate incrementalBlockReports;
   @Metric MutableRate cacheReports;
@@ -199,6 +200,10 @@ public class DataNodeMetrics {
     heartbeats.add(latency);
   }
 
+  public void addLifeline(long latency) {
+    lifelines.add(latency);
+  }
+
   public void addBlockReport(long latency) {
     blockReports.add(latency);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 907a0ea..a5b9dc2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -3626,6 +3626,37 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
 
   /**
+   * Handles a lifeline message sent by a DataNode.  This method updates contact
+   * information and statistics for the DataNode, so that it doesn't time out.
+   * Unlike a heartbeat, this method does not dispatch any commands back to the
+   * DataNode for local execution.  This method also cannot request a lease for
+   * sending a full block report.  Lifeline messages are used only as a fallback
+   * in case something prevents successful delivery of heartbeat messages.
+   * Therefore, the implementation of this method must remain lightweight
+   * compared to heartbeat handling.  It should avoid lock contention and
+   * expensive computation.
+   *
+   * @param nodeReg registration info for DataNode sending the lifeline
+   * @param reports storage reports from DataNode
+   * @param cacheCapacity cache capacity at DataNode
+   * @param cacheUsed cache used at DataNode
+   * @param xceiverCount estimated count of transfer threads running at DataNode
+   * @param xmitsInProgress count of transfers running at DataNode
+   * @param failedVolumes count of failed volumes at DataNode
+   * @param volumeFailureSummary info on failed volumes at DataNode
+   * @throws IOException if there is an error
+   */
+  void handleLifeline(DatanodeRegistration nodeReg, StorageReport[] reports,
+      long cacheCapacity, long cacheUsed, int xceiverCount, int xmitsInProgress,
+      int failedVolumes, VolumeFailureSummary volumeFailureSummary)
+      throws IOException {
+    int maxTransfer = blockManager.getMaxReplicationStreams() - xmitsInProgress;
+    blockManager.getDatanodeManager().handleLifeline(nodeReg, reports,
+        getBlockPoolId(), cacheCapacity, cacheUsed, xceiverCount, maxTransfer,
+        failedVolumes, volumeFailureSummary);
+  }
+
+  /**
    * Returns whether or not there were available resources at the last check of
    * resources.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 275e210..0c4a440 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -19,8 +19,9 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
@@ -111,11 +112,14 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeLifelineProtocolProtos.DatanodeLifelineProtocolService;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
 import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
@@ -256,6 +260,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
     BlockingService dnProtoPbService = DatanodeProtocolService
         .newReflectiveBlockingService(dnProtoPbTranslator);
 
+    DatanodeLifelineProtocolServerSideTranslatorPB lifelineProtoPbTranslator =
+        new DatanodeLifelineProtocolServerSideTranslatorPB(this);
+    BlockingService lifelineProtoPbService = DatanodeLifelineProtocolService
+        .newReflectiveBlockingService(lifelineProtoPbTranslator);
+
     NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator = 
         new NamenodeProtocolServerSideTranslatorPB(this);
     BlockingService NNPbService = NamenodeProtocolService
@@ -371,9 +380,14 @@ class NameNodeRpcServer implements NamenodeProtocols {
           lifelineRpcAddr.getPort());
 
       int lifelineHandlerCount = conf.getInt(
-          DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY,
-          DFS_NAMENODE_LIFELINE_HANDLER_COUNT_DEFAULT);
-
+          DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY, 0);
+      if (lifelineHandlerCount <= 0) {
+        float lifelineHandlerRatio = conf.getFloat(
+            DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY,
+            DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT);
+        lifelineHandlerCount = Math.max(
+            (int)(handlerCount * lifelineHandlerRatio), 1);
+      }
       lifelineRpcServer = new RPC.Builder(conf)
           .setProtocol(HAServiceProtocolPB.class)
           .setInstance(haPbService)
@@ -384,6 +398,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
           .setSecretManager(namesystem.getDelegationTokenSecretManager())
           .build();
 
+      DFSUtil.addPBProtocol(conf, DatanodeLifelineProtocolPB.class,
+          lifelineProtoPbService, lifelineRpcServer);
+
       // Update the address with the correct port
       InetSocketAddress listenAddr = lifelineRpcServer.getListenerAddress();
       lifelineRPCAddress = new InetSocketAddress(lifelineRpcAddr.getHostName(),
@@ -1509,6 +1526,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
     return namesystem.getNamespaceInfo();
   }
 
+  @Override // DatanodeLifelineProtocol
+  public void sendLifeline(DatanodeRegistration nodeReg, StorageReport[] report,
+      long dnCacheCapacity, long dnCacheUsed, int xmitsInProgress,
+      int xceiverCount, int failedVolumes,
+      VolumeFailureSummary volumeFailureSummary) throws IOException {
+    checkNNStartup();
+    verifyRequest(nodeReg);
+    namesystem.handleLifeline(nodeReg, report, dnCacheCapacity, dnCacheUsed,
+        xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary);
+  }
+
   /** 
    * Verifies the given registration.
    * 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeLifelineProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeLifelineProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeLifelineProtocol.java
new file mode 100644
index 0000000..b30e60b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeLifelineProtocol.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.io.retry.Idempotent;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol used by a DataNode to send lifeline messages to a NameNode.
+ */
+@KerberosInfo(
+    serverPrincipal = DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
+    clientPrincipal = DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY)
+@InterfaceAudience.Private
+public interface DatanodeLifelineProtocol {
+
+  @Idempotent
+  void sendLifeline(DatanodeRegistration registration, StorageReport[] reports,
+      long dnCacheCapacity, long dnCacheUsed, int xmitsInProgress,
+      int xceiverCount, int failedVolumes,
+      VolumeFailureSummary volumeFailureSummary) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
index 4a3d83d..d874e8f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.tracing.TraceAdminProtocol;
 public interface NamenodeProtocols
   extends ClientProtocol,
           DatanodeProtocol,
+          DatanodeLifelineProtocol,
           NamenodeProtocol,
           RefreshAuthorizationPolicyProtocol,
           ReconfigurationProtocol,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeLifelineProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeLifelineProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeLifelineProtocol.proto
new file mode 100644
index 0000000..b6ab756
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeLifelineProtocol.proto
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are private and stable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *stable* .proto interface.
+ */
+
+option java_package = "org.apache.hadoop.hdfs.protocol.proto";
+option java_outer_classname = "DatanodeLifelineProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.hdfs.datanodelifeline;
+
+import "DatanodeProtocol.proto";
+
+// The lifeline protocol does not use a new request message type. Instead, it
+// reuses the existing heartbeat request message.
+
+// Unlike heartbeats, the response is empty. There is no command dispatch.
+message LifelineResponseProto {
+}
+
+service DatanodeLifelineProtocolService {
+  rpc sendLifeline(hadoop.hdfs.datanode.HeartbeatRequestProto)
+      returns(LifelineResponseProto);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index ea25a91..d837bd8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -705,6 +705,22 @@
 </property>
 
 <property>
+  <name>dfs.datanode.lifeline.interval.seconds</name>
+  <value></value>
+  <description>
+    Sets the interval in seconds between sending DataNode Lifeline Protocol
+    messages from the DataNode to the NameNode.  The value must be greater than
+    the value of dfs.heartbeat.interval.  If this property is not defined, then
+    the default behavior is to calculate the interval as 3x the value of
+    dfs.heartbeat.interval.  Note that normal heartbeat processing may cause the
+    DataNode to postpone sending lifeline messages if they are not required.
+    Under normal operations with speedy heartbeat processing, it is possible
+    that no lifeline messages will need to be sent at all.  This property has no
+    effect if dfs.namenode.lifeline.rpc-address is not defined.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.handler.count</name>
   <value>10</value>
   <description>The number of Namenode RPC server threads that listen to
@@ -725,14 +741,34 @@
 </property>
 
 <property>
+  <name>dfs.namenode.lifeline.handler.ratio</name>
+  <value>0.10</value>
+  <description>
+    A ratio applied to the value of dfs.namenode.handler.count, which then
+    provides the number of RPC server threads the NameNode runs for handling the
+    lifeline RPC server.  For example, if dfs.namenode.handler.count is 100, and
+    dfs.namenode.lifeline.handler.factor is 0.10, then the NameNode starts
+    100 * 0.10 = 10 threads for handling the lifeline RPC server.  It is common
+    to tune the value of dfs.namenode.handler.count as a function of the number
+    of DataNodes in a cluster.  Using this property allows for the lifeline RPC
+    server handler threads to be tuned automatically without needing to touch a
+    separate property.  Lifeline message processing is lightweight, so it is
+    expected to require many fewer threads than the main NameNode RPC server.
+    This property is not used if dfs.namenode.lifeline.handler.count is defined,
+    which sets an absolute thread count.  This property has no effect if
+    dfs.namenode.lifeline.rpc-address is not defined.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.lifeline.handler.count</name>
-  <value>1</value>
+  <value></value>
   <description>
-    Sets number of RPC server threads the NameNode runs for handling the
-    lifeline RPC server.  The default value is 1, because this RPC server
-    handles only HA health check requests from ZKFC.  These are lightweight
-    requests that run single-threaded from the ZKFC client side.  This property
-    has no effect if dfs.namenode.lifeline.rpc-address is not defined.
+    Sets an absolute number of RPC server threads the NameNode runs for handling
+    the DataNode Lifeline Protocol and HA health check requests from ZKFC.  If
+    this property is defined, then it overrides the behavior of
+    dfs.namenode.lifeline.handler.ratio.  By default, it is not defined.  This
+    property has no effect if dfs.namenode.lifeline.rpc-address is not defined.
   </description>
 </property>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 1421f0f..95a103e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
@@ -396,7 +397,8 @@ public class TestBPOfferService {
           Mockito.eq(new InetSocketAddress(port)));
     }
 
-    return new BPOfferService(Lists.newArrayList(nnMap.keySet()), mockDn);
+    return new BPOfferService(Lists.newArrayList(nnMap.keySet()),
+        Collections.<InetSocketAddress>nCopies(nnMap.size(), null), mockDn);
   }
 
   private void waitForInitialization(final BPOfferService bpos)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
index 27e99db..48006dc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
@@ -51,7 +51,8 @@ public class TestBlockPoolManager {
     bpm = new BlockPoolManager(mockDN){
 
       @Override
-      protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
+      protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs,
+          List<InetSocketAddress> lifelineNnAddrs) {
         final int idx = mockIdx++;
         doLog("create #" + idx);
         final BPOfferService bpos = Mockito.mock(BPOfferService.class);
@@ -66,6 +67,7 @@ public class TestBlockPoolManager {
                   return null;
                 }
               }).when(bpos).refreshNNList(
+                  Mockito.<ArrayList<InetSocketAddress>>any(),
                   Mockito.<ArrayList<InetSocketAddress>>any());
         } catch (IOException e) {
           throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
index efdd87c..76885e4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
@@ -49,6 +49,7 @@ public class TestBpServiceActorScheduler {
   public Timeout timeout = new Timeout(300000);
 
   private static final long HEARTBEAT_INTERVAL_MS = 5000;      // 5 seconds
+  private static final long LIFELINE_INTERVAL_MS = 3 * HEARTBEAT_INTERVAL_MS;
   private static final long BLOCK_REPORT_INTERVAL_MS = 10000;  // 10 seconds
   private final Random random = new Random(System.nanoTime());
 
@@ -166,9 +167,23 @@ public class TestBpServiceActorScheduler {
     }
   }
 
+  @Test
+  public void testScheduleLifeline() {
+    for (final long now : getTimestamps()) {
+      Scheduler scheduler = makeMockScheduler(now);
+      scheduler.scheduleNextLifeline(now);
+      assertFalse(scheduler.isLifelineDue(now));
+      assertThat(scheduler.getLifelineWaitTime(), is(LIFELINE_INTERVAL_MS));
+      scheduler.scheduleNextLifeline(now - LIFELINE_INTERVAL_MS);
+      assertTrue(scheduler.isLifelineDue(now));
+      assertThat(scheduler.getLifelineWaitTime(), is(0L));
+    }
+  }
+
   private Scheduler makeMockScheduler(long now) {
     LOG.info("Using now = " + now);
-    Scheduler mockScheduler = spy(new Scheduler(HEARTBEAT_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS));
+    Scheduler mockScheduler = spy(new Scheduler(HEARTBEAT_INTERVAL_MS,
+        LIFELINE_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS));
     doReturn(now).when(mockScheduler).monotonicNow();
     mockScheduler.nextBlockReportTime = now;
     mockScheduler.nextHeartbeatTime = now;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
new file mode 100644
index 0000000..fd66115
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY;
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import org.apache.log4j.Level;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Supplier;
+
+/**
+ * Test suite covering lifeline protocol handling in the DataNode.
+ */
+public class TestDataNodeLifeline {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TestDataNodeLifeline.class);
+
+  static {
+    GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
+  }
+
+  @Rule
+  public Timeout timeout = new Timeout(60000);
+
+  private MiniDFSCluster cluster;
+  private HdfsConfiguration conf;
+  private DatanodeLifelineProtocolClientSideTranslatorPB lifelineNamenode;
+  private DataNodeMetrics metrics;
+  private DatanodeProtocolClientSideTranslatorPB namenode;
+  private FSNamesystem namesystem;
+
+  @Before
+  public void setup() throws Exception {
+    // Configure cluster with lifeline RPC server enabled, and down-tune
+    // heartbeat timings to try to force quick dead/stale DataNodes.
+    conf = new HdfsConfiguration();
+    conf.setInt(DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY, 2);
+    conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1);
+    conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, "0.0.0.0:0");
+    conf.setInt(DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, 6 * 1000);
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    namesystem = cluster.getNameNode().getNamesystem();
+
+    // Set up spies on RPC proxies so that we can inject failures.
+    DataNode dn = cluster.getDataNodes().get(0);
+    metrics = dn.getMetrics();
+    assertNotNull(metrics);
+    List<BPOfferService> allBpos = dn.getAllBpOs();
+    assertNotNull(allBpos);
+    assertEquals(1, allBpos.size());
+
+    BPOfferService bpos = allBpos.get(0);
+    List<BPServiceActor> allBpsa = bpos.getBPServiceActors();
+    assertNotNull(allBpsa);
+    assertEquals(1, allBpsa.size());
+
+    final BPServiceActor bpsa = allBpsa.get(0);
+    assertNotNull(bpsa);
+
+    // Lifeline RPC proxy gets created on separate thread, so poll until found.
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+          @Override
+          public Boolean get() {
+            if (bpsa.getLifelineNameNodeProxy() != null) {
+              lifelineNamenode = spy(bpsa.getLifelineNameNodeProxy());
+              bpsa.setLifelineNameNode(lifelineNamenode);
+            }
+            return lifelineNamenode != null;
+          }
+        }, 100, 10000);
+
+    assertNotNull(bpsa.getNameNodeProxy());
+    namenode = spy(bpsa.getNameNodeProxy());
+    bpsa.setNameNode(namenode);
+  }
+
+  @After
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+      GenericTestUtils.assertNoThreadsMatching(".*lifeline.*");
+    }
+  }
+
+  @Test
+  public void testSendLifelineIfHeartbeatBlocked() throws Exception {
+    // Run the test for the duration of sending 10 lifeline RPC messages.
+    int numLifelines = 10;
+    CountDownLatch lifelinesSent = new CountDownLatch(numLifelines);
+
+    // Intercept heartbeat to inject an artificial delay, until all expected
+    // lifeline RPC messages have been sent.
+    doAnswer(new LatchAwaitingAnswer<HeartbeatResponse>(lifelinesSent))
+        .when(namenode).sendHeartbeat(
+            any(DatanodeRegistration.class),
+            any(StorageReport[].class),
+            anyLong(),
+            anyLong(),
+            anyInt(),
+            anyInt(),
+            anyInt(),
+            any(VolumeFailureSummary.class),
+            anyBoolean());
+
+    // Intercept lifeline to trigger latch count-down on each call.
+    doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
+        .when(lifelineNamenode).sendLifeline(
+            any(DatanodeRegistration.class),
+            any(StorageReport[].class),
+            anyLong(),
+            anyLong(),
+            anyInt(),
+            anyInt(),
+            anyInt(),
+            any(VolumeFailureSummary.class));
+
+    // While waiting on the latch for the expected number of lifeline messages,
+    // poll DataNode tracking information.  Thanks to the lifeline, we expect
+    // that the DataNode always stays alive, and never goes stale or dead.
+    while (!lifelinesSent.await(1, SECONDS)) {
+      assertEquals("Expect DataNode to be kept alive by lifeline.", 1,
+          namesystem.getNumLiveDataNodes());
+      assertEquals("Expect DataNode not marked dead due to lifeline.", 0,
+          namesystem.getNumDeadDataNodes());
+      assertEquals("Expect DataNode not marked stale due to lifeline.", 0,
+          namesystem.getNumStaleDataNodes());
+    }
+
+    // Verify that we did in fact call the lifeline RPC.
+    verify(lifelineNamenode, atLeastOnce()).sendLifeline(
+        any(DatanodeRegistration.class),
+        any(StorageReport[].class),
+        anyLong(),
+        anyLong(),
+        anyInt(),
+        anyInt(),
+        anyInt(),
+        any(VolumeFailureSummary.class));
+
+    // Also verify lifeline call through metrics.  We expect at least
+    // numLifelines, guaranteed by waiting on the latch.  There is a small
+    // possibility of extra lifeline calls depending on timing, so we allow
+    // slack in the assertion.
+    assertTrue("Expect metrics to count at least " + numLifelines + " calls.",
+        getLongCounter("LifelinesNumOps", getMetrics(metrics.name())) >=
+            numLifelines);
+  }
+
+  @Test
+  public void testNoLifelineSentIfHeartbeatsOnTime() throws Exception {
+    // Run the test for the duration of sending 10 heartbeat RPC messages.
+    int numHeartbeats = 10;
+    CountDownLatch heartbeatsSent = new CountDownLatch(numHeartbeats);
+
+    // Intercept heartbeat to trigger latch count-down on each call.
+    doAnswer(new LatchCountingAnswer<HeartbeatResponse>(heartbeatsSent))
+        .when(namenode).sendHeartbeat(
+            any(DatanodeRegistration.class),
+            any(StorageReport[].class),
+            anyLong(),
+            anyLong(),
+            anyInt(),
+            anyInt(),
+            anyInt(),
+            any(VolumeFailureSummary.class),
+            anyBoolean());
+
+    // While waiting on the latch for the expected number of heartbeat messages,
+    // poll DataNode tracking information.  We expect that the DataNode always
+    // stays alive, and never goes stale or dead.
+    while (!heartbeatsSent.await(1, SECONDS)) {
+      assertEquals("Expect DataNode to be kept alive by lifeline.", 1,
+          namesystem.getNumLiveDataNodes());
+      assertEquals("Expect DataNode not marked dead due to lifeline.", 0,
+          namesystem.getNumDeadDataNodes());
+      assertEquals("Expect DataNode not marked stale due to lifeline.", 0,
+          namesystem.getNumStaleDataNodes());
+    }
+
+    // Verify that we did not call the lifeline RPC.
+    verify(lifelineNamenode, never()).sendLifeline(
+        any(DatanodeRegistration.class),
+        any(StorageReport[].class),
+        anyLong(),
+        anyLong(),
+        anyInt(),
+        anyInt(),
+        anyInt(),
+        any(VolumeFailureSummary.class));
+
+    // Also verify no lifeline calls through metrics.
+    assertEquals("Expect metrics to count no lifeline calls.", 0,
+        getLongCounter("LifelinesNumOps", getMetrics(metrics.name())));
+  }
+
+  /**
+   * Waits on a {@link CountDownLatch} before calling through to the method.
+   */
+  private final class LatchAwaitingAnswer<T> implements Answer<T> {
+    private final CountDownLatch latch;
+
+    public LatchAwaitingAnswer(CountDownLatch latch) {
+      this.latch = latch;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public T answer(InvocationOnMock invocation)
+        throws Throwable {
+      LOG.info("Awaiting, remaining latch count is {}.", latch.getCount());
+      latch.await();
+      return (T)invocation.callRealMethod();
+    }
+  }
+
+  /**
+   * Counts on a {@link CountDownLatch} after each call through to the method.
+   */
+  private final class LatchCountingAnswer<T> implements Answer<T> {
+    private final CountDownLatch latch;
+
+    public LatchCountingAnswer(CountDownLatch latch) {
+      this.latch = latch;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public T answer(InvocationOnMock invocation)
+        throws Throwable {
+      T result = (T)invocation.callRealMethod();
+      latch.countDown();
+      LOG.info("Countdown, remaining latch count is {}.", latch.getCount());
+      return result;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2759689d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java
index 216ff3d..f2a5d08 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java
@@ -61,7 +61,7 @@ public class TestDatanodeRegister {
     BPOfferService mockBPOS = mock(BPOfferService.class);
     doReturn(mockDN).when(mockBPOS).getDataNode();
     
-    actor = new BPServiceActor(INVALID_ADDR, mockBPOS);
+    actor = new BPServiceActor(INVALID_ADDR, null, mockBPOS);
 
     fakeNsInfo = mock(NamespaceInfo.class);
     // Return a a good software version.