You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yq...@apache.org on 2018/01/25 07:52:02 UTC

hadoop git commit: HDFS-13042. RBF: Heartbeat Router State. Contributed by Inigo Goiri.

Repository: hadoop
Updated Branches:
  refs/heads/trunk eb2dd0868 -> 7721fff74


HDFS-13042. RBF: Heartbeat Router State. Contributed by Inigo Goiri.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7721fff7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7721fff7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7721fff7

Branch: refs/heads/trunk
Commit: 7721fff74494eb7fbbbba7f8bb4b4692d880d301
Parents: eb2dd08
Author: Yiqun Lin <yq...@apache.org>
Authored: Thu Jan 25 15:51:26 2018 +0800
Committer: Yiqun Lin <yq...@apache.org>
Committed: Thu Jan 25 15:51:26 2018 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   8 +
 .../hdfs/server/federation/router/Router.java   |  71 +++++++
 .../router/RouterHeartbeatService.java          | 155 +++++++++++++++
 .../federation/router/RouterServiceState.java   |   2 +-
 .../federation/store/StateStoreService.java     |   9 +
 .../src/main/resources/hdfs-default.xml         |  20 ++
 .../store/TestStateStoreRouterState.java        | 194 +++++++++++++++++++
 7 files changed, 458 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index f53badc..84215f3f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1222,6 +1222,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_ROUTER_MONITOR_LOCAL_NAMENODE =
       FEDERATION_ROUTER_PREFIX + "monitor.localnamenode.enable";
   public static final boolean DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT = true;
+  public static final String DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS =
+      FEDERATION_ROUTER_PREFIX + "heartbeat-state.interval";
+  public static final long DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS_DEFAULT =
+      TimeUnit.SECONDS.toMillis(5);
 
   // HDFS Router NN client
   public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE =
@@ -1282,6 +1286,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       FEDERATION_STORE_PREFIX + "membership.expiration";
   public static final long FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT =
       TimeUnit.MINUTES.toMillis(5);
+  public static final String FEDERATION_STORE_ROUTER_EXPIRATION_MS =
+      FEDERATION_STORE_PREFIX + "router.expiration";
+  public static final long FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT =
+      TimeUnit.MINUTES.toMillis(5);
 
   // HDFS Router-based federation mount table entries
   /** Maximum number of cache entries to have. */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
index ea8a1c0..1e72c93 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
@@ -37,11 +37,13 @@ import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.JvmPauseMonitor;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,6 +114,18 @@ public class Router extends CompositeService {
   /** Quota cache manager. */
   private RouterQuotaManager quotaManager;
 
+  /** Manages the current state of the router. */
+  private RouterStore routerStateManager;
+  /** Heartbeat our run status to the router state manager. */
+  private RouterHeartbeatService routerHeartbeatService;
+
+  /** The start time of the namesystem. */
+  private final long startTime = Time.now();
+
+  /** State of the Router. */
+  private RouterServiceState state = RouterServiceState.UNINITIALIZED;
+
+
   /////////////////////////////////////////////////////////
   // Constructor
   /////////////////////////////////////////////////////////
@@ -127,6 +141,7 @@ public class Router extends CompositeService {
   @Override
   protected void serviceInit(Configuration configuration) throws Exception {
     this.conf = configuration;
+    updateRouterState(RouterServiceState.INITIALIZING);
 
     if (conf.getBoolean(
         DFSConfigKeys.DFS_ROUTER_STORE_ENABLE,
@@ -188,6 +203,10 @@ public class Router extends CompositeService {
       if (this.namenodeHearbeatServices.isEmpty()) {
         LOG.error("Heartbeat is enabled but there are no namenodes to monitor");
       }
+
+      // Periodically update the router state
+      this.routerHeartbeatService = new RouterHeartbeatService(this);
+      addService(this.routerHeartbeatService);
     }
 
     // Router metrics system
@@ -219,6 +238,8 @@ public class Router extends CompositeService {
   @Override
   protected void serviceStart() throws Exception {
 
+    updateRouterState(RouterServiceState.RUNNING);
+
     if (this.pauseMonitor != null) {
       this.pauseMonitor.start();
       JvmMetrics jvmMetrics = this.metrics.getJvmMetrics();
@@ -233,6 +254,9 @@ public class Router extends CompositeService {
   @Override
   protected void serviceStop() throws Exception {
 
+    // Update state
+    updateRouterState(RouterServiceState.SHUTDOWN);
+
     // JVM pause monitor
     if (this.pauseMonitor != null) {
       this.pauseMonitor.stop();
@@ -454,6 +478,31 @@ public class Router extends CompositeService {
   }
 
   /////////////////////////////////////////////////////////
+  // Router State Management
+  /////////////////////////////////////////////////////////
+
+  /**
+   * Update the router state and heartbeat to the state store.
+   *
+   * @param state The new router state.
+   */
+  public void updateRouterState(RouterServiceState newState) {
+    this.state = newState;
+    if (this.routerHeartbeatService != null) {
+      this.routerHeartbeatService.updateStateAsync();
+    }
+  }
+
+  /**
+   * Get the status of the router.
+   *
+   * @return Status of the router.
+   */
+  public RouterServiceState getRouterState() {
+    return this.state;
+  }
+
+  /////////////////////////////////////////////////////////
   // Submodule getters
   /////////////////////////////////////////////////////////
 
@@ -508,11 +557,33 @@ public class Router extends CompositeService {
     return this.namenodeResolver;
   }
 
+  /**
+   * Get the state store interface for the router heartbeats.
+   *
+   * @return FederationRouterStateStore state store API handle.
+   */
+  public RouterStore getRouterStateManager() {
+    if (this.routerStateManager == null && this.stateStore != null) {
+      this.routerStateManager = this.stateStore.getRegisteredRecordStore(
+          RouterStore.class);
+    }
+    return this.routerStateManager;
+  }
+
   /////////////////////////////////////////////////////////
   // Router info
   /////////////////////////////////////////////////////////
 
   /**
+   * Get the start date of the Router.
+   *
+   * @return Start date of the router.
+   */
+  public long getStartTime() {
+    return this.startTime;
+  }
+
+  /**
    * Unique ID for the router, typically the hostname:port string for the
    * router's RPC server. This ID may be null on router startup before the RPC
    * server has bound to a port.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
new file mode 100644
index 0000000..86a6210
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.store.CachedRecordStore;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
+import org.apache.hadoop.hdfs.server.federation.store.RecordStore;
+import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
+import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to periodically update the Router current state in the State Store.
+ */
+public class RouterHeartbeatService extends PeriodicService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterHeartbeatService.class);
+
+  /** Router we are hearbeating. */
+  private final Router router;
+
+  /**
+   * Create a new Router heartbeat service.
+   *
+   * @param router Router to heartbeat.
+   */
+  public RouterHeartbeatService(Router router) {
+    super(RouterHeartbeatService.class.getSimpleName());
+    this.router = router;
+  }
+
+  /**
+   * Trigger the update of the Router state asynchronously.
+   */
+  protected void updateStateAsync() {
+    Thread thread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        updateStateStore();
+      }
+    }, "Router Heartbeat Async");
+    thread.setDaemon(true);
+    thread.start();
+  }
+
+  /**
+   * Update the state of the Router in the State Store.
+   */
+  private synchronized void updateStateStore() {
+    String routerId = router.getRouterId();
+    if (routerId == null) {
+      LOG.error("Cannot heartbeat for router: unknown router id");
+      return;
+    }
+    RouterStore routerStore = router.getRouterStateManager();
+    if (routerStore != null) {
+      try {
+        RouterState record = RouterState.newInstance(
+            routerId, router.getStartTime(), router.getRouterState());
+        StateStoreVersion stateStoreVersion = StateStoreVersion.newInstance(
+            getStateStoreVersion(MembershipStore.class),
+            getStateStoreVersion(MountTableStore.class));
+        record.setStateStoreVersion(stateStoreVersion);
+        RouterHeartbeatRequest request =
+            RouterHeartbeatRequest.newInstance(record);
+        RouterHeartbeatResponse response = routerStore.routerHeartbeat(request);
+        if (!response.getStatus()) {
+          LOG.warn("Cannot heartbeat router {}", routerId);
+        } else {
+          LOG.debug("Router heartbeat for router {}", routerId);
+        }
+      } catch (IOException e) {
+        LOG.error("Cannot heartbeat router {}: {}", routerId, e.getMessage());
+      }
+    } else {
+      LOG.warn("Cannot heartbeat router {}: State Store unavailable", routerId);
+    }
+  }
+
+  /**
+   * Get the version of the data in the State Store.
+   *
+   * @param clazz Class in the State Store.
+   * @return Version of the data.
+   */
+  private <R extends BaseRecord, S extends RecordStore<R>>
+      long getStateStoreVersion(final Class<S> clazz) {
+    long version = -1;
+    try {
+      StateStoreService stateStore = router.getStateStore();
+      S recordStore = stateStore.getRegisteredRecordStore(clazz);
+      if (recordStore != null) {
+        if (recordStore instanceof CachedRecordStore) {
+          CachedRecordStore<R> cachedRecordStore =
+              (CachedRecordStore<R>) recordStore;
+          List<R> records = cachedRecordStore.getCachedRecords();
+          for (BaseRecord record : records) {
+            if (record.getDateModified() > version) {
+              version = record.getDateModified();
+            }
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Cannot get version for {}: {}", clazz, e.getMessage());
+    }
+    return version;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+
+    long interval = conf.getTimeDuration(
+        DFSConfigKeys.DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS,
+        DFSConfigKeys.DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    this.setIntervalMs(interval);
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void periodicInvoke() {
+    updateStateStore();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java
index 25a6466..3accbe9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hdfs.server.federation.router;
  * States of the Router.
  */
 public enum RouterServiceState {
-  NONE,
+  UNINITIALIZED,
   INITIALIZING,
   SAFEMODE,
   RUNNING,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
index 0289ba6..aa730ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
@@ -38,8 +39,10 @@ import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
 import org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl;
 import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
+import org.apache.hadoop.hdfs.server.federation.store.impl.RouterStoreImpl;
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
 import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
 import org.apache.hadoop.metrics2.MetricsException;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.service.CompositeService;
@@ -148,6 +151,7 @@ public class StateStoreService extends CompositeService {
     // Add supported record stores
     addRecordStore(MembershipStoreImpl.class);
     addRecordStore(MountTableStoreImpl.class);
+    addRecordStore(RouterStoreImpl.class);
 
     // Check the connection to the State Store periodically
     this.monitorService = new StateStoreConnectionMonitorService(this);
@@ -158,6 +162,11 @@ public class StateStoreService extends CompositeService {
         DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS,
         DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT));
 
+    RouterState.setExpirationMs(conf.getTimeDuration(
+        DFSConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS,
+        DFSConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT,
+        TimeUnit.MILLISECONDS));
+
     // Cache update service
     this.cacheUpdater = new StateStoreCacheUpdateService(this);
     addService(this.cacheUpdater);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 7a23eb4..d24310e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -5111,6 +5111,26 @@
   </property>
 
   <property>
+    <name>dfs.federation.router.heartbeat-state.interval</name>
+    <value>5s</value>
+    <description>
+      How often the Router should heartbeat its state into the State Store in
+      milliseconds. This setting supports multiple time unit suffixes as
+      described in dfs.federation.router.quota-cache.update.interval.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.store.router.expiration</name>
+    <value>5m</value>
+    <description>
+      Expiration time in milliseconds for a router state record. This setting
+      supports multiple time unit suffixes as described in
+      dfs.federation.router.quota-cache.update.interval.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.federation.router.monitor.namenode</name>
     <value></value>
     <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java
new file mode 100644
index 0000000..ae15ef6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
+import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
+import org.apache.hadoop.util.Time;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test the basic {@link StateStoreService} {@link RouterStore} functionality.
+ */
+public class TestStateStoreRouterState extends TestStateStoreBase {
+
+  private static RouterStore routerStore;
+
+  @BeforeClass
+  public static void create() {
+    // Reduce expirations to 5 seconds
+    getConf().setTimeDuration(
+        DFSConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS,
+        5, TimeUnit.SECONDS);
+  }
+
+  @Before
+  public void setup() throws IOException, InterruptedException {
+
+    if (routerStore == null) {
+      routerStore =
+          getStateStore().getRegisteredRecordStore(RouterStore.class);
+    }
+
+    // Clear router status registrations
+    assertTrue(clearRecords(getStateStore(), RouterState.class));
+  }
+
+  @Test
+  public void testStateStoreDisconnected() throws Exception {
+
+    // Close the data store driver
+    getStateStore().closeDriver();
+    assertEquals(false, getStateStore().isDriverReady());
+
+    // Test all APIs that access the data store to ensure they throw the correct
+    // exception.
+    GetRouterRegistrationRequest getSingleRequest =
+        GetRouterRegistrationRequest.newInstance();
+    verifyException(routerStore, "getRouterRegistration",
+        StateStoreUnavailableException.class,
+        new Class[] {GetRouterRegistrationRequest.class},
+        new Object[] {getSingleRequest});
+
+    GetRouterRegistrationsRequest getRequest =
+        GetRouterRegistrationsRequest.newInstance();
+    routerStore.loadCache(true);
+    verifyException(routerStore, "getRouterRegistrations",
+        StateStoreUnavailableException.class,
+        new Class[] {GetRouterRegistrationsRequest.class},
+        new Object[] {getRequest});
+
+    RouterHeartbeatRequest hbRequest = RouterHeartbeatRequest.newInstance(
+        RouterState.newInstance("test", 0, RouterServiceState.UNINITIALIZED));
+    verifyException(routerStore, "routerHeartbeat",
+        StateStoreUnavailableException.class,
+        new Class[] {RouterHeartbeatRequest.class},
+        new Object[] {hbRequest});
+  }
+
+  //
+  // Router
+  //
+  @Test
+  public void testUpdateRouterStatus()
+      throws IllegalStateException, IOException {
+
+    long dateStarted = Time.now();
+    String address = "testaddress";
+
+    // Set
+    RouterHeartbeatRequest request = RouterHeartbeatRequest.newInstance(
+        RouterState.newInstance(
+            address, dateStarted, RouterServiceState.RUNNING));
+    assertTrue(routerStore.routerHeartbeat(request).getStatus());
+
+    // Verify
+    GetRouterRegistrationRequest getRequest =
+        GetRouterRegistrationRequest.newInstance(address);
+    RouterState record =
+        routerStore.getRouterRegistration(getRequest).getRouter();
+    assertNotNull(record);
+    assertEquals(RouterServiceState.RUNNING, record.getStatus());
+    assertEquals(address, record.getAddress());
+    assertEquals(FederationUtil.getCompileInfo(), record.getCompileInfo());
+    // Build version may vary a bit
+    assertTrue(record.getBuildVersion().length() > 0);
+  }
+
+  @Test
+  public void testRouterStateExpired()
+      throws IOException, InterruptedException {
+
+    long dateStarted = Time.now();
+    String address = "testaddress";
+
+    RouterHeartbeatRequest request = RouterHeartbeatRequest.newInstance(
+        RouterState.newInstance(
+            address, dateStarted, RouterServiceState.RUNNING));
+    // Set
+    assertTrue(routerStore.routerHeartbeat(request).getStatus());
+
+    // Verify
+    GetRouterRegistrationRequest getRequest =
+        GetRouterRegistrationRequest.newInstance(address);
+    RouterState record =
+        routerStore.getRouterRegistration(getRequest).getRouter();
+    assertNotNull(record);
+
+    // Wait past expiration (set to 5 sec in config)
+    Thread.sleep(6000);
+
+    // Verify expired
+    RouterState r = routerStore.getRouterRegistration(getRequest).getRouter();
+    assertEquals(RouterServiceState.EXPIRED, r.getStatus());
+
+    // Heartbeat again and this shouldn't be EXPIRED anymore
+    assertTrue(routerStore.routerHeartbeat(request).getStatus());
+    r = routerStore.getRouterRegistration(getRequest).getRouter();
+    assertEquals(RouterServiceState.RUNNING, r.getStatus());
+  }
+
+  @Test
+  public void testGetAllRouterStates()
+      throws StateStoreUnavailableException, IOException {
+
+    // Set 2 entries
+    RouterHeartbeatRequest heartbeatRequest1 =
+        RouterHeartbeatRequest.newInstance(
+            RouterState.newInstance(
+                "testaddress1", Time.now(), RouterServiceState.RUNNING));
+    assertTrue(routerStore.routerHeartbeat(heartbeatRequest1).getStatus());
+
+    RouterHeartbeatRequest heartbeatRequest2 =
+        RouterHeartbeatRequest.newInstance(
+            RouterState.newInstance(
+                "testaddress2", Time.now(), RouterServiceState.RUNNING));
+    assertTrue(routerStore.routerHeartbeat(heartbeatRequest2).getStatus());
+
+    // Verify
+    routerStore.loadCache(true);
+    GetRouterRegistrationsRequest request =
+        GetRouterRegistrationsRequest.newInstance();
+    List<RouterState> entries =
+        routerStore.getRouterRegistrations(request).getRouters();
+    assertEquals(2, entries.size());
+    Collections.sort(entries);
+    assertEquals("testaddress1", entries.get(0).getAddress());
+    assertEquals("testaddress2", entries.get(1).getAddress());
+    assertEquals(RouterServiceState.RUNNING, entries.get(0).getStatus());
+    assertEquals(RouterServiceState.RUNNING, entries.get(1).getStatus());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org