You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/01/30 19:29:11 UTC

[32/37] hadoop git commit: HDFS-13044. RBF: Add a safe mode for the Router. Contributed by Inigo Goiri.

HDFS-13044. RBF: Add a safe mode for the Router. Contributed by Inigo Goiri.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dbb9dded
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dbb9dded
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dbb9dded

Branch: refs/heads/HDFS-7240
Commit: dbb9dded33b3cff3b630e98300d30515a9d1eec4
Parents: fde95d4
Author: Yiqun Lin <yq...@apache.org>
Authored: Tue Jan 30 12:12:08 2018 +0800
Committer: Yiqun Lin <yq...@apache.org>
Committed: Tue Jan 30 12:12:08 2018 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  13 ++
 .../hdfs/server/federation/router/Router.java   |  16 +-
 .../federation/router/RouterRpcServer.java      |  43 ++++-
 .../router/RouterSafeModeException.java         |  53 +++++
 .../router/RouterSafemodeService.java           | 150 +++++++++++++++
 .../store/StateStoreCacheUpdateService.java     |   7 +-
 .../src/main/resources/hdfs-default.xml         |  36 +++-
 .../src/site/markdown/HDFSRouterFederation.md   |   4 +
 .../server/federation/RouterConfigBuilder.java  |  13 ++
 .../federation/router/TestRouterSafemode.java   | 192 +++++++++++++++++++
 10 files changed, 515 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbb9dded/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 84215f3f..4589aaa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1291,6 +1291,19 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT =
       TimeUnit.MINUTES.toMillis(5);
 
+  // HDFS Router safe mode
+  public static final String DFS_ROUTER_SAFEMODE_ENABLE =
+      FEDERATION_ROUTER_PREFIX + "safemode.enable";
+  public static final boolean DFS_ROUTER_SAFEMODE_ENABLE_DEFAULT = true;
+  public static final String DFS_ROUTER_SAFEMODE_EXTENSION =
+      FEDERATION_ROUTER_PREFIX + "safemode.extension";
+  public static final long DFS_ROUTER_SAFEMODE_EXTENSION_DEFAULT =
+      TimeUnit.SECONDS.toMillis(30);
+  public static final String DFS_ROUTER_SAFEMODE_EXPIRATION =
+      FEDERATION_ROUTER_PREFIX + "safemode.expiration";
+  public static final long DFS_ROUTER_SAFEMODE_EXPIRATION_DEFAULT =
+      3 * DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT;
+
   // HDFS Router-based federation mount table entries
   /** Maximum number of cache entries to have. */
   public static final String FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbb9dded/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
index 1e72c93..79f43bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
@@ -118,6 +118,8 @@ public class Router extends CompositeService {
   private RouterStore routerStateManager;
   /** Heartbeat our run status to the router state manager. */
   private RouterHeartbeatService routerHeartbeatService;
+  /** Enter/exit safemode. */
+  private RouterSafemodeService safemodeService;
 
   /** The start time of the namesystem. */
   private final long startTime = Time.now();
@@ -232,13 +234,25 @@ public class Router extends CompositeService {
       addService(this.quotaUpdateService);
     }
 
+    // Safemode service to refuse RPC calls when the router is out of sync
+    if (conf.getBoolean(
+        DFSConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE,
+        DFSConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE_DEFAULT)) {
+      // Create safemode monitoring service
+      this.safemodeService = new RouterSafemodeService(this);
+      addService(this.safemodeService);
+    }
+
     super.serviceInit(conf);
   }
 
   @Override
   protected void serviceStart() throws Exception {
 
-    updateRouterState(RouterServiceState.RUNNING);
+    if (this.safemodeService == null) {
+      // Router is running now
+      updateRouterState(RouterServiceState.RUNNING);
+    }
 
     if (this.pauseMonitor != null) {
       this.pauseMonitor.start();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbb9dded/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 9afd441..57125ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -179,6 +179,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
   /** Interface to map global name space to HDFS subcluster name spaces. */
   private final FileSubclusterResolver subclusterResolver;
 
+  /** If we are in safe mode, fail requests as if a standby NN. */
+  private volatile boolean safeMode;
 
   /** Category of the operation that a thread is executing. */
   private final ThreadLocal<OperationCategory> opCategory = new ThreadLocal<>();
@@ -370,12 +372,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
    * @param op Category of the operation to check.
    * @param supported If the operation is supported or not. If not, it will
    *                  throw an UnsupportedOperationException.
-   * @throws StandbyException If the Router is in safe mode and cannot serve
-   *                          client requests.
+   * @throws SafeModeException If the Router is in safe mode and cannot serve
+   *                           client requests.
    * @throws UnsupportedOperationException If the operation is not supported.
    */
   protected void checkOperation(OperationCategory op, boolean supported)
-      throws StandbyException, UnsupportedOperationException {
+      throws RouterSafeModeException, UnsupportedOperationException {
     checkOperation(op);
 
     if (!supported) {
@@ -393,10 +395,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
    * UNCHECKED. This function should be called by all ClientProtocol functions.
    *
    * @param op Category of the operation to check.
-   * @throws StandbyException If the Router is in safe mode and cannot serve
-   *                          client requests.
+   * @throws SafeModeException If the Router is in safe mode and cannot serve
+   *                           client requests.
    */
-  protected void checkOperation(OperationCategory op) throws StandbyException {
+  protected void checkOperation(OperationCategory op)
+      throws RouterSafeModeException {
     // Log the function we are currently calling.
     if (rpcMonitor != null) {
       rpcMonitor.startOp();
@@ -415,7 +418,33 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
       return;
     }
 
-    // TODO check Router safe mode and return Standby exception
+    if (safeMode) {
+      // Throw standby exception, router is not available
+      if (rpcMonitor != null) {
+        rpcMonitor.routerFailureSafemode();
+      }
+      throw new RouterSafeModeException(router.getRouterId(), op);
+    }
+  }
+
+  /**
+   * In safe mode all RPC requests will fail and return a standby exception.
+   * The client will try another Router, similar to the client retry logic for
+   * HA.
+   *
+   * @param mode True if enabled, False if disabled.
+   */
+  public void setSafeMode(boolean mode) {
+    this.safeMode = mode;
+  }
+
+  /**
+   * Check if the Router is in safe mode and cannot serve RPC calls.
+   *
+   * @return If the Router is in safe mode.
+   */
+  public boolean isInSafeMode() {
+    return this.safeMode;
   }
 
   @Override // ClientProtocol

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbb9dded/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java
new file mode 100644
index 0000000..7a78b5b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
+import org.apache.hadoop.ipc.StandbyException;
+
+/**
+ * Exception that the Router throws when it is in safe mode. This extends
+ * {@link StandbyException} for the client to try another Router when it gets
+ * this exception.
+ */
+public class RouterSafeModeException extends StandbyException {
+
+  private static final long serialVersionUID = 453568188334993493L;
+
+  /** Identifier of the Router that generated this exception. */
+  private final String routerId;
+
+  /**
+   * Build a new Router safe mode exception.
+   * @param router Identifier of the Router.
+   * @param op Category of the operation (READ/WRITE).
+   */
+  public RouterSafeModeException(String router, OperationCategory op) {
+    super("Router " + router + " is in safe mode and cannot handle " + op
+        + " requests.");
+    this.routerId = router;
+  }
+
+  /**
+   * Get the id of the Router that generated this exception.
+   * @return Id of the Router that generated this exception.
+   */
+  public String getRouterId() {
+    return this.routerId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbb9dded/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java
new file mode 100644
index 0000000..56aab0a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.util.Time.now;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to periodically check if the {@link org.apache.hadoop.hdfs.server.
+ * federation.store.StateStoreService StateStoreService} cached information in
+ * the {@link Router} is up to date. This is for performance and removes the
+ * {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService
+ * StateStoreService} from the critical path in common operations.
+ */
+public class RouterSafemodeService extends PeriodicService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterSafemodeService.class);
+
+  /** Router to manage safe mode. */
+  private final Router router;
+
+  /** Interval in ms to wait post startup before allowing RPC requests. */
+  private long startupInterval;
+  /** Interval in ms after which the State Store cache is too stale. */
+  private long staleInterval;
+  /** Start time in ms of this service. */
+  private long startupTime;
+
+  /** The time the Router enters safe mode in milliseconds. */
+  private long enterSafeModeTime = now();
+
+
+  /**
+   * Create a new Cache update service.
+   *
+   * @param router Router containing the cache.
+   */
+  public RouterSafemodeService(Router router) {
+    super(RouterSafemodeService.class.getSimpleName());
+    this.router = router;
+  }
+
+  /**
+   * Enter safe mode.
+   */
+  private void enter() {
+    LOG.info("Entering safe mode");
+    enterSafeModeTime = now();
+    RouterRpcServer rpcServer = router.getRpcServer();
+    rpcServer.setSafeMode(true);
+    router.updateRouterState(RouterServiceState.SAFEMODE);
+  }
+
+  /**
+   * Leave safe mode.
+   */
+  private void leave() {
+    // Cache recently updated, leave safemode
+    long timeInSafemode = now() - enterSafeModeTime;
+    LOG.info("Leaving safe mode after {} milliseconds", timeInSafemode);
+    RouterMetrics routerMetrics = router.getRouterMetrics();
+    if (routerMetrics == null) {
+      LOG.error("The Router metrics are not enabled");
+    } else {
+      routerMetrics.setSafeModeTime(timeInSafemode);
+    }
+    RouterRpcServer rpcServer = router.getRpcServer();
+    rpcServer.setSafeMode(false);
+    router.updateRouterState(RouterServiceState.RUNNING);
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+
+    // Use same interval as cache update service
+    this.setIntervalMs(conf.getTimeDuration(
+        DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
+        DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT,
+        TimeUnit.MILLISECONDS));
+
+    this.startupInterval = conf.getTimeDuration(
+        DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION,
+        DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    LOG.info("Leave startup safe mode after {} ms", this.startupInterval);
+
+    this.staleInterval = conf.getTimeDuration(
+        DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION,
+        DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    LOG.info("Enter safe mode after {} ms without reaching the State Store",
+        this.staleInterval);
+
+    this.startupTime = Time.now();
+
+    // Initializing the RPC server in safe mode, it will disable it later
+    enter();
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void periodicInvoke() {
+    long now = Time.now();
+    long delta = now - startupTime;
+    if (delta < startupInterval) {
+      LOG.info("Delaying safemode exit for {} milliseconds...",
+          this.startupInterval - delta);
+      return;
+    }
+    RouterRpcServer rpcServer = router.getRpcServer();
+    StateStoreService stateStore = router.getStateStore();
+    long cacheUpdateTime = stateStore.getCacheUpdateTime();
+    boolean isCacheStale = (now - cacheUpdateTime) > this.staleInterval;
+
+    // Always update to indicate our cache was updated
+    if (isCacheStale) {
+      if (!rpcServer.isInSafeMode()) {
+        enter();
+      }
+    } else if (rpcServer.isInSafeMode()) {
+      // Cache recently updated, leave safe mode
+      leave();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbb9dded/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java
index bb8cfb0..9bcbc1e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.federation.store;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.router.PeriodicService;
@@ -52,9 +54,10 @@ public class StateStoreCacheUpdateService extends PeriodicService {
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
 
-    this.setIntervalMs(conf.getLong(
+    this.setIntervalMs(conf.getTimeDuration(
         DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
-        DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT));
+        DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT,
+        TimeUnit.MILLISECONDS));
 
     super.serviceInit(conf);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbb9dded/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index d24310e..7446766 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -5080,9 +5080,12 @@
 
   <property>
     <name>dfs.federation.router.cache.ttl</name>
-    <value>60000</value>
+    <value>1m</value>
     <description>
-      How often to refresh the State Store caches in milliseconds.
+      How often to refresh the State Store caches in milliseconds. This setting
+      supports multiple time unit suffixes as described in
+      dfs.heartbeat.interval. If no suffix is specified then milliseconds is
+      assumed.
     </description>
   </property>
 
@@ -5131,6 +5134,35 @@
   </property>
 
   <property>
+    <name>dfs.federation.router.safemode.enable</name>
+    <value>true</value>
+    <description>
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.safemode.extension</name>
+    <value>30s</value>
+    <description>
+      Time after startup that the Router is in safe mode. This setting
+      supports multiple time unit suffixes as described in
+      dfs.heartbeat.interval. If no suffix is specified then milliseconds is
+      assumed.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.safemode.expiration</name>
+    <value>3m</value>
+    <description>
+      Time without being able to reach the State Store to enter safe mode. This
+      setting supports multiple time unit suffixes as described in
+      dfs.heartbeat.interval. If no suffix is specified then milliseconds is
+      assumed.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.federation.router.monitor.namenode</name>
     <value></value>
     <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbb9dded/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSRouterFederation.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSRouterFederation.md
index 75798a1..6b21123 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSRouterFederation.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSRouterFederation.md
@@ -81,6 +81,10 @@ The Routers are stateless and metadata operations are atomic at the NameNodes.
 If a Router becomes unavailable, any Router can take over for it.
 The clients configure their DFS HA client (e.g., ConfiguredFailoverProvider or RequestHedgingProxyProvider) with all the Routers in the federation as endpoints.
 
+* **Unavailable State Store:**
+If a Router cannot contact the State Store, it will enter into a Safe Mode state which disallows it from serving requests.
+Clients will treat Routers in Safe Mode as it was an Standby NameNode and try another Router.
+
 * **NameNode heartbeat HA:**
 For high availability and flexibility, multiple Routers can monitor the same NameNode and heartbeat the information to the State Store.
 This increases clients' resiliency to stale information, should a Router fail.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbb9dded/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
index 3d8b35c..3659bf9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
@@ -35,6 +35,7 @@ public class RouterConfigBuilder {
   private boolean enableStateStore = false;
   private boolean enableMetrics = false;
   private boolean enableQuota = false;
+  private boolean enableSafemode = false;
 
   public RouterConfigBuilder(Configuration configuration) {
     this.conf = configuration;
@@ -52,6 +53,7 @@ public class RouterConfigBuilder {
     this.enableLocalHeartbeat = true;
     this.enableStateStore = true;
     this.enableMetrics = true;
+    this.enableSafemode = true;
     return this;
   }
 
@@ -95,6 +97,11 @@ public class RouterConfigBuilder {
     return this;
   }
 
+  public RouterConfigBuilder safemode(boolean enable) {
+    this.enableSafemode = enable;
+    return this;
+  }
+
   public RouterConfigBuilder rpc() {
     return this.rpc(true);
   }
@@ -123,6 +130,10 @@ public class RouterConfigBuilder {
     return this.quota(true);
   }
 
+  public RouterConfigBuilder safemode() {
+    return this.safemode(true);
+  }
+
   public Configuration build() {
     conf.setBoolean(DFSConfigKeys.DFS_ROUTER_STORE_ENABLE,
         this.enableStateStore);
@@ -139,6 +150,8 @@ public class RouterConfigBuilder {
         this.enableMetrics);
     conf.setBoolean(DFSConfigKeys.DFS_ROUTER_QUOTA_ENABLE,
         this.enableQuota);
+    conf.setBoolean(DFSConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE,
+        this.enableSafemode);
     return conf;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dbb9dded/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java
new file mode 100644
index 0000000..9299f77
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.deleteStateStore;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.util.Time;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test the safe mode for the {@link Router} controlled by
+ * {@link RouterSafemodeService}.
+ */
+public class TestRouterSafemode {
+
+  private Router router;
+  private static Configuration conf;
+
+  @BeforeClass
+  public static void create() throws IOException {
+    // Wipe state store
+    deleteStateStore();
+    // Configuration that supports the state store
+    conf = getStateStoreConfiguration();
+    // 2 sec startup standby
+    conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION,
+        TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS);
+    // 1 sec cache refresh
+    conf.setTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
+        TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS);
+    // 2 sec post cache update before entering safemode (2 intervals)
+    conf.setTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION,
+        TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS);
+    // RPC + State Store + Safe Mode only
+    conf = new RouterConfigBuilder(conf)
+        .rpc()
+        .safemode()
+        .stateStore()
+        .metrics()
+        .build();
+  }
+
+  @AfterClass
+  public static void destroy() {
+  }
+
+  @Before
+  public void setup() throws IOException, URISyntaxException {
+    router = new Router();
+    router.init(conf);
+    router.start();
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    if (router != null) {
+      router.stop();
+      router = null;
+    }
+  }
+
+  @Test
+  public void testSafemodeService() throws IOException {
+    RouterSafemodeService server = new RouterSafemodeService(router);
+    server.init(conf);
+    assertEquals(STATE.INITED, server.getServiceState());
+    server.start();
+    assertEquals(STATE.STARTED, server.getServiceState());
+    server.stop();
+    assertEquals(STATE.STOPPED, server.getServiceState());
+    server.close();
+  }
+
+  @Test
+  public void testRouterExitSafemode()
+      throws InterruptedException, IllegalStateException, IOException {
+
+    assertTrue(router.getRpcServer().isInSafeMode());
+    verifyRouter(RouterServiceState.SAFEMODE);
+
+    // Wait for initial time in milliseconds
+    long interval =
+        conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION,
+            TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) +
+        conf.getTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
+            TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS);
+    Thread.sleep(interval);
+
+    assertFalse(router.getRpcServer().isInSafeMode());
+    verifyRouter(RouterServiceState.RUNNING);
+  }
+
+  @Test
+  public void testRouterEnterSafemode()
+      throws IllegalStateException, IOException, InterruptedException {
+
+    // Verify starting state
+    assertTrue(router.getRpcServer().isInSafeMode());
+    verifyRouter(RouterServiceState.SAFEMODE);
+
+    // We should be in safe mode for DFS_ROUTER_SAFEMODE_EXTENSION time
+    long interval0 = conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXTENSION,
+        TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) - 1000;
+    long t0 = Time.now();
+    while (Time.now() - t0 < interval0) {
+      verifyRouter(RouterServiceState.SAFEMODE);
+      Thread.sleep(100);
+    }
+
+    // We wait some time for the state to propagate
+    long interval1 = 1000 + 2 * conf.getTimeDuration(
+        DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, TimeUnit.SECONDS.toMillis(1),
+        TimeUnit.MILLISECONDS);
+    Thread.sleep(interval1);
+
+    // Running
+    assertFalse(router.getRpcServer().isInSafeMode());
+    verifyRouter(RouterServiceState.RUNNING);
+
+    // Disable cache
+    router.getStateStore().stopCacheUpdateService();
+
+    // Wait until the State Store cache is stale in milliseconds
+    long interval2 =
+        conf.getTimeDuration(DFS_ROUTER_SAFEMODE_EXPIRATION,
+            TimeUnit.SECONDS.toMillis(2), TimeUnit.MILLISECONDS) +
+        conf.getTimeDuration(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
+            TimeUnit.SECONDS.toMillis(1), TimeUnit.MILLISECONDS);
+    Thread.sleep(interval2);
+
+    // Safemode
+    assertTrue(router.getRpcServer().isInSafeMode());
+    verifyRouter(RouterServiceState.SAFEMODE);
+  }
+
+  @Test
+  public void testRouterRpcSafeMode()
+      throws IllegalStateException, IOException {
+
+    assertTrue(router.getRpcServer().isInSafeMode());
+    verifyRouter(RouterServiceState.SAFEMODE);
+
+    // If the Router is in Safe Mode, we should get a SafeModeException
+    boolean exception = false;
+    try {
+      router.getRpcServer().delete("/testfile.txt", true);
+      fail("We should have thrown a safe mode exception");
+    } catch (RouterSafeModeException sme) {
+      exception = true;
+    }
+    assertTrue("We should have thrown a safe mode exception", exception);
+  }
+
+  private void verifyRouter(RouterServiceState status)
+      throws IllegalStateException, IOException {
+    assertEquals(status, router.getRouterState());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org