You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/09/01 02:41:28 UTC
[61/73] [abbrv] hadoop git commit: HDFS-10687. Federation Membership
State Store internal API. Contributed by Jason Kace and Inigo Goiri.
HDFS-10687. Federation Membership State Store internal API. Contributed by Jason Kace and Inigo Goiri.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fad7865e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fad7865e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fad7865e
Branch: refs/heads/HDFS-10467
Commit: fad7865e9624695049cfbf8d72a13d3b0217e167
Parents: 904138c
Author: Inigo Goiri <in...@apache.org>
Authored: Mon Jul 31 10:55:21 2017 -0700
Committer: Inigo Goiri <in...@apache.org>
Committed: Thu Aug 31 19:39:54 2017 -0700
----------------------------------------------------------------------
.../dev-support/findbugsExcludeFile.xml | 3 +
hadoop-hdfs-project/hadoop-hdfs/pom.xml | 1 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 17 +-
.../resolver/MembershipNamenodeResolver.java | 290 ++++++++++++
.../federation/router/FederationUtil.java | 42 +-
.../federation/store/CachedRecordStore.java | 237 ++++++++++
.../federation/store/MembershipStore.java | 126 +++++
.../federation/store/StateStoreCache.java | 36 ++
.../store/StateStoreCacheUpdateService.java | 67 +++
.../federation/store/StateStoreService.java | 202 +++++++-
.../store/impl/MembershipStoreImpl.java | 311 +++++++++++++
.../federation/store/impl/package-info.java | 31 ++
.../GetNamenodeRegistrationsRequest.java | 52 +++
.../GetNamenodeRegistrationsResponse.java | 55 +++
.../store/protocol/GetNamespaceInfoRequest.java | 30 ++
.../protocol/GetNamespaceInfoResponse.java | 52 +++
.../protocol/NamenodeHeartbeatRequest.java | 52 +++
.../protocol/NamenodeHeartbeatResponse.java | 49 ++
.../UpdateNamenodeRegistrationRequest.java | 72 +++
.../UpdateNamenodeRegistrationResponse.java | 51 ++
.../impl/pb/FederationProtocolPBTranslator.java | 145 ++++++
.../GetNamenodeRegistrationsRequestPBImpl.java | 87 ++++
.../GetNamenodeRegistrationsResponsePBImpl.java | 99 ++++
.../impl/pb/GetNamespaceInfoRequestPBImpl.java | 60 +++
.../impl/pb/GetNamespaceInfoResponsePBImpl.java | 95 ++++
.../impl/pb/NamenodeHeartbeatRequestPBImpl.java | 93 ++++
.../pb/NamenodeHeartbeatResponsePBImpl.java | 71 +++
...UpdateNamenodeRegistrationRequestPBImpl.java | 95 ++++
...pdateNamenodeRegistrationResponsePBImpl.java | 73 +++
.../store/protocol/impl/pb/package-info.java | 29 ++
.../store/records/MembershipState.java | 329 +++++++++++++
.../store/records/MembershipStats.java | 126 +++++
.../records/impl/pb/MembershipStatePBImpl.java | 334 +++++++++++++
.../records/impl/pb/MembershipStatsPBImpl.java | 191 ++++++++
.../src/main/proto/FederationProtocol.proto | 107 +++++
.../src/main/resources/hdfs-default.xml | 18 +-
.../resolver/TestNamenodeResolver.java | 284 ++++++++++++
.../store/FederationStateStoreTestUtils.java | 23 +-
.../federation/store/TestStateStoreBase.java | 81 ++++
.../store/TestStateStoreMembershipState.java | 463 +++++++++++++++++++
.../store/driver/TestStateStoreDriverBase.java | 69 ++-
.../store/records/TestMembershipState.java | 129 ++++++
42 files changed, 4745 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fad7865e/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index 9582fcb..4b958b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -15,6 +15,9 @@
<Package name="org.apache.hadoop.hdfs.qjournal.protocol" />
</Match>
<Match>
+ <Package name="org.apache.hadoop.hdfs.federation.protocol.proto" />
+ </Match>
+ <Match>
<Bug pattern="EI_EXPOSE_REP" />
</Match>
<Match>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fad7865e/hadoop-hdfs-project/hadoop-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index fa1044d..81e5fdf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -331,6 +331,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<include>QJournalProtocol.proto</include>
<include>editlog.proto</include>
<include>fsimage.proto</include>
+ <include>FederationProtocol.proto</include>
</includes>
</source>
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fad7865e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 8b39e88..afb5bbf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl;
@@ -1163,8 +1165,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"org.apache.hadoop.hdfs.server.federation.MockResolver";
public static final String FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS =
FEDERATION_ROUTER_PREFIX + "namenode.resolver.client.class";
- public static final String FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS_DEFAULT =
- "org.apache.hadoop.hdfs.server.federation.MockResolver";
+ public static final Class<? extends ActiveNamenodeResolver>
+ FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS_DEFAULT =
+ MembershipNamenodeResolver.class;
// HDFS Router-based federation State Store
public static final String FEDERATION_STORE_PREFIX =
@@ -1186,6 +1189,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(1);
+ public static final String DFS_ROUTER_CACHE_TIME_TO_LIVE_MS =
+ FEDERATION_ROUTER_PREFIX + "cache.ttl";
+ public static final long DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT =
+ TimeUnit.MINUTES.toMillis(1);
+
+ public static final String FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS =
+ FEDERATION_STORE_PREFIX + "membership.expiration";
+ public static final long FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT =
+ TimeUnit.MINUTES.toMillis(5);
+
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fad7865e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
new file mode 100644
index 0000000..b0ced24
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.ACTIVE;
+import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.EXPIRED;
+import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.UNAVAILABLE;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements a cached lookup of the most recently active namenode for a
+ * particular nameservice. Relies on the {@link StateStoreService} to
+ * discover available nameservices and namenodes.
+ */
+public class MembershipNamenodeResolver
+ implements ActiveNamenodeResolver, StateStoreCache {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MembershipNamenodeResolver.class);
+
+ /** Reference to the State Store. */
+ private final StateStoreService stateStore;
+ /** Membership State Store interface. */
+ private final MembershipStore membershipInterface;
+
+ /** Parent router ID. */
+ private String routerId;
+
+ /** Cached lookup of NN for nameservice. Invalidated on cache refresh. */
+ private Map<String, List<? extends FederationNamenodeContext>> cacheNS;
+ /** Cached lookup of NN for block pool. Invalidated on cache refresh. */
+ private Map<String, List<? extends FederationNamenodeContext>> cacheBP;
+
+
+ public MembershipNamenodeResolver(
+ Configuration conf, StateStoreService store) throws IOException {
+ this.stateStore = store;
+
+ this.cacheNS = new ConcurrentHashMap<>();
+ this.cacheBP = new ConcurrentHashMap<>();
+
+ if (this.stateStore != null) {
+ // Request cache updates from the state store
+ this.stateStore.registerCacheExternal(this);
+
+ // Initialize the interface to get the membership
+ this.membershipInterface = this.stateStore.getRegisteredRecordStore(
+ MembershipStore.class);
+ } else {
+ this.membershipInterface = null;
+ }
+
+ if (this.membershipInterface == null) {
+ throw new IOException("State Store does not have an interface for " +
+ MembershipStore.class.getSimpleName());
+ }
+ }
+
+ @Override
+ public boolean loadCache(boolean force) {
+ // Our cache depends on the store, update it first
+ try {
+ this.membershipInterface.loadCache(force);
+ } catch (IOException e) {
+ LOG.error("Cannot update membership from the State Store", e);
+ }
+
+ // Force refresh of active NN cache
+ cacheBP.clear();
+ cacheNS.clear();
+ return true;
+ }
+
+ @Override
+ public void updateActiveNamenode(
+ final String nsId, final InetSocketAddress address) throws IOException {
+
+ // Called when we have an RPC miss and successful hit on an alternate NN.
+ // Temporarily update our cache, it will be overwritten on the next update.
+ try {
+ MembershipState partial = MembershipState.newInstance();
+ String rpcAddress = address.getHostName() + ":" + address.getPort();
+ partial.setRpcAddress(rpcAddress);
+ partial.setNameserviceId(nsId);
+
+ GetNamenodeRegistrationsRequest request =
+ GetNamenodeRegistrationsRequest.newInstance(partial);
+
+ GetNamenodeRegistrationsResponse response =
+ this.membershipInterface.getNamenodeRegistrations(request);
+ List<MembershipState> records = response.getNamenodeMemberships();
+
+ if (records != null && records.size() == 1) {
+ MembershipState record = records.get(0);
+ UpdateNamenodeRegistrationRequest updateRequest =
+ UpdateNamenodeRegistrationRequest.newInstance(
+ record.getNameserviceId(), record.getNamenodeId(), ACTIVE);
+ this.membershipInterface.updateNamenodeRegistration(updateRequest);
+ }
+ } catch (StateStoreUnavailableException e) {
+ LOG.error("Cannot update {} as active, State Store unavailable", address);
+ }
+ }
+
+ @Override
+ public List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
+ final String nsId) throws IOException {
+
+ List<? extends FederationNamenodeContext> ret = cacheNS.get(nsId);
+ if (ret == null) {
+ try {
+ MembershipState partial = MembershipState.newInstance();
+ partial.setNameserviceId(nsId);
+ GetNamenodeRegistrationsRequest request =
+ GetNamenodeRegistrationsRequest.newInstance(partial);
+
+ final List<MembershipState> result =
+ getRecentRegistrationForQuery(request, true, false);
+ if (result == null || result.isEmpty()) {
+ LOG.error("Cannot locate eligible NNs for {}", nsId);
+ return null;
+ } else {
+ cacheNS.put(nsId, result);
+ ret = result;
+ }
+ } catch (StateStoreUnavailableException e) {
+ LOG.error("Cannot get active NN for {}, State Store unavailable", nsId);
+ }
+ }
+ if (ret == null) {
+ return null;
+ }
+ return Collections.unmodifiableList(ret);
+ }
+
+ @Override
+ public List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId(
+ final String bpId) throws IOException {
+
+ List<? extends FederationNamenodeContext> ret = cacheBP.get(bpId);
+ if (ret == null) {
+ try {
+ MembershipState partial = MembershipState.newInstance();
+ partial.setBlockPoolId(bpId);
+ GetNamenodeRegistrationsRequest request =
+ GetNamenodeRegistrationsRequest.newInstance(partial);
+
+ final List<MembershipState> result =
+ getRecentRegistrationForQuery(request, true, false);
+ if (result == null || result.isEmpty()) {
+ LOG.error("Cannot locate eligible NNs for {}", bpId);
+ } else {
+ cacheBP.put(bpId, result);
+ ret = result;
+ }
+ } catch (StateStoreUnavailableException e) {
+ LOG.error("Cannot get active NN for {}, State Store unavailable", bpId);
+ return null;
+ }
+ }
+ if (ret == null) {
+ return null;
+ }
+ return Collections.unmodifiableList(ret);
+ }
+
+ @Override
+ public boolean registerNamenode(NamenodeStatusReport report)
+ throws IOException {
+
+ if (this.routerId == null) {
+ LOG.warn("Cannot register namenode, router ID is not known {}", report);
+ return false;
+ }
+
+ MembershipState record = MembershipState.newInstance(
+ routerId, report.getNameserviceId(), report.getNamenodeId(),
+ report.getClusterId(), report.getBlockPoolId(), report.getRpcAddress(),
+ report.getServiceAddress(), report.getLifelineAddress(),
+ report.getWebAddress(), report.getState(), report.getSafemode());
+
+ if (report.getState() != UNAVAILABLE) {
+ // Set/update our last contact time
+ record.setLastContact(Time.now());
+ }
+
+ NamenodeHeartbeatRequest request = NamenodeHeartbeatRequest.newInstance();
+ request.setNamenodeMembership(record);
+ return this.membershipInterface.namenodeHeartbeat(request).getResult();
+ }
+
+ @Override
+ public Set<FederationNamespaceInfo> getNamespaces() throws IOException {
+ GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
+ GetNamespaceInfoResponse response =
+ this.membershipInterface.getNamespaceInfo(request);
+ return response.getNamespaceInfo();
+ }
+
+ /**
+ * Picks the most relevant record registration that matches the query. Return
+ * registrations matching the query in this preference: 1) Most recently
+ * updated ACTIVE registration 2) Most recently updated STANDBY registration
+ * (if showStandby) 3) Most recently updated UNAVAILABLE registration (if
+ * showUnavailable). EXPIRED registrations are ignored.
+ *
+ * @param query The select query for NN registrations.
+ * @param excludes List of NNs to exclude from matching results.
+ * @param addUnavailable include UNAVAILABLE registrations.
+ * @param addExpired include EXPIRED registrations.
+ * @return List of memberships or null if no registrations that
+ * both match the query AND the selected states.
+ * @throws IOException
+ */
+ private List<MembershipState> getRecentRegistrationForQuery(
+ GetNamenodeRegistrationsRequest request, boolean addUnavailable,
+ boolean addExpired) throws IOException {
+
+ // Retrieve a list of all registrations that match this query.
+ // This may include all NN records for a namespace/blockpool, including
+ // duplicate records for the same NN from different routers.
+ GetNamenodeRegistrationsResponse response =
+ this.membershipInterface.getNamenodeRegistrations(request);
+
+ List<MembershipState> memberships = response.getNamenodeMemberships();
+ if (!addExpired || !addUnavailable) {
+ Iterator<MembershipState> iterator = memberships.iterator();
+ while (iterator.hasNext()) {
+ MembershipState membership = iterator.next();
+ if (membership.getState() == EXPIRED && !addExpired) {
+ iterator.remove();
+ } else if (membership.getState() == UNAVAILABLE && !addUnavailable) {
+ iterator.remove();
+ }
+ }
+ }
+
+ List<MembershipState> priorityList = new ArrayList<>();
+ priorityList.addAll(memberships);
+ Collections.sort(priorityList, new NamenodePriorityComparator());
+
+ LOG.debug("Selected most recent NN {} for query", priorityList);
+ return priorityList;
+ }
+
+ @Override
+ public void setRouterId(String router) {
+ this.routerId = router;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fad7865e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
index 6e7e865..0129a37 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
@@ -19,26 +19,57 @@ package org.apache.hadoop.hdfs.server.federation.router;
import java.lang.reflect.Constructor;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Utilities for managing HDFS federation.
*/
public final class FederationUtil {
- private static final Log LOG = LogFactory.getLog(FederationUtil.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FederationUtil.class);
private FederationUtil() {
// Utility Class
}
/**
+ * Create an instance of an interface with a constructor using a context.
+ *
+ * @param conf Configuration for the class names.
+ * @param context Context object to pass to the instance.
+ * @param contextClass Type of the context passed to the constructor.
+ * @param clazz Class of the object to return.
+ * @return New instance of the specified class that implements the desired
+ * interface and a single parameter constructor containing a
+ * StateStore reference.
+ */
+ private static <T, R> T newInstance(final Configuration conf,
+ final R context, final Class<R> contextClass, final Class<T> clazz) {
+ try {
+ if (contextClass == null) {
+ // Default constructor if no context
+ Constructor<T> constructor = clazz.getConstructor();
+ return constructor.newInstance();
+ } else {
+ // Constructor with context
+ Constructor<T> constructor = clazz.getConstructor(
+ Configuration.class, contextClass);
+ return constructor.newInstance(conf, context);
+ }
+ } catch (ReflectiveOperationException e) {
+ LOG.error("Could not instantiate: {}", clazz.getSimpleName(), e);
+ return null;
+ }
+ }
+
+ /**
* Create an instance of an interface with a constructor using a state store
* constructor.
*
@@ -105,13 +136,14 @@ public final class FederationUtil {
*
* @param conf Configuration that defines the namenode resolver class.
* @param obj Context object passed to class constructor.
- * @return ActiveNamenodeResolver
+ * @return New active namenode resolver.
*/
public static ActiveNamenodeResolver newActiveNamenodeResolver(
Configuration conf, StateStoreService stateStore) {
- return newInstance(conf, stateStore, StateStoreService.class,
+ Class<? extends ActiveNamenodeResolver> clazz = conf.getClass(
DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS_DEFAULT,
ActiveNamenodeResolver.class);
+ return newInstance(conf, stateStore, StateStoreService.class, clazz);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fad7865e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
new file mode 100644
index 0000000..90a6699
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Record store that takes care of caching the records in memory.
+ *
+ * @param <R> Record to store by this interface.
+ */
+public abstract class CachedRecordStore<R extends BaseRecord>
+ extends RecordStore<R> implements StateStoreCache {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CachedRecordStore.class);
+
+
+ /** Prevent loading the cache more than once every 500 ms. */
+ private static final long MIN_UPDATE_MS = 500;
+
+
+ /** Cached entries. */
+ private List<R> records = new ArrayList<>();
+
+ /** Time stamp of the cached entries. */
+ private long timestamp = -1;
+
+ /** If the cache is initialized. */
+ private boolean initialized = false;
+
+ /** Last time the cache was updated. */
+ private long lastUpdate = -1;
+
+ /** Lock to access the memory cache. */
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ private final Lock readLock = readWriteLock.readLock();
+ private final Lock writeLock = readWriteLock.writeLock();
+
+ /** If it should override the expired values when loading the cache. */
+ private boolean override = false;
+
+
+ /**
+ * Create a new cached record store.
+ *
+ * @param clazz Class of the record to store.
+ * @param driver State Store driver.
+ */
+ protected CachedRecordStore(Class<R> clazz, StateStoreDriver driver) {
+ this(clazz, driver, false);
+ }
+
+ /**
+ * Create a new cached record store.
+ *
+ * @param clazz Class of the record to store.
+ * @param driver State Store driver.
+ * @param override If the entries should be override if they expire
+ */
+ protected CachedRecordStore(
+ Class<R> clazz, StateStoreDriver driver, boolean over) {
+ super(clazz, driver);
+
+ this.override = over;
+ }
+
+ /**
+ * Check that the cache of the State Store information is available.
+ *
+ * @throws StateStoreUnavailableException If the cache is not initialized.
+ */
+ private void checkCacheAvailable() throws StateStoreUnavailableException {
+ if (!this.initialized) {
+ throw new StateStoreUnavailableException(
+ "Cached State Store not initialized, " +
+ getRecordClass().getSimpleName() + " records not valid");
+ }
+ }
+
+ @Override
+ public boolean loadCache(boolean force) throws IOException {
+ // Prevent loading the cache too frequently
+ if (force || isUpdateTime()) {
+ List<R> newRecords = null;
+ long t = -1;
+ try {
+ QueryResult<R> result = getDriver().get(getRecordClass());
+ newRecords = result.getRecords();
+ t = result.getTimestamp();
+
+ // If we have any expired record, update the State Store
+ if (this.override) {
+ overrideExpiredRecords(result);
+ }
+ } catch (IOException e) {
+ LOG.error("Cannot get \"{}\" records from the State Store",
+ getRecordClass().getSimpleName());
+ this.initialized = false;
+ return false;
+ }
+
+ // Update cache atomically
+ writeLock.lock();
+ try {
+ this.records.clear();
+ this.records.addAll(newRecords);
+ this.timestamp = t;
+ this.initialized = true;
+ } finally {
+ writeLock.unlock();
+ }
+
+ lastUpdate = Time.monotonicNow();
+ }
+ return true;
+ }
+
+ /**
+ * Check if it's time to update the cache. Update it it was never updated.
+ *
+ * @return If it's time to update this cache.
+ */
+ private boolean isUpdateTime() {
+ return Time.monotonicNow() - lastUpdate > MIN_UPDATE_MS;
+ }
+
+ /**
+ * Updates the state store with any record overrides we detected, such as an
+ * expired state.
+ *
+ * @param query RecordQueryResult containing the data to be inspected.
+ * @param clazz Type of objects contained in the query.
+ * @throws IOException
+ */
+ public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
+ List<R> commitRecords = new ArrayList<>();
+ List<R> newRecords = query.getRecords();
+ long currentDriverTime = query.getTimestamp();
+ if (newRecords == null || currentDriverTime <= 0) {
+ LOG.error("Cannot check overrides for record");
+ return;
+ }
+ for (R record : newRecords) {
+ if (record.checkExpired(currentDriverTime)) {
+ String recordName = StateStoreUtils.getRecordName(record.getClass());
+ LOG.info("Override State Store record {}: {}", recordName, record);
+ commitRecords.add(record);
+ }
+ }
+ if (commitRecords.size() > 0) {
+ getDriver().putAll(commitRecords, true, false);
+ }
+ }
+
+ /**
+ * Updates the state store with any record overrides we detected, such as an
+ * expired state.
+ *
+ * @param driver State store driver for the data store.
+ * @param record Record record to be updated.
+ * @param clazz Type of data record.
+ * @throws IOException
+ */
+ public void overrideExpiredRecord(R record) throws IOException {
+ List<R> newRecords = Collections.singletonList(record);
+ long time = getDriver().getTime();
+ QueryResult<R> query = new QueryResult<>(newRecords, time);
+ overrideExpiredRecords(query);
+ }
+
+ /**
+ * Get all the cached records.
+ *
+ * @return Copy of the cached records.
+ * @throws StateStoreUnavailableException If the State store is not available.
+ */
+ public List<R> getCachedRecords() throws StateStoreUnavailableException {
+ checkCacheAvailable();
+
+ List<R> ret = new LinkedList<R>();
+ this.readLock.lock();
+ try {
+ ret.addAll(this.records);
+ } finally {
+ this.readLock.unlock();
+ }
+ return ret;
+ }
+
+ /**
+ * Get all the cached records and the time stamp of the cache.
+ *
+ * @return Copy of the cached records and the time stamp.
+ * @throws StateStoreUnavailableException If the State store is not available.
+ */
+ protected QueryResult<R> getCachedRecordsAndTimeStamp()
+ throws StateStoreUnavailableException {
+ checkCacheAvailable();
+
+ this.readLock.lock();
+ try {
+ return new QueryResult<R>(this.records, this.timestamp);
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fad7865e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java
new file mode 100644
index 0000000..3e8ba6b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+
+/**
+ * Management API for NameNode registrations stored in
+ * {@link org.apache.hadoop.hdfs.server.federation.store.records.MembershipState
+ * MembershipState} records. The {@link org.apache.hadoop.hdfs.server.
+ * federation.router.RouterHeartbeatService RouterHeartbeatService} periodically
+ * polls each NN to update the NameNode metadata(addresses, operational) and HA
+ * state(active, standby). Each NameNode may be polled by multiple
+ * {@link org.apache.hadoop.hdfs.server.federation.router.Router Router}
+ * instances.
+ * <p>
+ * Once fetched from the
+ * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
+ * StateStoreDriver}, NameNode registrations are cached until the next query.
+ * The fetched registration data is aggregated using a quorum to determine the
+ * best/most accurate state for each NameNode. The cache is periodically updated
+ * by the @{link StateStoreCacheUpdateService}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class MembershipStore
+ extends CachedRecordStore<MembershipState> {
+
+ protected MembershipStore(StateStoreDriver driver) {
+ super(MembershipState.class, driver, true);
+ }
+
+ /**
+ * Inserts or updates a namenode membership entry into the table.
+ *
+ * @param request Fully populated NamenodeHeartbeatRequest request.
+ * @return True if successful, false otherwise.
+ * @throws StateStoreUnavailableException Throws exception if the data store
+ * is not initialized.
+ * @throws IOException if the data store could not be queried or the query is
+ * invalid.
+ */
+ public abstract NamenodeHeartbeatResponse namenodeHeartbeat(
+ NamenodeHeartbeatRequest request) throws IOException;
+
+ /**
+ * Queries for a single cached registration entry matching the given
+ * parameters. Possible keys are the names of data structure elements Possible
+ * values are matching SQL "LIKE" targets.
+ *
+ * @param request Fully populated GetNamenodeRegistrationsRequest request.
+ * @return Single matching FederationMembershipStateEntry or null if not found
+ * or more than one entry matches.
+ * @throws StateStoreUnavailableException Throws exception if the data store
+ * is not initialized.
+ * @throws IOException if the data store could not be queried or the query is
+ * invalid.
+ */
+ public abstract GetNamenodeRegistrationsResponse getNamenodeRegistrations(
+ GetNamenodeRegistrationsRequest request) throws IOException;
+
+ /**
+ * Get the expired registrations from the registration cache.
+ *
+ * @return Expired registrations or zero-length list if none are found.
+ * @throws StateStoreUnavailableException Throws exception if the data store
+ * is not initialized.
+ * @throws IOException if the data store could not be queried or the query is
+ * invalid.
+ */
+ public abstract GetNamenodeRegistrationsResponse
+ getExpiredNamenodeRegistrations(GetNamenodeRegistrationsRequest request)
+ throws IOException;
+
+ /**
+ * Retrieves a list of registered nameservices and their associated info.
+ *
+ * @param request
+ * @return Collection of information for each registered nameservice.
+ * @throws IOException if the data store could not be queried or the query is
+ * invalid.
+ */
+ public abstract GetNamespaceInfoResponse getNamespaceInfo(
+ GetNamespaceInfoRequest request) throws IOException;
+
+ /**
+ * Overrides a cached namenode state with an updated state.
+ *
+ * @param request Fully populated OverrideNamenodeRegistrationRequest request.
+ * @return OverrideNamenodeRegistrationResponse
+ * @throws StateStoreUnavailableException if the data store is not
+ * initialized.
+ * @throws IOException if the data store could not be queried or the query is
+ * invalid.
+ */
+ public abstract UpdateNamenodeRegistrationResponse updateNamenodeRegistration(
+ UpdateNamenodeRegistrationRequest request) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fad7865e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java
new file mode 100644
index 0000000..83fc501
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import java.io.IOException;
+
+/**
+ * Interface for a cached copy of the State Store.
+ */
+public interface StateStoreCache {
+
+ /**
+ * Load the cache from the State Store. Called by the cache update service
+ * when the data has been reloaded.
+ *
+ * @param force If we force the load.
+ * @return If the cache was loaded successfully.
+ * @throws IOException If there was an error loading the cache.
+ */
+ boolean loadCache(boolean force) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fad7865e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java
new file mode 100644
index 0000000..bb8cfb0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.PeriodicService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to periodically update the {@link StateStoreService}
+ * cached information in the
+ * {@link org.apache.hadoop.hdfs.server.federation.router.Router Router}.
+ * This is for performance and removes the State Store from the critical path
+ * in common operations.
+ */
+public class StateStoreCacheUpdateService extends PeriodicService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StateStoreCacheUpdateService.class);
+
+ /** The service that manages the State Store connection. */
+ private final StateStoreService stateStore;
+
+
+ /**
+ * Create a new Cache update service.
+ *
+ * @param stateStore Implementation of the state store
+ */
+ public StateStoreCacheUpdateService(StateStoreService stateStore) {
+ super(StateStoreCacheUpdateService.class.getSimpleName());
+ this.stateStore = stateStore;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+
+ this.setIntervalMs(conf.getLong(
+ DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS,
+ DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT));
+
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void periodicInvoke() {
+ LOG.debug("Updating State Store cache");
+ stateStore.refreshCaches();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fad7865e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
index df207e0..73f607f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
@@ -18,17 +18,24 @@
package org.apache.hadoop.hdfs.server.federation.store;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +61,9 @@ import com.google.common.annotations.VisibleForTesting;
* federation.
* <li>{@link MountTableStore}: Mount table between to subclusters.
* See {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}.
+ * <li>{@link RebalancerStore}: Log of the rebalancing operations.
+ * <li>{@link RouterStore}: Router state in the federation.
+ * <li>{@link TokenStore}: Tokens in the federation.
* </ul>
*/
@InterfaceAudience.Private
@@ -77,8 +87,30 @@ public class StateStoreService extends CompositeService {
private StateStoreConnectionMonitorService monitorService;
+ /** Supported record stores. */
+ private final Map<
+ Class<? extends BaseRecord>, RecordStore<? extends BaseRecord>>
+ recordStores;
+
+ /** Service to maintain State Store caches. */
+ private StateStoreCacheUpdateService cacheUpdater;
+ /** Time the cache was last successfully updated. */
+ private long cacheLastUpdateTime;
+ /** List of internal caches to update. */
+ private final List<StateStoreCache> cachesToUpdateInternal;
+ /** List of external caches to update. */
+ private final List<StateStoreCache> cachesToUpdateExternal;
+
+
public StateStoreService() {
super(StateStoreService.class.getName());
+
+ // Records and stores supported by this implementation
+ this.recordStores = new HashMap<>();
+
+ // Caches to maintain
+ this.cachesToUpdateInternal = new ArrayList<>();
+ this.cachesToUpdateExternal = new ArrayList<>();
}
/**
@@ -102,10 +134,22 @@ public class StateStoreService extends CompositeService {
throw new IOException("Cannot create driver for the State Store");
}
+ // Add supported record stores
+ addRecordStore(MembershipStoreImpl.class);
+
// Check the connection to the State Store periodically
this.monitorService = new StateStoreConnectionMonitorService(this);
this.addService(monitorService);
+ // Set expirations intervals for each record
+ MembershipState.setExpirationMs(conf.getLong(
+ DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS,
+ DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT));
+
+ // Cache update service
+ this.cacheUpdater = new StateStoreCacheUpdateService(this);
+ addService(this.cacheUpdater);
+
super.serviceInit(this.conf);
}
@@ -123,13 +167,56 @@ public class StateStoreService extends CompositeService {
}
/**
+ * Add a record store to the State Store. It includes adding the store, the
+ * supported record and the cache management.
+ *
+ * @param clazz Class of the record store to track.
+ * @return New record store.
+ * @throws ReflectiveOperationException
+ */
+ private <T extends RecordStore<?>> void addRecordStore(
+ final Class<T> clazz) throws ReflectiveOperationException {
+
+ assert this.getServiceState() == STATE.INITED :
+ "Cannot add record to the State Store once started";
+
+ T recordStore = RecordStore.newInstance(clazz, this.getDriver());
+ Class<? extends BaseRecord> recordClass = recordStore.getRecordClass();
+ this.recordStores.put(recordClass, recordStore);
+
+ // Subscribe for cache updates
+ if (recordStore instanceof StateStoreCache) {
+ StateStoreCache cachedRecordStore = (StateStoreCache) recordStore;
+ this.cachesToUpdateInternal.add(cachedRecordStore);
+ }
+ }
+
+ /**
+ * Get the record store in this State Store for a given interface.
+ *
+ * @param recordStoreClass Class of the record store.
+ * @return Registered record store or null if not found.
+ */
+ public <T extends RecordStore<?>> T getRegisteredRecordStore(
+ final Class<T> recordStoreClass) {
+ for (RecordStore<? extends BaseRecord> recordStore :
+ this.recordStores.values()) {
+ if (recordStoreClass.isInstance(recordStore)) {
+ @SuppressWarnings("unchecked")
+ T recordStoreChecked = (T) recordStore;
+ return recordStoreChecked;
+ }
+ }
+ return null;
+ }
+
+ /**
* List of records supported by this State Store.
*
* @return List of supported record classes.
*/
public Collection<Class<? extends BaseRecord>> getSupportedRecords() {
- // TODO add list of records
- return new LinkedList<>();
+ return this.recordStores.keySet();
}
/**
@@ -142,6 +229,7 @@ public class StateStoreService extends CompositeService {
if (this.driver.init(conf, getIdentifier(), getSupportedRecords())) {
LOG.info("Connection to the State Store driver {} is open and ready",
driverName);
+ this.refreshCaches();
} else {
LOG.error("Cannot initialize State Store driver {}", driverName);
}
@@ -198,4 +286,114 @@ public class StateStoreService extends CompositeService {
this.identifier = id;
}
+ //
+ // Cached state store data
+ //
+ /**
+ * The last time the state store cache was fully updated.
+ *
+ * @return Timestamp.
+ */
+ public long getCacheUpdateTime() {
+ return this.cacheLastUpdateTime;
+ }
+
+ /**
+ * Stops the cache update service.
+ */
+ @VisibleForTesting
+ public void stopCacheUpdateService() {
+ if (this.cacheUpdater != null) {
+ this.cacheUpdater.stop();
+ removeService(this.cacheUpdater);
+ this.cacheUpdater = null;
+ }
+ }
+
+ /**
+ * Register a cached record store for automatic periodic cache updates.
+ *
+ * @param client Client to the state store.
+ */
+ public void registerCacheExternal(StateStoreCache client) {
+ this.cachesToUpdateExternal.add(client);
+ }
+
+ /**
+ * Refresh the cache with information from the State Store. Called
+ * periodically by the CacheUpdateService to maintain data caches and
+ * versions.
+ */
+ public void refreshCaches() {
+ refreshCaches(false);
+ }
+
+ /**
+ * Refresh the cache with information from the State Store. Called
+ * periodically by the CacheUpdateService to maintain data caches and
+ * versions.
+ * @param force If we force the refresh.
+ */
+ public void refreshCaches(boolean force) {
+ boolean success = true;
+ if (isDriverReady()) {
+ List<StateStoreCache> cachesToUpdate = new LinkedList<>();
+ cachesToUpdate.addAll(cachesToUpdateInternal);
+ cachesToUpdate.addAll(cachesToUpdateExternal);
+ for (StateStoreCache cachedStore : cachesToUpdate) {
+ String cacheName = cachedStore.getClass().getSimpleName();
+ boolean result = false;
+ try {
+ result = cachedStore.loadCache(force);
+ } catch (IOException e) {
+ LOG.error("Error updating cache for {}", cacheName, e);
+ result = false;
+ }
+ if (!result) {
+ success = false;
+ LOG.error("Cache update failed for cache {}", cacheName);
+ }
+ }
+ } else {
+ success = false;
+ LOG.info("Skipping State Store cache update, driver is not ready.");
+ }
+ if (success) {
+ // Uses local time, not driver time.
+ this.cacheLastUpdateTime = Time.now();
+ }
+ }
+
+ /**
+ * Update the cache for a specific record store.
+ *
+ * @param clazz Class of the record store.
+ * @return If the cached was loaded.
+ * @throws IOException if the cache update failed.
+ */
+ public boolean loadCache(final Class<?> clazz) throws IOException {
+ return loadCache(clazz, false);
+ }
+
+ /**
+ * Update the cache for a specific record store.
+ *
+ * @param clazz Class of the record store.
+ * @param force Force the update ignoring cached periods.
+ * @return If the cached was loaded.
+ * @throws IOException if the cache update failed.
+ */
+ public boolean loadCache(Class<?> clazz, boolean force) throws IOException {
+ List<StateStoreCache> cachesToUpdate =
+ new LinkedList<StateStoreCache>();
+ cachesToUpdate.addAll(this.cachesToUpdateInternal);
+ cachesToUpdate.addAll(this.cachesToUpdateExternal);
+ for (StateStoreCache cachedStore : cachesToUpdate) {
+ if (clazz.isInstance(cachedStore)) {
+ return cachedStore.loadCache(force);
+ }
+ }
+ throw new IOException("Registered cache was not found for " + clazz);
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fad7865e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java
new file mode 100644
index 0000000..c28131f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.impl;
+
+import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of the {@link MembershipStore} State Store API.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MembershipStoreImpl
+ extends MembershipStore implements StateStoreCache {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MembershipStoreImpl.class);
+
+
+ /** Reported namespaces that are not decommissioned. */
+ private final Set<FederationNamespaceInfo> activeNamespaces;
+
+ /** Namenodes (after evaluating the quorum) that are active in the cluster. */
+ private final Map<String, MembershipState> activeRegistrations;
+ /** Namenode status reports (raw) that were discarded for being too old. */
+ private final Map<String, MembershipState> expiredRegistrations;
+
+ /** Lock to access the local memory cache. */
+ private final ReadWriteLock cacheReadWriteLock =
+ new ReentrantReadWriteLock();
+ private final Lock cacheReadLock = cacheReadWriteLock.readLock();
+ private final Lock cacheWriteLock = cacheReadWriteLock.writeLock();
+
+
+ public MembershipStoreImpl(StateStoreDriver driver) {
+ super(driver);
+
+ this.activeRegistrations = new HashMap<>();
+ this.expiredRegistrations = new HashMap<>();
+ this.activeNamespaces = new TreeSet<>();
+ }
+
+ @Override
+ public GetNamenodeRegistrationsResponse getExpiredNamenodeRegistrations(
+ GetNamenodeRegistrationsRequest request) throws IOException {
+
+ GetNamenodeRegistrationsResponse response =
+ GetNamenodeRegistrationsResponse.newInstance();
+ cacheReadLock.lock();
+ try {
+ Collection<MembershipState> vals = this.expiredRegistrations.values();
+ List<MembershipState> copyVals = new ArrayList<>(vals);
+ response.setNamenodeMemberships(copyVals);
+ } finally {
+ cacheReadLock.unlock();
+ }
+ return response;
+ }
+
+ @Override
+ public GetNamespaceInfoResponse getNamespaceInfo(
+ GetNamespaceInfoRequest request) throws IOException {
+
+ Set<FederationNamespaceInfo> namespaces = new HashSet<>();
+ try {
+ cacheReadLock.lock();
+ namespaces.addAll(activeNamespaces);
+ } finally {
+ cacheReadLock.unlock();
+ }
+
+ GetNamespaceInfoResponse response =
+ GetNamespaceInfoResponse.newInstance(namespaces);
+ return response;
+ }
+
+ @Override
+ public GetNamenodeRegistrationsResponse getNamenodeRegistrations(
+ final GetNamenodeRegistrationsRequest request) throws IOException {
+
+ // TODO Cache some common queries and sorts
+ List<MembershipState> ret = null;
+
+ cacheReadLock.lock();
+ try {
+ Collection<MembershipState> registrations = activeRegistrations.values();
+ MembershipState partialMembership = request.getPartialMembership();
+ if (partialMembership == null) {
+ ret = new ArrayList<>(registrations);
+ } else {
+ Query<MembershipState> query = new Query<>(partialMembership);
+ ret = filterMultiple(query, registrations);
+ }
+ } finally {
+ cacheReadLock.unlock();
+ }
+ // Sort in ascending update date order
+ Collections.sort(ret);
+
+ GetNamenodeRegistrationsResponse response =
+ GetNamenodeRegistrationsResponse.newInstance(ret);
+ return response;
+ }
+
+ @Override
+ public NamenodeHeartbeatResponse namenodeHeartbeat(
+ NamenodeHeartbeatRequest request) throws IOException {
+
+ MembershipState record = request.getNamenodeMembership();
+ String nnId = record.getNamenodeKey();
+ MembershipState existingEntry = null;
+ cacheReadLock.lock();
+ try {
+ existingEntry = this.activeRegistrations.get(nnId);
+ } finally {
+ cacheReadLock.unlock();
+ }
+
+ if (existingEntry != null) {
+ if (existingEntry.getState() != record.getState()) {
+ LOG.info("NN registration state has changed: {} -> {}",
+ existingEntry, record);
+ } else {
+ LOG.debug("Updating NN registration: {} -> {}", existingEntry, record);
+ }
+ } else {
+ LOG.info("Inserting new NN registration: {}", record);
+ }
+
+ boolean status = getDriver().put(record, true, false);
+
+ NamenodeHeartbeatResponse response =
+ NamenodeHeartbeatResponse.newInstance(status);
+ return response;
+ }
+
+ @Override
+ public boolean loadCache(boolean force) throws IOException {
+ super.loadCache(force);
+
+ // Update local cache atomically
+ cacheWriteLock.lock();
+ try {
+ this.activeRegistrations.clear();
+ this.expiredRegistrations.clear();
+ this.activeNamespaces.clear();
+
+ // Build list of NN registrations: nnId -> registration list
+ Map<String, List<MembershipState>> nnRegistrations = new HashMap<>();
+ List<MembershipState> cachedRecords = getCachedRecords();
+ for (MembershipState membership : cachedRecords) {
+ String nnId = membership.getNamenodeKey();
+ if (membership.getState() == FederationNamenodeServiceState.EXPIRED) {
+ // Expired, RPC service does not use these
+ String key = membership.getPrimaryKey();
+ this.expiredRegistrations.put(key, membership);
+ } else {
+ // This is a valid NN registration, build a list of all registrations
+ // using the NN id to use for the quorum calculation.
+ List<MembershipState> nnRegistration =
+ nnRegistrations.get(nnId);
+ if (nnRegistration == null) {
+ nnRegistration = new LinkedList<>();
+ nnRegistrations.put(nnId, nnRegistration);
+ }
+ nnRegistration.add(membership);
+ String bpId = membership.getBlockPoolId();
+ String cId = membership.getClusterId();
+ String nsId = membership.getNameserviceId();
+ FederationNamespaceInfo nsInfo =
+ new FederationNamespaceInfo(bpId, cId, nsId);
+ this.activeNamespaces.add(nsInfo);
+ }
+ }
+
+ // Calculate most representative entry for each active NN id
+ for (List<MembershipState> nnRegistration : nnRegistrations.values()) {
+ // Run quorum based on NN state
+ MembershipState representativeRecord =
+ getRepresentativeQuorum(nnRegistration);
+ String nnKey = representativeRecord.getNamenodeKey();
+ this.activeRegistrations.put(nnKey, representativeRecord);
+ }
+ LOG.debug("Refreshed {} NN registrations from State Store",
+ cachedRecords.size());
+ } finally {
+ cacheWriteLock.unlock();
+ }
+ return true;
+ }
+
+ @Override
+ public UpdateNamenodeRegistrationResponse updateNamenodeRegistration(
+ UpdateNamenodeRegistrationRequest request) throws IOException {
+
+ boolean status = false;
+ cacheWriteLock.lock();
+ try {
+ String namenode = MembershipState.getNamenodeKey(
+ request.getNameserviceId(), request.getNamenodeId());
+ MembershipState member = this.activeRegistrations.get(namenode);
+ if (member != null) {
+ member.setState(request.getState());
+ status = true;
+ }
+ } finally {
+ cacheWriteLock.unlock();
+ }
+ UpdateNamenodeRegistrationResponse response =
+ UpdateNamenodeRegistrationResponse.newInstance(status);
+ return response;
+ }
+
+ /**
+ * Picks the most recent entry in the subset that is most agreeable on the
+ * specified field. 1) If a majority of the collection has the same value for
+ * the field, the first sorted entry within the subset the matches the
+ * majority value 2) Otherwise the first sorted entry in the set of all
+ * entries
+ *
+ * @param entries - Collection of state store record objects of the same type
+ * @param fieldName - Field name for the value to compare
+ * @return record that is most representative of the field name
+ */
+ private MembershipState getRepresentativeQuorum(
+ Collection<MembershipState> records) {
+
+ // Collate objects by field value: field value -> order set of records
+ Map<FederationNamenodeServiceState, TreeSet<MembershipState>> occurenceMap =
+ new HashMap<>();
+ for (MembershipState record : records) {
+ FederationNamenodeServiceState state = record.getState();
+ TreeSet<MembershipState> matchingSet = occurenceMap.get(state);
+ if (matchingSet == null) {
+ // TreeSet orders elements by descending date via comparators
+ matchingSet = new TreeSet<>();
+ occurenceMap.put(state, matchingSet);
+ }
+ matchingSet.add(record);
+ }
+
+ // Select largest group
+ TreeSet<MembershipState> largestSet = new TreeSet<>();
+ for (TreeSet<MembershipState> matchingSet : occurenceMap.values()) {
+ if (largestSet.size() < matchingSet.size()) {
+ largestSet = matchingSet;
+ }
+ }
+
+ // If quorum, use the newest element here
+ if (largestSet.size() > records.size() / 2) {
+ return largestSet.first();
+ // Otherwise, return most recent by class comparator
+ } else if (records.size() > 0) {
+ TreeSet<MembershipState> sortedList = new TreeSet<>(records);
+ LOG.debug("Quorum failed, using most recent: {}", sortedList.first());
+ return sortedList.first();
+ } else {
+ return null;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fad7865e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java
new file mode 100644
index 0000000..1a50d15
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/package-info.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains implementations of the state store API interfaces. All classes
+ * derive from {@link
+ * org.apache.hadoop.hdfs.server.federation.store.RecordStore}. The API
+ * definitions are contained in the
+ * org.apache.hadoop.hdfs.server.federation.store package.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+package org.apache.hadoop.hdfs.server.federation.store.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fad7865e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsRequest.java
new file mode 100644
index 0000000..568feaf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsRequest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+
+/**
+ * API request for listing namenode registrations present in the state store.
+ */
+public abstract class GetNamenodeRegistrationsRequest {
+
+ public static GetNamenodeRegistrationsRequest newInstance()
+ throws IOException {
+ return StateStoreSerializer.newRecord(
+ GetNamenodeRegistrationsRequest.class);
+ }
+
+ public static GetNamenodeRegistrationsRequest newInstance(
+ MembershipState member) throws IOException {
+ GetNamenodeRegistrationsRequest request = newInstance();
+ request.setPartialMembership(member);
+ return request;
+ }
+
+ @Public
+ @Unstable
+ public abstract MembershipState getPartialMembership();
+
+ @Public
+ @Unstable
+ public abstract void setPartialMembership(MembershipState member);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fad7865e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsResponse.java
new file mode 100644
index 0000000..0d60c90
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamenodeRegistrationsResponse.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+
+/**
+ * API response for listing namenode registrations present in the state store.
+ */
+public abstract class GetNamenodeRegistrationsResponse {
+
+ public static GetNamenodeRegistrationsResponse newInstance()
+ throws IOException {
+ return StateStoreSerializer.newRecord(
+ GetNamenodeRegistrationsResponse.class);
+ }
+
+ public static GetNamenodeRegistrationsResponse newInstance(
+ List<MembershipState> records) throws IOException {
+ GetNamenodeRegistrationsResponse response = newInstance();
+ response.setNamenodeMemberships(records);
+ return response;
+ }
+
+ @Public
+ @Unstable
+ public abstract List<MembershipState> getNamenodeMemberships()
+ throws IOException;
+
+ @Public
+ @Unstable
+ public abstract void setNamenodeMemberships(
+ List<MembershipState> records) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fad7865e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoRequest.java
new file mode 100644
index 0000000..b5cc01b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoRequest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+
+/**
+ * API response for listing HDFS namespaces present in the state store.
+ */
+public abstract class GetNamespaceInfoRequest {
+
+ public static GetNamespaceInfoRequest newInstance() {
+ return StateStoreSerializer.newRecord(GetNamespaceInfoRequest.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fad7865e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoResponse.java
new file mode 100644
index 0000000..f541453
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetNamespaceInfoResponse.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+
+/**
+ * API response for listing HDFS namespaces present in the state store.
+ */
+public abstract class GetNamespaceInfoResponse {
+
+ public static GetNamespaceInfoResponse newInstance() {
+ return StateStoreSerializer.newRecord(GetNamespaceInfoResponse.class);
+ }
+
+ public static GetNamespaceInfoResponse newInstance(
+ Set<FederationNamespaceInfo> namespaces) throws IOException {
+ GetNamespaceInfoResponse response = newInstance();
+ response.setNamespaceInfo(namespaces);
+ return response;
+ }
+
+ @Public
+ @Unstable
+ public abstract Set<FederationNamespaceInfo> getNamespaceInfo();
+
+ @Public
+ @Unstable
+ public abstract void setNamespaceInfo(
+ Set<FederationNamespaceInfo> namespaceInfo);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fad7865e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatRequest.java
new file mode 100644
index 0000000..9506026
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatRequest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+
+/**
+ * API request for registering a namenode with the state store.
+ */
+public abstract class NamenodeHeartbeatRequest {
+
+ public static NamenodeHeartbeatRequest newInstance() throws IOException {
+ return StateStoreSerializer.newRecord(NamenodeHeartbeatRequest.class);
+ }
+
+ public static NamenodeHeartbeatRequest newInstance(MembershipState namenode)
+ throws IOException {
+ NamenodeHeartbeatRequest request = newInstance();
+ request.setNamenodeMembership(namenode);
+ return request;
+ }
+
+ @Private
+ @Unstable
+ public abstract MembershipState getNamenodeMembership()
+ throws IOException;
+
+ @Private
+ @Unstable
+ public abstract void setNamenodeMembership(MembershipState report)
+ throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fad7865e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatResponse.java
new file mode 100644
index 0000000..acb7a6f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/NamenodeHeartbeatResponse.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+
+/**
+ * API response for registering a namenode with the state store.
+ */
+public abstract class NamenodeHeartbeatResponse {
+
+ public static NamenodeHeartbeatResponse newInstance() throws IOException {
+ return StateStoreSerializer.newRecord(NamenodeHeartbeatResponse.class);
+ }
+
+ public static NamenodeHeartbeatResponse newInstance(boolean status)
+ throws IOException {
+ NamenodeHeartbeatResponse response = newInstance();
+ response.setResult(status);
+ return response;
+ }
+
+ @Private
+ @Unstable
+ public abstract boolean getResult();
+
+ @Private
+ @Unstable
+ public abstract void setResult(boolean result);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fad7865e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateNamenodeRegistrationRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateNamenodeRegistrationRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateNamenodeRegistrationRequest.java
new file mode 100644
index 0000000..4459e33
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateNamenodeRegistrationRequest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+
+/**
+ * API request for overriding an existing namenode registration in the state
+ * store.
+ */
+public abstract class UpdateNamenodeRegistrationRequest {
+
+ public static UpdateNamenodeRegistrationRequest newInstance()
+ throws IOException {
+ return StateStoreSerializer.newRecord(
+ UpdateNamenodeRegistrationRequest.class);
+ }
+
+ public static UpdateNamenodeRegistrationRequest newInstance(
+ String nameserviceId, String namenodeId,
+ FederationNamenodeServiceState state) throws IOException {
+ UpdateNamenodeRegistrationRequest request = newInstance();
+ request.setNameserviceId(nameserviceId);
+ request.setNamenodeId(namenodeId);
+ request.setState(state);
+ return request;
+ }
+
+ @Private
+ @Unstable
+ public abstract String getNameserviceId();
+
+ @Private
+ @Unstable
+ public abstract String getNamenodeId();
+
+ @Private
+ @Unstable
+ public abstract FederationNamenodeServiceState getState();
+
+ @Private
+ @Unstable
+ public abstract void setNameserviceId(String nsId);
+
+ @Private
+ @Unstable
+ public abstract void setNamenodeId(String nnId);
+
+ @Private
+ @Unstable
+ public abstract void setState(FederationNamenodeServiceState state);
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org