You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/09/07 20:20:36 UTC
[33/40] hadoop git commit: HDFS-11826. Federation Namenode Heartbeat.
Contributed by Inigo Goiri.
HDFS-11826. Federation Namenode Heartbeat. Contributed by Inigo Goiri.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ae5e1352
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ae5e1352
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ae5e1352
Branch: refs/heads/HDFS-10467
Commit: ae5e13522e30cb10796c3b627ebd7fec9597a88c
Parents: b43bb80
Author: Inigo Goiri <in...@apache.org>
Authored: Tue Aug 1 14:40:27 2017 -0700
Committer: Inigo Goiri <in...@apache.org>
Committed: Thu Sep 7 13:19:51 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 14 +
.../java/org/apache/hadoop/hdfs/DFSUtil.java | 38 ++
.../resolver/NamenodeStatusReport.java | 193 ++++++++++
.../federation/router/FederationUtil.java | 66 ++++
.../router/NamenodeHeartbeatService.java | 350 +++++++++++++++++++
.../hdfs/server/federation/router/Router.java | 112 ++++++
.../src/main/resources/hdfs-default.xml | 32 ++
.../org/apache/hadoop/hdfs/MiniDFSCluster.java | 8 +
.../hdfs/server/federation/MockResolver.java | 9 +-
.../server/federation/RouterConfigBuilder.java | 22 ++
.../server/federation/RouterDFSCluster.java | 43 +++
.../router/TestNamenodeHeartbeat.java | 168 +++++++++
.../server/federation/router/TestRouter.java | 3 +
13 files changed, 1057 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5e1352/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index b9c7408..1ad34d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1147,6 +1147,20 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
FEDERATION_ROUTER_PREFIX + "rpc.enable";
public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
+ // HDFS Router heartbeat
+ public static final String DFS_ROUTER_HEARTBEAT_ENABLE =
+ FEDERATION_ROUTER_PREFIX + "heartbeat.enable";
+ public static final boolean DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT = true;
+ public static final String DFS_ROUTER_HEARTBEAT_INTERVAL_MS =
+ FEDERATION_ROUTER_PREFIX + "heartbeat.interval";
+ public static final long DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT =
+ TimeUnit.SECONDS.toMillis(5);
+ public static final String DFS_ROUTER_MONITOR_NAMENODE =
+ FEDERATION_ROUTER_PREFIX + "monitor.namenode";
+ public static final String DFS_ROUTER_MONITOR_LOCAL_NAMENODE =
+ FEDERATION_ROUTER_PREFIX + "monitor.localnamenode.enable";
+ public static final boolean DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT = true;
+
// HDFS Router NN client
public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE =
FEDERATION_ROUTER_PREFIX + "connection.pool-size";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5e1352/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 7776dc2..29936f1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -1241,6 +1241,44 @@ public class DFSUtil {
}
/**
+ * Map a logical namenode ID to its web address. Use the given nameservice if
+ * specified, or the configured one if none is given.
+ *
+ * @param conf Configuration
+ * @param nsId which nameservice nnId is a part of, optional
+ * @param nnId the namenode ID to get the service addr for
+ * @return the service addr, null if it could not be determined
+ */
+ public static String getNamenodeWebAddr(final Configuration conf, String nsId,
+ String nnId) {
+
+ if (nsId == null) {
+ nsId = getOnlyNameServiceIdOrNull(conf);
+ }
+
+ String webAddrKey = DFSUtilClient.concatSuffixes(
+ DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, nsId, nnId);
+
+ String webAddr =
+ conf.get(webAddrKey, DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT);
+ return webAddr;
+ }
+
+ /**
+ * Get all of the Web addresses of the individual NNs in a given nameservice.
+ *
+ * @param conf Configuration
+ * @param nsId the nameservice whose NNs addresses we want.
+ * @param defaultValue default address to return in case key is not found.
+ * @return A map from nnId -> Web address of each NN in the nameservice.
+ */
+ public static Map<String, InetSocketAddress> getWebAddressesForNameserviceId(
+ Configuration conf, String nsId, String defaultValue) {
+ return DFSUtilClient.getAddressesForNameserviceId(conf, nsId, defaultValue,
+ DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
+ }
+
+ /**
* If the configuration refers to only a single nameservice, return the
* name of that nameservice. If it refers to 0 or more than 1, return null.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5e1352/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java
index 9259048..f8759e8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java
@@ -39,8 +39,29 @@ public class NamenodeStatusReport {
private HAServiceState status = HAServiceState.STANDBY;
private boolean safeMode = false;
+ /** Datanodes stats. */
+ private int liveDatanodes = -1;
+ private int deadDatanodes = -1;
+ /** Decommissioning datanodes. */
+ private int decomDatanodes = -1;
+ /** Live decommissioned datanodes. */
+ private int liveDecomDatanodes = -1;
+ /** Dead decommissioned datanodes. */
+ private int deadDecomDatanodes = -1;
+
+ /** Space stats. */
+ private long availableSpace = -1;
+ private long numOfFiles = -1;
+ private long numOfBlocks = -1;
+ private long numOfBlocksMissing = -1;
+ private long numOfBlocksPendingReplication = -1;
+ private long numOfBlocksUnderReplicated = -1;
+ private long numOfBlocksPendingDeletion = -1;
+ private long totalSpace = -1;
+
/** If the fields are valid. */
private boolean registrationValid = false;
+ private boolean statsValid = false;
private boolean haStateValid = false;
public NamenodeStatusReport(String ns, String nn, String rpc, String service,
@@ -54,6 +75,15 @@ public class NamenodeStatusReport {
}
/**
+ * If the statistics are valid.
+ *
+ * @return If the statistics are valid.
+ */
+ public boolean statsValid() {
+ return this.statsValid;
+ }
+
+ /**
* If the registration is valid.
*
* @return If the registration is valid.
@@ -187,6 +217,169 @@ public class NamenodeStatusReport {
return this.safeMode;
}
+ /**
+ * Set the datanode information.
+ *
+ * @param numLive Number of live nodes.
+ * @param numDead Number of dead nodes.
+ * @param numDecom Number of decommissioning nodes.
+ * @param numLiveDecom Number of decommissioned live nodes.
+ * @param numDeadDecom Number of decommissioned dead nodes.
+ */
+ public void setDatanodeInfo(int numLive, int numDead, int numDecom,
+ int numLiveDecom, int numDeadDecom) {
+ this.liveDatanodes = numLive;
+ this.deadDatanodes = numDead;
+ this.decomDatanodes = numDecom;
+ this.liveDecomDatanodes = numLiveDecom;
+ this.deadDecomDatanodes = numDeadDecom;
+ this.statsValid = true;
+ }
+
+ /**
+ * Get the number of live blocks.
+ *
+ * @return The number of dead nodes.
+ */
+ public int getNumLiveDatanodes() {
+ return this.liveDatanodes;
+ }
+
+ /**
+ * Get the number of dead blocks.
+ *
+ * @return The number of dead nodes.
+ */
+ public int getNumDeadDatanodes() {
+ return this.deadDatanodes;
+ }
+
+ /**
+ * Get the number of decommissionining nodes.
+ *
+ * @return The number of decommissionining nodes.
+ */
+ public int getNumDecommissioningDatanodes() {
+ return this.decomDatanodes;
+ }
+
+ /**
+ * Get the number of live decommissioned nodes.
+ *
+ * @return The number of live decommissioned nodes.
+ */
+ public int getNumDecomLiveDatanodes() {
+ return this.liveDecomDatanodes;
+ }
+
+ /**
+ * Get the number of dead decommissioned nodes.
+ *
+ * @return The number of dead decommissioned nodes.
+ */
+ public int getNumDecomDeadDatanodes() {
+ return this.deadDecomDatanodes;
+ }
+
+ /**
+ * Set the filesystem information.
+ *
+ * @param available Available capacity.
+ * @param total Total capacity.
+ * @param numFiles Number of files.
+ * @param numBlocks Total number of blocks.
+ * @param numBlocksMissing Number of missing blocks.
+ * @param numOfBlocksPendingReplication Number of blocks pending replication.
+ * @param numOfBlocksUnderReplicated Number of blocks under replication.
+ * @param numOfBlocksPendingDeletion Number of blocks pending deletion.
+ */
+ public void setNamesystemInfo(long available, long total,
+ long numFiles, long numBlocks, long numBlocksMissing,
+ long numBlocksPendingReplication, long numBlocksUnderReplicated,
+ long numBlocksPendingDeletion) {
+ this.totalSpace = total;
+ this.availableSpace = available;
+ this.numOfBlocks = numBlocks;
+ this.numOfBlocksMissing = numBlocksMissing;
+ this.numOfBlocksPendingReplication = numBlocksPendingReplication;
+ this.numOfBlocksUnderReplicated = numBlocksUnderReplicated;
+ this.numOfBlocksPendingDeletion = numBlocksPendingDeletion;
+ this.numOfFiles = numFiles;
+ this.statsValid = true;
+ }
+
+ /**
+ * Get the number of blocks.
+ *
+ * @return The number of blocks.
+ */
+ public long getNumBlocks() {
+ return this.numOfBlocks;
+ }
+
+ /**
+ * Get the number of files.
+ *
+ * @return The number of files.
+ */
+ public long getNumFiles() {
+ return this.numOfFiles;
+ }
+
+ /**
+ * Get the total space.
+ *
+ * @return The total space.
+ */
+ public long getTotalSpace() {
+ return this.totalSpace;
+ }
+
+ /**
+ * Get the available space.
+ *
+ * @return The available space.
+ */
+ public long getAvailableSpace() {
+ return this.availableSpace;
+ }
+
+ /**
+ * Get the number of missing blocks.
+ *
+ * @return Number of missing blocks.
+ */
+ public long getNumBlocksMissing() {
+ return this.numOfBlocksMissing;
+ }
+
+ /**
+ * Get the number of pending replication blocks.
+ *
+ * @return Number of pending replication blocks.
+ */
+ public long getNumOfBlocksPendingReplication() {
+ return this.numOfBlocksPendingReplication;
+ }
+
+ /**
+ * Get the number of under replicated blocks.
+ *
+ * @return Number of under replicated blocks.
+ */
+ public long getNumOfBlocksUnderReplicated() {
+ return this.numOfBlocksUnderReplicated;
+ }
+
+ /**
+ * Get the number of pending deletion blocks.
+ *
+ * @return Number of pending deletion blocks.
+ */
+ public long getNumOfBlocksPendingDeletion() {
+ return this.numOfBlocksPendingDeletion;
+ }
+
@Override
public String toString() {
return String.format("%s-%s:%s",
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5e1352/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
index 0129a37..78c473a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
@@ -17,13 +17,22 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.lang.reflect.Constructor;
+import java.net.URL;
+import java.net.URLConnection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +49,63 @@ public final class FederationUtil {
}
/**
+ * Get a JMX data from a web endpoint.
+ *
+ * @param beanQuery JMX bean.
+ * @param webAddress Web address of the JMX endpoint.
+ * @return JSON with the JMX data
+ */
+ public static JSONArray getJmx(String beanQuery, String webAddress) {
+ JSONArray ret = null;
+ BufferedReader reader = null;
+ try {
+ String host = webAddress;
+ int port = -1;
+ if (webAddress.indexOf(":") > 0) {
+ String[] webAddressSplit = webAddress.split(":");
+ host = webAddressSplit[0];
+ port = Integer.parseInt(webAddressSplit[1]);
+ }
+ URL jmxURL = new URL("http", host, port, "/jmx?qry=" + beanQuery);
+ URLConnection conn = jmxURL.openConnection();
+ conn.setConnectTimeout(5 * 1000);
+ conn.setReadTimeout(5 * 1000);
+ InputStream in = conn.getInputStream();
+ InputStreamReader isr = new InputStreamReader(in, "UTF-8");
+ reader = new BufferedReader(isr);
+
+ StringBuilder sb = new StringBuilder();
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ sb.append(line);
+ }
+ String jmxOutput = sb.toString();
+
+ // Parse JSON
+ JSONObject json = new JSONObject(jmxOutput);
+ ret = json.getJSONArray("beans");
+ } catch (IOException e) {
+ LOG.error("Cannot read JMX bean {} from server {}: {}",
+ beanQuery, webAddress, e.getMessage());
+ } catch (JSONException e) {
+ LOG.error("Cannot parse JMX output for {} from server {}: {}",
+ beanQuery, webAddress, e.getMessage());
+ } catch (Exception e) {
+ LOG.error("Cannot parse JMX output for {} from server {}: {}",
+ beanQuery, webAddress, e);
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ LOG.error("Problem closing {}", webAddress, e);
+ }
+ }
+ }
+ return ret;
+ }
+
+ /**
* Create an instance of an interface with a constructor using a context.
*
* @param conf Configuration for the class names.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5e1352/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java
new file mode 100644
index 0000000..fe4f939
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link Router} periodically checks the state of a Namenode (usually on
+ * the same server) and reports their high availability (HA) state and
+ * load/space status to the
+ * {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService}
+ * . Note that this is an optional role as a Router can be independent of any
+ * subcluster.
+ * <p>
+ * For performance with Namenode HA, the Router uses the high availability state
+ * information in the State Store to forward the request to the Namenode that is
+ * most likely to be active.
+ * <p>
+ * Note that this service can be embedded into the Namenode itself to simplify
+ * the operation.
+ */
+public class NamenodeHeartbeatService extends PeriodicService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(NamenodeHeartbeatService.class);
+
+
+ /** Configuration for the heartbeat. */
+ private Configuration conf;
+
+ /** Router performing the heartbeating. */
+ private final ActiveNamenodeResolver resolver;
+
+ /** Interface to the tracked NN. */
+ private final String nameserviceId;
+ private final String namenodeId;
+
+ /** Namenode HA target. */
+ private NNHAServiceTarget localTarget;
+ /** RPC address for the namenode. */
+ private String rpcAddress;
+ /** Service RPC address for the namenode. */
+ private String serviceAddress;
+ /** Service RPC address for the namenode. */
+ private String lifelineAddress;
+ /** HTTP address for the namenode. */
+ private String webAddress;
+
+ /**
+ * Create a new Namenode status updater.
+ * @param resolver Namenode resolver service to handle NN registration.
+ * @param nameserviceId Identifier of the nameservice.
+ * @param namenodeId Identifier of the namenode in HA.
+ */
+ public NamenodeHeartbeatService(
+ ActiveNamenodeResolver resolver, String nsId, String nnId) {
+ super(NamenodeHeartbeatService.class.getSimpleName() + " " + nsId + " " +
+ nnId);
+
+ this.resolver = resolver;
+
+ this.nameserviceId = nsId;
+ this.namenodeId = nnId;
+
+ }
+
+ @Override
+ protected void serviceInit(Configuration configuration) throws Exception {
+
+ this.conf = configuration;
+
+ if (this.namenodeId != null && !this.namenodeId.isEmpty()) {
+ this.localTarget = new NNHAServiceTarget(
+ conf, nameserviceId, namenodeId);
+ } else {
+ this.localTarget = null;
+ }
+
+ // Get the RPC address for the clients to connect
+ this.rpcAddress = getRpcAddress(conf, nameserviceId, namenodeId);
+ LOG.info("{}-{} RPC address: {}",
+ nameserviceId, namenodeId, rpcAddress);
+
+ // Get the Service RPC address for monitoring
+ this.serviceAddress =
+ DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, namenodeId);
+ if (this.serviceAddress == null) {
+ LOG.error("Cannot locate RPC service address for NN {}-{}, " +
+ "using RPC address {}", nameserviceId, namenodeId, this.rpcAddress);
+ this.serviceAddress = this.rpcAddress;
+ }
+ LOG.info("{}-{} Service RPC address: {}",
+ nameserviceId, namenodeId, serviceAddress);
+
+ // Get the Lifeline RPC address for faster monitoring
+ this.lifelineAddress =
+ DFSUtil.getNamenodeLifelineAddr(conf, nameserviceId, namenodeId);
+ if (this.lifelineAddress == null) {
+ this.lifelineAddress = this.serviceAddress;
+ }
+ LOG.info("{}-{} Lifeline RPC address: {}",
+ nameserviceId, namenodeId, lifelineAddress);
+
+ // Get the Web address for UI
+ this.webAddress =
+ DFSUtil.getNamenodeWebAddr(conf, nameserviceId, namenodeId);
+ LOG.info("{}-{} Web address: {}", nameserviceId, namenodeId, webAddress);
+
+ this.setIntervalMs(conf.getLong(
+ DFS_ROUTER_HEARTBEAT_INTERVAL_MS,
+ DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT));
+
+
+ super.serviceInit(configuration);
+ }
+
+ @Override
+ public void periodicInvoke() {
+ updateState();
+ }
+
+ /**
+ * Get the RPC address for a Namenode.
+ * @param conf Configuration.
+ * @param nsId Name service identifier.
+ * @param nnId Name node identifier.
+ * @return RPC address in format hostname:1234.
+ */
+ private static String getRpcAddress(
+ Configuration conf, String nsId, String nnId) {
+
+ // Get it from the regular RPC setting
+ String confKey = DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
+ String ret = conf.get(confKey);
+
+ if (nsId != null && nnId != null) {
+ // Get if for the proper nameservice and namenode
+ confKey = DFSUtil.addKeySuffixes(confKey, nsId, nnId);
+ ret = conf.get(confKey);
+
+ // If not available, get it from the map
+ if (ret == null) {
+ Map<String, InetSocketAddress> rpcAddresses =
+ DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null);
+ if (rpcAddresses.containsKey(nnId)) {
+ InetSocketAddress sockAddr = rpcAddresses.get(nnId);
+ InetAddress addr = sockAddr.getAddress();
+ ret = addr.getHostAddress() + ":" + sockAddr.getPort();
+ }
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Update the state of the Namenode.
+ */
+ private void updateState() {
+ NamenodeStatusReport report = getNamenodeStatusReport();
+ if (!report.registrationValid()) {
+ // Not operational
+ LOG.error("Namenode is not operational: {}", getNamenodeDesc());
+ } else if (report.haStateValid()) {
+ // block and HA status available
+ LOG.debug("Received service state: {} from HA namenode: {}",
+ report.getState(), getNamenodeDesc());
+ } else if (localTarget == null) {
+ // block info available, HA status not expected
+ LOG.debug(
+ "Reporting non-HA namenode as operational: " + getNamenodeDesc());
+ } else {
+ // block info available, HA status should be available, but was not
+ // fetched do nothing and let the current state stand
+ return;
+ }
+ try {
+ if (!resolver.registerNamenode(report)) {
+ LOG.warn("Cannot register namenode {}", report);
+ }
+ } catch (IOException e) {
+ LOG.info("Cannot register namenode in the State Store");
+ } catch (Exception ex) {
+ LOG.error("Unhandled exception updating NN registration for {}",
+ getNamenodeDesc(), ex);
+ }
+ }
+
+ /**
+ * Get the status report for the Namenode monitored by this heartbeater.
+ * @return Namenode status report.
+ */
+ protected NamenodeStatusReport getNamenodeStatusReport() {
+ NamenodeStatusReport report = new NamenodeStatusReport(nameserviceId,
+ namenodeId, rpcAddress, serviceAddress, lifelineAddress, webAddress);
+
+ try {
+ LOG.debug("Probing NN at service address: {}", serviceAddress);
+
+ URI serviceURI = new URI("hdfs://" + serviceAddress);
+ // Read the filesystem info from RPC (required)
+ NamenodeProtocol nn = NameNodeProxies
+ .createProxy(this.conf, serviceURI, NamenodeProtocol.class)
+ .getProxy();
+
+ if (nn != null) {
+ NamespaceInfo info = nn.versionRequest();
+ if (info != null) {
+ report.setNamespaceInfo(info);
+ }
+ }
+ if (!report.registrationValid()) {
+ return report;
+ }
+
+ // Check for safemode from the client protocol. Currently optional, but
+ // should be required at some point for QoS
+ try {
+ ClientProtocol client = NameNodeProxies
+ .createProxy(this.conf, serviceURI, ClientProtocol.class)
+ .getProxy();
+ if (client != null) {
+ boolean isSafeMode = client.setSafeMode(
+ SafeModeAction.SAFEMODE_GET, false);
+ report.setSafeMode(isSafeMode);
+ }
+ } catch (Exception e) {
+ LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e);
+ }
+
+ // Read the stats from JMX (optional)
+ updateJMXParameters(webAddress, report);
+
+ if (localTarget != null) {
+ // Try to get the HA status
+ try {
+ // Determine if NN is active
+ // TODO: dynamic timeout
+ HAServiceProtocol haProtocol = localTarget.getProxy(conf, 30*1000);
+ HAServiceStatus status = haProtocol.getServiceStatus();
+ report.setHAServiceState(status.getState());
+ } catch (Throwable e) {
+ // Failed to fetch HA status, ignoring failure
+ LOG.error("Cannot fetch HA status for {}: {}",
+ getNamenodeDesc(), e.getMessage(), e);
+ }
+ }
+ } catch(IOException e) {
+ LOG.error("Cannot communicate with {}: {}",
+ getNamenodeDesc(), e.getMessage());
+ } catch(Throwable e) {
+ // Generic error that we don't know about
+ LOG.error("Unexpected exception while communicating with {}: {}",
+ getNamenodeDesc(), e.getMessage(), e);
+ }
+ return report;
+ }
+
+ /**
+ * Get the description of the Namenode to monitor.
+ * @return Description of the Namenode to monitor.
+ */
+ public String getNamenodeDesc() {
+ if (namenodeId != null && !namenodeId.isEmpty()) {
+ return nameserviceId + "-" + namenodeId + ":" + serviceAddress;
+ } else {
+ return nameserviceId + ":" + serviceAddress;
+ }
+ }
+
+ /**
+ * Get the parameters for a Namenode from JMX and add them to the report.
+ * @param webAddress Web interface of the Namenode to monitor.
+ * @param report Namenode status report to update with JMX data.
+ */
+ private void updateJMXParameters(
+ String address, NamenodeStatusReport report) {
+ try {
+ // TODO part of this should be moved to its own utility
+ String query = "Hadoop:service=NameNode,name=FSNamesystem*";
+ JSONArray aux = FederationUtil.getJmx(query, address);
+ if (aux != null) {
+ for (int i = 0; i < aux.length(); i++) {
+ JSONObject jsonObject = aux.getJSONObject(i);
+ String name = jsonObject.getString("name");
+ if (name.equals("Hadoop:service=NameNode,name=FSNamesystemState")) {
+ report.setDatanodeInfo(
+ jsonObject.getInt("NumLiveDataNodes"),
+ jsonObject.getInt("NumDeadDataNodes"),
+ jsonObject.getInt("NumDecommissioningDataNodes"),
+ jsonObject.getInt("NumDecomLiveDataNodes"),
+ jsonObject.getInt("NumDecomDeadDataNodes"));
+ } else if (name.equals(
+ "Hadoop:service=NameNode,name=FSNamesystem")) {
+ report.setNamesystemInfo(
+ jsonObject.getLong("CapacityRemaining"),
+ jsonObject.getLong("CapacityTotal"),
+ jsonObject.getLong("FilesTotal"),
+ jsonObject.getLong("BlocksTotal"),
+ jsonObject.getLong("MissingBlocks"),
+ jsonObject.getLong("PendingReplicationBlocks"),
+ jsonObject.getLong("UnderReplicatedBlocks"),
+ jsonObject.getLong("PendingDeletionBlocks"));
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5e1352/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
index 019a5cd..cfddf20 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
@@ -25,12 +25,16 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
@@ -85,6 +89,8 @@ public class Router extends CompositeService {
/** Interface to identify the active NN for a nameservice or blockpool ID. */
private ActiveNamenodeResolver namenodeResolver;
+ /** Updates the namenode status in the namenode resolver. */
+ private Collection<NamenodeHeartbeatService> namenodeHearbeatServices;
/** Usage string for help message. */
@@ -133,6 +139,22 @@ public class Router extends CompositeService {
this.setRpcServerAddress(rpcServer.getRpcAddress());
}
+ if (conf.getBoolean(
+ DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
+ DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT)) {
+
+ // Create status updater for each monitored Namenode
+ this.namenodeHearbeatServices = createNamenodeHearbeatServices();
+ for (NamenodeHeartbeatService hearbeatService :
+ this.namenodeHearbeatServices) {
+ addService(hearbeatService);
+ }
+
+ if (this.namenodeHearbeatServices.isEmpty()) {
+ LOG.error("Heartbeat is enabled but there are no namenodes to monitor");
+ }
+ }
+
super.serviceInit(conf);
}
@@ -243,6 +265,96 @@ public class Router extends CompositeService {
}
/////////////////////////////////////////////////////////
+ // Namenode heartbeat monitors
+ /////////////////////////////////////////////////////////
+
+ /**
+ * Create each of the services that will monitor a Namenode.
+ *
+ * @return List of heartbeat services.
+ */
+ protected Collection<NamenodeHeartbeatService>
+ createNamenodeHearbeatServices() {
+
+ Map<String, NamenodeHeartbeatService> ret = new HashMap<>();
+
+ if (conf.getBoolean(
+ DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE,
+ DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT)) {
+ // Create a local heartbet service
+ NamenodeHeartbeatService localHeartbeatService =
+ createLocalNamenodeHearbeatService();
+ if (localHeartbeatService != null) {
+ String nnDesc = localHeartbeatService.getNamenodeDesc();
+ ret.put(nnDesc, localHeartbeatService);
+ }
+ }
+
+ // Create heartbeat services for a list specified by the admin
+ String namenodes = this.conf.get(
+ DFSConfigKeys.DFS_ROUTER_MONITOR_NAMENODE);
+ if (namenodes != null) {
+ for (String namenode : namenodes.split(",")) {
+ String[] namenodeSplit = namenode.split("\\.");
+ String nsId = null;
+ String nnId = null;
+ if (namenodeSplit.length == 2) {
+ nsId = namenodeSplit[0];
+ nnId = namenodeSplit[1];
+ } else if (namenodeSplit.length == 1) {
+ nsId = namenode;
+ } else {
+ LOG.error("Wrong Namenode to monitor: {}", namenode);
+ }
+ if (nsId != null) {
+ NamenodeHeartbeatService heartbeatService =
+ createNamenodeHearbeatService(nsId, nnId);
+ if (heartbeatService != null) {
+ ret.put(heartbeatService.getNamenodeDesc(), heartbeatService);
+ }
+ }
+ }
+ }
+
+ return ret.values();
+ }
+
+ /**
+ * Create a new status updater for the local Namenode.
+ *
+ * @return Updater of the status for the local Namenode.
+ */
+ protected NamenodeHeartbeatService createLocalNamenodeHearbeatService() {
+ // Detect NN running in this machine
+ String nsId = DFSUtil.getNamenodeNameServiceId(conf);
+ String nnId = null;
+ if (HAUtil.isHAEnabled(conf, nsId)) {
+ nnId = HAUtil.getNameNodeId(conf, nsId);
+ if (nnId == null) {
+ LOG.error("Cannot find namenode id for local {}", nsId);
+ }
+ }
+
+ return createNamenodeHearbeatService(nsId, nnId);
+ }
+
+ /**
+ * Create a heartbeat monitor for a particular Namenode.
+ *
+ * @param nsId Identifier of the nameservice to monitor.
+ * @param nnId Identifier of the namenode (HA) to monitor.
+ * @return Updater of the status for the specified Namenode.
+ */
+ protected NamenodeHeartbeatService createNamenodeHearbeatService(
+ String nsId, String nnId) {
+
+ LOG.info("Creating heartbeat service for Namenode {} in {}", nnId, nsId);
+ NamenodeHeartbeatService ret = new NamenodeHeartbeatService(
+ namenodeResolver, nsId, nnId);
+ return ret;
+ }
+
+ /////////////////////////////////////////////////////////
// Submodule getters
/////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5e1352/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 3189468..b7c0f5d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4782,4 +4782,36 @@
</description>
</property>
+ <property>
+ <name>dfs.federation.router.heartbeat.enable</name>
+ <value>true</value>
+ <description>
+ Enables the Router to heartbeat into the State Store.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.federation.router.heartbeat.interval</name>
+ <value>5000</value>
+ <description>
+ How often the Router should heartbeat into the State Store in milliseconds.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.federation.router.monitor.namenode</name>
+ <value></value>
+ <description>
+ The identifier of the namenodes to monitor and heartbeat.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.federation.router.monitor.localnamenode.enable</name>
+ <value>true</value>
+ <description>
+ If the Router should monitor the namenode in the local machine.
+ </description>
+ </property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5e1352/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 0345cf5..da91006 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -582,6 +582,14 @@ public class MiniDFSCluster implements AutoCloseable {
public void setStartOpt(StartupOption startOpt) {
this.startOpt = startOpt;
}
+
+ public String getNameserviceId() {
+ return this.nameserviceId;
+ }
+
+ public String getNamenodeId() {
+ return this.nnId;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5e1352/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
index 2875750..87427fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
@@ -56,9 +56,16 @@ public class MockResolver
private Set<FederationNamespaceInfo> namespaces = new HashSet<>();
private String defaultNamespace = null;
+ public MockResolver() {
+ this.cleanRegistrations();
+ }
+
+ public MockResolver(Configuration conf) {
+ this();
+ }
public MockResolver(Configuration conf, StateStoreService store) {
- this.cleanRegistrations();
+ this();
}
public void addLocation(String mount, String nsId, String location) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5e1352/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
index 39fcf7a..21555c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
@@ -28,6 +28,8 @@ public class RouterConfigBuilder {
private Configuration conf;
private boolean enableRpcServer = false;
+ private boolean enableHeartbeat = false;
+ private boolean enableLocalHeartbeat = false;
public RouterConfigBuilder(Configuration configuration) {
this.conf = configuration;
@@ -39,6 +41,13 @@ public class RouterConfigBuilder {
public RouterConfigBuilder all() {
this.enableRpcServer = true;
+ this.enableHeartbeat = true;
+ this.enableLocalHeartbeat = true;
+ return this;
+ }
+
+ public RouterConfigBuilder enableLocalHeartbeat(boolean enable) {
+ this.enableLocalHeartbeat = enable;
return this;
}
@@ -47,12 +56,25 @@ public class RouterConfigBuilder {
return this;
}
+ public RouterConfigBuilder heartbeat(boolean enable) {
+ this.enableHeartbeat = enable;
+ return this;
+ }
+
public RouterConfigBuilder rpc() {
return this.rpc(true);
}
+ public RouterConfigBuilder heartbeat() {
+ return this.heartbeat(true);
+ }
+
public Configuration build() {
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_RPC_ENABLE, this.enableRpcServer);
+ conf.setBoolean(DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
+ this.enableHeartbeat);
+ conf.setBoolean(DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE,
+ this.enableLocalHeartbeat);
return conf;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5e1352/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
index 4031b7f..0830c19 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf;
@@ -754,6 +755,48 @@ public class RouterDFSCluster {
}
/**
+ * Switch a namenode in a nameservice to be the active.
+ * @param nsId Nameservice identifier.
+ * @param nnId Namenode identifier.
+ */
+ public void switchToActive(String nsId, String nnId) {
+ try {
+ int total = cluster.getNumNameNodes();
+ NameNodeInfo[] nns = cluster.getNameNodeInfos();
+ for (int i = 0; i < total; i++) {
+ NameNodeInfo nn = nns[i];
+ if (nn.getNameserviceId().equals(nsId) &&
+ nn.getNamenodeId().equals(nnId)) {
+ cluster.transitionToActive(i);
+ }
+ }
+ } catch (Throwable e) {
+ LOG.error("Cannot transition to active", e);
+ }
+ }
+
+ /**
+ * Switch a namenode in a nameservice to be in standby.
+ * @param nsId Nameservice identifier.
+ * @param nnId Namenode identifier.
+ */
+ public void switchToStandby(String nsId, String nnId) {
+ try {
+ int total = cluster.getNumNameNodes();
+ NameNodeInfo[] nns = cluster.getNameNodeInfos();
+ for (int i = 0; i < total; i++) {
+ NameNodeInfo nn = nns[i];
+ if (nn.getNameserviceId().equals(nsId) &&
+ nn.getNamenodeId().equals(nnId)) {
+ cluster.transitionToStandby(i);
+ }
+ }
+ } catch (Throwable e) {
+ LOG.error("Cannot transition to standby", e);
+ }
+ }
+
+ /**
* Stop the federated HDFS cluster.
*/
public void shutdown() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5e1352/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java
new file mode 100644
index 0000000..877fb02
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import org.apache.hadoop.service.Service.STATE;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Test the service that heartbeats the state of the namenodes to the State
+ * Store.
+ */
+public class TestNamenodeHeartbeat {
+
+ private static RouterDFSCluster cluster;
+ private static ActiveNamenodeResolver namenodeResolver;
+ private static List<NamenodeHeartbeatService> services;
+
+ @Rule
+ public TestName name = new TestName();
+
+ @BeforeClass
+ public static void globalSetUp() throws Exception {
+
+ cluster = new RouterDFSCluster(true, 2);
+
+ // Start NNs and DNs and wait until ready
+ cluster.startCluster();
+
+ // Mock locator that records the heartbeats
+ List<String> nss = cluster.getNameservices();
+ String ns = nss.get(0);
+ Configuration conf = cluster.generateNamenodeConfiguration(ns);
+ namenodeResolver = new MockResolver(conf);
+ namenodeResolver.setRouterId("testrouter");
+
+ // Create one heartbeat service per NN
+ services = new ArrayList<>();
+ for (NamenodeContext nn : cluster.getNamenodes()) {
+ String nsId = nn.getNameserviceId();
+ String nnId = nn.getNamenodeId();
+ NamenodeHeartbeatService service = new NamenodeHeartbeatService(
+ namenodeResolver, nsId, nnId);
+ service.init(conf);
+ service.start();
+ services.add(service);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ cluster.shutdown();
+ for (NamenodeHeartbeatService service: services) {
+ service.stop();
+ service.close();
+ }
+ }
+
+ @Test
+ public void testNamenodeHeartbeatService() throws IOException {
+
+ RouterDFSCluster testCluster = new RouterDFSCluster(true, 1);
+ Configuration heartbeatConfig = testCluster.generateNamenodeConfiguration(
+ NAMESERVICES[0]);
+ NamenodeHeartbeatService server = new NamenodeHeartbeatService(
+ namenodeResolver, NAMESERVICES[0], NAMENODES[0]);
+ server.init(heartbeatConfig);
+ assertEquals(STATE.INITED, server.getServiceState());
+ server.start();
+ assertEquals(STATE.STARTED, server.getServiceState());
+ server.stop();
+ assertEquals(STATE.STOPPED, server.getServiceState());
+ server.close();
+ }
+
+ @Test
+ public void testHearbeat() throws InterruptedException, IOException {
+
+ // Set NAMENODE1 to active for all nameservices
+ if (cluster.isHighAvailability()) {
+ for (String ns : cluster.getNameservices()) {
+ cluster.switchToActive(ns, NAMENODES[0]);
+ cluster.switchToStandby(ns, NAMENODES[1]);
+ }
+ }
+
+ // Wait for heartbeats to record
+ Thread.sleep(5000);
+
+ // Verify the locator has matching NN entries for each NS
+ for (String ns : cluster.getNameservices()) {
+ List<? extends FederationNamenodeContext> nns =
+ namenodeResolver.getNamenodesForNameserviceId(ns);
+
+ // Active
+ FederationNamenodeContext active = nns.get(0);
+ assertEquals(NAMENODES[0], active.getNamenodeId());
+
+ // Standby
+ FederationNamenodeContext standby = nns.get(1);
+ assertEquals(NAMENODES[1], standby.getNamenodeId());
+ }
+
+ // Switch active NNs in 1/2 nameservices
+ List<String> nss = cluster.getNameservices();
+ String failoverNS = nss.get(0);
+ String normalNs = nss.get(1);
+
+ cluster.switchToStandby(failoverNS, NAMENODES[0]);
+ cluster.switchToActive(failoverNS, NAMENODES[1]);
+
+ // Wait for heartbeats to record
+ Thread.sleep(5000);
+
+ // Verify the locator has recorded the failover for the failover NS
+ List<? extends FederationNamenodeContext> failoverNSs =
+ namenodeResolver.getNamenodesForNameserviceId(failoverNS);
+ // Active
+ FederationNamenodeContext active = failoverNSs.get(0);
+ assertEquals(NAMENODES[1], active.getNamenodeId());
+
+ // Standby
+ FederationNamenodeContext standby = failoverNSs.get(1);
+ assertEquals(NAMENODES[0], standby.getNamenodeId());
+
+ // Verify the locator has the same records for the other ns
+ List<? extends FederationNamenodeContext> normalNss =
+ namenodeResolver.getNamenodesForNameserviceId(normalNs);
+ // Active
+ active = normalNss.get(0);
+ assertEquals(NAMENODES[0], active.getNamenodeId());
+ // Standby
+ standby = normalNss.get(1);
+ assertEquals(NAMENODES[1], standby.getNamenodeId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae5e1352/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
index d8afb39..2074d3d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
@@ -98,6 +98,9 @@ public class TestRouter {
// Rpc only
testRouterStartup(new RouterConfigBuilder(conf).rpc().build());
+ // Heartbeat only
+ testRouterStartup(new RouterConfigBuilder(conf).heartbeat().build());
+
// Run with all services
testRouterStartup(new RouterConfigBuilder(conf).all().build());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org