You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yq...@apache.org on 2018/01/25 07:52:02 UTC
hadoop git commit: HDFS-13042. RBF: Heartbeat Router State.
Contributed by Inigo Goiri.
Repository: hadoop
Updated Branches:
refs/heads/trunk eb2dd0868 -> 7721fff74
HDFS-13042. RBF: Heartbeat Router State. Contributed by Inigo Goiri.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7721fff7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7721fff7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7721fff7
Branch: refs/heads/trunk
Commit: 7721fff74494eb7fbbbba7f8bb4b4692d880d301
Parents: eb2dd08
Author: Yiqun Lin <yq...@apache.org>
Authored: Thu Jan 25 15:51:26 2018 +0800
Committer: Yiqun Lin <yq...@apache.org>
Committed: Thu Jan 25 15:51:26 2018 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 8 +
.../hdfs/server/federation/router/Router.java | 71 +++++++
.../router/RouterHeartbeatService.java | 155 +++++++++++++++
.../federation/router/RouterServiceState.java | 2 +-
.../federation/store/StateStoreService.java | 9 +
.../src/main/resources/hdfs-default.xml | 20 ++
.../store/TestStateStoreRouterState.java | 194 +++++++++++++++++++
7 files changed, 458 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index f53badc..84215f3f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1222,6 +1222,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_ROUTER_MONITOR_LOCAL_NAMENODE =
FEDERATION_ROUTER_PREFIX + "monitor.localnamenode.enable";
public static final boolean DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT = true;
+ public static final String DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS =
+ FEDERATION_ROUTER_PREFIX + "heartbeat-state.interval";
+ public static final long DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS_DEFAULT =
+ TimeUnit.SECONDS.toMillis(5);
// HDFS Router NN client
public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE =
@@ -1282,6 +1286,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
FEDERATION_STORE_PREFIX + "membership.expiration";
public static final long FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(5);
+ public static final String FEDERATION_STORE_ROUTER_EXPIRATION_MS =
+ FEDERATION_STORE_PREFIX + "router.expiration";
+ public static final long FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT =
+ TimeUnit.MINUTES.toMillis(5);
// HDFS Router-based federation mount table entries
/** Maximum number of cache entries to have. */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
index ea8a1c0..1e72c93 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
@@ -37,11 +37,13 @@ import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.JvmPauseMonitor;
+import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,6 +114,18 @@ public class Router extends CompositeService {
/** Quota cache manager. */
private RouterQuotaManager quotaManager;
+ /** Manages the current state of the router. */
+ private RouterStore routerStateManager;
+ /** Heartbeat our run status to the router state manager. */
+ private RouterHeartbeatService routerHeartbeatService;
+
+ /** The start time of the namesystem. */
+ private final long startTime = Time.now();
+
+ /** State of the Router. */
+ private RouterServiceState state = RouterServiceState.UNINITIALIZED;
+
+
/////////////////////////////////////////////////////////
// Constructor
/////////////////////////////////////////////////////////
@@ -127,6 +141,7 @@ public class Router extends CompositeService {
@Override
protected void serviceInit(Configuration configuration) throws Exception {
this.conf = configuration;
+ updateRouterState(RouterServiceState.INITIALIZING);
if (conf.getBoolean(
DFSConfigKeys.DFS_ROUTER_STORE_ENABLE,
@@ -188,6 +203,10 @@ public class Router extends CompositeService {
if (this.namenodeHearbeatServices.isEmpty()) {
LOG.error("Heartbeat is enabled but there are no namenodes to monitor");
}
+
+ // Periodically update the router state
+ this.routerHeartbeatService = new RouterHeartbeatService(this);
+ addService(this.routerHeartbeatService);
}
// Router metrics system
@@ -219,6 +238,8 @@ public class Router extends CompositeService {
@Override
protected void serviceStart() throws Exception {
+ updateRouterState(RouterServiceState.RUNNING);
+
if (this.pauseMonitor != null) {
this.pauseMonitor.start();
JvmMetrics jvmMetrics = this.metrics.getJvmMetrics();
@@ -233,6 +254,9 @@ public class Router extends CompositeService {
@Override
protected void serviceStop() throws Exception {
+ // Update state
+ updateRouterState(RouterServiceState.SHUTDOWN);
+
// JVM pause monitor
if (this.pauseMonitor != null) {
this.pauseMonitor.stop();
@@ -454,6 +478,31 @@ public class Router extends CompositeService {
}
/////////////////////////////////////////////////////////
+ // Router State Management
+ /////////////////////////////////////////////////////////
+
+ /**
+ * Update the router state and heartbeat to the state store.
+ *
+ * @param state The new router state.
+ */
+ public void updateRouterState(RouterServiceState newState) {
+ this.state = newState;
+ if (this.routerHeartbeatService != null) {
+ this.routerHeartbeatService.updateStateAsync();
+ }
+ }
+
+ /**
+ * Get the status of the router.
+ *
+ * @return Status of the router.
+ */
+ public RouterServiceState getRouterState() {
+ return this.state;
+ }
+
+ /////////////////////////////////////////////////////////
// Submodule getters
/////////////////////////////////////////////////////////
@@ -508,11 +557,33 @@ public class Router extends CompositeService {
return this.namenodeResolver;
}
+ /**
+ * Get the state store interface for the router heartbeats.
+ *
+ * @return FederationRouterStateStore state store API handle.
+ */
+ public RouterStore getRouterStateManager() {
+ if (this.routerStateManager == null && this.stateStore != null) {
+ this.routerStateManager = this.stateStore.getRegisteredRecordStore(
+ RouterStore.class);
+ }
+ return this.routerStateManager;
+ }
+
/////////////////////////////////////////////////////////
// Router info
/////////////////////////////////////////////////////////
/**
+ * Get the start date of the Router.
+ *
+ * @return Start date of the router.
+ */
+ public long getStartTime() {
+ return this.startTime;
+ }
+
+ /**
* Unique ID for the router, typically the hostname:port string for the
* router's RPC server. This ID may be null on router startup before the RPC
* server has bound to a port.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
new file mode 100644
index 0000000..86a6210
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.CachedRecordStore;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
+import org.apache.hadoop.hdfs.server.federation.store.RecordStore;
+import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
+import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to periodically update the Router current state in the State Store.
+ */
+public class RouterHeartbeatService extends PeriodicService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RouterHeartbeatService.class);
+
+ /** Router we are hearbeating. */
+ private final Router router;
+
+ /**
+ * Create a new Router heartbeat service.
+ *
+ * @param router Router to heartbeat.
+ */
+ public RouterHeartbeatService(Router router) {
+ super(RouterHeartbeatService.class.getSimpleName());
+ this.router = router;
+ }
+
+ /**
+ * Trigger the update of the Router state asynchronously.
+ */
+ protected void updateStateAsync() {
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ updateStateStore();
+ }
+ }, "Router Heartbeat Async");
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ /**
+ * Update the state of the Router in the State Store.
+ */
+ private synchronized void updateStateStore() {
+ String routerId = router.getRouterId();
+ if (routerId == null) {
+ LOG.error("Cannot heartbeat for router: unknown router id");
+ return;
+ }
+ RouterStore routerStore = router.getRouterStateManager();
+ if (routerStore != null) {
+ try {
+ RouterState record = RouterState.newInstance(
+ routerId, router.getStartTime(), router.getRouterState());
+ StateStoreVersion stateStoreVersion = StateStoreVersion.newInstance(
+ getStateStoreVersion(MembershipStore.class),
+ getStateStoreVersion(MountTableStore.class));
+ record.setStateStoreVersion(stateStoreVersion);
+ RouterHeartbeatRequest request =
+ RouterHeartbeatRequest.newInstance(record);
+ RouterHeartbeatResponse response = routerStore.routerHeartbeat(request);
+ if (!response.getStatus()) {
+ LOG.warn("Cannot heartbeat router {}", routerId);
+ } else {
+ LOG.debug("Router heartbeat for router {}", routerId);
+ }
+ } catch (IOException e) {
+ LOG.error("Cannot heartbeat router {}: {}", routerId, e.getMessage());
+ }
+ } else {
+ LOG.warn("Cannot heartbeat router {}: State Store unavailable", routerId);
+ }
+ }
+
+ /**
+ * Get the version of the data in the State Store.
+ *
+ * @param clazz Class in the State Store.
+ * @return Version of the data.
+ */
+ private <R extends BaseRecord, S extends RecordStore<R>>
+ long getStateStoreVersion(final Class<S> clazz) {
+ long version = -1;
+ try {
+ StateStoreService stateStore = router.getStateStore();
+ S recordStore = stateStore.getRegisteredRecordStore(clazz);
+ if (recordStore != null) {
+ if (recordStore instanceof CachedRecordStore) {
+ CachedRecordStore<R> cachedRecordStore =
+ (CachedRecordStore<R>) recordStore;
+ List<R> records = cachedRecordStore.getCachedRecords();
+ for (BaseRecord record : records) {
+ if (record.getDateModified() > version) {
+ version = record.getDateModified();
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Cannot get version for {}: {}", clazz, e.getMessage());
+ }
+ return version;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+
+ long interval = conf.getTimeDuration(
+ DFSConfigKeys.DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS,
+ DFSConfigKeys.DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ this.setIntervalMs(interval);
+
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void periodicInvoke() {
+ updateStateStore();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java
index 25a6466..3accbe9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hdfs.server.federation.router;
* States of the Router.
*/
public enum RouterServiceState {
- NONE,
+ UNINITIALIZED,
INITIALIZING,
SAFEMODE,
RUNNING,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
index 0289ba6..aa730ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
@@ -38,8 +39,10 @@ import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
+import org.apache.hadoop.hdfs.server.federation.store.impl.RouterStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.service.CompositeService;
@@ -148,6 +151,7 @@ public class StateStoreService extends CompositeService {
// Add supported record stores
addRecordStore(MembershipStoreImpl.class);
addRecordStore(MountTableStoreImpl.class);
+ addRecordStore(RouterStoreImpl.class);
// Check the connection to the State Store periodically
this.monitorService = new StateStoreConnectionMonitorService(this);
@@ -158,6 +162,11 @@ public class StateStoreService extends CompositeService {
DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS,
DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT));
+ RouterState.setExpirationMs(conf.getTimeDuration(
+ DFSConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS,
+ DFSConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT,
+ TimeUnit.MILLISECONDS));
+
// Cache update service
this.cacheUpdater = new StateStoreCacheUpdateService(this);
addService(this.cacheUpdater);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 7a23eb4..d24310e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -5111,6 +5111,26 @@
</property>
<property>
+ <name>dfs.federation.router.heartbeat-state.interval</name>
+ <value>5s</value>
+ <description>
+ How often the Router should heartbeat its state into the State Store in
+ milliseconds. This setting supports multiple time unit suffixes as
+ described in dfs.federation.router.quota-cache.update.interval.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.federation.router.store.router.expiration</name>
+ <value>5m</value>
+ <description>
+ Expiration time in milliseconds for a router state record. This setting
+ supports multiple time unit suffixes as described in
+ dfs.federation.router.quota-cache.update.interval.
+ </description>
+ </property>
+
+ <property>
<name>dfs.federation.router.monitor.namenode</name>
<value></value>
<description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java
new file mode 100644
index 0000000..ae15ef6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
+import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
+import org.apache.hadoop.util.Time;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test the basic {@link StateStoreService} {@link RouterStore} functionality.
+ */
+public class TestStateStoreRouterState extends TestStateStoreBase {
+
+ private static RouterStore routerStore;
+
+ @BeforeClass
+ public static void create() {
+ // Reduce expirations to 5 seconds
+ getConf().setTimeDuration(
+ DFSConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS,
+ 5, TimeUnit.SECONDS);
+ }
+
+ @Before
+ public void setup() throws IOException, InterruptedException {
+
+ if (routerStore == null) {
+ routerStore =
+ getStateStore().getRegisteredRecordStore(RouterStore.class);
+ }
+
+ // Clear router status registrations
+ assertTrue(clearRecords(getStateStore(), RouterState.class));
+ }
+
+ @Test
+ public void testStateStoreDisconnected() throws Exception {
+
+ // Close the data store driver
+ getStateStore().closeDriver();
+ assertEquals(false, getStateStore().isDriverReady());
+
+ // Test all APIs that access the data store to ensure they throw the correct
+ // exception.
+ GetRouterRegistrationRequest getSingleRequest =
+ GetRouterRegistrationRequest.newInstance();
+ verifyException(routerStore, "getRouterRegistration",
+ StateStoreUnavailableException.class,
+ new Class[] {GetRouterRegistrationRequest.class},
+ new Object[] {getSingleRequest});
+
+ GetRouterRegistrationsRequest getRequest =
+ GetRouterRegistrationsRequest.newInstance();
+ routerStore.loadCache(true);
+ verifyException(routerStore, "getRouterRegistrations",
+ StateStoreUnavailableException.class,
+ new Class[] {GetRouterRegistrationsRequest.class},
+ new Object[] {getRequest});
+
+ RouterHeartbeatRequest hbRequest = RouterHeartbeatRequest.newInstance(
+ RouterState.newInstance("test", 0, RouterServiceState.UNINITIALIZED));
+ verifyException(routerStore, "routerHeartbeat",
+ StateStoreUnavailableException.class,
+ new Class[] {RouterHeartbeatRequest.class},
+ new Object[] {hbRequest});
+ }
+
+ //
+ // Router
+ //
+ @Test
+ public void testUpdateRouterStatus()
+ throws IllegalStateException, IOException {
+
+ long dateStarted = Time.now();
+ String address = "testaddress";
+
+ // Set
+ RouterHeartbeatRequest request = RouterHeartbeatRequest.newInstance(
+ RouterState.newInstance(
+ address, dateStarted, RouterServiceState.RUNNING));
+ assertTrue(routerStore.routerHeartbeat(request).getStatus());
+
+ // Verify
+ GetRouterRegistrationRequest getRequest =
+ GetRouterRegistrationRequest.newInstance(address);
+ RouterState record =
+ routerStore.getRouterRegistration(getRequest).getRouter();
+ assertNotNull(record);
+ assertEquals(RouterServiceState.RUNNING, record.getStatus());
+ assertEquals(address, record.getAddress());
+ assertEquals(FederationUtil.getCompileInfo(), record.getCompileInfo());
+ // Build version may vary a bit
+ assertTrue(record.getBuildVersion().length() > 0);
+ }
+
+ @Test
+ public void testRouterStateExpired()
+ throws IOException, InterruptedException {
+
+ long dateStarted = Time.now();
+ String address = "testaddress";
+
+ RouterHeartbeatRequest request = RouterHeartbeatRequest.newInstance(
+ RouterState.newInstance(
+ address, dateStarted, RouterServiceState.RUNNING));
+ // Set
+ assertTrue(routerStore.routerHeartbeat(request).getStatus());
+
+ // Verify
+ GetRouterRegistrationRequest getRequest =
+ GetRouterRegistrationRequest.newInstance(address);
+ RouterState record =
+ routerStore.getRouterRegistration(getRequest).getRouter();
+ assertNotNull(record);
+
+ // Wait past expiration (set to 5 sec in config)
+ Thread.sleep(6000);
+
+ // Verify expired
+ RouterState r = routerStore.getRouterRegistration(getRequest).getRouter();
+ assertEquals(RouterServiceState.EXPIRED, r.getStatus());
+
+ // Heartbeat again and this shouldn't be EXPIRED anymore
+ assertTrue(routerStore.routerHeartbeat(request).getStatus());
+ r = routerStore.getRouterRegistration(getRequest).getRouter();
+ assertEquals(RouterServiceState.RUNNING, r.getStatus());
+ }
+
+ @Test
+ public void testGetAllRouterStates()
+ throws StateStoreUnavailableException, IOException {
+
+ // Set 2 entries
+ RouterHeartbeatRequest heartbeatRequest1 =
+ RouterHeartbeatRequest.newInstance(
+ RouterState.newInstance(
+ "testaddress1", Time.now(), RouterServiceState.RUNNING));
+ assertTrue(routerStore.routerHeartbeat(heartbeatRequest1).getStatus());
+
+ RouterHeartbeatRequest heartbeatRequest2 =
+ RouterHeartbeatRequest.newInstance(
+ RouterState.newInstance(
+ "testaddress2", Time.now(), RouterServiceState.RUNNING));
+ assertTrue(routerStore.routerHeartbeat(heartbeatRequest2).getStatus());
+
+ // Verify
+ routerStore.loadCache(true);
+ GetRouterRegistrationsRequest request =
+ GetRouterRegistrationsRequest.newInstance();
+ List<RouterState> entries =
+ routerStore.getRouterRegistrations(request).getRouters();
+ assertEquals(2, entries.size());
+ Collections.sort(entries);
+ assertEquals("testaddress1", entries.get(0).getAddress());
+ assertEquals("testaddress2", entries.get(1).getAddress());
+ assertEquals(RouterServiceState.RUNNING, entries.get(0).getStatus());
+ assertEquals(RouterServiceState.RUNNING, entries.get(1).getStatus());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org