You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/09/02 21:21:25 UTC

[41/48] hadoop git commit: HDFS-11826. Federation Namenode Heartbeat. Contributed by Inigo Goiri.

HDFS-11826. Federation Namenode Heartbeat. Contributed by Inigo Goiri.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a8d935b4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a8d935b4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a8d935b4

Branch: refs/heads/HDFS-10467
Commit: a8d935b45536db54e66f3f30e1a61966bb1cfaef
Parents: 02c53b9
Author: Inigo Goiri <in...@apache.org>
Authored: Tue Aug 1 14:40:27 2017 -0700
Committer: Inigo Goiri <in...@apache.org>
Committed: Sat Sep 2 14:20:09 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  14 +
 .../java/org/apache/hadoop/hdfs/DFSUtil.java    |  38 ++
 .../resolver/NamenodeStatusReport.java          | 193 ++++++++++
 .../federation/router/FederationUtil.java       |  66 ++++
 .../router/NamenodeHeartbeatService.java        | 350 +++++++++++++++++++
 .../hdfs/server/federation/router/Router.java   | 112 ++++++
 .../src/main/resources/hdfs-default.xml         |  32 ++
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java  |   8 +
 .../hdfs/server/federation/MockResolver.java    |   9 +-
 .../server/federation/RouterConfigBuilder.java  |  22 ++
 .../server/federation/RouterDFSCluster.java     |  43 +++
 .../router/TestNamenodeHeartbeat.java           | 168 +++++++++
 .../server/federation/router/TestRouter.java    |   3 +
 13 files changed, 1057 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8d935b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index afb5bbf..d1c2b41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1144,6 +1144,20 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       FEDERATION_ROUTER_PREFIX + "rpc.enable";
   public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
 
+  // HDFS Router heartbeat
+  public static final String DFS_ROUTER_HEARTBEAT_ENABLE =
+      FEDERATION_ROUTER_PREFIX + "heartbeat.enable";
+  public static final boolean DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT = true;
+  public static final String DFS_ROUTER_HEARTBEAT_INTERVAL_MS =
+      FEDERATION_ROUTER_PREFIX + "heartbeat.interval";
+  public static final long DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT =
+      TimeUnit.SECONDS.toMillis(5);
+  public static final String DFS_ROUTER_MONITOR_NAMENODE =
+      FEDERATION_ROUTER_PREFIX + "monitor.namenode";
+  public static final String DFS_ROUTER_MONITOR_LOCAL_NAMENODE =
+      FEDERATION_ROUTER_PREFIX + "monitor.localnamenode.enable";
+  public static final boolean DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT = true;
+
   // HDFS Router NN client
   public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE =
       FEDERATION_ROUTER_PREFIX + "connection.pool-size";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8d935b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 7776dc2..29936f1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -1241,6 +1241,44 @@ public class DFSUtil {
   }
 
   /**
+   * Map a logical namenode ID to its web address. Use the given nameservice if
+   * specified, or the configured one if none is given.
+   *
+   * @param conf Configuration
+   * @param nsId which nameservice nnId is a part of, optional
+   * @param nnId the namenode ID to get the service addr for
+   * @return the service addr, null if it could not be determined
+   */
+  public static String getNamenodeWebAddr(final Configuration conf, String nsId,
+      String nnId) {
+
+    if (nsId == null) {
+      nsId = getOnlyNameServiceIdOrNull(conf);
+    }
+
+    String webAddrKey = DFSUtilClient.concatSuffixes(
+        DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, nsId, nnId);
+
+    String webAddr =
+        conf.get(webAddrKey, DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT);
+    return webAddr;
+  }
+
+  /**
+   * Get all of the Web addresses of the individual NNs in a given nameservice.
+   *
+   * @param conf Configuration
+   * @param nsId the nameservice whose NNs addresses we want.
+   * @param defaultValue default address to return in case key is not found.
+   * @return A map from nnId -> Web address of each NN in the nameservice.
+   */
+  public static Map<String, InetSocketAddress> getWebAddressesForNameserviceId(
+      Configuration conf, String nsId, String defaultValue) {
+    return DFSUtilClient.getAddressesForNameserviceId(conf, nsId, defaultValue,
+        DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
+  }
+
+  /**
    * If the configuration refers to only a single nameservice, return the
    * name of that nameservice. If it refers to 0 or more than 1, return null.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8d935b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java
index 9259048..f8759e8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java
@@ -39,8 +39,29 @@ public class NamenodeStatusReport {
   private HAServiceState status = HAServiceState.STANDBY;
   private boolean safeMode = false;
 
+  /** Datanodes stats. */
+  private int liveDatanodes = -1;
+  private int deadDatanodes = -1;
+  /** Decommissioning datanodes. */
+  private int decomDatanodes = -1;
+  /** Live decommissioned datanodes. */
+  private int liveDecomDatanodes = -1;
+  /** Dead decommissioned datanodes. */
+  private int deadDecomDatanodes = -1;
+
+  /** Space stats. */
+  private long availableSpace = -1;
+  private long numOfFiles = -1;
+  private long numOfBlocks = -1;
+  private long numOfBlocksMissing = -1;
+  private long numOfBlocksPendingReplication = -1;
+  private long numOfBlocksUnderReplicated = -1;
+  private long numOfBlocksPendingDeletion = -1;
+  private long totalSpace = -1;
+
   /** If the fields are valid. */
   private boolean registrationValid = false;
+  private boolean statsValid = false;
   private boolean haStateValid = false;
 
   public NamenodeStatusReport(String ns, String nn, String rpc, String service,
@@ -54,6 +75,15 @@ public class NamenodeStatusReport {
   }
 
   /**
+   * If the statistics are valid.
+   *
+   * @return If the statistics are valid.
+   */
+  public boolean statsValid() {
+    return this.statsValid;
+  }
+
+  /**
    * If the registration is valid.
    *
    * @return If the registration is valid.
@@ -187,6 +217,169 @@ public class NamenodeStatusReport {
     return this.safeMode;
   }
 
+  /**
+   * Set the datanode information.
+   *
+   * @param numLive Number of live nodes.
+   * @param numDead Number of dead nodes.
+   * @param numDecom Number of decommissioning nodes.
+   * @param numLiveDecom Number of decommissioned live nodes.
+   * @param numDeadDecom Number of decommissioned dead nodes.
+   */
+  public void setDatanodeInfo(int numLive, int numDead, int numDecom,
+      int numLiveDecom, int numDeadDecom) {
+    this.liveDatanodes = numLive;
+    this.deadDatanodes = numDead;
+    this.decomDatanodes = numDecom;
+    this.liveDecomDatanodes = numLiveDecom;
+    this.deadDecomDatanodes = numDeadDecom;
+    this.statsValid = true;
+  }
+
+  /**
+   * Get the number of live blocks.
+   *
+   * @return The number of dead nodes.
+   */
+  public int getNumLiveDatanodes() {
+    return this.liveDatanodes;
+  }
+
+  /**
+   * Get the number of dead blocks.
+   *
+   * @return The number of dead nodes.
+   */
+  public int getNumDeadDatanodes() {
+    return this.deadDatanodes;
+  }
+
+  /**
+   * Get the number of decommissionining nodes.
+   *
+   * @return The number of decommissionining nodes.
+   */
+  public int getNumDecommissioningDatanodes() {
+    return this.decomDatanodes;
+  }
+
+  /**
+   * Get the number of live decommissioned nodes.
+   *
+   * @return The number of live decommissioned nodes.
+   */
+  public int getNumDecomLiveDatanodes() {
+    return this.liveDecomDatanodes;
+  }
+
+  /**
+   * Get the number of dead decommissioned nodes.
+   *
+   * @return The number of dead decommissioned nodes.
+   */
+  public int getNumDecomDeadDatanodes() {
+    return this.deadDecomDatanodes;
+  }
+
+  /**
+   * Set the filesystem information.
+   *
+   * @param available Available capacity.
+   * @param total Total capacity.
+   * @param numFiles Number of files.
+   * @param numBlocks Total number of blocks.
+   * @param numBlocksMissing Number of missing blocks.
+   * @param numOfBlocksPendingReplication Number of blocks pending replication.
+   * @param numOfBlocksUnderReplicated Number of blocks under replication.
+   * @param numOfBlocksPendingDeletion Number of blocks pending deletion.
+   */
+  public void setNamesystemInfo(long available, long total,
+      long numFiles, long numBlocks, long numBlocksMissing,
+      long numBlocksPendingReplication, long numBlocksUnderReplicated,
+      long numBlocksPendingDeletion) {
+    this.totalSpace = total;
+    this.availableSpace = available;
+    this.numOfBlocks = numBlocks;
+    this.numOfBlocksMissing = numBlocksMissing;
+    this.numOfBlocksPendingReplication = numBlocksPendingReplication;
+    this.numOfBlocksUnderReplicated = numBlocksUnderReplicated;
+    this.numOfBlocksPendingDeletion = numBlocksPendingDeletion;
+    this.numOfFiles = numFiles;
+    this.statsValid = true;
+  }
+
+  /**
+   * Get the number of blocks.
+   *
+   * @return The number of blocks.
+   */
+  public long getNumBlocks() {
+    return this.numOfBlocks;
+  }
+
+  /**
+   * Get the number of files.
+   *
+   * @return The number of files.
+   */
+  public long getNumFiles() {
+    return this.numOfFiles;
+  }
+
+  /**
+   * Get the total space.
+   *
+   * @return The total space.
+   */
+  public long getTotalSpace() {
+    return this.totalSpace;
+  }
+
+  /**
+   * Get the available space.
+   *
+   * @return The available space.
+   */
+  public long getAvailableSpace() {
+    return this.availableSpace;
+  }
+
+  /**
+   * Get the number of missing blocks.
+   *
+   * @return Number of missing blocks.
+   */
+  public long getNumBlocksMissing() {
+    return this.numOfBlocksMissing;
+  }
+
+  /**
+   * Get the number of pending replication blocks.
+   *
+   * @return Number of pending replication blocks.
+   */
+  public long getNumOfBlocksPendingReplication() {
+    return this.numOfBlocksPendingReplication;
+  }
+
+  /**
+   * Get the number of under replicated blocks.
+   *
+   * @return Number of under replicated blocks.
+   */
+  public long getNumOfBlocksUnderReplicated() {
+    return this.numOfBlocksUnderReplicated;
+  }
+
+  /**
+   * Get the number of pending deletion blocks.
+   *
+   * @return Number of pending deletion blocks.
+   */
+  public long getNumOfBlocksPendingDeletion() {
+    return this.numOfBlocksPendingDeletion;
+  }
+
   @Override
   public String toString() {
     return String.format("%s-%s:%s",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8d935b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
index 0129a37..78c473a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
@@ -17,13 +17,22 @@
  */
 package org.apache.hadoop.hdfs.server.federation.router;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.lang.reflect.Constructor;
+import java.net.URL;
+import java.net.URLConnection;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +49,63 @@ public final class FederationUtil {
   }
 
   /**
+   * Get a JMX data from a web endpoint.
+   *
+   * @param beanQuery JMX bean.
+   * @param webAddress Web address of the JMX endpoint.
+   * @return JSON with the JMX data
+   */
+  public static JSONArray getJmx(String beanQuery, String webAddress) {
+    JSONArray ret = null;
+    BufferedReader reader = null;
+    try {
+      String host = webAddress;
+      int port = -1;
+      if (webAddress.indexOf(":") > 0) {
+        String[] webAddressSplit = webAddress.split(":");
+        host = webAddressSplit[0];
+        port = Integer.parseInt(webAddressSplit[1]);
+      }
+      URL jmxURL = new URL("http", host, port, "/jmx?qry=" + beanQuery);
+      URLConnection conn = jmxURL.openConnection();
+      conn.setConnectTimeout(5 * 1000);
+      conn.setReadTimeout(5 * 1000);
+      InputStream in = conn.getInputStream();
+      InputStreamReader isr = new InputStreamReader(in, "UTF-8");
+      reader = new BufferedReader(isr);
+
+      StringBuilder sb = new StringBuilder();
+      String line = null;
+      while ((line = reader.readLine()) != null) {
+        sb.append(line);
+      }
+      String jmxOutput = sb.toString();
+
+      // Parse JSON
+      JSONObject json = new JSONObject(jmxOutput);
+      ret = json.getJSONArray("beans");
+    } catch (IOException e) {
+      LOG.error("Cannot read JMX bean {} from server {}: {}",
+          beanQuery, webAddress, e.getMessage());
+    } catch (JSONException e) {
+      LOG.error("Cannot parse JMX output for {} from server {}: {}",
+          beanQuery, webAddress, e.getMessage());
+    } catch (Exception e) {
+      LOG.error("Cannot parse JMX output for {} from server {}: {}",
+          beanQuery, webAddress, e);
+    } finally {
+      if (reader != null) {
+        try {
+          reader.close();
+        } catch (IOException e) {
+          LOG.error("Problem closing {}", webAddress, e);
+        }
+      }
+    }
+    return ret;
+  }
+
+  /**
    * Create an instance of an interface with a constructor using a context.
    *
    * @param conf Configuration for the class names.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8d935b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java
new file mode 100644
index 0000000..fe4f939
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link Router} periodically checks the state of a Namenode (usually on
+ * the same server) and reports their high availability (HA) state and
+ * load/space status to the
+ * {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService}
+ * . Note that this is an optional role as a Router can be independent of any
+ * subcluster.
+ * <p>
+ * For performance with Namenode HA, the Router uses the high availability state
+ * information in the State Store to forward the request to the Namenode that is
+ * most likely to be active.
+ * <p>
+ * Note that this service can be embedded into the Namenode itself to simplify
+ * the operation.
+ */
+public class NamenodeHeartbeatService extends PeriodicService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(NamenodeHeartbeatService.class);
+
+
+  /** Configuration for the heartbeat. */
+  private Configuration conf;
+
+  /** Router performing the heartbeating. */
+  private final ActiveNamenodeResolver resolver;
+
+  /** Interface to the tracked NN. */
+  private final String nameserviceId;
+  private final String namenodeId;
+
+  /** Namenode HA target. */
+  private NNHAServiceTarget localTarget;
+  /** RPC address for the namenode. */
+  private String rpcAddress;
+  /** Service RPC address for the namenode. */
+  private String serviceAddress;
+  /** Service RPC address for the namenode. */
+  private String lifelineAddress;
+  /** HTTP address for the namenode. */
+  private String webAddress;
+
+  /**
+   * Create a new Namenode status updater.
+   * @param resolver Namenode resolver service to handle NN registration.
+   * @param nameserviceId Identifier of the nameservice.
+   * @param namenodeId Identifier of the namenode in HA.
+   */
+  public NamenodeHeartbeatService(
+      ActiveNamenodeResolver resolver, String nsId, String nnId) {
+    super(NamenodeHeartbeatService.class.getSimpleName() + " " + nsId + " " +
+        nnId);
+
+    this.resolver = resolver;
+
+    this.nameserviceId = nsId;
+    this.namenodeId = nnId;
+
+  }
+
+  @Override
+  protected void serviceInit(Configuration configuration) throws Exception {
+
+    this.conf = configuration;
+
+    if (this.namenodeId != null && !this.namenodeId.isEmpty()) {
+      this.localTarget = new NNHAServiceTarget(
+          conf, nameserviceId, namenodeId);
+    } else {
+      this.localTarget = null;
+    }
+
+    // Get the RPC address for the clients to connect
+    this.rpcAddress = getRpcAddress(conf, nameserviceId, namenodeId);
+    LOG.info("{}-{} RPC address: {}",
+        nameserviceId, namenodeId, rpcAddress);
+
+    // Get the Service RPC address for monitoring
+    this.serviceAddress =
+        DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, namenodeId);
+    if (this.serviceAddress == null) {
+      LOG.error("Cannot locate RPC service address for NN {}-{}, " +
+          "using RPC address {}", nameserviceId, namenodeId, this.rpcAddress);
+      this.serviceAddress = this.rpcAddress;
+    }
+    LOG.info("{}-{} Service RPC address: {}",
+        nameserviceId, namenodeId, serviceAddress);
+
+    // Get the Lifeline RPC address for faster monitoring
+    this.lifelineAddress =
+        DFSUtil.getNamenodeLifelineAddr(conf, nameserviceId, namenodeId);
+    if (this.lifelineAddress == null) {
+      this.lifelineAddress = this.serviceAddress;
+    }
+    LOG.info("{}-{} Lifeline RPC address: {}",
+        nameserviceId, namenodeId, lifelineAddress);
+
+    // Get the Web address for UI
+    this.webAddress =
+        DFSUtil.getNamenodeWebAddr(conf, nameserviceId, namenodeId);
+    LOG.info("{}-{} Web address: {}", nameserviceId, namenodeId, webAddress);
+
+    this.setIntervalMs(conf.getLong(
+        DFS_ROUTER_HEARTBEAT_INTERVAL_MS,
+        DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT));
+
+
+    super.serviceInit(configuration);
+  }
+
+  @Override
+  public void periodicInvoke() {
+    updateState();
+  }
+
+  /**
+   * Get the RPC address for a Namenode.
+   * @param conf Configuration.
+   * @param nsId Name service identifier.
+   * @param nnId Name node identifier.
+   * @return RPC address in format hostname:1234.
+   */
+  private static String getRpcAddress(
+      Configuration conf, String nsId, String nnId) {
+
+    // Get it from the regular RPC setting
+    String confKey = DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
+    String ret = conf.get(confKey);
+
+    if (nsId != null && nnId != null) {
+      // Get if for the proper nameservice and namenode
+      confKey = DFSUtil.addKeySuffixes(confKey, nsId, nnId);
+      ret = conf.get(confKey);
+
+      // If not available, get it from the map
+      if (ret == null) {
+        Map<String, InetSocketAddress> rpcAddresses =
+            DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null);
+        if (rpcAddresses.containsKey(nnId)) {
+          InetSocketAddress sockAddr = rpcAddresses.get(nnId);
+          InetAddress addr = sockAddr.getAddress();
+          ret = addr.getHostAddress() + ":" + sockAddr.getPort();
+        }
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * Update the state of the Namenode.
+   */
+  private void updateState() {
+    NamenodeStatusReport report = getNamenodeStatusReport();
+    if (!report.registrationValid()) {
+      // Not operational
+      LOG.error("Namenode is not operational: {}", getNamenodeDesc());
+    } else if (report.haStateValid()) {
+      // block and HA status available
+      LOG.debug("Received service state: {} from HA namenode: {}",
+          report.getState(), getNamenodeDesc());
+    } else if (localTarget == null) {
+      // block info available, HA status not expected
+      LOG.debug(
+          "Reporting non-HA namenode as operational: " + getNamenodeDesc());
+    } else {
+      // block info available, HA status should be available, but was not
+      // fetched do nothing and let the current state stand
+      return;
+    }
+    try {
+      if (!resolver.registerNamenode(report)) {
+        LOG.warn("Cannot register namenode {}", report);
+      }
+    } catch (IOException e) {
+      LOG.info("Cannot register namenode in the State Store");
+    } catch (Exception ex) {
+      LOG.error("Unhandled exception updating NN registration for {}",
+          getNamenodeDesc(), ex);
+    }
+  }
+
+  /**
+   * Get the status report for the Namenode monitored by this heartbeater.
+   * @return Namenode status report.
+   */
+  protected NamenodeStatusReport getNamenodeStatusReport() {
+    NamenodeStatusReport report = new NamenodeStatusReport(nameserviceId,
+        namenodeId, rpcAddress, serviceAddress, lifelineAddress, webAddress);
+
+    try {
+      LOG.debug("Probing NN at service address: {}", serviceAddress);
+
+      URI serviceURI = new URI("hdfs://" + serviceAddress);
+      // Read the filesystem info from RPC (required)
+      NamenodeProtocol nn = NameNodeProxies
+          .createProxy(this.conf, serviceURI, NamenodeProtocol.class)
+          .getProxy();
+
+      if (nn != null) {
+        NamespaceInfo info = nn.versionRequest();
+        if (info != null) {
+          report.setNamespaceInfo(info);
+        }
+      }
+      if (!report.registrationValid()) {
+        return report;
+      }
+
+      // Check for safemode from the client protocol. Currently optional, but
+      // should be required at some point for QoS
+      try {
+        ClientProtocol client = NameNodeProxies
+            .createProxy(this.conf, serviceURI, ClientProtocol.class)
+            .getProxy();
+        if (client != null) {
+          boolean isSafeMode = client.setSafeMode(
+              SafeModeAction.SAFEMODE_GET, false);
+          report.setSafeMode(isSafeMode);
+        }
+      } catch (Exception e) {
+        LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e);
+      }
+
+      // Read the stats from JMX (optional)
+      updateJMXParameters(webAddress, report);
+
+      if (localTarget != null) {
+        // Try to get the HA status
+        try {
+          // Determine if NN is active
+          // TODO: dynamic timeout
+          HAServiceProtocol haProtocol = localTarget.getProxy(conf, 30*1000);
+          HAServiceStatus status = haProtocol.getServiceStatus();
+          report.setHAServiceState(status.getState());
+        } catch (Throwable e) {
+          // Failed to fetch HA status, ignoring failure
+          LOG.error("Cannot fetch HA status for {}: {}",
+              getNamenodeDesc(), e.getMessage(), e);
+        }
+      }
+    } catch(IOException e) {
+      LOG.error("Cannot communicate with {}: {}",
+          getNamenodeDesc(), e.getMessage());
+    } catch(Throwable e) {
+      // Generic error that we don't know about
+      LOG.error("Unexpected exception while communicating with {}: {}",
+          getNamenodeDesc(), e.getMessage(), e);
+    }
+    return report;
+  }
+
+  /**
+   * Get the description of the Namenode to monitor.
+   * @return Description of the Namenode to monitor.
+   */
+  public String getNamenodeDesc() {
+    if (namenodeId != null && !namenodeId.isEmpty()) {
+      return nameserviceId + "-" + namenodeId + ":" + serviceAddress;
+    } else {
+      return nameserviceId + ":" + serviceAddress;
+    }
+  }
+
+  /**
+   * Get the parameters for a Namenode from JMX and add them to the report.
+   * @param webAddress Web interface of the Namenode to monitor.
+   * @param report Namenode status report to update with JMX data.
+   */
+  private void updateJMXParameters(
+      String address, NamenodeStatusReport report) {
+    try {
+      // TODO part of this should be moved to its own utility
+      String query = "Hadoop:service=NameNode,name=FSNamesystem*";
+      JSONArray aux = FederationUtil.getJmx(query, address);
+      if (aux != null) {
+        for (int i = 0; i < aux.length(); i++) {
+          JSONObject jsonObject = aux.getJSONObject(i);
+          String name = jsonObject.getString("name");
+          if (name.equals("Hadoop:service=NameNode,name=FSNamesystemState")) {
+            report.setDatanodeInfo(
+                jsonObject.getInt("NumLiveDataNodes"),
+                jsonObject.getInt("NumDeadDataNodes"),
+                jsonObject.getInt("NumDecommissioningDataNodes"),
+                jsonObject.getInt("NumDecomLiveDataNodes"),
+                jsonObject.getInt("NumDecomDeadDataNodes"));
+          } else if (name.equals(
+              "Hadoop:service=NameNode,name=FSNamesystem")) {
+            report.setNamesystemInfo(
+                jsonObject.getLong("CapacityRemaining"),
+                jsonObject.getLong("CapacityTotal"),
+                jsonObject.getLong("FilesTotal"),
+                jsonObject.getLong("BlocksTotal"),
+                jsonObject.getLong("MissingBlocks"),
+                jsonObject.getLong("PendingReplicationBlocks"),
+                jsonObject.getLong("UnderReplicatedBlocks"),
+                jsonObject.getLong("PendingDeletionBlocks"));
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8d935b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
index 019a5cd..cfddf20 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
@@ -25,12 +25,16 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
@@ -85,6 +89,8 @@ public class Router extends CompositeService {
 
   /** Interface to identify the active NN for a nameservice or blockpool ID. */
   private ActiveNamenodeResolver namenodeResolver;
+  /** Updates the namenode status in the namenode resolver. */
+  private Collection<NamenodeHeartbeatService> namenodeHearbeatServices;
 
 
   /** Usage string for help message. */
@@ -133,6 +139,22 @@ public class Router extends CompositeService {
       this.setRpcServerAddress(rpcServer.getRpcAddress());
     }
 
+    if (conf.getBoolean(
+        DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
+        DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT)) {
+
+      // Create status updater for each monitored Namenode
+      this.namenodeHearbeatServices = createNamenodeHearbeatServices();
+      for (NamenodeHeartbeatService hearbeatService :
+          this.namenodeHearbeatServices) {
+        addService(hearbeatService);
+      }
+
+      if (this.namenodeHearbeatServices.isEmpty()) {
+        LOG.error("Heartbeat is enabled but there are no namenodes to monitor");
+      }
+    }
+
     super.serviceInit(conf);
   }
 
@@ -243,6 +265,96 @@ public class Router extends CompositeService {
   }
 
   /////////////////////////////////////////////////////////
+  // Namenode heartbeat monitors
+  /////////////////////////////////////////////////////////
+
+  /**
+   * Create each of the services that will monitor a Namenode.
+   *
+   * @return List of heartbeat services.
+   */
+  protected Collection<NamenodeHeartbeatService>
+      createNamenodeHearbeatServices() {
+
+    Map<String, NamenodeHeartbeatService> ret = new HashMap<>();
+
+    if (conf.getBoolean(
+        DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE,
+        DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT)) {
+      // Create a local heartbet service
+      NamenodeHeartbeatService localHeartbeatService =
+          createLocalNamenodeHearbeatService();
+      if (localHeartbeatService != null) {
+        String nnDesc = localHeartbeatService.getNamenodeDesc();
+        ret.put(nnDesc, localHeartbeatService);
+      }
+    }
+
+    // Create heartbeat services for a list specified by the admin
+    String namenodes = this.conf.get(
+        DFSConfigKeys.DFS_ROUTER_MONITOR_NAMENODE);
+    if (namenodes != null) {
+      for (String namenode : namenodes.split(",")) {
+        String[] namenodeSplit = namenode.split("\\.");
+        String nsId = null;
+        String nnId = null;
+        if (namenodeSplit.length == 2) {
+          nsId = namenodeSplit[0];
+          nnId = namenodeSplit[1];
+        } else if (namenodeSplit.length == 1) {
+          nsId = namenode;
+        } else {
+          LOG.error("Wrong Namenode to monitor: {}", namenode);
+        }
+        if (nsId != null) {
+          NamenodeHeartbeatService heartbeatService =
+              createNamenodeHearbeatService(nsId, nnId);
+          if (heartbeatService != null) {
+            ret.put(heartbeatService.getNamenodeDesc(), heartbeatService);
+          }
+        }
+      }
+    }
+
+    return ret.values();
+  }
+
+  /**
+   * Create a new status updater for the local Namenode.
+   *
+   * @return Updater of the status for the local Namenode.
+   */
+  protected NamenodeHeartbeatService createLocalNamenodeHearbeatService() {
+    // Detect NN running in this machine
+    String nsId = DFSUtil.getNamenodeNameServiceId(conf);
+    String nnId = null;
+    if (HAUtil.isHAEnabled(conf, nsId)) {
+      nnId = HAUtil.getNameNodeId(conf, nsId);
+      if (nnId == null) {
+        LOG.error("Cannot find namenode id for local {}", nsId);
+      }
+    }
+
+    return createNamenodeHearbeatService(nsId, nnId);
+  }
+
+  /**
+   * Create a heartbeat monitor for a particular Namenode.
+   *
+   * @param nsId Identifier of the nameservice to monitor.
+   * @param nnId Identifier of the namenode (HA) to monitor.
+   * @return Updater of the status for the specified Namenode.
+   */
+  protected NamenodeHeartbeatService createNamenodeHearbeatService(
+      String nsId, String nnId) {
+
+    LOG.info("Creating heartbeat service for Namenode {} in {}", nnId, nsId);
+    NamenodeHeartbeatService ret = new NamenodeHeartbeatService(
+        namenodeResolver, nsId, nnId);
+    return ret;
+  }
+
+  /////////////////////////////////////////////////////////
   // Submodule getters
   /////////////////////////////////////////////////////////
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8d935b4/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 57a0cd8..6f00261 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4770,4 +4770,36 @@
     </description>
   </property>
 
+  <property>
+    <name>dfs.federation.router.heartbeat.enable</name>
+    <value>true</value>
+    <description>
+      Enables the Router to heartbeat into the State Store.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.heartbeat.interval</name>
+    <value>5000</value>
+    <description>
+      How often the Router should heartbeat into the State Store in milliseconds.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.monitor.namenode</name>
+    <value></value>
+    <description>
+      The identifier of the namenodes to monitor and heartbeat.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.monitor.localnamenode.enable</name>
+    <value>true</value>
+    <description>
+      If the Router should monitor the namenode in the local machine.
+    </description>
+  </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8d935b4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 0345cf5..da91006 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -582,6 +582,14 @@ public class MiniDFSCluster implements AutoCloseable {
     public void setStartOpt(StartupOption startOpt) {
       this.startOpt = startOpt;
     }
+
+    public String getNameserviceId() {
+      return this.nameserviceId;
+    }
+
+    public String getNamenodeId() {
+      return this.nnId;
+    }
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8d935b4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
index 2875750..87427fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
@@ -56,9 +56,16 @@ public class MockResolver
   private Set<FederationNamespaceInfo> namespaces = new HashSet<>();
   private String defaultNamespace = null;
 
+  public MockResolver() {
+    this.cleanRegistrations();
+  }
+
+  public MockResolver(Configuration conf) {
+    this();
+  }
 
   public MockResolver(Configuration conf, StateStoreService store) {
-    this.cleanRegistrations();
+    this();
   }
 
   public void addLocation(String mount, String nsId, String location) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8d935b4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
index 39fcf7a..21555c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
@@ -28,6 +28,8 @@ public class RouterConfigBuilder {
   private Configuration conf;
 
   private boolean enableRpcServer = false;
+  private boolean enableHeartbeat = false;
+  private boolean enableLocalHeartbeat = false;
 
   public RouterConfigBuilder(Configuration configuration) {
     this.conf = configuration;
@@ -39,6 +41,13 @@ public class RouterConfigBuilder {
 
   public RouterConfigBuilder all() {
     this.enableRpcServer = true;
+    this.enableHeartbeat = true;
+    this.enableLocalHeartbeat = true;
+    return this;
+  }
+
+  public RouterConfigBuilder enableLocalHeartbeat(boolean enable) {
+    this.enableLocalHeartbeat = enable;
     return this;
   }
 
@@ -47,12 +56,25 @@ public class RouterConfigBuilder {
     return this;
   }
 
+  public RouterConfigBuilder heartbeat(boolean enable) {
+    this.enableHeartbeat = enable;
+    return this;
+  }
+
   public RouterConfigBuilder rpc() {
     return this.rpc(true);
   }
 
+  public RouterConfigBuilder heartbeat() {
+    return this.heartbeat(true);
+  }
+
   public Configuration build() {
     conf.setBoolean(DFSConfigKeys.DFS_ROUTER_RPC_ENABLE, this.enableRpcServer);
+    conf.setBoolean(DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
+        this.enableHeartbeat);
+    conf.setBoolean(DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE,
+        this.enableLocalHeartbeat);
     return conf;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8d935b4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
index 4031b7f..0830c19 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf;
@@ -754,6 +755,48 @@ public class RouterDFSCluster {
   }
 
   /**
+   * Switch a namenode in a nameservice to be the active.
+   * @param nsId Nameservice identifier.
+   * @param nnId Namenode identifier.
+   */
+  public void switchToActive(String nsId, String nnId) {
+    try {
+      int total = cluster.getNumNameNodes();
+      NameNodeInfo[] nns = cluster.getNameNodeInfos();
+      for (int i = 0; i < total; i++) {
+        NameNodeInfo nn = nns[i];
+        if (nn.getNameserviceId().equals(nsId) &&
+            nn.getNamenodeId().equals(nnId)) {
+          cluster.transitionToActive(i);
+        }
+      }
+    } catch (Throwable e) {
+      LOG.error("Cannot transition to active", e);
+    }
+  }
+
+  /**
+   * Switch a namenode in a nameservice to be in standby.
+   * @param nsId Nameservice identifier.
+   * @param nnId Namenode identifier.
+   */
+  public void switchToStandby(String nsId, String nnId) {
+    try {
+      int total = cluster.getNumNameNodes();
+      NameNodeInfo[] nns = cluster.getNameNodeInfos();
+      for (int i = 0; i < total; i++) {
+        NameNodeInfo nn = nns[i];
+        if (nn.getNameserviceId().equals(nsId) &&
+            nn.getNamenodeId().equals(nnId)) {
+          cluster.transitionToStandby(i);
+        }
+      }
+    } catch (Throwable e) {
+      LOG.error("Cannot transition to standby", e);
+    }
+  }
+
+  /**
    * Stop the federated HDFS cluster.
    */
   public void shutdown() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8d935b4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java
new file mode 100644
index 0000000..877fb02
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import org.apache.hadoop.service.Service.STATE;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Test the service that heartbeats the state of the namenodes to the State
+ * Store.
+ */
+public class TestNamenodeHeartbeat {
+
+  private static RouterDFSCluster cluster;
+  private static ActiveNamenodeResolver namenodeResolver;
+  private static List<NamenodeHeartbeatService> services;
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void globalSetUp() throws Exception {
+
+    cluster = new RouterDFSCluster(true, 2);
+
+    // Start NNs and DNs and wait until ready
+    cluster.startCluster();
+
+    // Mock locator that records the heartbeats
+    List<String> nss = cluster.getNameservices();
+    String ns = nss.get(0);
+    Configuration conf = cluster.generateNamenodeConfiguration(ns);
+    namenodeResolver = new MockResolver(conf);
+    namenodeResolver.setRouterId("testrouter");
+
+    // Create one heartbeat service per NN
+    services = new ArrayList<>();
+    for (NamenodeContext nn : cluster.getNamenodes()) {
+      String nsId = nn.getNameserviceId();
+      String nnId = nn.getNamenodeId();
+      NamenodeHeartbeatService service = new NamenodeHeartbeatService(
+          namenodeResolver, nsId, nnId);
+      service.init(conf);
+      service.start();
+      services.add(service);
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    cluster.shutdown();
+    for (NamenodeHeartbeatService service: services) {
+      service.stop();
+      service.close();
+    }
+  }
+
+  @Test
+  public void testNamenodeHeartbeatService() throws IOException {
+
+    RouterDFSCluster testCluster = new RouterDFSCluster(true, 1);
+    Configuration heartbeatConfig = testCluster.generateNamenodeConfiguration(
+        NAMESERVICES[0]);
+    NamenodeHeartbeatService server = new NamenodeHeartbeatService(
+        namenodeResolver, NAMESERVICES[0], NAMENODES[0]);
+    server.init(heartbeatConfig);
+    assertEquals(STATE.INITED, server.getServiceState());
+    server.start();
+    assertEquals(STATE.STARTED, server.getServiceState());
+    server.stop();
+    assertEquals(STATE.STOPPED, server.getServiceState());
+    server.close();
+  }
+
+  @Test
+  public void testHearbeat() throws InterruptedException, IOException {
+
+    // Set NAMENODE1 to active for all nameservices
+    if (cluster.isHighAvailability()) {
+      for (String ns : cluster.getNameservices()) {
+        cluster.switchToActive(ns, NAMENODES[0]);
+        cluster.switchToStandby(ns, NAMENODES[1]);
+      }
+    }
+
+    // Wait for heartbeats to record
+    Thread.sleep(5000);
+
+    // Verify the locator has matching NN entries for each NS
+    for (String ns : cluster.getNameservices()) {
+      List<? extends FederationNamenodeContext> nns =
+          namenodeResolver.getNamenodesForNameserviceId(ns);
+
+      // Active
+      FederationNamenodeContext active = nns.get(0);
+      assertEquals(NAMENODES[0], active.getNamenodeId());
+
+      // Standby
+      FederationNamenodeContext standby = nns.get(1);
+      assertEquals(NAMENODES[1], standby.getNamenodeId());
+    }
+
+    // Switch active NNs in 1/2 nameservices
+    List<String> nss = cluster.getNameservices();
+    String failoverNS = nss.get(0);
+    String normalNs = nss.get(1);
+
+    cluster.switchToStandby(failoverNS, NAMENODES[0]);
+    cluster.switchToActive(failoverNS, NAMENODES[1]);
+
+    // Wait for heartbeats to record
+    Thread.sleep(5000);
+
+    // Verify the locator has recorded the failover for the failover NS
+    List<? extends FederationNamenodeContext> failoverNSs =
+        namenodeResolver.getNamenodesForNameserviceId(failoverNS);
+    // Active
+    FederationNamenodeContext active = failoverNSs.get(0);
+    assertEquals(NAMENODES[1], active.getNamenodeId());
+
+    // Standby
+    FederationNamenodeContext standby = failoverNSs.get(1);
+    assertEquals(NAMENODES[0], standby.getNamenodeId());
+
+    // Verify the locator has the same records for the other ns
+    List<? extends FederationNamenodeContext> normalNss =
+        namenodeResolver.getNamenodesForNameserviceId(normalNs);
+    // Active
+    active = normalNss.get(0);
+    assertEquals(NAMENODES[0], active.getNamenodeId());
+    // Standby
+    standby = normalNss.get(1);
+    assertEquals(NAMENODES[1], standby.getNamenodeId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8d935b4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
index d8afb39..2074d3d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
@@ -98,6 +98,9 @@ public class TestRouter {
     // Rpc only
     testRouterStartup(new RouterConfigBuilder(conf).rpc().build());
 
+    // Heartbeat only
+    testRouterStartup(new RouterConfigBuilder(conf).heartbeat().build());
+
     // Run with all services
     testRouterStartup(new RouterConfigBuilder(conf).all().build());
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org