You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2019/02/20 22:21:45 UTC
[hadoop] 19/41: HDFS-13443. RBF: Update mount table cache
immediately after changing (add/update/remove) mount table entries.
Contributed by Mohammad Arshad.
This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch HDFS-13891
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit c49a422d89dcd3815d9800e1efeb7fdae3269a19
Author: Yiqun Lin <yq...@apache.org>
AuthorDate: Wed Dec 19 11:40:00 2018 +0800
HDFS-13443. RBF: Update mount table cache immediately after changing (add/update/remove) mount table entries. Contributed by Mohammad Arshad.
---
.../RouterAdminProtocolServerSideTranslatorPB.java | 23 ++
.../RouterAdminProtocolTranslatorPB.java | 21 ++
.../federation/resolver/MountTableManager.java | 16 +
.../router/MountTableRefresherService.java | 289 +++++++++++++++
.../router/MountTableRefresherThread.java | 96 +++++
.../server/federation/router/RBFConfigKeys.java | 25 ++
.../hdfs/server/federation/router/Router.java | 53 ++-
.../federation/router/RouterAdminServer.java | 28 +-
.../federation/router/RouterHeartbeatService.java | 5 +
.../server/federation/store/MountTableStore.java | 24 ++
.../server/federation/store/StateStoreUtils.java | 26 ++
.../federation/store/impl/MountTableStoreImpl.java | 18 +
.../protocol/RefreshMountTableEntriesRequest.java | 34 ++
.../protocol/RefreshMountTableEntriesResponse.java | 44 +++
.../pb/RefreshMountTableEntriesRequestPBImpl.java | 67 ++++
.../pb/RefreshMountTableEntriesResponsePBImpl.java | 74 ++++
.../federation/store/records/RouterState.java | 4 +
.../store/records/impl/pb/RouterStatePBImpl.java | 10 +
.../hadoop/hdfs/tools/federation/RouterAdmin.java | 33 +-
.../src/main/proto/FederationProtocol.proto | 8 +
.../src/main/proto/RouterProtocol.proto | 5 +
.../src/main/resources/hdfs-rbf-default.xml | 34 ++
.../src/site/markdown/HDFSRouterFederation.md | 9 +
.../server/federation/FederationTestUtils.java | 27 ++
.../server/federation/RouterConfigBuilder.java | 12 +
.../federation/router/TestRouterAdminCLI.java | 25 +-
.../router/TestRouterMountTableCacheRefresh.java | 396 +++++++++++++++++++++
.../hadoop-hdfs/src/site/markdown/HDFSCommands.md | 2 +
28 files changed, 1402 insertions(+), 6 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolServerSideTranslatorPB.java
index 6341ebd..a31c46d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolServerSideTranslatorPB.java
@@ -37,6 +37,8 @@ import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProt
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetSafeModeResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.LeaveSafeModeRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.LeaveSafeModeResponseProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesRequestProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProto;
@@ -58,6 +60,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeReques
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
@@ -78,6 +82,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetSafeMo
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetSafeModeResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.LeaveSafeModeRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.LeaveSafeModeResponsePBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RefreshMountTableEntriesRequestPBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RefreshMountTableEntriesResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryRequestPBImpl;
@@ -275,4 +281,21 @@ public class RouterAdminProtocolServerSideTranslatorPB implements
throw new ServiceException(e);
}
}
+
+ @Override
+ public RefreshMountTableEntriesResponseProto refreshMountTableEntries(
+ RpcController controller, RefreshMountTableEntriesRequestProto request)
+ throws ServiceException {
+ try {
+ RefreshMountTableEntriesRequest req =
+ new RefreshMountTableEntriesRequestPBImpl(request);
+ RefreshMountTableEntriesResponse response =
+ server.refreshMountTableEntries(req);
+ RefreshMountTableEntriesResponsePBImpl responsePB =
+ (RefreshMountTableEntriesResponsePBImpl) response;
+ return responsePB.getProto();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolTranslatorPB.java
index 6e24438..1fbb06d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolTranslatorPB.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProt
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetSafeModeResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.LeaveSafeModeRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.LeaveSafeModeResponseProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesRequestProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProto;
@@ -61,6 +63,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeReques
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
@@ -77,6 +81,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountT
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetSafeModeResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.LeaveSafeModeResponsePBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RefreshMountTableEntriesRequestPBImpl;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RefreshMountTableEntriesResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryRequestPBImpl;
@@ -267,4 +273,19 @@ public class RouterAdminProtocolTranslatorPB
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
}
}
+
+ @Override
+ public RefreshMountTableEntriesResponse refreshMountTableEntries(
+ RefreshMountTableEntriesRequest request) throws IOException {
+ RefreshMountTableEntriesRequestPBImpl requestPB =
+ (RefreshMountTableEntriesRequestPBImpl) request;
+ RefreshMountTableEntriesRequestProto proto = requestPB.getProto();
+ try {
+ RefreshMountTableEntriesResponseProto response =
+ rpcProxy.refreshMountTableEntries(null, proto);
+ return new RefreshMountTableEntriesResponsePBImpl(response);
+ } catch (ServiceException e) {
+ throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java
index c2e4a5b..9a1e416 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntr
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
@@ -77,4 +79,18 @@ public interface MountTableManager {
*/
GetMountTableEntriesResponse getMountTableEntries(
GetMountTableEntriesRequest request) throws IOException;
+
+ /**
+ * Refresh mount table entries cache from the state store. Cache is updated
+ * periodically but with this API cache can be refreshed immediately. This API
+ * is primarily meant to be called from the Admin Server. Admin Server will
+ * call this API and refresh mount table cache of all the routers while
+ * changing mount table entries.
+ *
+ * @param request Fully populated request object.
+ * @return True the mount table entry was updated without any error.
+ * @throws IOException Throws exception if the data store is not initialized.
+ */
+ RefreshMountTableEntriesResponse refreshMountTableEntries(
+ RefreshMountTableEntriesRequest request) throws IOException;
}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.java
new file mode 100644
index 0000000..fafcef4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
+import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.AbstractService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This service is invoked from {@link MountTableStore} when there is change in
+ * mount table entries and it updates mount table entry cache on local router as
+ * well as on all remote routers. Refresh on local router is done by calling
+ * {@link MountTableStore#loadCache(boolean)}} API directly, no RPC call
+ * involved, but on remote routers refresh is done through RouterClient(RPC
+ * call). To improve performance, all routers are refreshed in separate thread
+ * and all connection are cached. Cached connections are removed from
+ * cache and closed when their max live time is elapsed.
+ */
+public class MountTableRefresherService extends AbstractService {
+ private static final String ROUTER_CONNECT_ERROR_MSG =
+ "Router {} connection failed. Mount table cache will not refesh.";
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MountTableRefresherService.class);
+
+ /** Local router. */
+ private final Router router;
+ /** Mount table store. */
+ private MountTableStore mountTableStore;
+ /** Local router admin address in the form of host:port. */
+ private String localAdminAdress;
+ /** Timeout in ms to update mount table cache on all the routers. */
+ private long cacheUpdateTimeout;
+
+ /**
+ * All router admin clients cached. So no need to create the client again and
+ * again. Router admin address(host:port) is used as key to cache RouterClient
+ * objects.
+ */
+ private LoadingCache<String, RouterClient> routerClientsCache;
+
+ /**
+ * Removes expired RouterClient from routerClientsCache.
+ */
+ private ScheduledExecutorService clientCacheCleanerScheduler;
+
+ /**
+ * Create a new service to refresh mount table cache when there is change in
+ * mount table entries.
+ *
+ * @param router whose mount table cache will be refreshed
+ */
+ public MountTableRefresherService(Router router) {
+ super(MountTableRefresherService.class.getSimpleName());
+ this.router = router;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ this.mountTableStore = getMountTableStore();
+ // attach this service to mount table store.
+ this.mountTableStore.setRefreshService(this);
+ this.localAdminAdress =
+ StateStoreUtils.getHostPortString(router.getAdminServerAddress());
+ this.cacheUpdateTimeout = conf.getTimeDuration(
+ RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT,
+ RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ long routerClientMaxLiveTime = conf.getTimeDuration(
+ RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME,
+ RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ routerClientsCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(routerClientMaxLiveTime, TimeUnit.MILLISECONDS)
+ .removalListener(getClientRemover()).build(getClientCreator());
+
+ initClientCacheCleaner(routerClientMaxLiveTime);
+ }
+
+ private void initClientCacheCleaner(long routerClientMaxLiveTime) {
+ clientCacheCleanerScheduler =
+ Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
+ .setNameFormat("MountTableRefresh_ClientsCacheCleaner")
+ .setDaemon(true).build());
+ /*
+ * When cleanUp() method is called, expired RouterClient will be removed and
+ * closed.
+ */
+ clientCacheCleanerScheduler.scheduleWithFixedDelay(
+ () -> routerClientsCache.cleanUp(), routerClientMaxLiveTime,
+ routerClientMaxLiveTime, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Create cache entry remove listener.
+ */
+ private RemovalListener<String, RouterClient> getClientRemover() {
+ return new RemovalListener<String, RouterClient>() {
+ @Override
+ public void onRemoval(
+ RemovalNotification<String, RouterClient> notification) {
+ closeRouterClient(notification.getValue());
+ }
+ };
+ }
+
+ @VisibleForTesting
+ protected void closeRouterClient(RouterClient client) {
+ try {
+ client.close();
+ } catch (IOException e) {
+ LOG.error("Error while closing RouterClient", e);
+ }
+ }
+
+ /**
+ * Creates RouterClient and caches it.
+ */
+ private CacheLoader<String, RouterClient> getClientCreator() {
+ return new CacheLoader<String, RouterClient>() {
+ public RouterClient load(String adminAddress) throws IOException {
+ InetSocketAddress routerSocket =
+ NetUtils.createSocketAddr(adminAddress);
+ Configuration config = getConfig();
+ return createRouterClient(routerSocket, config);
+ }
+ };
+ }
+
+ @VisibleForTesting
+ protected RouterClient createRouterClient(InetSocketAddress routerSocket,
+ Configuration config) throws IOException {
+ return new RouterClient(routerSocket, config);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ clientCacheCleanerScheduler.shutdown();
+ // remove and close all admin clients
+ routerClientsCache.invalidateAll();
+ }
+
+ private MountTableStore getMountTableStore() throws IOException {
+ MountTableStore mountTblStore =
+ router.getStateStore().getRegisteredRecordStore(MountTableStore.class);
+ if (mountTblStore == null) {
+ throw new IOException("Mount table state store is not available.");
+ }
+ return mountTblStore;
+ }
+
+ /**
+ * Refresh mount table cache of this router as well as all other routers.
+ */
+ public void refresh() throws StateStoreUnavailableException {
+ List<RouterState> cachedRecords =
+ router.getRouterStateManager().getCachedRecords();
+ List<MountTableRefresherThread> refreshThreads = new ArrayList<>();
+ for (RouterState routerState : cachedRecords) {
+ String adminAddress = routerState.getAdminAddress();
+ if (adminAddress == null || adminAddress.length() == 0) {
+ // this router has not enabled router admin
+ continue;
+ }
+ // No use of calling refresh on router which is not running state
+ if (routerState.getStatus() != RouterServiceState.RUNNING) {
+ LOG.info(
+ "Router {} is not running. Mount table cache will not refesh.");
+ // remove if RouterClient is cached.
+ removeFromCache(adminAddress);
+ } else if (isLocalAdmin(adminAddress)) {
+ /*
+ * Local router's cache update does not require RPC call, so no need for
+ * RouterClient
+ */
+ refreshThreads.add(getLocalRefresher(adminAddress));
+ } else {
+ try {
+ RouterClient client = routerClientsCache.get(adminAddress);
+ refreshThreads.add(new MountTableRefresherThread(
+ client.getMountTableManager(), adminAddress));
+ } catch (ExecutionException execExcep) {
+ // Can not connect, seems router is stopped now.
+ LOG.warn(ROUTER_CONNECT_ERROR_MSG, adminAddress, execExcep);
+ }
+ }
+ }
+ if (!refreshThreads.isEmpty()) {
+ invokeRefresh(refreshThreads);
+ }
+ }
+
+ @VisibleForTesting
+ protected MountTableRefresherThread getLocalRefresher(String adminAddress) {
+ return new MountTableRefresherThread(router.getAdminServer(), adminAddress);
+ }
+
+ private void removeFromCache(String adminAddress) {
+ routerClientsCache.invalidate(adminAddress);
+ }
+
+ private void invokeRefresh(List<MountTableRefresherThread> refreshThreads) {
+ CountDownLatch countDownLatch = new CountDownLatch(refreshThreads.size());
+ // start all the threads
+ for (MountTableRefresherThread refThread : refreshThreads) {
+ refThread.setCountDownLatch(countDownLatch);
+ refThread.start();
+ }
+ try {
+ /*
+ * Wait for all the thread to complete, await method returns false if
+ * refresh is not finished within specified time
+ */
+ boolean allReqCompleted =
+ countDownLatch.await(cacheUpdateTimeout, TimeUnit.MILLISECONDS);
+ if (!allReqCompleted) {
+ LOG.warn("Not all router admins updated their cache");
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Mount table cache refresher was interrupted.", e);
+ }
+ logResult(refreshThreads);
+ }
+
+ private boolean isLocalAdmin(String adminAddress) {
+ return adminAddress.contentEquals(localAdminAdress);
+ }
+
+ private void logResult(List<MountTableRefresherThread> refreshThreads) {
+ int succesCount = 0;
+ int failureCount = 0;
+ for (MountTableRefresherThread mountTableRefreshThread : refreshThreads) {
+ if (mountTableRefreshThread.isSuccess()) {
+ succesCount++;
+ } else {
+ failureCount++;
+ // remove RouterClient from cache so that new client is created
+ removeFromCache(mountTableRefreshThread.getAdminAddress());
+ }
+ }
+ LOG.info("Mount table entries cache refresh succesCount={},failureCount={}",
+ succesCount, failureCount);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherThread.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherThread.java
new file mode 100644
index 0000000..c9967a2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherThread.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for updating mount table cache on all the router.
+ */
+public class MountTableRefresherThread extends Thread {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MountTableRefresherThread.class);
+ private boolean success;
+ /** Admin server on which refreshed to be invoked. */
+ private String adminAddress;
+ private CountDownLatch countDownLatch;
+ private MountTableManager manager;
+
+ public MountTableRefresherThread(MountTableManager manager,
+ String adminAddress) {
+ this.manager = manager;
+ this.adminAddress = adminAddress;
+ setName("MountTableRefresh_" + adminAddress);
+ setDaemon(true);
+ }
+
+ /**
+ * Refresh mount table cache of local and remote routers. Local and remote
+ * routers will be refreshed differently. Lets understand what are the
+ * local and remote routers and refresh will be done differently on these
+ * routers. Suppose there are three routers R1, R2 and R3. User want to add
+ * new mount table entry. He will connect to only one router, not all the
+ * routers. Suppose He connects to R1 and calls add mount table entry through
+ * API or CLI. Now in this context R1 is local router, R2 and R3 are remote
+ * routers. Because add mount table entry is invoked on R1, R1 will update the
+ * cache locally it need not to make RPC call. But R1 will make RPC calls to
+ * update cache on R2 and R3.
+ */
+ @Override
+ public void run() {
+ try {
+ RefreshMountTableEntriesResponse refreshMountTableEntries =
+ manager.refreshMountTableEntries(
+ RefreshMountTableEntriesRequest.newInstance());
+ success = refreshMountTableEntries.getResult();
+ } catch (IOException e) {
+ LOG.error("Failed to refresh mount table entries cache at router {}",
+ adminAddress, e);
+ } finally {
+ countDownLatch.countDown();
+ }
+ }
+
+ /**
+ * @return true if cache was refreshed successfully.
+ */
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public void setCountDownLatch(CountDownLatch countDownLatch) {
+ this.countDownLatch = countDownLatch;
+ }
+
+ @Override
+ public String toString() {
+ return "MountTableRefreshThread [success=" + success + ", adminAddress="
+ + adminAddress + "]";
+ }
+
+ public String getAdminAddress() {
+ return adminAddress;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index 0070de7..5e907c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -204,6 +204,31 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
FEDERATION_ROUTER_PREFIX + "mount-table.max-cache-size";
/** Remove cache entries if we have more than 10k. */
public static final int FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE_DEFAULT = 10000;
+ /**
+ * If true then cache updated immediately after mount table entry change
+ * otherwise it is updated periodically based configuration.
+ */
+ public static final String MOUNT_TABLE_CACHE_UPDATE =
+ FEDERATION_ROUTER_PREFIX + "mount-table.cache.update";
+ public static final boolean MOUNT_TABLE_CACHE_UPDATE_DEFAULT =
+ false;
+ /**
+ * Timeout to update mount table cache on all the routers.
+ */
+ public static final String MOUNT_TABLE_CACHE_UPDATE_TIMEOUT =
+ FEDERATION_ROUTER_PREFIX + "mount-table.cache.update.timeout";
+ public static final long MOUNT_TABLE_CACHE_UPDATE_TIMEOUT_DEFAULT =
+ TimeUnit.MINUTES.toMillis(1);
+ /**
+ * Remote router mount table cache is updated through RouterClient(RPC call).
+ * To improve performance, RouterClient connections are cached but it should
+ * not be kept in cache forever. This property defines the max time a
+ * connection can be cached.
+ */
+ public static final String MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME =
+ FEDERATION_ROUTER_PREFIX + "mount-table.cache.update.client.max.time";
+ public static final long MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME_DEFAULT =
+ TimeUnit.MINUTES.toMillis(5);
public static final String FEDERATION_MOUNT_TABLE_CACHE_ENABLE =
FEDERATION_ROUTER_PREFIX + "mount-table.cache.enable";
public static final boolean FEDERATION_MOUNT_TABLE_CACHE_ENABLE_DEFAULT =
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
index 3182e27..6a7437f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
@@ -254,9 +254,50 @@ public class Router extends CompositeService {
addService(this.safemodeService);
}
+ /*
+ * Refresh mount table cache immediately after adding, modifying or deleting
+ * the mount table entries. If this service is not enabled mount table cache
+ * are refreshed periodically by StateStoreCacheUpdateService
+ */
+ if (conf.getBoolean(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE,
+ RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_DEFAULT)) {
+ // There is no use of starting refresh service if state store and admin
+ // servers are not enabled
+ String disabledDependentServices = getDisabledDependentServices();
+ /*
+ * disabledDependentServices null means all dependent services are
+ * enabled.
+ */
+ if (disabledDependentServices == null) {
+
+ MountTableRefresherService refreshService =
+ new MountTableRefresherService(this);
+ addService(refreshService);
+ LOG.info("Service {} is enabled.",
+ MountTableRefresherService.class.getSimpleName());
+ } else {
+ LOG.warn(
+ "Service {} not enabled: depenendent service(s) {} not enabled.",
+ MountTableRefresherService.class.getSimpleName(),
+ disabledDependentServices);
+ }
+ }
+
super.serviceInit(conf);
}
+ private String getDisabledDependentServices() {
+ if (this.stateStore == null && this.adminServer == null) {
+ return StateStoreService.class.getSimpleName() + ","
+ + RouterAdminServer.class.getSimpleName();
+ } else if (this.stateStore == null) {
+ return StateStoreService.class.getSimpleName();
+ } else if (this.adminServer == null) {
+ return RouterAdminServer.class.getSimpleName();
+ }
+ return null;
+ }
+
/**
* Returns the hostname for this Router. If the hostname is not
* explicitly configured in the given config, then it is determined.
@@ -696,9 +737,19 @@ public class Router extends CompositeService {
}
/**
- * Get the Router safe mode service
+ * Get the Router safe mode service.
*/
RouterSafemodeService getSafemodeService() {
return this.safemodeService;
}
+
+ /**
+ * Get router admin server.
+ *
+ * @return Null if admin is not enabled.
+ */
+ public RouterAdminServer getAdminServer() {
+ return adminServer;
+ }
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
index f34dc41..5bb7751 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.store.DisabledNameserviceStore;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.DisableNameserviceRequest;
@@ -55,6 +56,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeReques
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
@@ -102,6 +105,7 @@ public class RouterAdminServer extends AbstractService
private static String routerOwner;
private static String superGroup;
private static boolean isPermissionEnabled;
+ private boolean iStateStoreCache;
public RouterAdminServer(Configuration conf, Router router)
throws IOException {
@@ -154,6 +158,8 @@ public class RouterAdminServer extends AbstractService
this.adminAddress = new InetSocketAddress(
confRpcAddress.getHostName(), listenAddress.getPort());
router.setAdminServerAddress(this.adminAddress);
+ iStateStoreCache =
+ router.getSubclusterResolver() instanceof StateStoreCache;
}
/**
@@ -243,7 +249,7 @@ public class RouterAdminServer extends AbstractService
getMountTableStore().updateMountTableEntry(request);
MountTable mountTable = request.getEntry();
- if (mountTable != null) {
+ if (mountTable != null && router.isQuotaEnabled()) {
synchronizeQuota(mountTable);
}
return response;
@@ -331,6 +337,26 @@ public class RouterAdminServer extends AbstractService
return GetSafeModeResponse.newInstance(isInSafeMode);
}
+ @Override
+ public RefreshMountTableEntriesResponse refreshMountTableEntries(
+ RefreshMountTableEntriesRequest request) throws IOException {
+ if (iStateStoreCache) {
+ /*
+ * MountTableResolver updates MountTableStore cache also. Expecting other
+ * SubclusterResolver implementations to update MountTableStore cache also
+ * apart from updating its cache.
+ */
+ boolean result = ((StateStoreCache) this.router.getSubclusterResolver())
+ .loadCache(true);
+ RefreshMountTableEntriesResponse response =
+ RefreshMountTableEntriesResponse.newInstance();
+ response.setResult(result);
+ return response;
+ } else {
+ return getMountTableStore().refreshMountTableEntries(request);
+ }
+ }
+
/**
* Verify if Router set safe mode state correctly.
* @param isInSafeMode Expected state to be set.
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
index a7f02d3..c497d85 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.RecordStore;
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
@@ -91,6 +92,10 @@ public class RouterHeartbeatService extends PeriodicService {
getStateStoreVersion(MembershipStore.class),
getStateStoreVersion(MountTableStore.class));
record.setStateStoreVersion(stateStoreVersion);
+ // if admin server not started then hostPort will be empty
+ String hostPort =
+ StateStoreUtils.getHostPortString(router.getAdminServerAddress());
+ record.setAdminAddress(hostPort);
RouterHeartbeatRequest request =
RouterHeartbeatRequest.newInstance(record);
RouterHeartbeatResponse response = routerStore.routerHeartbeat(request);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java
index b439659..9d4b64b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java
@@ -20,8 +20,11 @@ package org.apache.hadoop.hdfs.server.federation.store;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.router.MountTableRefresherService;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Management API for the HDFS mount table information stored in
@@ -42,8 +45,29 @@ import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
@InterfaceStability.Evolving
public abstract class MountTableStore extends CachedRecordStore<MountTable>
implements MountTableManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MountTableStore.class);
+ private MountTableRefresherService refreshService;
public MountTableStore(StateStoreDriver driver) {
super(MountTable.class, driver);
}
+
+ public void setRefreshService(MountTableRefresherService refreshService) {
+ this.refreshService = refreshService;
+ }
+
+ /**
+ * Update mount table cache of this router as well as all other routers.
+ */
+ protected void updateCacheAllRouters() {
+ if (refreshService != null) {
+ try {
+ refreshService.refresh();
+ } catch (StateStoreUnavailableException e) {
+ LOG.error("Cannot refresh mount table: state store not available", e);
+ }
+ }
+ }
+
}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java
index 924c96a..4b932d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.federation.store;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
@@ -110,4 +113,27 @@ public final class StateStoreUtils {
}
return matchingList;
}
+
+ /**
+ * Returns address in form of host:port, empty string if address is null.
+ *
+ * @param address address
+ * @return host:port
+ */
+ public static String getHostPortString(InetSocketAddress address) {
+ if (null == address) {
+ return "";
+ }
+ String hostName = address.getHostName();
+ if (hostName.equals("0.0.0.0")) {
+ try {
+ hostName = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ LOG.error("Failed to get local host name", e);
+ return "";
+ }
+ }
+ return hostName + ":" + address.getPort();
+ }
+
}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
index eb117d6..76c7e78 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntr
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
@@ -68,6 +70,7 @@ public class MountTableStoreImpl extends MountTableStore {
AddMountTableEntryResponse response =
AddMountTableEntryResponse.newInstance();
response.setStatus(status);
+ updateCacheAllRouters();
return response;
}
@@ -86,6 +89,7 @@ public class MountTableStoreImpl extends MountTableStore {
UpdateMountTableEntryResponse response =
UpdateMountTableEntryResponse.newInstance();
response.setStatus(status);
+ updateCacheAllRouters();
return response;
}
@@ -110,6 +114,7 @@ public class MountTableStoreImpl extends MountTableStore {
RemoveMountTableEntryResponse response =
RemoveMountTableEntryResponse.newInstance();
response.setStatus(status);
+ updateCacheAllRouters();
return response;
}
@@ -151,4 +156,17 @@ public class MountTableStoreImpl extends MountTableStore {
response.setTimestamp(Time.now());
return response;
}
+
+ @Override
+ public RefreshMountTableEntriesResponse refreshMountTableEntries(
+ RefreshMountTableEntriesRequest request) throws IOException {
+ // Because this refresh is done through admin API, it should always be force
+ // refresh.
+ boolean result = loadCache(true);
+ RefreshMountTableEntriesResponse response =
+ RefreshMountTableEntriesResponse.newInstance();
+ response.setResult(result);
+ return response;
+ }
+
}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RefreshMountTableEntriesRequest.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RefreshMountTableEntriesRequest.java
new file mode 100644
index 0000000..899afe7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RefreshMountTableEntriesRequest.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+
+/**
+ * API request for refreshing mount table cached entries from state store.
+ */
+public abstract class RefreshMountTableEntriesRequest {
+
+ public static RefreshMountTableEntriesRequest newInstance()
+ throws IOException {
+ return StateStoreSerializer
+ .newRecord(RefreshMountTableEntriesRequest.class);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RefreshMountTableEntriesResponse.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RefreshMountTableEntriesResponse.java
new file mode 100644
index 0000000..6c9ed77
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RefreshMountTableEntriesResponse.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
+
+/**
+ * API response for refreshing mount table entries cache from state store.
+ */
+public abstract class RefreshMountTableEntriesResponse {
+
+ public static RefreshMountTableEntriesResponse newInstance()
+ throws IOException {
+ return StateStoreSerializer
+ .newRecord(RefreshMountTableEntriesResponse.class);
+ }
+
+ @Public
+ @Unstable
+ public abstract boolean getResult();
+
+ @Public
+ @Unstable
+ public abstract void setResult(boolean result);
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RefreshMountTableEntriesRequestPBImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RefreshMountTableEntriesRequestPBImpl.java
new file mode 100644
index 0000000..cec0699
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RefreshMountTableEntriesRequestPBImpl.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesRequestProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesRequestProto.Builder;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesRequestProtoOrBuilder;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
+
+import com.google.protobuf.Message;
+
+/**
+ * Protobuf implementation of the state store API object
+ * RefreshMountTableEntriesRequest.
+ */
+public class RefreshMountTableEntriesRequestPBImpl
+ extends RefreshMountTableEntriesRequest implements PBRecord {
+
+ private FederationProtocolPBTranslator<RefreshMountTableEntriesRequestProto,
+ Builder, RefreshMountTableEntriesRequestProtoOrBuilder> translator =
+ new FederationProtocolPBTranslator<>(
+ RefreshMountTableEntriesRequestProto.class);
+
+ public RefreshMountTableEntriesRequestPBImpl() {
+ }
+
+ public RefreshMountTableEntriesRequestPBImpl(
+ RefreshMountTableEntriesRequestProto proto) {
+ this.translator.setProto(proto);
+ }
+
+ @Override
+ public RefreshMountTableEntriesRequestProto getProto() {
+ // if builder is null build() returns null, calling getBuilder() to
+ // instantiate builder
+ this.translator.getBuilder();
+ return this.translator.build();
+ }
+
+ @Override
+ public void setProto(Message proto) {
+ this.translator.setProto(proto);
+ }
+
+ @Override
+ public void readInstance(String base64String) throws IOException {
+ this.translator.readInstance(base64String);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RefreshMountTableEntriesResponsePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RefreshMountTableEntriesResponsePBImpl.java
new file mode 100644
index 0000000..5acf479
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RefreshMountTableEntriesResponsePBImpl.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesResponseProto;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesResponseProto.Builder;
+import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshMountTableEntriesResponseProtoOrBuilder;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
+
+import com.google.protobuf.Message;
+
+/**
+ * Protobuf implementation of the state store API object
+ * RefreshMountTableEntriesResponse.
+ */
+public class RefreshMountTableEntriesResponsePBImpl
+ extends RefreshMountTableEntriesResponse implements PBRecord {
+
+ private FederationProtocolPBTranslator<RefreshMountTableEntriesResponseProto,
+ Builder, RefreshMountTableEntriesResponseProtoOrBuilder> translator =
+ new FederationProtocolPBTranslator<>(
+ RefreshMountTableEntriesResponseProto.class);
+
+ public RefreshMountTableEntriesResponsePBImpl() {
+ }
+
+ public RefreshMountTableEntriesResponsePBImpl(
+ RefreshMountTableEntriesResponseProto proto) {
+ this.translator.setProto(proto);
+ }
+
+ @Override
+ public RefreshMountTableEntriesResponseProto getProto() {
+ return this.translator.build();
+ }
+
+ @Override
+ public void setProto(Message proto) {
+ this.translator.setProto(proto);
+ }
+
+ @Override
+ public void readInstance(String base64String) throws IOException {
+ this.translator.readInstance(base64String);
+ }
+
+ @Override
+ public boolean getResult() {
+ return this.translator.getProtoOrBuilder().getResult();
+ };
+
+ @Override
+ public void setResult(boolean result) {
+ this.translator.getBuilder().setResult(result);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/RouterState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/RouterState.java
index c90abcc..2fe6941 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/RouterState.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/RouterState.java
@@ -88,6 +88,10 @@ public abstract class RouterState extends BaseRecord {
public abstract long getDateStarted();
+ public abstract void setAdminAddress(String adminAddress);
+
+ public abstract String getAdminAddress();
+
/**
* Get the identifier for the Router. It uses the address.
*
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/RouterStatePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/RouterStatePBImpl.java
index 23a61f9..d837386 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/RouterStatePBImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/RouterStatePBImpl.java
@@ -199,4 +199,14 @@ public class RouterStatePBImpl extends RouterState implements PBRecord {
public long getDateCreated() {
return this.translator.getProtoOrBuilder().getDateCreated();
}
+
+ @Override
+ public void setAdminAddress(String adminAddress) {
+ this.translator.getBuilder().setAdminAddress(adminAddress);
+ }
+
+ @Override
+ public String getAdminAddress() {
+ return this.translator.getProtoOrBuilder().getAdminAddress();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
index bdaabe8..27c42cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
@@ -54,6 +54,8 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeReques
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
@@ -107,7 +109,8 @@ public class RouterAdmin extends Configured implements Tool {
if (cmd == null) {
String[] commands =
{"-add", "-update", "-rm", "-ls", "-setQuota", "-clrQuota",
- "-safemode", "-nameservice", "-getDisabledNameservices"};
+ "-safemode", "-nameservice", "-getDisabledNameservices",
+ "-refresh"};
StringBuilder usage = new StringBuilder();
usage.append("Usage: hdfs dfsrouteradmin :\n");
for (int i = 0; i < commands.length; i++) {
@@ -142,6 +145,8 @@ public class RouterAdmin extends Configured implements Tool {
return "\t[-nameservice enable | disable <nameservice>]";
} else if (cmd.equals("-getDisabledNameservices")) {
return "\t[-getDisabledNameservices]";
+ } else if (cmd.equals("-refresh")) {
+ return "\t[-refresh]";
}
return getUsage(null);
}
@@ -230,9 +235,10 @@ public class RouterAdmin extends Configured implements Tool {
printUsage(cmd);
return exitCode;
}
+ String address = null;
// Initialize RouterClient
try {
- String address = getConf().getTrimmed(
+ address = getConf().getTrimmed(
RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT);
InetSocketAddress routerSocket = NetUtils.createSocketAddr(address);
@@ -302,6 +308,8 @@ public class RouterAdmin extends Configured implements Tool {
manageNameservice(subcmd, nsId);
} else if ("-getDisabledNameservices".equals(cmd)) {
getDisabledNameservices();
+ } else if ("-refresh".equals(cmd)) {
+ refresh(address);
} else {
throw new IllegalArgumentException("Unknown Command: " + cmd);
}
@@ -337,6 +345,27 @@ public class RouterAdmin extends Configured implements Tool {
return exitCode;
}
+ private void refresh(String address) throws IOException {
+ if (refreshRouterCache()) {
+ System.out.println(
+ "Successfully updated mount table cache on router " + address);
+ }
+ }
+
+ /**
+ * Refresh mount table cache on connected router.
+ *
+ * @return true if cache refreshed successfully
+ * @throws IOException
+ */
+ private boolean refreshRouterCache() throws IOException {
+ RefreshMountTableEntriesResponse response =
+ client.getMountTableManager().refreshMountTableEntries(
+ RefreshMountTableEntriesRequest.newInstance());
+ return response.getResult();
+ }
+
+
/**
* Add a mount table entry or update if it exists.
*
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
index b1a62b1..17ae299 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
@@ -193,6 +193,7 @@ message RouterRecordProto {
optional string version = 6;
optional string compileInfo = 7;
optional uint64 dateStarted = 8;
+ optional string adminAddress = 9;
}
message GetRouterRegistrationRequestProto {
@@ -219,6 +220,13 @@ message RouterHeartbeatResponseProto {
optional bool status = 1;
}
+message RefreshMountTableEntriesRequestProto {
+}
+
+message RefreshMountTableEntriesResponseProto {
+ optional bool result = 1;
+}
+
/////////////////////////////////////////////////
// Route State
/////////////////////////////////////////////////
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/RouterProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/RouterProtocol.proto
index f3a2b6e..34a012a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/RouterProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/RouterProtocol.proto
@@ -74,4 +74,9 @@ service RouterAdminProtocolService {
* Get the list of disabled name services.
*/
rpc getDisabledNameservices(GetDisabledNameservicesRequestProto) returns (GetDisabledNameservicesResponseProto);
+
+ /**
+ * Refresh mount entries
+ */
+ rpc refreshMountTableEntries(RefreshMountTableEntriesRequestProto) returns(RefreshMountTableEntriesResponseProto);
}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index afb3c32..72f6c2f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -547,4 +547,38 @@
</description>
</property>
+ <property>
+ <name>dfs.federation.router.mount-table.cache.update</name>
+ <value>false</value>
+ <description>Set true to enable MountTableRefreshService. This service
+ updates mount table cache immediately after adding, modifying or
+ deleting the mount table entries. If this service is not enabled
+ mount table cache are refreshed periodically by
+ StateStoreCacheUpdateService
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.federation.router.mount-table.cache.update.timeout</name>
+ <value>1m</value>
+ <description>This property defines how long to wait for all the
+ admin servers to finish their mount table cache update. This setting
+ supports multiple time unit suffixes as described in
+ dfs.federation.router.safemode.extension.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.federation.router.mount-table.cache.update.client.max.time
+ </name>
+ <value>5m</value>
+ <description>Remote router mount table cache is updated through
+ RouterClient(RPC call). To improve performance, RouterClient
+ connections are cached but it should not be kept in cache forever.
+ This property defines the max time a connection can be cached. This
+ setting supports multiple time unit suffixes as described in
+ dfs.federation.router.safemode.extension.
+ </description>
+ </property>
+
</configuration>
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
index 72bf6af..adc4383 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
@@ -230,6 +230,12 @@ Ls command will show below information for each mount table entry:
Source Destinations Owner Group Mode Quota/Usage
/path ns0->/path root supergroup rwxr-xr-x [NsQuota: 50/0, SsQuota: 100 B/0 B]
+Mount table cache is refreshed periodically but it can also be refreshed by executing refresh command:
+
+ [hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -refresh
+
+The above command will refresh cache of the connected router. This command is redundant when mount table refresh service is enabled as the service will always keep the cache updated.
+
#### Multiple subclusters
A mount point also supports mapping multiple subclusters.
For example, to create a mount point that stores files in subclusters `ns1` and `ns2`.
@@ -380,6 +386,9 @@ The connection to the State Store and the internal caching at the Router.
| dfs.federation.router.store.connection.test | 60000 | How often to check for the connection to the State Store in milliseconds. |
| dfs.federation.router.cache.ttl | 60000 | How often to refresh the State Store caches in milliseconds. |
| dfs.federation.router.store.membership.expiration | 300000 | Expiration time in milliseconds for a membership record. |
+| dfs.federation.router.mount-table.cache.update | false | If true, Mount table cache is updated whenever a mount table entry is added, modified or removed for all the routers. |
+| dfs.federation.router.mount-table.cache.update.timeout | 1m | Max time to wait for all the routers to finish their mount table cache update. |
+| dfs.federation.router.mount-table.cache.update.client.max.time | 5m | Max time a RouterClient connection can be cached. |
### Routing
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java
index c48e6e2..5095c6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java
@@ -56,6 +56,8 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
+import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
+import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.test.GenericTestUtils;
@@ -316,4 +318,29 @@ public final class FederationTestUtils {
}).when(spyHAContext).checkOperation(any(OperationCategory.class));
Whitebox.setInternalState(namesystem, "haContext", spyHAContext);
}
+
+ /**
+ * Wait for a number of routers to be registered in state store.
+ *
+ * @param stateManager number of routers to be registered.
+ * @param routerCount number of routers to be registered.
+ * @param tiemout max wait time in ms
+ */
+ public static void waitRouterRegistered(RouterStore stateManager,
+ long routerCount, int timeout) throws Exception {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ try {
+ List<RouterState> cachedRecords = stateManager.getCachedRecords();
+ if (cachedRecords.size() == routerCount) {
+ return true;
+ }
+ } catch (IOException e) {
+ // Ignore
+ }
+ return false;
+ }
+ }, 100, timeout);
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
index be0de52..6d9b2c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
@@ -38,6 +38,7 @@ public class RouterConfigBuilder {
private boolean enableMetrics = false;
private boolean enableQuota = false;
private boolean enableSafemode = false;
+ private boolean enableCacheRefresh;
public RouterConfigBuilder(Configuration configuration) {
this.conf = configuration;
@@ -104,6 +105,11 @@ public class RouterConfigBuilder {
return this;
}
+ public RouterConfigBuilder refreshCache(boolean enable) {
+ this.enableCacheRefresh = enable;
+ return this;
+ }
+
public RouterConfigBuilder rpc() {
return this.rpc(true);
}
@@ -140,6 +146,10 @@ public class RouterConfigBuilder {
return this.safemode(true);
}
+ public RouterConfigBuilder refreshCache() {
+ return this.refreshCache(true);
+ }
+
public Configuration build() {
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE,
this.enableStateStore);
@@ -158,6 +168,8 @@ public class RouterConfigBuilder {
this.enableQuota);
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE,
this.enableSafemode);
+ conf.setBoolean(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE,
+ this.enableCacheRefresh);
return conf;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
index d0e3e50..445022b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
@@ -799,6 +799,28 @@ public class TestRouterAdminCLI {
assertTrue(err.toString().contains("No arguments allowed"));
}
+ @Test
+ public void testRefreshMountTableCache() throws Exception {
+ String src = "/refreshMount";
+
+ // create mount table entry
+ String[] argv = new String[] {"-add", src, "refreshNS0", "/refreshDest"};
+ assertEquals(0, ToolRunner.run(admin, argv));
+
+ // refresh the mount table entry cache
+ System.setOut(new PrintStream(out));
+ argv = new String[] {"-refresh"};
+ assertEquals(0, ToolRunner.run(admin, argv));
+ assertTrue(
+ out.toString().startsWith("Successfully updated mount table cache"));
+
+ // Now ls should return that mount table entry
+ out.reset();
+ argv = new String[] {"-ls", src};
+ assertEquals(0, ToolRunner.run(admin, argv));
+ assertTrue(out.toString().contains(src));
+ }
+
/**
* Wait for the Router transforming to expected state.
* @param expectedState Expected Router state.
@@ -836,8 +858,7 @@ public class TestRouterAdminCLI {
}
@Test
- public void testUpdateDestinationForExistingMountTable() throws
- Exception {
+ public void testUpdateDestinationForExistingMountTable() throws Exception {
// Add a mount table firstly
String nsId = "ns0";
String src = "/test-updateDestinationForExistingMountTable";
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableCacheRefresh.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableCacheRefresh.java
new file mode 100644
index 0000000..c90e614
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableCacheRefresh.java
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hdfs.server.federation.FederationTestUtils;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * This test class verifies that mount table cache is updated on all the routers
+ * when MountTableRefreshService is enabled and there is a change in mount table
+ * entries.
+ */
+public class TestRouterMountTableCacheRefresh {
+ private static TestingServer curatorTestingServer;
+ private static MiniRouterDFSCluster cluster;
+ private static RouterContext routerContext;
+ private static MountTableManager mountTableManager;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ curatorTestingServer = new TestingServer();
+ curatorTestingServer.start();
+ final String connectString = curatorTestingServer.getConnectString();
+ int numNameservices = 2;
+ cluster = new MiniRouterDFSCluster(false, numNameservices);
+ Configuration conf = new RouterConfigBuilder().refreshCache().admin().rpc()
+ .heartbeat().build();
+ conf.setClass(RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
+ RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT,
+ FileSubclusterResolver.class);
+ conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString);
+ conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE, true);
+ cluster.addRouterOverrides(conf);
+ cluster.startCluster();
+ cluster.startRouters();
+ cluster.waitClusterUp();
+ routerContext = cluster.getRandomRouter();
+ RouterStore routerStateManager =
+ routerContext.getRouter().getRouterStateManager();
+ mountTableManager = routerContext.getAdminClient().getMountTableManager();
+ // wait for one minute for all the routers to get registered
+ FederationTestUtils.waitRouterRegistered(routerStateManager,
+ numNameservices, 60000);
+ }
+
+ @AfterClass
+ public static void destory() {
+ try {
+ curatorTestingServer.close();
+ cluster.shutdown();
+ } catch (IOException e) {
+ // do nothing
+ }
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ clearEntries();
+ }
+
+ private void clearEntries() throws IOException {
+ List<MountTable> result = getMountTableEntries();
+ for (MountTable mountTable : result) {
+ RemoveMountTableEntryResponse removeMountTableEntry =
+ mountTableManager.removeMountTableEntry(RemoveMountTableEntryRequest
+ .newInstance(mountTable.getSourcePath()));
+ assertTrue(removeMountTableEntry.getStatus());
+ }
+ }
+
+ /**
+ * addMountTableEntry API should internally update the cache on all the
+ * routers.
+ */
+ @Test
+ public void testMountTableEntriesCacheUpdatedAfterAddAPICall()
+ throws IOException {
+
+ // Existing mount table size
+ int existingEntriesCount = getNumMountTableEntries();
+ String srcPath = "/addPath";
+ MountTable newEntry = MountTable.newInstance(srcPath,
+ Collections.singletonMap("ns0", "/addPathDest"), Time.now(),
+ Time.now());
+ addMountTableEntry(mountTableManager, newEntry);
+
+ // When Add entry is done, all the routers must have updated its mount table
+ // entry
+ List<RouterContext> routers = getRouters();
+ for (RouterContext rc : routers) {
+ List<MountTable> result =
+ getMountTableEntries(rc.getAdminClient().getMountTableManager());
+ assertEquals(1 + existingEntriesCount, result.size());
+ MountTable mountTableResult = result.get(0);
+ assertEquals(srcPath, mountTableResult.getSourcePath());
+ }
+ }
+
+ /**
+ * removeMountTableEntry API should internally update the cache on all the
+ * routers.
+ */
+ @Test
+ public void testMountTableEntriesCacheUpdatedAfterRemoveAPICall()
+ throws IOException {
+ // add
+ String srcPath = "/removePathSrc";
+ MountTable newEntry = MountTable.newInstance(srcPath,
+ Collections.singletonMap("ns0", "/removePathDest"), Time.now(),
+ Time.now());
+ addMountTableEntry(mountTableManager, newEntry);
+ int addCount = getNumMountTableEntries();
+ assertEquals(1, addCount);
+
+ // remove
+ RemoveMountTableEntryResponse removeMountTableEntry =
+ mountTableManager.removeMountTableEntry(
+ RemoveMountTableEntryRequest.newInstance(srcPath));
+ assertTrue(removeMountTableEntry.getStatus());
+
+ int removeCount = getNumMountTableEntries();
+ assertEquals(addCount - 1, removeCount);
+ }
+
+ /**
+ * updateMountTableEntry API should internally update the cache on all the
+ * routers.
+ */
+ @Test
+ public void testMountTableEntriesCacheUpdatedAfterUpdateAPICall()
+ throws IOException {
+ // add
+ String srcPath = "/updatePathSrc";
+ MountTable newEntry = MountTable.newInstance(srcPath,
+ Collections.singletonMap("ns0", "/updatePathDest"), Time.now(),
+ Time.now());
+ addMountTableEntry(mountTableManager, newEntry);
+ int addCount = getNumMountTableEntries();
+ assertEquals(1, addCount);
+
+ // update
+ String key = "ns1";
+ String value = "/updatePathDest2";
+ MountTable upateEntry = MountTable.newInstance(srcPath,
+ Collections.singletonMap(key, value), Time.now(), Time.now());
+ UpdateMountTableEntryResponse updateMountTableEntry =
+ mountTableManager.updateMountTableEntry(
+ UpdateMountTableEntryRequest.newInstance(upateEntry));
+ assertTrue(updateMountTableEntry.getStatus());
+ MountTable updatedMountTable = getMountTableEntry(srcPath);
+ assertNotNull("Updated mount table entrty cannot be null",
+ updatedMountTable);
+ assertEquals(1, updatedMountTable.getDestinations().size());
+ assertEquals(key,
+ updatedMountTable.getDestinations().get(0).getNameserviceId());
+ assertEquals(value, updatedMountTable.getDestinations().get(0).getDest());
+ }
+
+ /**
+ * After caching RouterClient if router goes down, refresh should be
+ * successful on other available router. The router which is not running
+ * should be ignored.
+ */
+ @Test
+ public void testCachedRouterClientBehaviourAfterRouterStoped()
+ throws IOException {
+ String srcPath = "/addPathClientCache";
+ MountTable newEntry = MountTable.newInstance(srcPath,
+ Collections.singletonMap("ns0", "/addPathClientCacheDest"), Time.now(),
+ Time.now());
+ addMountTableEntry(mountTableManager, newEntry);
+
+ // When Add entry is done, all the routers must have updated its mount table
+ // entry
+ List<RouterContext> routers = getRouters();
+ for (RouterContext rc : routers) {
+ List<MountTable> result =
+ getMountTableEntries(rc.getAdminClient().getMountTableManager());
+ assertEquals(1, result.size());
+ MountTable mountTableResult = result.get(0);
+ assertEquals(srcPath, mountTableResult.getSourcePath());
+ }
+
+ // Lets stop one router
+ for (RouterContext rc : routers) {
+ InetSocketAddress adminServerAddress =
+ rc.getRouter().getAdminServerAddress();
+ if (!routerContext.getRouter().getAdminServerAddress()
+ .equals(adminServerAddress)) {
+ cluster.stopRouter(rc);
+ break;
+ }
+ }
+
+ srcPath = "/addPathClientCache2";
+ newEntry = MountTable.newInstance(srcPath,
+ Collections.singletonMap("ns0", "/addPathClientCacheDest2"), Time.now(),
+ Time.now());
+ addMountTableEntry(mountTableManager, newEntry);
+ for (RouterContext rc : getRouters()) {
+ List<MountTable> result =
+ getMountTableEntries(rc.getAdminClient().getMountTableManager());
+ assertEquals(2, result.size());
+ }
+ }
+
+ private List<RouterContext> getRouters() {
+ List<RouterContext> result = new ArrayList<>();
+ for (RouterContext rc : cluster.getRouters()) {
+ if (rc.getRouter().getServiceState() == STATE.STARTED) {
+ result.add(rc);
+ }
+ }
+ return result;
+ }
+
+ @Test
+ public void testRefreshMountTableEntriesAPI() throws IOException {
+ RefreshMountTableEntriesRequest request =
+ RefreshMountTableEntriesRequest.newInstance();
+ RefreshMountTableEntriesResponse refreshMountTableEntriesRes =
+ mountTableManager.refreshMountTableEntries(request);
+ // refresh should be successful
+ assertTrue(refreshMountTableEntriesRes.getResult());
+ }
+
+ /**
+ * Verify cache update timeouts when any of the router takes more time than
+ * the configured timeout period.
+ */
+ @Test(timeout = 10000)
+ public void testMountTableEntriesCacheUpdateTimeout() throws IOException {
+ // Resources will be closed when router is closed
+ @SuppressWarnings("resource")
+ MountTableRefresherService mountTableRefresherService =
+ new MountTableRefresherService(routerContext.getRouter()) {
+ @Override
+ protected MountTableRefresherThread getLocalRefresher(
+ String adminAddress) {
+ return new MountTableRefresherThread(null, adminAddress) {
+ @Override
+ public void run() {
+ try {
+ // Sleep 1 minute
+ Thread.sleep(60000);
+ } catch (InterruptedException e) {
+ // Do nothing
+ }
+ }
+ };
+ }
+ };
+ Configuration config = routerContext.getRouter().getConfig();
+ config.setTimeDuration(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT, 5,
+ TimeUnit.SECONDS);
+ mountTableRefresherService.init(config);
+ // One router is not responding for 1 minute, still refresh should
+ // finished in 5 second as cache update timeout is set 5 second.
+ mountTableRefresherService.refresh();
+ // Test case timeout is assert for this test case.
+ }
+
+ /**
+ * Verify Cached RouterClient connections are removed from cache and closed
+ * when their max live time is elapsed.
+ */
+ @Test
+ public void testRouterClientConnectionExpiration() throws Exception {
+ final AtomicInteger createCounter = new AtomicInteger();
+ final AtomicInteger removeCounter = new AtomicInteger();
+ // Resources will be closed when router is closed
+ @SuppressWarnings("resource")
+ MountTableRefresherService mountTableRefresherService =
+ new MountTableRefresherService(routerContext.getRouter()) {
+ @Override
+ protected void closeRouterClient(RouterClient client) {
+ super.closeRouterClient(client);
+ removeCounter.incrementAndGet();
+ }
+
+ @Override
+ protected RouterClient createRouterClient(
+ InetSocketAddress routerSocket, Configuration config)
+ throws IOException {
+ createCounter.incrementAndGet();
+ return super.createRouterClient(routerSocket, config);
+ }
+ };
+ int clientCacheTime = 2000;
+ Configuration config = routerContext.getRouter().getConfig();
+ config.setTimeDuration(
+ RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME, clientCacheTime,
+ TimeUnit.MILLISECONDS);
+ mountTableRefresherService.init(config);
+ // Do refresh to created RouterClient
+ mountTableRefresherService.refresh();
+ assertNotEquals("No RouterClient is created.", 0, createCounter.get());
+ /*
+ * Wait for clients to expire. Lets wait triple the cache eviction period.
+ * After cache eviction period all created client must be removed and
+ * closed.
+ */
+ GenericTestUtils.waitFor(() -> createCounter.get() == removeCounter.get(),
+ 100, 3 * clientCacheTime);
+ }
+
+ private int getNumMountTableEntries() throws IOException {
+ List<MountTable> records = getMountTableEntries();
+ int oldEntriesCount = records.size();
+ return oldEntriesCount;
+ }
+
+ private MountTable getMountTableEntry(String srcPath) throws IOException {
+ List<MountTable> mountTableEntries = getMountTableEntries();
+ for (MountTable mountTable : mountTableEntries) {
+ String sourcePath = mountTable.getSourcePath();
+ if (srcPath.equals(sourcePath)) {
+ return mountTable;
+ }
+ }
+ return null;
+ }
+
+ private void addMountTableEntry(MountTableManager mountTableMgr,
+ MountTable newEntry) throws IOException {
+ AddMountTableEntryRequest addRequest =
+ AddMountTableEntryRequest.newInstance(newEntry);
+ AddMountTableEntryResponse addResponse =
+ mountTableMgr.addMountTableEntry(addRequest);
+ assertTrue(addResponse.getStatus());
+ }
+
+ private List<MountTable> getMountTableEntries() throws IOException {
+ return getMountTableEntries(mountTableManager);
+ }
+
+ private List<MountTable> getMountTableEntries(
+ MountTableManager mountTableManagerParam) throws IOException {
+ GetMountTableEntriesRequest request =
+ GetMountTableEntriesRequest.newInstance("/");
+ return mountTableManagerParam.getMountTableEntries(request).getEntries();
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
index 0ba9b94..5bfb0cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
@@ -437,6 +437,7 @@ Usage:
[-safemode enter | leave | get]
[-nameservice disable | enable <nameservice>]
[-getDisabledNameservices]
+ [-refresh]
| COMMAND\_OPTION | Description |
|:---- |:---- |
@@ -449,6 +450,7 @@ Usage:
| `-safemode` `enter` `leave` `get` | Manually set the Router entering or leaving safe mode. The option *get* will be used for verifying if the Router is in safe mode state. |
| `-nameservice` `disable` `enable` *nameservice* | Disable/enable a name service from the federation. If disabled, requests will not go to that name service. |
| `-getDisabledNameservices` | Get the name services that are disabled in the federation. |
+| `-refresh` | Update mount table cache of the connected router. |
The commands for managing Router-based federation. See [Mount table management](../hadoop-hdfs-rbf/HDFSRouterFederation.html#Mount_table_management) for more info.
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org