You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/01/30 19:29:11 UTC
[32/37] hadoop git commit: HDFS-13044. RBF: Add a safe mode for the
Router. Contributed by Inigo Goiri.
HDFS-13044. RBF: Add a safe mode for the Router. Contributed by Inigo Goiri.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dbb9dded
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dbb9dded
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dbb9dded
Branch: refs/heads/HDFS-7240
Commit: dbb9dded33b3cff3b630e98300d30515a9d1eec4
Parents: fde95d4
Author: Yiqun Lin <yq...@apache.org>
Authored: Tue Jan 30 12:12:08 2018 +0800
Committer: Yiqun Lin <yq...@apache.org>
Committed: Tue Jan 30 12:12:08 2018 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 13 ++
.../hdfs/server/federation/router/Router.java | 16 +-
.../federation/router/RouterRpcServer.java | 43 ++++-
.../router/RouterSafeModeException.java | 53 +++++
.../router/RouterSafemodeService.java | 150 +++++++++++++++
.../store/StateStoreCacheUpdateService.java | 7 +-
.../src/main/resources/hdfs-default.xml | 36 +++-
.../src/site/markdown/HDFSRouterFederation.md | 4 +
.../server/federation/RouterConfigBuilder.java | 13 ++
.../federation/router/TestRouterSafemode.java | 192 +++++++++++++++++++
10 files changed, 515 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbb9dded/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 84215f3f..4589aaa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1291,6 +1291,19 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(5);
+ // HDFS Router safe mode
+ public static final String DFS_ROUTER_SAFEMODE_ENABLE =
+ FEDERATION_ROUTER_PREFIX + "safemode.enable";
+ public static final boolean DFS_ROUTER_SAFEMODE_ENABLE_DEFAULT = true;
+ public static final String DFS_ROUTER_SAFEMODE_EXTENSION =
+ FEDERATION_ROUTER_PREFIX + "safemode.extension";
+ public static final long DFS_ROUTER_SAFEMODE_EXTENSION_DEFAULT =
+ TimeUnit.SECONDS.toMillis(30);
+ public static final String DFS_ROUTER_SAFEMODE_EXPIRATION =
+ FEDERATION_ROUTER_PREFIX + "safemode.expiration";
+ public static final long DFS_ROUTER_SAFEMODE_EXPIRATION_DEFAULT =
+ 3 * DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT;
+
// HDFS Router-based federation mount table entries
/** Maximum number of cache entries to have. */
public static final String FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbb9dded/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
index 1e72c93..79f43bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
@@ -118,6 +118,8 @@ public class Router extends CompositeService {
private RouterStore routerStateManager;
/** Heartbeat our run status to the router state manager. */
private RouterHeartbeatService routerHeartbeatService;
+ /** Enter/exit safemode. */
+ private RouterSafemodeService safemodeService;
/** The start time of the namesystem. */
private final long startTime = Time.now();
@@ -232,13 +234,25 @@ public class Router extends CompositeService {
addService(this.quotaUpdateService);
}
+ // Safemode service to refuse RPC calls when the router is out of sync
+ if (conf.getBoolean(
+ DFSConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE,
+ DFSConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE_DEFAULT)) {
+ // Create safemode monitoring service
+ this.safemodeService = new RouterSafemodeService(this);
+ addService(this.safemodeService);
+ }
+
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
- updateRouterState(RouterServiceState.RUNNING);
+ if (this.safemodeService == null) {
+ // Router is running now
+ updateRouterState(RouterServiceState.RUNNING);
+ }
if (this.pauseMonitor != null) {
this.pauseMonitor.start();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbb9dded/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 9afd441..57125ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -179,6 +179,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
/** Interface to map global name space to HDFS subcluster name spaces. */
private final FileSubclusterResolver subclusterResolver;
+ /** If we are in safe mode, fail requests as if a standby NN. */
+ private volatile boolean safeMode;
/** Category of the operation that a thread is executing. */
private final ThreadLocal<OperationCategory> opCategory = new ThreadLocal<>();
@@ -370,12 +372,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
* @param op Category of the operation to check.
* @param supported If the operation is supported or not. If not, it will
* throw an UnsupportedOperationException.
- * @throws StandbyException If the Router is in safe mode and cannot serve
- * client requests.
+ * @throws SafeModeException If the Router is in safe mode and cannot serve
+ * client requests.
* @throws UnsupportedOperationException If the operation is not supported.
*/
protected void checkOperation(OperationCategory op, boolean supported)
- throws StandbyException, UnsupportedOperationException {
+ throws RouterSafeModeException, UnsupportedOperationException {
checkOperation(op);
if (!supported) {
@@ -393,10 +395,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
* UNCHECKED. This function should be called by all ClientProtocol functions.
*
* @param op Category of the operation to check.
- * @throws StandbyException If the Router is in safe mode and cannot serve
- * client requests.
+ * @throws SafeModeException If the Router is in safe mode and cannot serve
+ * client requests.
*/
- protected void checkOperation(OperationCategory op) throws StandbyException {
+ protected void checkOperation(OperationCategory op)
+ throws RouterSafeModeException {
// Log the function we are currently calling.
if (rpcMonitor != null) {
rpcMonitor.startOp();
@@ -415,7 +418,33 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
return;
}
- // TODO check Router safe mode and return Standby exception
+ if (safeMode) {
+ // Throw standby exception, router is not available
+ if (rpcMonitor != null) {
+ rpcMonitor.routerFailureSafemode();
+ }
+ throw new RouterSafeModeException(router.getRouterId(), op);
+ }
+ }
+
+ /**
+ * In safe mode all RPC requests will fail and return a standby exception.
+ * The client will try another Router, similar to the client retry logic for
+ * HA.
+ *
+ * @param mode True if enabled, False if disabled.
+ */
+ public void setSafeMode(boolean mode) {
+ this.safeMode = mode;
+ }
+
+ /**
+ * Check if the Router is in safe mode and cannot serve RPC calls.
+ *
+ * @return If the Router is in safe mode.
+ */
+ public boolean isInSafeMode() {
+ return this.safeMode;
}
@Override // ClientProtocol
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbb9dded/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java
new file mode 100644
index 0000000..7a78b5b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
+import org.apache.hadoop.ipc.StandbyException;
+
+/**
+ * Exception that the Router throws when it is in safe mode. This extends
+ * {@link StandbyException} for the client to try another Router when it gets
+ * this exception.
+ */
+public class RouterSafeModeException extends StandbyException {
+
+ private static final long serialVersionUID = 453568188334993493L;
+
+ /** Identifier of the Router that generated this exception. */
+ private final String routerId;
+
+ /**
+ * Build a new Router safe mode exception.
+ * @param router Identifier of the Router.
+ * @param op Category of the operation (READ/WRITE).
+ */
+ public RouterSafeModeException(String router, OperationCategory op) {
+ super("Router " + router + " is in safe mode and cannot handle " + op
+ + " requests.");
+ this.routerId = router;
+ }
+
+ /**
+ * Get the id of the Router that generated this exception.
+ * @return Id of the Router that generated this exception.
+ */
+ public String getRouterId() {
+ return this.routerId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbb9dded/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java
new file mode 100644
index 0000000..56aab0a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.util.Time.now;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to periodically check if the {@link org.apache.hadoop.hdfs.server.
+ * federation.store.StateStoreService StateStoreService} cached information in
+ * the {@link Router} is up to date. This is for performance and removes the
+ * {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService
+ * StateStoreService} from the critical path in common operations.
+ */
+public class RouterSafemodeService extends PeriodicService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RouterSafemodeService.class);
+
+ /** Router to manage safe mode. */
+ private final Router router;
+
+ /** Interval in ms to wait post startup before allowing RPC requests. */
+ private long startupInterval;
+ /** Interval in ms after which the State Store cache is too stale. */
+ private long staleInterval;
+ /** Start time in ms of this service. */
+ private long startupTime;
+
+ /** The time the Router enters safe mode in milliseconds. */
+ private long enterSafeModeTime = now();
+
+
+ /**
+ * Create a new Cache update service.
+ *
+ * @param router Router containing the cache.
+ */
+ public RouterSafemodeService(Router router) {
+ super(RouterSafemodeService.class.getSimpleName());
+ this.router = router;
+ }
+
+ /**
+ * Enter safe mode.
+ */
+ private void enter() {
+ LOG.info("Entering safe mode");
+ enterSafeModeTime = now();
+ RouterRpcServer rpcServer = router.getRpcServer();
+ rpcServer.setSafeMode(true);
+ router.updateRouterState(RouterServiceState.SAFEMODE);
+ }
+
+ /**
+ * Leave safe mode.
+ */
+ private void leave() {
+ // Cache recently updated, leave safemode
+ long timeInSafemode = now() - enterSafeModeTime;
+ LOG.info("Leaving safe mode after {} milliseconds", timeInSafemode);
+ RouterMetrics routerMetrics = router.getRouterMetrics();
+ if (routerMetrics == null) {
+ LOG.error("The Router metrics are not enabled");
+ } else {
+ routerMetrics.setSafeModeTime(timeInSafemode);
+ }
+ RouterRpcServer rpcServer = router.getRpcServer();
+ rpcServer.setSafeMode(false);
+ router.updateRouterState(RouterServiceState.RUNNING);
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+
+ // Use same interval as cache update service
+ this.setIntervalMs(conf.getTimeDuration(
+ DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
+ DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT,
+ TimeUnit.MILLISECONDS));
+
+ this.startupInterval = conf.getTimeDuration(
+ DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION,
+ DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ LOG.info("Leave startup safe mode after {} ms", this.startupInterval);
+
+ this.staleInterval = conf.getTimeDuration(
+ DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION,
+ DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ LOG.info("Enter safe mode after {} ms without reaching the State Store",
+ this.staleInterval);
+
+ this.startupTime = Time.now();
+
+ // Initializing the RPC server in safe mode, it will disable it later
+ enter();
+
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void periodicInvoke() {
+ long now = Time.now();
+ long delta = now - startupTime;
+ if (delta < startupInterval) {
+ LOG.info("Delaying safemode exit for {} milliseconds...",
+ this.startupInterval - delta);
+ return;
+ }
+ RouterRpcServer rpcServer = router.getRpcServer();
+ StateStoreService stateStore = router.getStateStore();
+ long cacheUpdateTime = stateStore.getCacheUpdateTime();
+ boolean isCacheStale = (now - cacheUpdateTime) > this.staleInterval;
+
+ // Always update to indicate our cache was updated
+ if (isCacheStale) {
+ if (!rpcServer.isInSafeMode()) {
+ enter();
+ }
+ } else if (rpcServer.isInSafeMode()) {
+ // Cache recently updated, leave safe mode
+ leave();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbb9dded/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java
index bb8cfb0..9bcbc1e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.federation.store;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.PeriodicService;
@@ -52,9 +54,10 @@ public class StateStoreCacheUpdateService extends PeriodicService {
@Override
protected void serviceInit(Configuration conf) throws Exception {
- this.setIntervalMs(conf.getLong(
+ this.setIntervalMs(conf.getTimeDuration(
DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
- DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT));
+ DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT,
+ TimeUnit.MILLISECONDS));
super.serviceInit(conf);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbb9dded/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index d24310e..7446766 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -5080,9 +5080,12 @@
<property>
<name>dfs.federation.router.cache.ttl</name>
- <value>60000</value>
+ <value>1m</value>
<description>
- How often to refresh the State Store caches in milliseconds.
+ How often to refresh the State Store caches in milliseconds. This setting
+ supports multiple time unit suffixes as described in
+ dfs.heartbeat.interval. If no suffix is specified then milliseconds is
+ assumed.
</description>
</property>
@@ -5131,6 +5134,35 @@
</property>
<property>
+ <name>dfs.federation.router.safemode.enable</name>
+ <value>true</value>
+ <description>
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.federation.router.safemode.extension</name>
+ <value>30s</value>
+ <description>
+ Time after startup that the Router is in safe mode. This setting
+ supports multiple time unit suffixes as described in
+ dfs.heartbeat.interval. If no suffix is specified then milliseconds is
+ assumed.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.federation.router.safemode.expiration</name>
+ <value>3m</value>
+ <description>
+ Time without being able to reach the State Store to enter safe mode. This
+ setting supports multiple time unit suffixes as described in
+ dfs.heartbeat.interval. If no suffix is specified then milliseconds is
+ assumed.
+ </description>
+ </property>
+
+ <property>
<name>dfs.federation.router.monitor.namenode</name>
<value></value>
<description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbb9dded/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSRouterFederation.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSRouterFederation.md
index 75798a1..6b21123 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSRouterFederation.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSRouterFederation.md
@@ -81,6 +81,10 @@ The Routers are stateless and metadata operations are atomic at the NameNodes.
If a Router becomes unavailable, any Router can take over for it.
The clients configure their DFS HA client (e.g., ConfiguredFailoverProvider or RequestHedgingProxyProvider) with all the Routers in the federation as endpoints.
+* **Unavailable State Store:**
+If a Router cannot contact the State Store, it will enter into a Safe Mode state which disallows it from serving requests.
+Clients will treat Routers in Safe Mode as it was an Standby NameNode and try another Router.
+
* **NameNode heartbeat HA:**
For high availability and flexibility, multiple Routers can monitor the same NameNode and heartbeat the information to the State Store.
This increases clients' resiliency to stale information, should a Router fail.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbb9dded/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
index 3d8b35c..3659bf9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
@@ -35,6 +35,7 @@ public class RouterConfigBuilder {
private boolean enableStateStore = false;
private boolean enableMetrics = false;
private boolean enableQuota = false;
+ private boolean enableSafemode = false;
public RouterConfigBuilder(Configuration configuration) {
this.conf = configuration;
@@ -52,6 +53,7 @@ public class RouterConfigBuilder {
this.enableLocalHeartbeat = true;
this.enableStateStore = true;
this.enableMetrics = true;
+ this.enableSafemode = true;
return this;
}
@@ -95,6 +97,11 @@ public class RouterConfigBuilder {
return this;
}
+ public RouterConfigBuilder safemode(boolean enable) {
+ this.enableSafemode = enable;
+ return this;
+ }
+
public RouterConfigBuilder rpc() {
return this.rpc(true);
}
@@ -123,6 +130,10 @@ public class RouterConfigBuilder {
return this.quota(true);
}
+ public RouterConfigBuilder safemode() {
+ return this.safemode(true);
+ }
+
public Configuration build() {
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_STORE_ENABLE,
this.enableStateStore);
@@ -139,6 +150,8 @@ public class RouterConfigBuilder {
this.enableMetrics);
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_QUOTA_ENABLE,
this.enableQuota);
+ conf.setBoolean(DFSConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE,
+ this.enableSafemode);
return conf;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbb9dded/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java
new file mode 100644
index 0000000..9299f77
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.deleteStateStore;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.util.Time;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test the safe mode for the {@link Router} controlled by
+ * {@link RouterSafemodeService}.
+ */
+public class TestRouterSafemode {
+
+ private Router router;
+ private static Configuration conf;
+
+ @BeforeClass
+ public static void create() throws IOException {
+ // Wipe state store
+ deleteStateStore();
+ // Configuration that supports the state store
+ conf = getStateStoreConfiguration();
+ // 2 sec startup standby
+ conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION,
+ TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS);
+ // 1 sec cache refresh
+ conf.setTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
+ TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS);
+ // 2 sec post cache update before entering safemode (2 intervals)
+ conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION,
+ TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS);
+ // RPC + State Store + Safe Mode only
+ conf = new RouterConfigBuilder(conf)
+ .rpc()
+ .safemode()
+ .stateStore()
+ .metrics()
+ .build();
+ }
+
+ @AfterClass
+ public static void destroy() {
+ }
+
+ @Before
+ public void setup() throws IOException, URISyntaxException {
+ router = new Router();
+ router.init(conf);
+ router.start();
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ if (router != null) {
+ router.stop();
+ router = null;
+ }
+ }
+
+ @Test
+ public void testSafemodeService() throws IOException {
+ RouterSafemodeService server = new RouterSafemodeService(router);
+ server.init(conf);
+ assertEquals(STATE.INITED, server.getServiceState());
+ server.start();
+ assertEquals(STATE.STARTED, server.getServiceState());
+ server.stop();
+ assertEquals(STATE.STOPPED, server.getServiceState());
+ server.close();
+ }
+
+ @Test
+ public void testRouterExitSafemode()
+ throws InterruptedException, IllegalStateException, IOException {
+
+ assertTrue(router.getRpcServer().isInSafeMode());
+ verifyRouter(RouterServiceState.SAFEMODE);
+
+ // Wait for initial time in milliseconds
+ long interval =
+ conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION,
+ TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) +
+ conf.getTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
+ TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS);
+ Thread.sleep(interval);
+
+ assertFalse(router.getRpcServer().isInSafeMode());
+ verifyRouter(RouterServiceState.RUNNING);
+ }
+
+ @Test
+ public void testRouterEnterSafemode()
+ throws IllegalStateException, IOException, InterruptedException {
+
+ // Verify starting state
+ assertTrue(router.getRpcServer().isInSafeMode());
+ verifyRouter(RouterServiceState.SAFEMODE);
+
+ // We should be in safe mode for DFS_ROUTER_SAFEMODE_EXTENSION time
+ long interval0 = conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION,
+ TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) - 1000;
+ long t0 = Time.now();
+ while (Time.now() - t0 < interval0) {
+ verifyRouter(RouterServiceState.SAFEMODE);
+ Thread.sleep(100);
+ }
+
+ // We wait some time for the state to propagate
+ long interval1 = 1000 + 2 * conf.getTimeDuration(
+ DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, TimeUnit.SECONDS.toMillis(1),
+ TimeUnit.MILLISECONDS);
+ Thread.sleep(interval1);
+
+ // Running
+ assertFalse(router.getRpcServer().isInSafeMode());
+ verifyRouter(RouterServiceState.RUNNING);
+
+ // Disable cache
+ router.getStateStore().stopCacheUpdateService();
+
+ // Wait until the State Store cache is stale in milliseconds
+ long interval2 =
+ conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION,
+ TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) +
+ conf.getTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
+ TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS);
+ Thread.sleep(interval2);
+
+ // Safemode
+ assertTrue(router.getRpcServer().isInSafeMode());
+ verifyRouter(RouterServiceState.SAFEMODE);
+ }
+
+ @Test
+ public void testRouterRpcSafeMode()
+ throws IllegalStateException, IOException {
+
+ assertTrue(router.getRpcServer().isInSafeMode());
+ verifyRouter(RouterServiceState.SAFEMODE);
+
+ // If the Router is in Safe Mode, we should get a SafeModeException
+ boolean exception = false;
+ try {
+ router.getRpcServer().delete("/testfile.txt", true);
+ fail("We should have thrown a safe mode exception");
+ } catch (RouterSafeModeException sme) {
+ exception = true;
+ }
+ assertTrue("We should have thrown a safe mode exception", exception);
+ }
+
+ private void verifyRouter(RouterServiceState status)
+ throws IllegalStateException, IOException {
+ assertEquals(status, router.getRouterState());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org