You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/09/06 16:50:13 UTC

[13/29] hadoop git commit: HDFS-10687. Federation Membership State Store internal API. Contributed by Jason Kace and Inigo Goiri.

HDFS-10687. Federation Membership State Store internal API. Contributed by Jason Kace and Inigo Goiri.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/79fce5e0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/79fce5e0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/79fce5e0

Branch: refs/heads/HDFS-10467
Commit: 79fce5e0cfb11fe5cf4bc9bae72bf7f601afbf60
Parents: 4d1180e
Author: Inigo Goiri <in...@apache.org>
Authored: Mon Jul 31 10:55:21 2017 -0700
Committer: Inigo Goiri <in...@apache.org>
Committed: Wed Sep 6 09:49:44 2017 -0700

----------------------------------------------------------------------
 .../dev-support/findbugsExcludeFile.xml         |   3 +
 hadoop-hdfs-project/hadoop-hdfs/pom.xml         |   1 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  17 +-
 .../resolver/MembershipNamenodeResolver.java    | 290 ++++++++++++
 .../federation/router/FederationUtil.java       |  42 +-
 .../federation/store/CachedRecordStore.java     | 237 ++++++++++
 .../federation/store/MembershipStore.java       | 126 +++++
 .../federation/store/StateStoreCache.java       |  36 ++
 .../store/StateStoreCacheUpdateService.java     |  67 +++
 .../federation/store/StateStoreService.java     | 202 +++++++-
 .../store/impl/MembershipStoreImpl.java         | 311 +++++++++++++
 .../federation/store/impl/package-info.java     |  31 ++
 .../GetNamenodeRegistrationsRequest.java        |  52 +++
 .../GetNamenodeRegistrationsResponse.java       |  55 +++
 .../store/protocol/GetNamespaceInfoRequest.java |  30 ++
 .../protocol/GetNamespaceInfoResponse.java      |  52 +++
 .../protocol/NamenodeHeartbeatRequest.java      |  52 +++
 .../protocol/NamenodeHeartbeatResponse.java     |  49 ++
 .../UpdateNamenodeRegistrationRequest.java      |  72 +++
 .../UpdateNamenodeRegistrationResponse.java     |  51 ++
 .../impl/pb/FederationProtocolPBTranslator.java | 145 ++++++
 .../GetNamenodeRegistrationsRequestPBImpl.java  |  87 ++++
 .../GetNamenodeRegistrationsResponsePBImpl.java |  99 ++++
 .../impl/pb/GetNamespaceInfoRequestPBImpl.java  |  60 +++
 .../impl/pb/GetNamespaceInfoResponsePBImpl.java |  95 ++++
 .../impl/pb/NamenodeHeartbeatRequestPBImpl.java |  93 ++++
 .../pb/NamenodeHeartbeatResponsePBImpl.java     |  71 +++
 ...UpdateNamenodeRegistrationRequestPBImpl.java |  95 ++++
 ...pdateNamenodeRegistrationResponsePBImpl.java |  73 +++
 .../store/protocol/impl/pb/package-info.java    |  29 ++
 .../store/records/MembershipState.java          | 329 +++++++++++++
 .../store/records/MembershipStats.java          | 126 +++++
 .../records/impl/pb/MembershipStatePBImpl.java  | 334 +++++++++++++
 .../records/impl/pb/MembershipStatsPBImpl.java  | 191 ++++++++
 .../src/main/proto/FederationProtocol.proto     | 107 +++++
 .../src/main/resources/hdfs-default.xml         |  18 +-
 .../resolver/TestNamenodeResolver.java          | 284 ++++++++++++
 .../store/FederationStateStoreTestUtils.java    |  23 +-
 .../federation/store/TestStateStoreBase.java    |  81 ++++
 .../store/TestStateStoreMembershipState.java    | 463 +++++++++++++++++++
 .../store/driver/TestStateStoreDriverBase.java  |  69 ++-
 .../store/records/TestMembershipState.java      | 129 ++++++
 42 files changed, 4745 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fce5e0/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index 9582fcb..4b958b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -15,6 +15,9 @@
        <Package name="org.apache.hadoop.hdfs.qjournal.protocol" />
      </Match>
      <Match>
+       <Package name="org.apache.hadoop.hdfs.federation.protocol.proto" />
+     </Match>
+     <Match>
        <Bug pattern="EI_EXPOSE_REP" />
      </Match>
      <Match>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fce5e0/hadoop-hdfs-project/hadoop-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index 425572f..cc7a975 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -331,6 +331,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
                   <include>QJournalProtocol.proto</include>
                   <include>editlog.proto</include>
                   <include>fsimage.proto</include>
+                  <include>FederationProtocol.proto</include>
                 </includes>
               </source>
             </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fce5e0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 8b39e88..afb5bbf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
 import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl;
 import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl;
@@ -1163,8 +1165,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "org.apache.hadoop.hdfs.server.federation.MockResolver";
   public static final String FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS =
       FEDERATION_ROUTER_PREFIX + "namenode.resolver.client.class";
-  public static final String FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS_DEFAULT =
-      "org.apache.hadoop.hdfs.server.federation.MockResolver";
+  public static final Class<? extends ActiveNamenodeResolver>
+      FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS_DEFAULT =
+          MembershipNamenodeResolver.class;
 
   // HDFS Router-based federation State Store
   public static final String FEDERATION_STORE_PREFIX =
@@ -1186,6 +1189,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT =
       TimeUnit.MINUTES.toMillis(1);
 
+  public static final String DFS_ROUTER_CACHE_TIME_TO_LIVE_MS =
+      FEDERATION_ROUTER_PREFIX + "cache.ttl";
+  public static final long DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT =
+      TimeUnit.MINUTES.toMillis(1);
+
+  public static final String FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS =
+      FEDERATION_STORE_PREFIX + "membership.expiration";
+  public static final long FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT =
+      TimeUnit.MINUTES.toMillis(5);
+
   // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry 
   @Deprecated
   public static final String  DFS_CLIENT_RETRY_POLICY_ENABLED_KEY

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fce5e0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
new file mode 100644
index 0000000..b0ced24
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.ACTIVE;
+import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.EXPIRED;
+import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.UNAVAILABLE;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements a cached lookup of the most recently active namenode for a
+ * particular nameservice. Relies on the {@link StateStoreService} to
+ * discover available nameservices and namenodes.
+ */
+public class MembershipNamenodeResolver
+    implements ActiveNamenodeResolver, StateStoreCache {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MembershipNamenodeResolver.class);
+
+  /** Reference to the State Store. */
+  private final StateStoreService stateStore;
+  /** Membership State Store interface. */
+  private final MembershipStore membershipInterface;
+
+  /** Parent router ID. */
+  private String routerId;
+
+  /** Cached lookup of NN for nameservice. Invalidated on cache refresh. */
+  private Map<String, List<? extends FederationNamenodeContext>> cacheNS;
+  /** Cached lookup of NN for block pool. Invalidated on cache refresh. */
+  private Map<String, List<? extends FederationNamenodeContext>> cacheBP;
+
+
+  public MembershipNamenodeResolver(
+      Configuration conf, StateStoreService store) throws IOException {
+    this.stateStore = store;
+
+    this.cacheNS = new ConcurrentHashMap<>();
+    this.cacheBP = new ConcurrentHashMap<>();
+
+    if (this.stateStore != null) {
+      // Request cache updates from the state store
+      this.stateStore.registerCacheExternal(this);
+
+      // Initialize the interface to get the membership
+      this.membershipInterface = this.stateStore.getRegisteredRecordStore(
+          MembershipStore.class);
+    } else {
+      this.membershipInterface = null;
+    }
+
+    if (this.membershipInterface == null) {
+      throw new IOException("State Store does not have an interface for " +
+          MembershipStore.class.getSimpleName());
+    }
+  }
+
+  @Override
+  public boolean loadCache(boolean force) {
+    // Our cache depends on the store, update it first
+    try {
+      this.membershipInterface.loadCache(force);
+    } catch (IOException e) {
+      LOG.error("Cannot update membership from the State Store", e);
+    }
+
+    // Force refresh of active NN cache
+    cacheBP.clear();
+    cacheNS.clear();
+    return true;
+  }
+
+  @Override
+  public void updateActiveNamenode(
+      final String nsId, final InetSocketAddress address) throws IOException {
+
+    // Called when we have an RPC miss and successful hit on an alternate NN.
+    // Temporarily update our cache, it will be overwritten on the next update.
+    try {
+      MembershipState partial = MembershipState.newInstance();
+      String rpcAddress = address.getHostName() + ":" + address.getPort();
+      partial.setRpcAddress(rpcAddress);
+      partial.setNameserviceId(nsId);
+
+      GetNamenodeRegistrationsRequest request =
+          GetNamenodeRegistrationsRequest.newInstance(partial);
+
+      GetNamenodeRegistrationsResponse response =
+          this.membershipInterface.getNamenodeRegistrations(request);
+      List<MembershipState> records = response.getNamenodeMemberships();
+
+      if (records != null && records.size() == 1) {
+        MembershipState record = records.get(0);
+        UpdateNamenodeRegistrationRequest updateRequest =
+            UpdateNamenodeRegistrationRequest.newInstance(
+                record.getNameserviceId(), record.getNamenodeId(), ACTIVE);
+        this.membershipInterface.updateNamenodeRegistration(updateRequest);
+      }
+    } catch (StateStoreUnavailableException e) {
+      LOG.error("Cannot update {} as active, State Store unavailable", address);
+    }
+  }
+
+  @Override
+  public List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
+      final String nsId) throws IOException {
+
+    List<? extends FederationNamenodeContext> ret = cacheNS.get(nsId);
+    if (ret == null) {
+      try {
+        MembershipState partial = MembershipState.newInstance();
+        partial.setNameserviceId(nsId);
+        GetNamenodeRegistrationsRequest request =
+            GetNamenodeRegistrationsRequest.newInstance(partial);
+
+        final List<MembershipState> result =
+            getRecentRegistrationForQuery(request, true, false);
+        if (result == null || result.isEmpty()) {
+          LOG.error("Cannot locate eligible NNs for {}", nsId);
+          return null;
+        } else {
+          cacheNS.put(nsId, result);
+          ret = result;
+        }
+      } catch (StateStoreUnavailableException e) {
+        LOG.error("Cannot get active NN for {}, State Store unavailable", nsId);
+      }
+    }
+    if (ret == null) {
+      return null;
+    }
+    return Collections.unmodifiableList(ret);
+  }
+
+  @Override
+  public List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId(
+      final String bpId) throws IOException {
+
+    List<? extends FederationNamenodeContext> ret = cacheBP.get(bpId);
+    if (ret == null) {
+      try {
+        MembershipState partial = MembershipState.newInstance();
+        partial.setBlockPoolId(bpId);
+        GetNamenodeRegistrationsRequest request =
+            GetNamenodeRegistrationsRequest.newInstance(partial);
+
+        final List<MembershipState> result =
+            getRecentRegistrationForQuery(request, true, false);
+        if (result == null || result.isEmpty()) {
+          LOG.error("Cannot locate eligible NNs for {}", bpId);
+        } else {
+          cacheBP.put(bpId, result);
+          ret = result;
+        }
+      } catch (StateStoreUnavailableException e) {
+        LOG.error("Cannot get active NN for {}, State Store unavailable", bpId);
+        return null;
+      }
+    }
+    if (ret == null) {
+      return null;
+    }
+    return Collections.unmodifiableList(ret);
+  }
+
+  @Override
+  public boolean registerNamenode(NamenodeStatusReport report)
+      throws IOException {
+
+    if (this.routerId == null) {
+      LOG.warn("Cannot register namenode, router ID is not known {}", report);
+      return false;
+    }
+
+    MembershipState record = MembershipState.newInstance(
+        routerId, report.getNameserviceId(), report.getNamenodeId(),
+        report.getClusterId(), report.getBlockPoolId(), report.getRpcAddress(),
+        report.getServiceAddress(), report.getLifelineAddress(),
+        report.getWebAddress(), report.getState(), report.getSafemode());
+
+    if (report.getState() != UNAVAILABLE) {
+      // Set/update our last contact time
+      record.setLastContact(Time.now());
+    }
+
+    NamenodeHeartbeatRequest request = NamenodeHeartbeatRequest.newInstance();
+    request.setNamenodeMembership(record);
+    return this.membershipInterface.namenodeHeartbeat(request).getResult();
+  }
+
+  @Override
+  public Set<FederationNamespaceInfo> getNamespaces() throws IOException {
+    GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
+    GetNamespaceInfoResponse response =
+        this.membershipInterface.getNamespaceInfo(request);
+    return response.getNamespaceInfo();
+  }
+
+  /**
+   * Picks the most relevant record registration that matches the query. Return
+   * registrations matching the query in this preference: 1) Most recently
+   * updated ACTIVE registration 2) Most recently updated STANDBY registration
+   * (if showStandby) 3) Most recently updated UNAVAILABLE registration (if
+   * showUnavailable). EXPIRED registrations are ignored.
+   *
+   * @param query The select query for NN registrations.
+   * @param excludes List of NNs to exclude from matching results.
+   * @param addUnavailable include UNAVAILABLE registrations.
+   * @param addExpired include EXPIRED registrations.
+   * @return List of memberships or null if no registrations that
+   *         both match the query AND the selected states.
+   * @throws IOException
+   */
+  private List<MembershipState> getRecentRegistrationForQuery(
+      GetNamenodeRegistrationsRequest request, boolean addUnavailable,
+      boolean addExpired) throws IOException {
+
+    // Retrieve a list of all registrations that match this query.
+    // This may include all NN records for a namespace/blockpool, including
+    // duplicate records for the same NN from different routers.
+    GetNamenodeRegistrationsResponse response =
+        this.membershipInterface.getNamenodeRegistrations(request);
+
+    List<MembershipState> memberships = response.getNamenodeMemberships();
+    if (!addExpired || !addUnavailable) {
+      Iterator<MembershipState> iterator = memberships.iterator();
+      while (iterator.hasNext()) {
+        MembershipState membership = iterator.next();
+        if (membership.getState() == EXPIRED && !addExpired) {
+          iterator.remove();
+        } else if (membership.getState() == UNAVAILABLE && !addUnavailable) {
+          iterator.remove();
+        }
+      }
+    }
+
+    List<MembershipState> priorityList = new ArrayList<>();
+    priorityList.addAll(memberships);
+    Collections.sort(priorityList, new NamenodePriorityComparator());
+
+    LOG.debug("Selected most recent NN {} for query", priorityList);
+    return priorityList;
+  }
+
+  @Override
+  public void setRouterId(String router) {
+    this.routerId = router;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fce5e0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
index 6e7e865..0129a37 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
@@ -19,26 +19,57 @@ package org.apache.hadoop.hdfs.server.federation.router;
 
 import java.lang.reflect.Constructor;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Utilities for managing HDFS federation.
  */
 public final class FederationUtil {
 
-  private static final Log LOG = LogFactory.getLog(FederationUtil.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FederationUtil.class);
 
   private FederationUtil() {
     // Utility Class
   }
 
   /**
+   * Create an instance of an interface with a constructor using a context.
+   *
+   * @param conf Configuration for the class names.
+   * @param context Context object to pass to the instance.
+   * @param contextClass Type of the context passed to the constructor.
+   * @param clazz Class of the object to return.
+   * @return New instance of the specified class that implements the desired
+   *         interface and a single parameter constructor containing a
+   *         StateStore reference.
+   */
+  private static <T, R> T newInstance(final Configuration conf,
+      final R context, final Class<R> contextClass, final Class<T> clazz) {
+    try {
+      if (contextClass == null) {
+        // Default constructor if no context
+        Constructor<T> constructor = clazz.getConstructor();
+        return constructor.newInstance();
+      } else {
+        // Constructor with context
+        Constructor<T> constructor = clazz.getConstructor(
+            Configuration.class, contextClass);
+        return constructor.newInstance(conf, context);
+      }
+    } catch (ReflectiveOperationException e) {
+      LOG.error("Could not instantiate: {}", clazz.getSimpleName(), e);
+      return null;
+    }
+  }
+
+  /**
    * Create an instance of an interface with a constructor using a state store
    * constructor.
    *
@@ -105,13 +136,14 @@ public final class FederationUtil {
    *
    * @param conf Configuration that defines the namenode resolver class.
    * @param obj Context object passed to class constructor.
-   * @return ActiveNamenodeResolver
+   * @return New active namenode resolver.
    */
   public static ActiveNamenodeResolver newActiveNamenodeResolver(
       Configuration conf, StateStoreService stateStore) {
-    return newInstance(conf, stateStore, StateStoreService.class,
+    Class<? extends ActiveNamenodeResolver> clazz = conf.getClass(
         DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
         DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS_DEFAULT,
         ActiveNamenodeResolver.class);
+    return newInstance(conf, stateStore, StateStoreService.class, clazz);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fce5e0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
new file mode 100644
index 0000000..90a6699
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Record store that takes care of caching the records in memory.
+ *
+ * @param <R> Record to store by this interface.
+ */
+public abstract class CachedRecordStore<R extends BaseRecord>
+    extends RecordStore<R> implements StateStoreCache {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CachedRecordStore.class);
+
+
+  /** Prevent loading the cache more than once every 500 ms. */
+  private static final long MIN_UPDATE_MS = 500;
+
+
+  /** Cached entries. */
+  private List<R> records = new ArrayList<>();
+
+  /** Time stamp of the cached entries. */
+  private long timestamp = -1;
+
+  /** If the cache is initialized. */
+  private boolean initialized = false;
+
+  /** Last time the cache was updated. */
+  private long lastUpdate = -1;
+
+  /** Lock to access the memory cache. */
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
+  /** If it should override the expired values when loading the cache. */
+  private boolean override = false;
+
+
+  /**
+   * Create a new cached record store.
+   *
+   * @param clazz Class of the record to store.
+   * @param driver State Store driver.
+   */
+  protected CachedRecordStore(Class<R> clazz, StateStoreDriver driver) {
+    this(clazz, driver, false);
+  }
+
+  /**
+   * Create a new cached record store.
+   *
+   * @param clazz Class of the record to store.
+   * @param driver State Store driver.
+   * @param override If the entries should be override if they expire
+   */
+  protected CachedRecordStore(
+      Class<R> clazz, StateStoreDriver driver, boolean over) {
+    super(clazz, driver);
+
+    this.override = over;
+  }
+
+  /**
+   * Check that the cache of the State Store information is available.
+   *
+   * @throws StateStoreUnavailableException If the cache is not initialized.
+   */
+  private void checkCacheAvailable() throws StateStoreUnavailableException {
+    if (!this.initialized) {
+      throw new StateStoreUnavailableException(
+          "Cached State Store not initialized, " +
+          getRecordClass().getSimpleName() + " records not valid");
+    }
+  }
+
+  @Override
+  public boolean loadCache(boolean force) throws IOException {
+    // Prevent loading the cache too frequently
+    if (force || isUpdateTime()) {
+      List<R> newRecords = null;
+      long t = -1;
+      try {
+        QueryResult<R> result = getDriver().get(getRecordClass());
+        newRecords = result.getRecords();
+        t = result.getTimestamp();
+
+        // If we have any expired record, update the State Store
+        if (this.override) {
+          overrideExpiredRecords(result);
+        }
+      } catch (IOException e) {
+        LOG.error("Cannot get \"{}\" records from the State Store",
+            getRecordClass().getSimpleName());
+        this.initialized = false;
+        return false;
+      }
+
+      // Update cache atomically
+      writeLock.lock();
+      try {
+        this.records.clear();
+        this.records.addAll(newRecords);
+        this.timestamp = t;
+        this.initialized = true;
+      } finally {
+        writeLock.unlock();
+      }
+
+      lastUpdate = Time.monotonicNow();
+    }
+    return true;
+  }
+
+  /**
+   * Check if it's time to update the cache. Update it it was never updated.
+   *
+   * @return If it's time to update this cache.
+   */
+  private boolean isUpdateTime() {
+    return Time.monotonicNow() - lastUpdate > MIN_UPDATE_MS;
+  }
+
+  /**
+   * Updates the state store with any record overrides we detected, such as an
+   * expired state.
+   *
+   * @param query RecordQueryResult containing the data to be inspected.
+   * @param clazz Type of objects contained in the query.
+   * @throws IOException
+   */
+  public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
+    List<R> commitRecords = new ArrayList<>();
+    List<R> newRecords = query.getRecords();
+    long currentDriverTime = query.getTimestamp();
+    if (newRecords == null || currentDriverTime <= 0) {
+      LOG.error("Cannot check overrides for record");
+      return;
+    }
+    for (R record : newRecords) {
+      if (record.checkExpired(currentDriverTime)) {
+        String recordName = StateStoreUtils.getRecordName(record.getClass());
+        LOG.info("Override State Store record {}: {}", recordName, record);
+        commitRecords.add(record);
+      }
+    }
+    if (commitRecords.size() > 0) {
+      getDriver().putAll(commitRecords, true, false);
+    }
+  }
+
+  /**
+   * Updates the state store with any record overrides we detected, such as an
+   * expired state.
+   *
+   * @param driver State store driver for the data store.
+   * @param record Record record to be updated.
+   * @param clazz Type of data record.
+   * @throws IOException
+   */
+  public void overrideExpiredRecord(R record) throws IOException {
+    List<R> newRecords = Collections.singletonList(record);
+    long time = getDriver().getTime();
+    QueryResult<R> query = new QueryResult<>(newRecords, time);
+    overrideExpiredRecords(query);
+  }
+
+  /**
+   * Get all the cached records.
+   *
+   * @return Copy of the cached records.
+   * @throws StateStoreUnavailableException If the State store is not available.
+   */
+  public List<R> getCachedRecords() throws StateStoreUnavailableException {
+    checkCacheAvailable();
+
+    List<R> ret = new LinkedList<R>();
+    this.readLock.lock();
+    try {
+      ret.addAll(this.records);
+    } finally {
+      this.readLock.unlock();
+    }
+    return ret;
+  }
+
+  /**
+   * Get all the cached records and the time stamp of the cache.
+   *
+   * @return Copy of the cached records and the time stamp.
+   * @throws StateStoreUnavailableException If the State store is not available.
+   */
+  protected QueryResult<R> getCachedRecordsAndTimeStamp()
+      throws StateStoreUnavailableException {
+    checkCacheAvailable();
+
+    this.readLock.lock();
+    try {
+      return new QueryResult<R>(this.records, this.timestamp);
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fce5e0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java
new file mode 100644
index 0000000..3e8ba6b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+
+/**
+ * Management API for NameNode registrations stored in
+ * {@link org.apache.hadoop.hdfs.server.federation.store.records.MembershipState
+ * MembershipState} records. The {@link org.apache.hadoop.hdfs.server.
+ * federation.router.RouterHeartbeatService RouterHeartbeatService} periodically
+ * polls each NN to update the NameNode metadata(addresses, operational) and HA
+ * state(active, standby). Each NameNode may be polled by multiple
+ * {@link org.apache.hadoop.hdfs.server.federation.router.Router Router}
+ * instances.
+ * <p>
+ * Once fetched from the
+ * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
+ * StateStoreDriver}, NameNode registrations are cached until the next query.
+ * The fetched registration data is aggregated using a quorum to determine the
+ * best/most accurate state for each NameNode. The cache is periodically updated
+ * by the @{link StateStoreCacheUpdateService}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class MembershipStore
+    extends CachedRecordStore<MembershipState> {
+
+  protected MembershipStore(StateStoreDriver driver) {
+    super(MembershipState.class, driver, true);
+  }
+
+  /**
+   * Inserts or updates a namenode membership entry into the table.
+   *
+   * @param request Fully populated NamenodeHeartbeatRequest request.
+   * @return True if successful, false otherwise.
+   * @throws StateStoreUnavailableException Throws exception if the data store
+   *           is not initialized.
+   * @throws IOException if the data store could not be queried or the query is
+   *           invalid.
+   */
+  public abstract NamenodeHeartbeatResponse namenodeHeartbeat(
+      NamenodeHeartbeatRequest request) throws IOException;
+
+  /**
+   * Queries for a single cached registration entry matching the given
+   * parameters. Possible keys are the names of data structure elements Possible
+   * values are matching SQL "LIKE" targets.
+   *
+   * @param request Fully populated GetNamenodeRegistrationsRequest request.
+   * @return Single matching FederationMembershipStateEntry or null if not found
+   *         or more than one entry matches.
+   * @throws StateStoreUnavailableException Throws exception if the data store
+   *           is not initialized.
+   * @throws IOException if the data store could not be queried or the query is
+   *           invalid.
+   */
+  public abstract GetNamenodeRegistrationsResponse getNamenodeRegistrations(
+      GetNamenodeRegistrationsRequest request) throws IOException;
+
+  /**
+   * Get the expired registrations from the registration cache.
+   *
+   * @return Expired registrations or zero-length list if none are found.
+   * @throws StateStoreUnavailableException Throws exception if the data store
+   *           is not initialized.
+   * @throws IOException if the data store could not be queried or the query is
+   *           invalid.
+   */
+  public abstract GetNamenodeRegistrationsResponse
+      getExpiredNamenodeRegistrations(GetNamenodeRegistrationsRequest request)
+          throws IOException;
+
+  /**
+   * Retrieves a list of registered nameservices and their associated info.
+   *
+   * @param request
+   * @return Collection of information for each registered nameservice.
+   * @throws IOException if the data store could not be queried or the query is
+   *           invalid.
+   */
+  public abstract GetNamespaceInfoResponse getNamespaceInfo(
+      GetNamespaceInfoRequest request) throws IOException;
+
+  /**
+   * Overrides a cached namenode state with an updated state.
+   *
+   * @param request Fully populated OverrideNamenodeRegistrationRequest request.
+   * @return OverrideNamenodeRegistrationResponse
+   * @throws StateStoreUnavailableException if the data store is not
+   *           initialized.
+   * @throws IOException if the data store could not be queried or the query is
+   *           invalid.
+   */
+  public abstract UpdateNamenodeRegistrationResponse updateNamenodeRegistration(
+      UpdateNamenodeRegistrationRequest request) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fce5e0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java
new file mode 100644
index 0000000..83fc501
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import java.io.IOException;
+
+/**
+ * Interface for a cached copy of the State Store.
+ */
+public interface StateStoreCache {
+
+  /**
+   * Load the cache from the State Store. Called by the cache update service
+   * when the data has been reloaded.
+   *
+   * @param force If we force the load.
+   * @return If the cache was loaded successfully.
+   * @throws IOException If there was an error loading the cache.
+   */
+  boolean loadCache(boolean force) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fce5e0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java
new file mode 100644
index 0000000..bb8cfb0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.PeriodicService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to periodically update the {@link StateStoreService}
+ * cached information in the
+ * {@link org.apache.hadoop.hdfs.server.federation.router.Router Router}.
+ * This is for performance and removes the State Store from the critical path
+ * in common operations.
+ */
+public class StateStoreCacheUpdateService extends PeriodicService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StateStoreCacheUpdateService.class);
+
+  /** The service that manages the State Store connection. */
+  private final StateStoreService stateStore;
+
+
+  /**
+   * Create a new Cache update service.
+   *
+   * @param stateStore Implementation of the state store
+   */
+  public StateStoreCacheUpdateService(StateStoreService stateStore) {
+    super(StateStoreCacheUpdateService.class.getSimpleName());
+    this.stateStore = stateStore;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+
+    this.setIntervalMs(conf.getLong(
+        DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
+        DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT));
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void periodicInvoke() {
+    LOG.debug("Updating State Store cache");
+    stateStore.refreshCaches();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fce5e0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
index df207e0..73f607f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
@@ -18,17 +18,24 @@
 package org.apache.hadoop.hdfs.server.federation.store;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl;
 import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +61,9 @@ import com.google.common.annotations.VisibleForTesting;
  * federation.
  * <li>{@link MountTableStore}: Mount table between to subclusters.
  * See {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}.
+ * <li>{@link RebalancerStore}: Log of the rebalancing operations.
+ * <li>{@link RouterStore}: Router state in the federation.
+ * <li>{@link TokenStore}: Tokens in the federation.
  * </ul>
  */
 @InterfaceAudience.Private
@@ -77,8 +87,30 @@ public class StateStoreService extends CompositeService {
   private StateStoreConnectionMonitorService monitorService;
 
 
+  /** Supported record stores. */
+  private final Map<
+      Class<? extends BaseRecord>, RecordStore<? extends BaseRecord>>
+          recordStores;
+
+  /** Service to maintain State Store caches. */
+  private StateStoreCacheUpdateService cacheUpdater;
+  /** Time the cache was last successfully updated. */
+  private long cacheLastUpdateTime;
+  /** List of internal caches to update. */
+  private final List<StateStoreCache> cachesToUpdateInternal;
+  /** List of external caches to update. */
+  private final List<StateStoreCache> cachesToUpdateExternal;
+
+
   public StateStoreService() {
     super(StateStoreService.class.getName());
+
+    // Records and stores supported by this implementation
+    this.recordStores = new HashMap<>();
+
+    // Caches to maintain
+    this.cachesToUpdateInternal = new ArrayList<>();
+    this.cachesToUpdateExternal = new ArrayList<>();
   }
 
   /**
@@ -102,10 +134,22 @@ public class StateStoreService extends CompositeService {
       throw new IOException("Cannot create driver for the State Store");
     }
 
+    // Add supported record stores
+    addRecordStore(MembershipStoreImpl.class);
+
     // Check the connection to the State Store periodically
     this.monitorService = new StateStoreConnectionMonitorService(this);
     this.addService(monitorService);
 
+    // Set expirations intervals for each record
+    MembershipState.setExpirationMs(conf.getLong(
+        DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS,
+        DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT));
+
+    // Cache update service
+    this.cacheUpdater = new StateStoreCacheUpdateService(this);
+    addService(this.cacheUpdater);
+
     super.serviceInit(this.conf);
   }
 
@@ -123,13 +167,56 @@ public class StateStoreService extends CompositeService {
   }
 
   /**
+   * Add a record store to the State Store. It includes adding the store, the
+   * supported record and the cache management.
+   *
+   * @param clazz Class of the record store to track.
+   * @return New record store.
+   * @throws ReflectiveOperationException
+   */
+  private <T extends RecordStore<?>> void addRecordStore(
+      final Class<T> clazz) throws ReflectiveOperationException {
+
+    assert this.getServiceState() == STATE.INITED :
+        "Cannot add record to the State Store once started";
+
+    T recordStore = RecordStore.newInstance(clazz, this.getDriver());
+    Class<? extends BaseRecord> recordClass = recordStore.getRecordClass();
+    this.recordStores.put(recordClass, recordStore);
+
+    // Subscribe for cache updates
+    if (recordStore instanceof StateStoreCache) {
+      StateStoreCache cachedRecordStore = (StateStoreCache) recordStore;
+      this.cachesToUpdateInternal.add(cachedRecordStore);
+    }
+  }
+
+  /**
+   * Get the record store in this State Store for a given interface.
+   *
+   * @param recordStoreClass Class of the record store.
+   * @return Registered record store or null if not found.
+   */
+  public <T extends RecordStore<?>> T getRegisteredRecordStore(
+      final Class<T> recordStoreClass) {
+    for (RecordStore<? extends BaseRecord> recordStore :
+        this.recordStores.values()) {
+      if (recordStoreClass.isInstance(recordStore)) {
+        @SuppressWarnings("unchecked")
+        T recordStoreChecked = (T) recordStore;
+        return recordStoreChecked;
+      }
+    }
+    return null;
+  }
+
+  /**
    * List of records supported by this State Store.
    *
    * @return List of supported record classes.
    */
   public Collection<Class<? extends BaseRecord>> getSupportedRecords() {
-    // TODO add list of records
-    return new LinkedList<>();
+    return this.recordStores.keySet();
   }
 
   /**
@@ -142,6 +229,7 @@ public class StateStoreService extends CompositeService {
         if (this.driver.init(conf, getIdentifier(), getSupportedRecords())) {
           LOG.info("Connection to the State Store driver {} is open and ready",
               driverName);
+          this.refreshCaches();
         } else {
           LOG.error("Cannot initialize State Store driver {}", driverName);
         }
@@ -198,4 +286,114 @@ public class StateStoreService extends CompositeService {
     this.identifier = id;
   }
 
+  //
+  // Cached state store data
+  //
+  /**
+   * The last time the state store cache was fully updated.
+   *
+   * @return Timestamp.
+   */
+  public long getCacheUpdateTime() {
+    return this.cacheLastUpdateTime;
+  }
+
+  /**
+   * Stops the cache update service.
+   */
+  @VisibleForTesting
+  public void stopCacheUpdateService() {
+    if (this.cacheUpdater != null) {
+      this.cacheUpdater.stop();
+      removeService(this.cacheUpdater);
+      this.cacheUpdater = null;
+    }
+  }
+
+  /**
+   * Register a cached record store for automatic periodic cache updates.
+   *
+   * @param client Client to the state store.
+   */
+  public void registerCacheExternal(StateStoreCache client) {
+    this.cachesToUpdateExternal.add(client);
+  }
+
+  /**
+   * Refresh the cache with information from the State Store. Called
+   * periodically by the CacheUpdateService to maintain data caches and
+   * versions.
+   */
+  public void refreshCaches() {
+    refreshCaches(false);
+  }
+
+  /**
+   * Refresh the cache with information from the State Store. Called
+   * periodically by the CacheUpdateService to maintain data caches and
+   * versions.
+   * @param force If we force the refresh.
+   */
+  public void refreshCaches(boolean force) {
+    boolean success = true;
+    if (isDriverReady()) {
+      List<StateStoreCache> cachesToUpdate = new LinkedList<>();
+      cachesToUpdate.addAll(cachesToUpdateInternal);
+      cachesToUpdate.addAll(cachesToUpdateExternal);
+      for (StateStoreCache cachedStore : cachesToUpdate) {
+        String cacheName = cachedStore.getClass().getSimpleName();
+        boolean result = false;
+        try {
+          result = cachedStore.loadCache(force);
+        } catch (IOException e) {
+          LOG.error("Error updating cache for {}", cacheName, e);
+          result = false;
+        }
+        if (!result) {
+          success = false;
+          LOG.error("Cache update failed for cache {}", cacheName);
+        }
+      }
+    } else {
+      success = false;
+      LOG.info("Skipping State Store cache update, driver is not ready.");
+    }
+    if (success) {
+      // Uses local time, not driver time.
+      this.cacheLastUpdateTime = Time.now();
+    }
+  }
+
+  /**
+   * Update the cache for a specific record store.
+   *
+   * @param clazz Class of the record store.
+   * @return If the cached was loaded.
+   * @throws IOException if the cache update failed.
+   */
+  public boolean loadCache(final Class<?> clazz) throws IOException {
+    return loadCache(clazz, false);
+  }
+
+  /**
+   * Update the cache for a specific record store.
+   *
+   * @param clazz Class of the record store.
+   * @param force Force the update ignoring cached periods.
+   * @return If the cached was loaded.
+   * @throws IOException if the cache update failed.
+   */
+  public boolean loadCache(Class<?> clazz, boolean force) throws IOException {
+    List<StateStoreCache> cachesToUpdate =
+        new LinkedList<StateStoreCache>();
+    cachesToUpdate.addAll(this.cachesToUpdateInternal);
+    cachesToUpdate.addAll(this.cachesToUpdateExternal);
+    for (StateStoreCache cachedStore : cachesToUpdate) {
+      if (clazz.isInstance(cachedStore)) {
+        return cachedStore.loadCache(force);
+      }
+    }
+    throw new IOException("Registered cache was not found for " + clazz);
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fce5e0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java
new file mode 100644
index 0000000..c28131f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.impl;
+
+import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of the {@link MembershipStore} State Store API.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MembershipStoreImpl
+    extends MembershipStore implements StateStoreCache {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MembershipStoreImpl.class);
+
+
+  /** Reported namespaces that are not decommissioned. */
+  private final Set<FederationNamespaceInfo> activeNamespaces;
+
+  /** Namenodes (after evaluating the quorum) that are active in the cluster. */
+  private final Map<String, MembershipState> activeRegistrations;
+  /** Namenode status reports (raw) that were discarded for being too old. */
+  private final Map<String, MembershipState> expiredRegistrations;
+
+  /** Lock to access the local memory cache. */
+  private final ReadWriteLock cacheReadWriteLock =
+      new ReentrantReadWriteLock();
+  private final Lock cacheReadLock = cacheReadWriteLock.readLock();
+  private final Lock cacheWriteLock = cacheReadWriteLock.writeLock();
+
+
+  public MembershipStoreImpl(StateStoreDriver driver) {
+    super(driver);
+
+    this.activeRegistrations = new HashMap<>();
+    this.expiredRegistrations = new HashMap<>();
+    this.activeNamespaces = new TreeSet<>();
+  }
+
+  @Override
+  public GetNamenodeRegistrationsResponse getExpiredNamenodeRegistrations(
+      GetNamenodeRegistrationsRequest request) throws IOException {
+
+    GetNamenodeRegistrationsResponse response =
+        GetNamenodeRegistrationsResponse.newInstance();
+    cacheReadLock.lock();
+    try {
+      Collection<MembershipState> vals = this.expiredRegistrations.values();
+      List<MembershipState> copyVals = new ArrayList<>(vals);
+      response.setNamenodeMemberships(copyVals);
+    } finally {
+      cacheReadLock.unlock();
+    }
+    return response;
+  }
+
+  @Override
+  public GetNamespaceInfoResponse getNamespaceInfo(
+      GetNamespaceInfoRequest request) throws IOException {
+
+    Set<FederationNamespaceInfo> namespaces = new HashSet<>();
+    try {
+      cacheReadLock.lock();
+      namespaces.addAll(activeNamespaces);
+    } finally {
+      cacheReadLock.unlock();
+    }
+
+    GetNamespaceInfoResponse response =
+        GetNamespaceInfoResponse.newInstance(namespaces);
+    return response;
+  }
+
+  @Override
+  public GetNamenodeRegistrationsResponse getNamenodeRegistrations(
+      final GetNamenodeRegistrationsRequest request) throws IOException {
+
+    // TODO Cache some common queries and sorts
+    List<MembershipState> ret = null;
+
+    cacheReadLock.lock();
+    try {
+      Collection<MembershipState> registrations = activeRegistrations.values();
+      MembershipState partialMembership = request.getPartialMembership();
+      if (partialMembership == null) {
+        ret = new ArrayList<>(registrations);
+      } else {
+        Query<MembershipState> query = new Query<>(partialMembership);
+        ret = filterMultiple(query, registrations);
+      }
+    } finally {
+      cacheReadLock.unlock();
+    }
+    // Sort in ascending update date order
+    Collections.sort(ret);
+
+    GetNamenodeRegistrationsResponse response =
+        GetNamenodeRegistrationsResponse.newInstance(ret);
+    return response;
+  }
+
+  @Override
+  public NamenodeHeartbeatResponse namenodeHeartbeat(
+      NamenodeHeartbeatRequest request) throws IOException {
+
+    MembershipState record = request.getNamenodeMembership();
+    String nnId = record.getNamenodeKey();
+    MembershipState existingEntry = null;
+    cacheReadLock.lock();
+    try {
+      existingEntry = this.activeRegistrations.get(nnId);
+    } finally {
+      cacheReadLock.unlock();
+    }
+
+    if (existingEntry != null) {
+      if (existingEntry.getState() != record.getState()) {
+        LOG.info("NN registration state has changed: {} -> {}",
+            existingEntry, record);
+      } else {
+        LOG.debug("Updating NN registration: {} -> {}", existingEntry, record);
+      }
+    } else {
+      LOG.info("Inserting new NN registration: {}", record);
+    }
+
+    boolean status = getDriver().put(record, true, false);
+
+    NamenodeHeartbeatResponse response =
+        NamenodeHeartbeatResponse.newInstance(status);
+    return response;
+  }
+
+  @Override
+  public boolean loadCache(boolean force) throws IOException {
+    super.loadCache(force);
+
+    // Update local cache atomically
+    cacheWriteLock.lock();
+    try {
+      this.activeRegistrations.clear();
+      this.expiredRegistrations.clear();
+      this.activeNamespaces.clear();
+
+      // Build list of NN registrations: nnId -> registration list
+      Map<String, List<MembershipState>> nnRegistrations = new HashMap<>();
+      List<MembershipState> cachedRecords = getCachedRecords();
+      for (MembershipState membership : cachedRecords) {
+        String nnId = membership.getNamenodeKey();
+        if (membership.getState() == FederationNamenodeServiceState.EXPIRED) {
+          // Expired, RPC service does not use these
+          String key = membership.getPrimaryKey();
+          this.expiredRegistrations.put(key, membership);
+        } else {
+          // This is a valid NN registration, build a list of all registrations
+          // using the NN id to use for the quorum calculation.
+          List<MembershipState> nnRegistration =
+              nnRegistrations.get(nnId);
+          if (nnRegistration == null) {
+            nnRegistration = new LinkedList<>();
+            nnRegistrations.put(nnId, nnRegistration);
+          }
+          nnRegistration.add(membership);
+          String bpId = membership.getBlockPoolId();
+          String cId = membership.getClusterId();
+          String nsId = membership.getNameserviceId();
+          FederationNamespaceInfo nsInfo =
+              new FederationNamespaceInfo(bpId, cId, nsId);
+          this.activeNamespaces.add(nsInfo);
+        }
+      }
+
+      // Calculate most representative entry for each active NN id
+      for (List<MembershipState> nnRegistration : nnRegistrations.values()) {
+        // Run quorum based on NN state
+        MembershipState representativeRecord =
+            getRepresentativeQuorum(nnRegistration);
+        String nnKey = representativeRecord.getNamenodeKey();
+        this.activeRegistrations.put(nnKey, representativeRecord);
+      }
+      LOG.debug("Refreshed {} NN registrations from State Store",
+          cachedRecords.size());
+    } finally {
+      cacheWriteLock.unlock();
+    }
+    return true;
+  }
+
+  @Override
+  public UpdateNamenodeRegistrationResponse updateNamenodeRegistration(
+      UpdateNamenodeRegistrationRequest request) throws IOException {
+
+    boolean status = false;
+    cacheWriteLock.lock();
+    try {
+      String namenode = MembershipState.getNamenodeKey(
+          request.getNameserviceId(), request.getNamenodeId());
+      MembershipState member = this.activeRegistrations.get(namenode);
+      if (member != null) {
+        member.setState(request.getState());
+        status = true;
+      }
+    } finally {
+      cacheWriteLock.unlock();
+    }
+    UpdateNamenodeRegistrationResponse response =
+        UpdateNamenodeRegistrationResponse.newInstance(status);
+    return response;
+  }
+
+  /**
+   * Picks the most recent entry in the subset that is most agreeable on the
+   * specified field. 1) If a majority of the collection has the same value for
+   * the field, the first sorted entry within the subset the matches the
+   * majority value 2) Otherwise the first sorted entry in the set of all
+   * entries
+   *
+   * @param entries - Collection of state store record objects of the same type
+   * @param fieldName - Field name for the value to compare
+   * @return record that is most representative of the field name
+   */
+  private MembershipState getRepresentativeQuorum(
+      Collection<MembershipState> records) {
+
+    // Collate objects by field value: field value -> order set of records
+    Map<FederationNamenodeServiceState, TreeSet<MembershipState>> occurenceMap =
+        new HashMap<>();
+    for (MembershipState record : records) {
+      FederationNamenodeServiceState state = record.getState();
+      TreeSet<MembershipState> matchingSet = occurenceMap.get(state);
+      if (matchingSet == null) {
+        // TreeSet orders elements by descending date via comparators
+        matchingSet = new TreeSet<>();
+        occurenceMap.put(state, matchingSet);
+      }
+      matchingSet.add(record);
+    }
+
+    // Select largest group
+    TreeSet<MembershipState> largestSet = new TreeSet<>();
+    for (TreeSet<MembershipState> matchingSet : occurenceMap.values()) {
+      if (largestSet.size() < matchingSet.size()) {
+        largestSet = matchingSet;
+      }
+    }
+
+    // If quorum, use the newest element here
+    if (largestSet.size() > records.size() / 2) {
+      return largestSet.first();
+      // Otherwise, return most recent by class comparator
+    } else if (records.size() > 0) {
+      TreeSet<MembershipState> sortedList = new TreeSet<>(records);
+      LOG.debug("Quorum failed, using most recent: {}", sortedList.first());
+      return sortedList.first();
+    } else {
+      return null;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fce5e0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java
new file mode 100644
index 0000000..1a50d15
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains implementations of the state store API interfaces. All classes
+ * derive from {@link
+ * org.apache.hadoop.hdfs.server.federation.store.RecordStore}. The API
+ * definitions are contained in the
+ * org.apache.hadoop.hdfs.server.federation.store package.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+package org.apache.hadoop.hdfs.server.federation.store.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fce5e0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsRequest.java
new file mode 100644
index 0000000..568feaf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsRequest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+
+/**
+ * API request for listing namenode registrations present in the state store.
+ */
+public abstract class GetNamenodeRegistrationsRequest {
+
+  public static GetNamenodeRegistrationsRequest newInstance()
+      throws IOException {
+    return StateStoreSerializer.newRecord(
+        GetNamenodeRegistrationsRequest.class);
+  }
+
+  public static GetNamenodeRegistrationsRequest newInstance(
+      MembershipState member) throws IOException {
+    GetNamenodeRegistrationsRequest request = newInstance();
+    request.setPartialMembership(member);
+    return request;
+  }
+
+  @Public
+  @Unstable
+  public abstract MembershipState getPartialMembership();
+
+  @Public
+  @Unstable
+  public abstract void setPartialMembership(MembershipState member);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fce5e0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsResponse.java
new file mode 100644
index 0000000..0d60c90
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsResponse.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+
+/**
+ * API response for listing namenode registrations present in the state store.
+ */
+public abstract class GetNamenodeRegistrationsResponse {
+
+  public static GetNamenodeRegistrationsResponse newInstance()
+      throws IOException {
+    return StateStoreSerializer.newRecord(
+        GetNamenodeRegistrationsResponse.class);
+  }
+
+  public static GetNamenodeRegistrationsResponse newInstance(
+      List<MembershipState> records) throws IOException {
+    GetNamenodeRegistrationsResponse response = newInstance();
+    response.setNamenodeMemberships(records);
+    return response;
+  }
+
+  @Public
+  @Unstable
+  public abstract List<MembershipState> getNamenodeMemberships()
+      throws IOException;
+
+  @Public
+  @Unstable
+  public abstract void setNamenodeMemberships(
+      List<MembershipState> records) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fce5e0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoRequest.java
new file mode 100644
index 0000000..b5cc01b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoRequest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+
+/**
+ * API response for listing HDFS namespaces present in the state store.
+ */
+public abstract class GetNamespaceInfoRequest {
+
+  public static GetNamespaceInfoRequest newInstance() {
+    return StateStoreSerializer.newRecord(GetNamespaceInfoRequest.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fce5e0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoResponse.java
new file mode 100644
index 0000000..f541453
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoResponse.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+
+/**
+ * API response for listing HDFS namespaces present in the state store.
+ */
+public abstract class GetNamespaceInfoResponse {
+
+  public static GetNamespaceInfoResponse newInstance() {
+    return StateStoreSerializer.newRecord(GetNamespaceInfoResponse.class);
+  }
+
+  public static GetNamespaceInfoResponse newInstance(
+      Set<FederationNamespaceInfo> namespaces) throws IOException {
+    GetNamespaceInfoResponse response = newInstance();
+    response.setNamespaceInfo(namespaces);
+    return response;
+  }
+
+  @Public
+  @Unstable
+  public abstract Set<FederationNamespaceInfo> getNamespaceInfo();
+
+  @Public
+  @Unstable
+  public abstract void setNamespaceInfo(
+      Set<FederationNamespaceInfo> namespaceInfo);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fce5e0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatRequest.java
new file mode 100644
index 0000000..9506026
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatRequest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+
+/**
+ * API request for registering a namenode with the state store.
+ */
+public abstract class NamenodeHeartbeatRequest {
+
+  public static NamenodeHeartbeatRequest newInstance() throws IOException {
+    return StateStoreSerializer.newRecord(NamenodeHeartbeatRequest.class);
+  }
+
+  public static NamenodeHeartbeatRequest newInstance(MembershipState namenode)
+      throws IOException {
+    NamenodeHeartbeatRequest request = newInstance();
+    request.setNamenodeMembership(namenode);
+    return request;
+  }
+
+  @Private
+  @Unstable
+  public abstract MembershipState getNamenodeMembership()
+      throws IOException;
+
+  @Private
+  @Unstable
+  public abstract void setNamenodeMembership(MembershipState report)
+      throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fce5e0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatResponse.java
new file mode 100644
index 0000000..acb7a6f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatResponse.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+
+/**
+ * API response for registering a namenode with the state store.
+ */
+public abstract class NamenodeHeartbeatResponse {
+
+  public static NamenodeHeartbeatResponse newInstance() throws IOException {
+    return StateStoreSerializer.newRecord(NamenodeHeartbeatResponse.class);
+  }
+
+  public static NamenodeHeartbeatResponse newInstance(boolean status)
+      throws IOException {
+    NamenodeHeartbeatResponse response = newInstance();
+    response.setResult(status);
+    return response;
+  }
+
+  @Private
+  @Unstable
+  public abstract boolean getResult();
+
+  @Private
+  @Unstable
+  public abstract void setResult(boolean result);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79fce5e0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateNamenodeRegistrationRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateNamenodeRegistrationRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateNamenodeRegistrationRequest.java
new file mode 100644
index 0000000..4459e33
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateNamenodeRegistrationRequest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+
+/**
+ * API request for overriding an existing namenode registration in the state
+ * store.
+ */
+public abstract class UpdateNamenodeRegistrationRequest {
+
+  public static UpdateNamenodeRegistrationRequest newInstance()
+      throws IOException {
+    return StateStoreSerializer.newRecord(
+        UpdateNamenodeRegistrationRequest.class);
+  }
+
+  public static UpdateNamenodeRegistrationRequest newInstance(
+      String nameserviceId, String namenodeId,
+      FederationNamenodeServiceState state) throws IOException {
+    UpdateNamenodeRegistrationRequest request = newInstance();
+    request.setNameserviceId(nameserviceId);
+    request.setNamenodeId(namenodeId);
+    request.setState(state);
+    return request;
+  }
+
+  @Private
+  @Unstable
+  public abstract String getNameserviceId();
+
+  @Private
+  @Unstable
+  public abstract String getNamenodeId();
+
+  @Private
+  @Unstable
+  public abstract FederationNamenodeServiceState getState();
+
+  @Private
+  @Unstable
+  public abstract void setNameserviceId(String nsId);
+
+  @Private
+  @Unstable
+  public abstract void setNamenodeId(String nnId);
+
+  @Private
+  @Unstable
+  public abstract void setState(FederationNamenodeServiceState state);
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org