You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2018/01/18 23:47:35 UTC
[04/49] hadoop git commit: HDFS-12934. RBF: Federation supports
global quota. Contributed by Yiqun Lin.
HDFS-12934. RBF: Federation supports global quota. Contributed by Yiqun Lin.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d98a2e6e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d98a2e6e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d98a2e6e
Branch: refs/heads/YARN-7402
Commit: d98a2e6e2383f8b66def346409b0517aa32d298d
Parents: d9006d8
Author: Yiqun Lin <yq...@apache.org>
Authored: Wed Jan 10 13:59:11 2018 +0800
Committer: Yiqun Lin <yq...@apache.org>
Committed: Wed Jan 10 13:59:11 2018 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 9 +
.../hdfs/server/federation/router/Quota.java | 208 +++++++++
.../hdfs/server/federation/router/Router.java | 38 +-
.../federation/router/RouterQuotaManager.java | 160 +++++++
.../router/RouterQuotaUpdateService.java | 228 ++++++++++
.../federation/router/RouterQuotaUsage.java | 88 ++++
.../federation/router/RouterRpcServer.java | 40 +-
.../federation/store/records/MountTable.java | 22 +
.../store/records/impl/pb/MountTablePBImpl.java | 32 ++
.../hadoop/hdfs/server/namenode/Quota.java | 2 +-
.../hdfs/tools/federation/RouterAdmin.java | 133 +++++-
.../src/main/proto/FederationProtocol.proto | 3 +
.../src/main/resources/hdfs-default.xml | 20 +
.../server/federation/RouterConfigBuilder.java | 12 +
.../federation/router/TestRouterAdminCLI.java | 54 +++
.../federation/router/TestRouterQuota.java | 452 +++++++++++++++++++
.../router/TestRouterQuotaManager.java | 113 +++++
.../store/records/TestMountTable.java | 41 ++
18 files changed, 1639 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index a380833..2825cc9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1322,6 +1322,15 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_ROUTER_HTTPS_ADDRESS_DEFAULT =
"0.0.0.0:" + DFS_ROUTER_HTTPS_PORT_DEFAULT;
+ // HDFS Router-based federation quota
+ public static final String DFS_ROUTER_QUOTA_ENABLE =
+ FEDERATION_ROUTER_PREFIX + "quota.enable";
+ public static final boolean DFS_ROUTER_QUOTA_ENABLED_DEFAULT = false;
+ public static final String DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL =
+ FEDERATION_ROUTER_PREFIX + "quota-cache.update.interval";
+ public static final long DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL_DEFAULT =
+ 60000;
+
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
new file mode 100644
index 0000000..f5e5272
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.fs.QuotaUsage;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Module that implements the quota relevant RPC calls
+ * {@link ClientProtocol#setQuota(String, long, long, StorageType)}
+ * and
+ * {@link ClientProtocol#getQuotaUsage(String)}
+ * in the {@link RouterRpcServer}.
+ */
+public class Quota {
+ private static final Logger LOG = LoggerFactory.getLogger(Quota.class);
+
+ /** RPC server to receive client calls. */
+ private final RouterRpcServer rpcServer;
+ /** RPC clients to connect to the Namenodes. */
+ private final RouterRpcClient rpcClient;
+ /** Router used in RouterRpcServer. */
+ private final Router router;
+
+ public Quota(Router router, RouterRpcServer server) {
+ this.router = router;
+ this.rpcServer = server;
+ this.rpcClient = server.getRPCClient();
+ }
+
+ /**
+ * Set quota for the federation path.
+ * @param path Federation path.
+ * @param namespaceQuota Name space quota.
+ * @param storagespaceQuota Storage space quota.
+ * @param type StorageType that the space quota is intended to be set on.
+ * @throws IOException
+ */
+ public void setQuota(String path, long namespaceQuota,
+ long storagespaceQuota, StorageType type) throws IOException {
+ rpcServer.checkOperation(OperationCategory.WRITE);
+
+ // Set quota for current path and its children mount table path.
+ final List<RemoteLocation> locations = getQuotaRemoteLocations(path);
+ if (LOG.isDebugEnabled()) {
+ for (RemoteLocation loc : locations) {
+ LOG.debug("Set quota for path: nsId: {}, dest: {}.",
+ loc.getNameserviceId(), loc.getDest());
+ }
+ }
+
+ RemoteMethod method = new RemoteMethod("setQuota",
+ new Class<?>[] {String.class, long.class, long.class,
+ StorageType.class},
+ new RemoteParam(), namespaceQuota, storagespaceQuota, type);
+ rpcClient.invokeConcurrent(locations, method, false, false);
+ }
+
+ /**
+ * Get quota usage for the federation path.
+ * @param path Federation path.
+ * @return Aggregated quota.
+ * @throws IOException
+ */
+ public QuotaUsage getQuotaUsage(String path) throws IOException {
+ final List<RemoteLocation> quotaLocs = getValidQuotaLocations(path);
+ RemoteMethod method = new RemoteMethod("getQuotaUsage",
+ new Class<?>[] {String.class}, new RemoteParam());
+ Map<RemoteLocation, Object> results = rpcClient.invokeConcurrent(quotaLocs,
+ method, true, false);
+
+ return aggregateQuota(results);
+ }
+
+ /**
+ * Get valid quota remote locations used in {@link #getQuotaUsage(String)}.
+ * Differentiate the method {@link #getQuotaRemoteLocations(String)}, this
+ * method will do some additional filtering.
+ * @param path Federation path.
+ * @return List of valid quota remote locations.
+ * @throws IOException
+ */
+ private List<RemoteLocation> getValidQuotaLocations(String path)
+ throws IOException {
+ final List<RemoteLocation> locations = getQuotaRemoteLocations(path);
+
+ // NameService -> Locations
+ Map<String, List<RemoteLocation>> validLocations = new HashMap<>();
+ for (RemoteLocation loc : locations) {
+ String nsId = loc.getNameserviceId();
+ List<RemoteLocation> dests = validLocations.get(nsId);
+ if (dests == null) {
+ dests = new LinkedList<>();
+ dests.add(loc);
+ validLocations.put(nsId, dests);
+ } else {
+ // Ensure the paths in the same nameservice is different.
+ // Don't include parent-child paths.
+ boolean isChildPath = false;
+ for (RemoteLocation d : dests) {
+ if (loc.getDest().startsWith(d.getDest())) {
+ isChildPath = true;
+ break;
+ }
+ }
+
+ if (!isChildPath) {
+ dests.add(loc);
+ }
+ }
+ }
+
+ List<RemoteLocation> quotaLocs = new LinkedList<>();
+ for (List<RemoteLocation> locs : validLocations.values()) {
+ quotaLocs.addAll(locs);
+ }
+
+ return quotaLocs;
+ }
+
+ /**
+ * Aggregate quota that queried from sub-clusters.
+ * @param results Quota query result.
+ * @return Aggregated Quota.
+ */
+ private QuotaUsage aggregateQuota(Map<RemoteLocation, Object> results) {
+ long nsCount = 0;
+ long ssCount = 0;
+ boolean hasQuotaUnSet = false;
+
+ for (Map.Entry<RemoteLocation, Object> entry : results.entrySet()) {
+ RemoteLocation loc = entry.getKey();
+ QuotaUsage usage = (QuotaUsage) entry.getValue();
+ if (usage != null) {
+ // If quota is not set in real FileSystem, the usage
+ // value will return -1.
+ if (usage.getQuota() == -1 && usage.getSpaceQuota() == -1) {
+ hasQuotaUnSet = true;
+ }
+
+ nsCount += usage.getFileAndDirectoryCount();
+ ssCount += usage.getSpaceConsumed();
+ LOG.debug(
+ "Get quota usage for path: nsId: {}, dest: {},"
+ + " nsCount: {}, ssCount: {}.",
+ loc.getNameserviceId(), loc.getDest(),
+ usage.getFileAndDirectoryCount(), usage.getSpaceConsumed());
+ }
+ }
+
+ QuotaUsage.Builder builder = new QuotaUsage.Builder()
+ .fileAndDirectoryCount(nsCount).spaceConsumed(ssCount);
+ if (hasQuotaUnSet) {
+ builder.quota(HdfsConstants.QUOTA_DONT_SET);
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Get all quota remote locations across subclusters under given
+ * federation path.
+ * @param path Federation path.
+ * @return List of quota remote locations.
+ * @throws IOException
+ */
+ private List<RemoteLocation> getQuotaRemoteLocations(String path)
+ throws IOException {
+ List<RemoteLocation> locations = new LinkedList<>();
+ RouterQuotaManager manager = this.router.getQuotaManager();
+ if (manager != null) {
+ Set<String> childrenPaths = manager.getPaths(path);
+ for (String childPath : childrenPaths) {
+ locations.addAll(rpcServer.getLocationsForPath(childPath, true));
+ }
+ }
+
+ return locations;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
index 413566e..ea8a1c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
@@ -45,6 +45,8 @@ import org.apache.hadoop.util.JvmPauseMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Router that provides a unified view of multiple federated HDFS clusters. It
* has two main roles: (1) federated interface and (2) NameNode heartbeat.
@@ -105,7 +107,10 @@ public class Router extends CompositeService {
/** JVM pauses (GC and others). */
private JvmPauseMonitor pauseMonitor;
-
+ /** Quota usage update service. */
+ private RouterQuotaUpdateService quotaUpdateService;
+ /** Quota cache manager. */
+ private RouterQuotaManager quotaManager;
/////////////////////////////////////////////////////////
// Constructor
@@ -200,6 +205,14 @@ public class Router extends CompositeService {
this.pauseMonitor.init(conf);
}
+ // Initial quota relevant service
+ if (conf.getBoolean(DFSConfigKeys.DFS_ROUTER_QUOTA_ENABLE,
+ DFSConfigKeys.DFS_ROUTER_QUOTA_ENABLED_DEFAULT)) {
+ this.quotaManager = new RouterQuotaManager();
+ this.quotaUpdateService = new RouterQuotaUpdateService(this);
+ addService(this.quotaUpdateService);
+ }
+
super.serviceInit(conf);
}
@@ -524,4 +537,27 @@ public class Router extends CompositeService {
this.namenodeResolver.setRouterId(this.routerId);
}
}
+
+ /**
+ * If the quota system is enabled in Router.
+ */
+ public boolean isQuotaEnabled() {
+ return this.quotaManager != null;
+ }
+
+ /**
+ * Get route quota manager.
+ * @return RouterQuotaManager Quota manager.
+ */
+ public RouterQuotaManager getQuotaManager() {
+ return this.quotaManager;
+ }
+
+ /**
+ * Get quota cache update service.
+ */
+ @VisibleForTesting
+ RouterQuotaUpdateService getQuotaCacheUpdateService() {
+ return this.quotaUpdateService;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java
new file mode 100644
index 0000000..fc3575c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+
+/**
+ * Router quota manager in Router. The manager maintains
+ * {@link RouterQuotaUsage} cache of mount tables and do management
+ * for the quota caches.
+ */
+public class RouterQuotaManager {
+ /** Quota usage <MountTable Path, Aggregated QuotaUsage> cache. */
+ private TreeMap<String, RouterQuotaUsage> cache;
+
+ /** Lock to access the quota cache. */
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ private final Lock readLock = readWriteLock.readLock();
+ private final Lock writeLock = readWriteLock.writeLock();
+
+ public RouterQuotaManager() {
+ this.cache = new TreeMap<>();
+ }
+
+ /**
+ * Get all the mount quota paths.
+ */
+ public Set<String> getAll() {
+ readLock.lock();
+ try {
+ return this.cache.keySet();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Get the nearest ancestor's quota usage, and meanwhile its quota was set.
+ * @param path The path being written.
+ * @return RouterQuotaUsage Quota usage.
+ */
+ public RouterQuotaUsage getQuotaUsage(String path) {
+ readLock.lock();
+ try {
+ RouterQuotaUsage quotaUsage = this.cache.get(path);
+ if (quotaUsage != null && isQuotaSet(quotaUsage)) {
+ return quotaUsage;
+ }
+
+ // If not found, look for its parent path usage value.
+ int pos = path.lastIndexOf(Path.SEPARATOR);
+ if (pos != -1) {
+ String parentPath = path.substring(0, pos);
+ return getQuotaUsage(parentPath);
+ }
+ } finally {
+ readLock.unlock();
+ }
+
+ return null;
+ }
+
+ /**
+ * Get children paths (can including itself) under specified federation path.
+ * @param parentPath
+ * @return Set<String> Children path set.
+ */
+ public Set<String> getPaths(String parentPath) {
+ readLock.lock();
+ try {
+ String from = parentPath;
+ String to = parentPath + Character.MAX_VALUE;
+ SortedMap<String, RouterQuotaUsage> subMap = this.cache.subMap(from, to);
+ return subMap.keySet();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Put new entity into cache.
+ * @param path Mount table path.
+ * @param quotaUsage Corresponding cache value.
+ */
+ public void put(String path, RouterQuotaUsage quotaUsage) {
+ writeLock.lock();
+ try {
+ this.cache.put(path, quotaUsage);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Remove the entity from cache.
+ * @param path Mount table path.
+ */
+ public void remove(String path) {
+ writeLock.lock();
+ try {
+ this.cache.remove(path);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Clean up the cache.
+ */
+ public void clear() {
+ writeLock.lock();
+ try {
+ this.cache.clear();
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Check if the quota was set.
+ * @param quota RouterQuotaUsage set in mount table.
+ */
+ public boolean isQuotaSet(RouterQuotaUsage quota) {
+ if (quota != null) {
+ long nsQuota = quota.getQuota();
+ long ssQuota = quota.getSpaceQuota();
+
+ // once nsQuota or ssQuota was set, this mount table is quota set
+ if (nsQuota != HdfsConstants.QUOTA_DONT_SET
+ || ssQuota != HdfsConstants.QUOTA_DONT_SET) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
new file mode 100644
index 0000000..80abc11
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.QuotaUsage;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to periodically update the {@link RouterQuotaUsage}
+ * cached information in the {@link Router} and update corresponding
+ * mount table in State Store.
+ */
+public class RouterQuotaUpdateService extends PeriodicService {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RouterQuotaUpdateService.class);
+
+ private MountTableStore mountTableStore;
+ private RouterRpcServer rpcServer;
+ /** Router using this Service. */
+ private final Router router;
+ /** Router Quota manager. */
+ private RouterQuotaManager quotaManager;
+
+ public RouterQuotaUpdateService(final Router router) throws IOException {
+ super(RouterQuotaUpdateService.class.getName());
+ this.router = router;
+ this.rpcServer = router.getRpcServer();
+ this.quotaManager = router.getQuotaManager();
+
+ if (this.quotaManager == null) {
+ throw new IOException("Router quota manager is not initialized.");
+ }
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ this.setIntervalMs(conf.getTimeDuration(
+ DFSConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL,
+ DFSConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS));
+
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void periodicInvoke() {
+ LOG.debug("Start to update quota cache.");
+ try {
+ List<MountTable> updateMountTables = new LinkedList<>();
+ List<MountTable> mountTables = getQuotaSetMountTables();
+ for (MountTable entry : mountTables) {
+ String src = entry.getSourcePath();
+ RouterQuotaUsage oldQuota = entry.getQuota();
+ long nsQuota = oldQuota.getQuota();
+ long ssQuota = oldQuota.getSpaceQuota();
+ // Call RouterRpcServer#getQuotaUsage for getting current quota usage.
+ QuotaUsage currentQuotaUsage = this.rpcServer.getQuotaModule()
+ .getQuotaUsage(src);
+ // If quota is not set in some subclusters under federation path,
+ // set quota for this path.
+ if (currentQuotaUsage.getQuota() == HdfsConstants.QUOTA_DONT_SET) {
+ this.rpcServer.setQuota(src, nsQuota, ssQuota, null);
+ }
+
+ RouterQuotaUsage newQuota = generateNewQuota(oldQuota,
+ currentQuotaUsage);
+ this.quotaManager.put(src, newQuota);
+ entry.setQuota(newQuota);
+
+ // only update mount tables which quota was changed
+ if (!oldQuota.equals(newQuota)) {
+ updateMountTables.add(entry);
+
+ LOG.debug(
+ "Update quota usage entity of path: {}, nsCount: {},"
+ + " nsQuota: {}, ssCount: {}, ssQuota: {}.",
+ src, newQuota.getFileAndDirectoryCount(),
+ newQuota.getQuota(), newQuota.getSpaceConsumed(),
+ newQuota.getSpaceQuota());
+ }
+ }
+
+ updateMountTableEntries(updateMountTables);
+ } catch (IOException e) {
+ LOG.error("Quota cache updated error.", e);
+ }
+ }
+
+ /**
+ * Get mount table store management interface.
+ * @return MountTableStore instance.
+ * @throws IOException
+ */
+ private MountTableStore getMountTableStore() throws IOException {
+ if (this.mountTableStore == null) {
+ this.mountTableStore = router.getStateStore().getRegisteredRecordStore(
+ MountTableStore.class);
+ if (this.mountTableStore == null) {
+ throw new IOException("Mount table state store is not available.");
+ }
+ }
+ return this.mountTableStore;
+ }
+
+ /**
+ * Get all the existing mount tables.
+ * @return List of mount tables.
+ * @throws IOException
+ */
+ private List<MountTable> getMountTableEntries() throws IOException {
+ // scan mount tables from root path
+ GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
+ .newInstance("/");
+ GetMountTableEntriesResponse getResponse = getMountTableStore()
+ .getMountTableEntries(getRequest);
+ return getResponse.getEntries();
+ }
+
+ /**
+ * Get mount tables which quota was set.
+ * During this time, the quota usage cache will also be updated by
+ * quota manager:
+ * 1. Stale paths (entries) will be removed.
+ * 2. Existing entries will be override and updated.
+ * @return List of mount tables which quota was set.
+ * @throws IOException
+ */
+ private List<MountTable> getQuotaSetMountTables() throws IOException {
+ List<MountTable> mountTables = getMountTableEntries();
+ Set<String> stalePaths = new HashSet<>();
+ for (String path : this.quotaManager.getAll()) {
+ stalePaths.add(path);
+ }
+
+ List<MountTable> neededMountTables = new LinkedList<>();
+ for (MountTable entry : mountTables) {
+ // select mount tables which is quota set
+ if (isQuotaSet(entry)) {
+ neededMountTables.add(entry);
+ }
+
+ // update mount table entries info in quota cache
+ String src = entry.getSourcePath();
+ this.quotaManager.put(src, entry.getQuota());
+ stalePaths.remove(src);
+ }
+
+ // remove stale paths that currently cached
+ for (String stalePath : stalePaths) {
+ this.quotaManager.remove(stalePath);
+ }
+
+ return neededMountTables;
+ }
+
+ /**
+ * Check if the quota was set in given MountTable.
+ * @param mountTable Mount table entry.
+ */
+ private boolean isQuotaSet(MountTable mountTable) {
+ if (mountTable != null) {
+ return this.quotaManager.isQuotaSet(mountTable.getQuota());
+ }
+ return false;
+ }
+
+ /**
+ * Generate a new quota based on old quota and current quota usage value.
+ * @param oldQuota Old quota stored in State Store.
+ * @param currentQuotaUsage Current quota usage value queried from
+ * subcluster.
+ * @return A new RouterQuotaUsage.
+ */
+ private RouterQuotaUsage generateNewQuota(RouterQuotaUsage oldQuota,
+ QuotaUsage currentQuotaUsage) {
+ RouterQuotaUsage newQuota = new RouterQuotaUsage.Builder()
+ .fileAndDirectoryCount(currentQuotaUsage.getFileAndDirectoryCount())
+ .quota(oldQuota.getQuota())
+ .spaceConsumed(currentQuotaUsage.getSpaceConsumed())
+ .spaceQuota(oldQuota.getSpaceQuota()).build();
+ return newQuota;
+ }
+
+ /**
+ * Write out updated mount table entries into State Store.
+ * @param updateMountTables Mount tables to be updated.
+ * @throws IOException
+ */
+ private void updateMountTableEntries(List<MountTable> updateMountTables)
+ throws IOException {
+ for (MountTable entry : updateMountTables) {
+ UpdateMountTableEntryRequest updateRequest = UpdateMountTableEntryRequest
+ .newInstance(entry);
+ getMountTableStore().updateMountTableEntry(updateRequest);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java
new file mode 100644
index 0000000..55bfc48
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.fs.QuotaUsage;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature;
+import org.apache.hadoop.hdfs.server.namenode.Quota;
+
+/**
+ * The subclass of {@link QuotaUsage} used in Router-based federation.
+ */
+public final class RouterQuotaUsage extends QuotaUsage {
+ private RouterQuotaUsage(Builder builder) {
+ super(builder);
+ }
+
+ /** Build the instance based on the builder. */
+ public static class Builder extends QuotaUsage.Builder {
+
+ public RouterQuotaUsage build() {
+ return new RouterQuotaUsage(this);
+ }
+
+ @Override
+ public Builder fileAndDirectoryCount(long count) {
+ super.fileAndDirectoryCount(count);
+ return this;
+ }
+
+ @Override
+ public Builder quota(long quota) {
+ super.quota(quota);
+ return this;
+ }
+
+ @Override
+ public Builder spaceConsumed(long spaceConsumed) {
+ super.spaceConsumed(spaceConsumed);
+ return this;
+ }
+
+ @Override
+ public Builder spaceQuota(long spaceQuota) {
+ super.spaceQuota(spaceQuota);
+ return this;
+ }
+ }
+
+ /**
+ * Verify if namespace quota is violated once quota is set. Relevant
+ * method {@link DirectoryWithQuotaFeature#verifyNamespaceQuota}.
+ * @throws NSQuotaExceededException
+ */
+ public void verifyNamespaceQuota() throws NSQuotaExceededException {
+ if (Quota.isViolated(getQuota(), getFileAndDirectoryCount())) {
+ throw new NSQuotaExceededException(getQuota(),
+ getFileAndDirectoryCount());
+ }
+ }
+
+ /**
+ * Verify if storage space quota is violated once quota is set. Relevant
+ * method {@link DirectoryWithQuotaFeature#verifyStoragespaceQuota}.
+ * @throws DSQuotaExceededException
+ */
+ public void verifyStoragespaceQuota() throws DSQuotaExceededException {
+ if (Quota.isViolated(getSpaceQuota(), getSpaceConsumed())) {
+ throw new DSQuotaExceededException(getSpaceQuota(), getSpaceConsumed());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 11f7fa6..73b189e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -181,6 +181,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
/** Category of the operation that a thread is executing. */
private final ThreadLocal<OperationCategory> opCategory = new ThreadLocal<>();
+ /** Router Quota calls. */
+ private final Quota quotaCall;
/**
* Construct a router RPC server.
@@ -277,6 +279,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
// Create the client
this.rpcClient = new RouterRpcClient(this.conf, this.router.getRouterId(),
this.namenodeResolver, this.rpcMonitor);
+
+ // Initialize modules
+ this.quotaCall = new Quota(this.router, this);
}
@Override
@@ -384,7 +389,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
* @throws StandbyException If the Router is in safe mode and cannot serve
* client requests.
*/
- private void checkOperation(OperationCategory op) throws StandbyException {
+ protected void checkOperation(OperationCategory op) throws StandbyException {
// Log the function we are currently calling.
if (rpcMonitor != null) {
rpcMonitor.startOp();
@@ -1839,21 +1844,13 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
@Override // ClientProtocol
public void setQuota(String path, long namespaceQuota, long storagespaceQuota,
StorageType type) throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- // TODO assign global replicas instead of applying them to each folder
- final List<RemoteLocation> locations = getLocationsForPath(path, true);
- RemoteMethod method = new RemoteMethod("setQuota",
- new Class<?>[] {String.class, Long.class, Long.class,
- StorageType.class},
- new RemoteParam(), namespaceQuota, storagespaceQuota, type);
- rpcClient.invokeConcurrent(locations, method, false, false);
+ this.quotaCall.setQuota(path, namespaceQuota, storagespaceQuota, type);
}
@Override // ClientProtocol
public QuotaUsage getQuotaUsage(String path) throws IOException {
- checkOperation(OperationCategory.READ, false);
- return null;
+ checkOperation(OperationCategory.READ);
+ return this.quotaCall.getQuotaUsage(path);
}
@Override
@@ -1996,7 +1993,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
* @return Prioritized list of locations in the federated cluster.
* @throws IOException If the location for this path cannot be determined.
*/
- private List<RemoteLocation> getLocationsForPath(
+ protected List<RemoteLocation> getLocationsForPath(
String path, boolean failIfLocked) throws IOException {
try {
// Check the location for this path
@@ -2016,6 +2013,16 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
}
throw new IOException(path + " is in a read only mount point");
}
+
+ // Check quota
+ if (this.router.isQuotaEnabled()) {
+ RouterQuotaUsage quotaUsage = this.router.getQuotaManager()
+ .getQuotaUsage(path);
+ if (quotaUsage != null) {
+ quotaUsage.verifyNamespaceQuota();
+ quotaUsage.verifyStoragespaceQuota();
+ }
+ }
}
return location.getDestinations();
@@ -2119,4 +2126,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
UserGroupInformation ugi = Server.getRemoteUser();
return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
}
+
+ /**
+ * Get quota module implement.
+ */
+ public Quota getQuotaModule() {
+ return this.quotaCall;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
index 1b5d2d6..53ad1e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
@@ -29,9 +29,11 @@ import java.util.TreeMap;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.router.RouterPermissionChecker;
+import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.security.UserGroupInformation;
@@ -140,6 +142,12 @@ public abstract class MountTable extends BaseRecord {
record.setMode(new FsPermission(
RouterPermissionChecker.MOUNT_TABLE_PERMISSION_DEFAULT));
+ // Set quota for mount table
+ RouterQuotaUsage quota = new RouterQuotaUsage.Builder()
+ .fileAndDirectoryCount(0).quota(HdfsConstants.QUOTA_DONT_SET)
+ .spaceConsumed(0).spaceQuota(HdfsConstants.QUOTA_DONT_SET).build();
+ record.setQuota(quota);
+
// Validate
record.validate();
return record;
@@ -249,6 +257,20 @@ public abstract class MountTable extends BaseRecord {
public abstract void setMode(FsPermission mode);
/**
+ * Get quota of this mount table entry.
+ *
+ * @return RouterQuotaUsage quota usage
+ */
+ public abstract RouterQuotaUsage getQuota();
+
+ /**
+ * Set quota for this mount table entry.
+ *
+ * @param quota QuotaUsage for mount table entry
+ */
+ public abstract void setQuota(RouterQuotaUsage quota);
+
+ /**
* Get the default location.
* @return The default location.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
index 372f209..d9e9555 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
@@ -27,10 +27,12 @@ import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProt
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto.DestOrder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProtoOrBuilder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoteLocationProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.QuotaUsageProto;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.router.RouterAdminServer;
import org.apache.hadoop.hdfs.server.federation.router.RouterPermissionChecker;
+import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.FederationProtocolPBTranslator;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
@@ -250,6 +252,36 @@ public class MountTablePBImpl extends MountTable implements PBRecord {
}
}
+ @Override
+ public RouterQuotaUsage getQuota() {
+ MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
+ if (!proto.hasQuota()) {
+ return null;
+ }
+
+ QuotaUsageProto quotaProto = proto.getQuota();
+ RouterQuotaUsage.Builder builder = new RouterQuotaUsage.Builder()
+ .fileAndDirectoryCount(quotaProto.getFileAndDirectoryCount())
+ .quota(quotaProto.getQuota())
+ .spaceConsumed(quotaProto.getSpaceConsumed())
+ .spaceQuota(quotaProto.getSpaceQuota());
+ return builder.build();
+ }
+
+ @Override
+ public void setQuota(RouterQuotaUsage quota) {
+ Builder builder = this.translator.getBuilder();
+ if (quota == null) {
+ builder.clearQuota();
+ } else {
+ QuotaUsageProto quotaUsage = QuotaUsageProto.newBuilder()
+ .setFileAndDirectoryCount(quota.getFileAndDirectoryCount())
+ .setQuota(quota.getQuota()).setSpaceConsumed(quota.getSpaceConsumed())
+ .setSpaceQuota(quota.getSpaceQuota()).build();
+ builder.setQuota(quotaUsage);
+ }
+ }
+
private DestinationOrder convert(DestOrder order) {
switch (order) {
case LOCAL:
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java
index 6d20e6c..5e708be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java
@@ -49,7 +49,7 @@ public enum Quota {
* Is quota violated?
* The quota is violated if quota is set and usage > quota.
*/
- static boolean isViolated(final long quota, final long usage) {
+ public static boolean isViolated(final long quota, final long usage) {
return quota >= 0 && usage > quota;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
index fd961f2..0681ed5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
@@ -29,10 +29,12 @@ import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
@@ -80,7 +82,9 @@ public class RouterAdmin extends Configured implements Tool {
+ "\t[-add <source> <nameservice> <destination> "
+ "[-readonly] -owner <owner> -group <group> -mode <mode>]\n"
+ "\t[-rm <source>]\n"
- + "\t[-ls <path>]\n";
+ + "\t[-ls <path>]\n"
+ + "\t[-setQuota <path> -ns <nsQuota> -ss <ssQuota>]\n"
+ + "\t[-clrQuota <path>\n";
System.out.println(usage);
}
@@ -109,6 +113,18 @@ public class RouterAdmin extends Configured implements Tool {
printUsage();
return exitCode;
}
+ } else if ("-setQuota".equalsIgnoreCase(cmd)) {
+ if (argv.length < 4) {
+ System.err.println("Not enough parameters specificed for cmd " + cmd);
+ printUsage();
+ return exitCode;
+ }
+ } else if ("-clrQuota".equalsIgnoreCase(cmd)) {
+ if (argv.length < 2) {
+ System.err.println("Not enough parameters specificed for cmd " + cmd);
+ printUsage();
+ return exitCode;
+ }
}
// Initialize RouterClient
@@ -144,6 +160,16 @@ public class RouterAdmin extends Configured implements Tool {
} else {
listMounts("/");
}
+ } else if ("-setQuota".equals(cmd)) {
+ if (setQuota(argv, i)) {
+ System.out.println(
+ "Successfully set quota for mount point " + argv[i]);
+ }
+ } else if ("-clrQuota".equals(cmd)) {
+ if (clrQuota(argv[i])) {
+ System.out.println(
+ "Successfully clear quota for mount point " + argv[i]);
+ }
} else {
printUsage();
return exitCode;
@@ -389,6 +415,111 @@ public class RouterAdmin extends Configured implements Tool {
}
/**
+ * Set quota for a mount table entry.
+ *
+ * @param parameters Parameters of the quota.
+ * @param i Index in the parameters.
+ */
+ private boolean setQuota(String[] parameters, int i) throws IOException {
+ long nsQuota = HdfsConstants.QUOTA_DONT_SET;
+ long ssQuota = HdfsConstants.QUOTA_DONT_SET;
+
+ String mount = parameters[i++];
+ while (i < parameters.length) {
+ if (parameters[i].equals("-nsQuota")) {
+ i++;
+ try {
+ nsQuota = Long.parseLong(parameters[i]);
+ } catch (Exception e) {
+ System.err.println("Cannot parse nsQuota: " + parameters[i]);
+ }
+ } else if (parameters[i].equals("-ssQuota")) {
+ i++;
+ try {
+ ssQuota = Long.parseLong(parameters[i]);
+ } catch (Exception e) {
+ System.err.println("Cannot parse ssQuota: " + parameters[i]);
+ }
+ }
+
+ i++;
+ }
+
+ if (nsQuota <= 0 || ssQuota <= 0) {
+ System.err.println("Input quota value should be a positive number.");
+ return false;
+ }
+
+ return updateQuota(mount, nsQuota, ssQuota);
+ }
+
+ /**
+ * Clear quota of the mount point.
+ *
+ * @param mount Mount table to clear
+ * @return If the quota was cleared.
+ * @throws IOException Error clearing the mount point.
+ */
+ private boolean clrQuota(String mount) throws IOException {
+ return updateQuota(mount, HdfsConstants.QUOTA_DONT_SET,
+ HdfsConstants.QUOTA_DONT_SET);
+ }
+
+ /**
+ * Update quota of specified mount table.
+ *
+ * @param mount Specified mount table to update.
+ * @param nsQuota Namespace quota.
+ * @param ssQuota Storage space quota.
+ * @return If the quota was updated.
+ * @throws IOException Error updating quota.
+ */
+ private boolean updateQuota(String mount, long nsQuota, long ssQuota)
+ throws IOException {
+ // Get existing entry
+ MountTableManager mountTable = client.getMountTableManager();
+ GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
+ .newInstance(mount);
+ GetMountTableEntriesResponse getResponse = mountTable
+ .getMountTableEntries(getRequest);
+ List<MountTable> results = getResponse.getEntries();
+ MountTable existingEntry = null;
+ for (MountTable result : results) {
+ if (mount.equals(result.getSourcePath())) {
+ existingEntry = result;
+ break;
+ }
+ }
+
+ if (existingEntry == null) {
+ return false;
+ } else {
+ long nsCount = existingEntry.getQuota().getFileAndDirectoryCount();
+ long ssCount = existingEntry.getQuota().getSpaceConsumed();
+ // If nsQuota or ssQuota was unset, reset corresponding usage
+ // value to zero.
+ if (nsQuota == HdfsConstants.QUOTA_DONT_SET) {
+ nsCount = 0;
+ }
+
+ if (nsQuota == HdfsConstants.QUOTA_DONT_SET) {
+ ssCount = 0;
+ }
+
+ RouterQuotaUsage updatedQuota = new RouterQuotaUsage.Builder()
+ .fileAndDirectoryCount(nsCount).quota(nsQuota)
+ .spaceConsumed(ssCount).spaceQuota(ssQuota).build();
+ existingEntry.setQuota(updatedQuota);
+ }
+
+ UpdateMountTableEntryRequest updateRequest =
+ UpdateMountTableEntryRequest.newInstance(existingEntry);
+ UpdateMountTableEntryResponse updateResponse = mountTable
+ .updateMountTableEntry(updateRequest);
+ return updateResponse.getStatus();
+ }
+
+ /**
* Inner class that stores ACL info of mount table.
*/
static class ACLEntity {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto
index 043a21a..5d9b9d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto
@@ -22,6 +22,7 @@ option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.hdfs;
+import "hdfs.proto";
/////////////////////////////////////////////////
// Membership
@@ -134,6 +135,8 @@ message MountTableRecordProto {
optional string ownerName = 10;
optional string groupName = 11;
optional int32 mode = 12;
+
+ optional QuotaUsageProto quota = 13;
}
message AddMountTableEntryRequestProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 4ca7b58..cd365be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -5126,4 +5126,24 @@
</description>
</property>
+ <property>
+ <name>dfs.federation.router.quota.enable</name>
+ <value>false</value>
+ <description>
+ Set to true to enable quota system in Router.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.federation.router.quota-cache.update.interval</name>
+ <value>60s</value>
+ <description>
+ Interval time for updating quota usage cache in Router.
+ This property is used only if the value of
+ dfs.federation.router.quota.enable is true.
+ This setting supports multiple time unit suffixes as described
+ in dfs.heartbeat.interval. If no suffix is specified then milliseconds
+ is assumed.
+ </description>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
index 88da77e..3d8b35c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
@@ -34,6 +34,7 @@ public class RouterConfigBuilder {
private boolean enableLocalHeartbeat = false;
private boolean enableStateStore = false;
private boolean enableMetrics = false;
+ private boolean enableQuota = false;
public RouterConfigBuilder(Configuration configuration) {
this.conf = configuration;
@@ -89,6 +90,11 @@ public class RouterConfigBuilder {
return this;
}
+ public RouterConfigBuilder quota(boolean enable) {
+ this.enableQuota = enable;
+ return this;
+ }
+
public RouterConfigBuilder rpc() {
return this.rpc(true);
}
@@ -113,6 +119,10 @@ public class RouterConfigBuilder {
return this.metrics(true);
}
+ public RouterConfigBuilder quota() {
+ return this.quota(true);
+ }
+
public Configuration build() {
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_STORE_ENABLE,
this.enableStateStore);
@@ -127,6 +137,8 @@ public class RouterConfigBuilder {
this.enableLocalHeartbeat);
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE,
this.enableMetrics);
+ conf.setBoolean(DFSConfigKeys.DFS_ROUTER_QUOTA_ENABLE,
+ this.enableQuota);
return conf;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
index 9e82967..ec47a41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
@@ -28,6 +28,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
@@ -294,4 +295,57 @@ public class TestRouterAdminCLI {
argv = new String[] {"-rm", mount};
assertEquals(rmCommandCode, ToolRunner.run(admin, argv));
}
+
+ @Test
+ public void testSetAndClearQuota() throws Exception {
+ String nsId = "ns0";
+ String src = "/test-QuotaMounttable";
+ String dest = "/QuotaMounttable";
+ String[] argv = new String[] {"-add", src, nsId, dest};
+ assertEquals(0, ToolRunner.run(admin, argv));
+
+ stateStore.loadCache(MountTableStoreImpl.class, true);
+ GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
+ .newInstance(src);
+ GetMountTableEntriesResponse getResponse = client.getMountTableManager()
+ .getMountTableEntries(getRequest);
+ MountTable mountTable = getResponse.getEntries().get(0);
+ RouterQuotaUsage quotaUsage = mountTable.getQuota();
+
+ // verify the default quota set
+ assertEquals(0, quotaUsage.getFileAndDirectoryCount());
+ assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getQuota());
+ assertEquals(0, quotaUsage.getSpaceConsumed());
+ assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getSpaceQuota());
+
+ long nsQuota = 50;
+ long ssQuota = 100;
+ argv = new String[] {"-setQuota", src, "-nsQuota", String.valueOf(nsQuota),
+ "-ssQuota", String.valueOf(ssQuota)};
+ assertEquals(0, ToolRunner.run(admin, argv));
+
+ stateStore.loadCache(MountTableStoreImpl.class, true);
+ getResponse = client.getMountTableManager()
+ .getMountTableEntries(getRequest);
+ mountTable = getResponse.getEntries().get(0);
+ quotaUsage = mountTable.getQuota();
+
+ // verify if the quota is set
+ assertEquals(nsQuota, quotaUsage.getQuota());
+ assertEquals(ssQuota, quotaUsage.getSpaceQuota());
+
+ // test clrQuota command
+ argv = new String[] {"-clrQuota", src};
+ assertEquals(0, ToolRunner.run(admin, argv));
+
+ stateStore.loadCache(MountTableStoreImpl.class, true);
+ getResponse = client.getMountTableManager()
+ .getMountTableEntries(getRequest);
+ mountTable = getResponse.getEntries().get(0);
+ quotaUsage = mountTable.getQuota();
+
+ // verify if quota unset successfully
+ assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getQuota());
+ assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getSpaceQuota());
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java
new file mode 100644
index 0000000..368b5e2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java
@@ -0,0 +1,452 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.QuotaUsage;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Supplier;
+
+/**
+ * Tests quota behaviors in Router-based Federation.
+ */
+public class TestRouterQuota {
+ private static StateStoreDFSCluster cluster;
+ private static NamenodeContext nnContext1;
+ private static NamenodeContext nnContext2;
+ private static RouterContext routerContext;
+ private static MountTableResolver resolver;
+
+ private static final int BLOCK_SIZE = 512;
+
+ @Before
+ public void setUp() throws Exception {
+
+ // Build and start a federated cluster
+ cluster = new StateStoreDFSCluster(false, 2);
+ Configuration routerConf = new RouterConfigBuilder()
+ .stateStore()
+ .admin()
+ .quota()
+ .rpc()
+ .build();
+ routerConf.set(DFSConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL, "2s");
+
+ // override some hdfs settings that used in testing space quota
+ Configuration hdfsConf = new Configuration(false);
+ hdfsConf.setInt(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ hdfsConf.setInt(HdfsClientConfigKeys.DFS_REPLICATION_KEY, 1);
+
+ cluster.addRouterOverrides(routerConf);
+ cluster.addNamenodeOverrides(hdfsConf);
+ cluster.startCluster();
+ cluster.startRouters();
+ cluster.waitClusterUp();
+
+ nnContext1 = cluster.getNamenode(cluster.getNameservices().get(0), null);
+ nnContext2 = cluster.getNamenode(cluster.getNameservices().get(1), null);
+ routerContext = cluster.getRandomRouter();
+ Router router = routerContext.getRouter();
+ resolver = (MountTableResolver) router.getSubclusterResolver();
+ }
+
+ @After
+ public void tearDown() {
+ if (cluster != null) {
+ cluster.stopRouter(routerContext);
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ @Test
+ public void testNamespaceQuotaExceed() throws Exception {
+ long nsQuota = 3;
+ final FileSystem nnFs1 = nnContext1.getFileSystem();
+ final FileSystem nnFs2 = nnContext2.getFileSystem();
+
+ // Add two mount tables:
+ // /nsquota --> ns0---testdir1
+ // /nsquota/subdir --> ns1---testdir2
+ nnFs1.mkdirs(new Path("/testdir1"));
+ nnFs2.mkdirs(new Path("/testdir2"));
+ MountTable mountTable1 = MountTable.newInstance("/nsquota",
+ Collections.singletonMap("ns0", "/testdir1"));
+
+ mountTable1.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota).build());
+ addMountTable(mountTable1);
+
+ MountTable mountTable2 = MountTable.newInstance("/nsquota/subdir",
+ Collections.singletonMap("ns1", "/testdir2"));
+ mountTable2.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota).build());
+ addMountTable(mountTable2);
+
+ final FileSystem routerFs = routerContext.getFileSystem();
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+
+ @Override
+ public Boolean get() {
+ boolean isNsQuotaViolated = false;
+ try {
+ // create new directory to trigger NSQuotaExceededException
+ routerFs.mkdirs(new Path("/nsquota/" + UUID.randomUUID()));
+ routerFs.mkdirs(new Path("/nsquota/subdir/" + UUID.randomUUID()));
+ } catch (NSQuotaExceededException e) {
+ isNsQuotaViolated = true;
+ } catch (IOException ignored) {
+ }
+ return isNsQuotaViolated;
+ }
+ }, 5000, 60000);
+ // mkdir in real FileSystem should be okay
+ nnFs1.mkdirs(new Path("/testdir1/" + UUID.randomUUID()));
+ nnFs2.mkdirs(new Path("/testdir2/" + UUID.randomUUID()));
+ }
+
+ @Test
+ public void testStorageSpaceQuotaaExceed() throws Exception {
+ long ssQuota = 3071;
+ final FileSystem nnFs1 = nnContext1.getFileSystem();
+ final FileSystem nnFs2 = nnContext2.getFileSystem();
+
+ // Add two mount tables:
+ // /ssquota --> ns0---testdir3
+ // /ssquota/subdir --> ns1---testdir4
+ nnFs1.mkdirs(new Path("/testdir3"));
+ nnFs2.mkdirs(new Path("/testdir4"));
+ MountTable mountTable1 = MountTable.newInstance("/ssquota",
+ Collections.singletonMap("ns0", "/testdir3"));
+
+ mountTable1
+ .setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota).build());
+ addMountTable(mountTable1);
+
+ MountTable mountTable2 = MountTable.newInstance("/ssquota/subdir",
+ Collections.singletonMap("ns1", "/testdir4"));
+ mountTable2
+ .setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota).build());
+ addMountTable(mountTable2);
+
+ DFSClient routerClient = routerContext.getClient();
+ routerClient.create("/ssquota/file", true).close();
+ routerClient.create("/ssquota/subdir/file", true).close();
+
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+
+ @Override
+ public Boolean get() {
+ boolean isDsQuotaViolated = false;
+ try {
+ // append data to trigger NSQuotaExceededException
+ appendData("/ssquota/file", routerClient, BLOCK_SIZE);
+ appendData("/ssquota/subdir/file", routerClient, BLOCK_SIZE);
+ } catch (DSQuotaExceededException e) {
+ isDsQuotaViolated = true;
+ } catch (IOException ignored) {
+ }
+ return isDsQuotaViolated;
+ }
+ }, 5000, 60000);
+
+ // append data to destination path in real FileSystem should be okay
+ appendData("/testdir3/file", nnContext1.getClient(), BLOCK_SIZE);
+ appendData("/testdir4/file", nnContext2.getClient(), BLOCK_SIZE);
+ }
+
+ /**
+ * Add a mount table entry to the mount table through the admin API.
+ * @param entry Mount table entry to add.
+ * @return If it was successfully added.
+ * @throws IOException Problems adding entries.
+ */
+ private boolean addMountTable(final MountTable entry) throws IOException {
+ RouterClient client = routerContext.getAdminClient();
+ MountTableManager mountTableManager = client.getMountTableManager();
+ AddMountTableEntryRequest addRequest =
+ AddMountTableEntryRequest.newInstance(entry);
+ AddMountTableEntryResponse addResponse =
+ mountTableManager.addMountTableEntry(addRequest);
+
+ // Reload the Router cache
+ resolver.loadCache(true);
+
+ return addResponse.getStatus();
+ }
+
+ /**
+ * Append data in specified file.
+ * @param path Path of file.
+ * @param client DFS Client.
+ * @param dataLen The length of write data.
+ * @throws IOException
+ */
+ private void appendData(String path, DFSClient client, int dataLen)
+ throws IOException {
+ EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.APPEND);
+ HdfsDataOutputStream stream = client.append(path, 1024, createFlag, null,
+ null);
+ byte[] data = new byte[dataLen];
+ stream.write(data);
+ stream.close();
+ }
+
+ @Test
+ public void testSetQuota() throws Exception {
+ long nsQuota = 5;
+ long ssQuota = 100;
+ final FileSystem nnFs1 = nnContext1.getFileSystem();
+ final FileSystem nnFs2 = nnContext2.getFileSystem();
+
+ // Add two mount tables:
+ // /setquota --> ns0---testdir5
+ // /setquota/subdir --> ns1---testdir6
+ nnFs1.mkdirs(new Path("/testdir5"));
+ nnFs2.mkdirs(new Path("/testdir6"));
+ MountTable mountTable1 = MountTable.newInstance("/setquota",
+ Collections.singletonMap("ns0", "/testdir5"));
+ mountTable1
+ .setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
+ .spaceQuota(ssQuota).build());
+ addMountTable(mountTable1);
+
+ // don't set quota for subpath of mount table
+ MountTable mountTable2 = MountTable.newInstance("/setquota/subdir",
+ Collections.singletonMap("ns1", "/testdir6"));
+ addMountTable(mountTable2);
+
+ RouterQuotaUpdateService updateService = routerContext.getRouter()
+ .getQuotaCacheUpdateService();
+ // ensure setQuota RPC call was invoked
+ updateService.periodicInvoke();
+
+ ClientProtocol client1 = nnContext1.getClient().getNamenode();
+ ClientProtocol client2 = nnContext2.getClient().getNamenode();
+ final QuotaUsage quota1 = client1.getQuotaUsage("/testdir5");
+ final QuotaUsage quota2 = client2.getQuotaUsage("/testdir6");
+
+ assertEquals(nsQuota, quota1.getQuota());
+ assertEquals(ssQuota, quota1.getSpaceQuota());
+ assertEquals(nsQuota, quota2.getQuota());
+ assertEquals(ssQuota, quota2.getSpaceQuota());
+ }
+
+ @Test
+ public void testGetQuota() throws Exception {
+ long nsQuota = 10;
+ long ssQuota = 100;
+ final FileSystem nnFs1 = nnContext1.getFileSystem();
+ final FileSystem nnFs2 = nnContext2.getFileSystem();
+
+ // Add two mount tables:
+ // /getquota --> ns0---/testdir7
+ // /getquota/subdir1 --> ns0---/testdir7/subdir
+ // /getquota/subdir2 --> ns1---/testdir8
+ nnFs1.mkdirs(new Path("/testdir7"));
+ nnFs1.mkdirs(new Path("/testdir7/subdir"));
+ nnFs2.mkdirs(new Path("/testdir8"));
+ MountTable mountTable1 = MountTable.newInstance("/getquota",
+ Collections.singletonMap("ns0", "/testdir7"));
+ mountTable1
+ .setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
+ .spaceQuota(ssQuota).build());
+ addMountTable(mountTable1);
+
+ MountTable mountTable2 = MountTable.newInstance("/getquota/subdir1",
+ Collections.singletonMap("ns0", "/testdir7/subdir"));
+ addMountTable(mountTable2);
+
+ MountTable mountTable3 = MountTable.newInstance("/getquota/subdir2",
+ Collections.singletonMap("ns1", "/testdir8"));
+ addMountTable(mountTable3);
+
+ // use router client to create new files
+ DFSClient routerClient = routerContext.getClient();
+ routerClient.create("/getquota/file", true).close();
+ routerClient.create("/getquota/subdir1/file", true).close();
+ routerClient.create("/getquota/subdir2/file", true).close();
+
+ ClientProtocol clientProtocol = routerContext.getClient().getNamenode();
+ RouterQuotaUpdateService updateService = routerContext.getRouter()
+ .getQuotaCacheUpdateService();
+ updateService.periodicInvoke();
+ final QuotaUsage quota = clientProtocol.getQuotaUsage("/getquota");
+ // the quota should be aggregated
+ assertEquals(6, quota.getFileAndDirectoryCount());
+ }
+
+ @Test
+ public void testStaleQuotaRemoving() throws Exception {
+ long nsQuota = 20;
+ long ssQuota = 200;
+ String stalePath = "/stalequota";
+ final FileSystem nnFs1 = nnContext1.getFileSystem();
+
+ // Add one mount tables:
+ // /stalequota --> ns0---/testdir9
+ nnFs1.mkdirs(new Path("/testdir9"));
+ MountTable mountTable = MountTable.newInstance(stalePath,
+ Collections.singletonMap("ns0", "/testdir9"));
+ mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
+ .spaceQuota(ssQuota).build());
+ addMountTable(mountTable);
+
+ // Call periodicInvoke to ensure quota for stalePath was
+ // loaded into quota manager.
+ RouterQuotaUpdateService updateService = routerContext.getRouter()
+ .getQuotaCacheUpdateService();
+ updateService.periodicInvoke();
+
+ // use quota manager to get its quota usage and do verification
+ RouterQuotaManager quotaManager = routerContext.getRouter()
+ .getQuotaManager();
+ RouterQuotaUsage quota = quotaManager.getQuotaUsage(stalePath);
+ assertEquals(nsQuota, quota.getQuota());
+ assertEquals(ssQuota, quota.getSpaceQuota());
+
+ // remove stale path entry
+ removeMountTable(stalePath);
+ updateService.periodicInvoke();
+ // the stale entry should be removed and we will get null
+ quota = quotaManager.getQuotaUsage(stalePath);
+ assertNull(quota);
+ }
+
+ /**
+ * Remove a mount table entry to the mount table through the admin API.
+ * @param entry Mount table entry to remove.
+ * @return If it was successfully removed.
+ * @throws IOException Problems removing entries.
+ */
+ private boolean removeMountTable(String path) throws IOException {
+ RouterClient client = routerContext.getAdminClient();
+ MountTableManager mountTableManager = client.getMountTableManager();
+ RemoveMountTableEntryRequest removeRequest = RemoveMountTableEntryRequest
+ .newInstance(path);
+ RemoveMountTableEntryResponse removeResponse = mountTableManager
+ .removeMountTableEntry(removeRequest);
+
+ // Reload the Router cache
+ resolver.loadCache(true);
+ return removeResponse.getStatus();
+ }
+
+ @Test
+ public void testQuotaUpdating() throws Exception {
+ long nsQuota = 30;
+ long ssQuota = 1024;
+ String path = "/updatequota";
+ final FileSystem nnFs1 = nnContext1.getFileSystem();
+
+ // Add one mount table:
+ // /updatequota --> ns0---/testdir10
+ nnFs1.mkdirs(new Path("/testdir10"));
+ MountTable mountTable = MountTable.newInstance(path,
+ Collections.singletonMap("ns0", "/testdir10"));
+ mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
+ .spaceQuota(ssQuota).build());
+ addMountTable(mountTable);
+
+ // Call periodicInvoke to ensure quota updated in quota manager
+ // and state store.
+ RouterQuotaUpdateService updateService = routerContext.getRouter()
+ .getQuotaCacheUpdateService();
+ updateService.periodicInvoke();
+
+ // verify initial quota value
+ List<MountTable> results = getMountTable(path);
+ MountTable updatedMountTable = !results.isEmpty() ? results.get(0) : null;
+ RouterQuotaUsage quota = updatedMountTable.getQuota();
+ assertEquals(nsQuota, quota.getQuota());
+ assertEquals(ssQuota, quota.getSpaceQuota());
+ assertEquals(1, quota.getFileAndDirectoryCount());
+ assertEquals(0, quota.getSpaceConsumed());
+
+ // mkdir and write a new file
+ final FileSystem routerFs = routerContext.getFileSystem();
+ routerFs.mkdirs(new Path(path + UUID.randomUUID()));
+ DFSClient routerClient = routerContext.getClient();
+ routerClient.create(path + "/file", true).close();
+ appendData(path + "/file", routerClient, BLOCK_SIZE);
+
+ updateService.periodicInvoke();
+ results = getMountTable(path);
+ updatedMountTable = !results.isEmpty() ? results.get(0) : null;
+ quota = updatedMountTable.getQuota();
+
+ // verify if quota has been updated in state store
+ assertEquals(nsQuota, quota.getQuota());
+ assertEquals(ssQuota, quota.getSpaceQuota());
+ assertEquals(3, quota.getFileAndDirectoryCount());
+ assertEquals(BLOCK_SIZE, quota.getSpaceConsumed());
+ }
+
+ /**
+ * Get the mount table entries of specified path through the admin API.
+ * @param path Mount table entry to get.
+ * @return If it was successfully got.
+ * @throws IOException Problems getting entries.
+ */
+ private List<MountTable> getMountTable(String path) throws IOException {
+ // Reload the Router cache
+ resolver.loadCache(true);
+ RouterClient client = routerContext.getAdminClient();
+ MountTableManager mountTableManager = client.getMountTableManager();
+ GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
+ .newInstance(path);
+ GetMountTableEntriesResponse removeResponse = mountTableManager
+ .getMountTableEntries(getRequest);
+
+ return removeResponse.getEntries();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuotaManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuotaManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuotaManager.java
new file mode 100644
index 0000000..346c881
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuotaManager.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Set;
+
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for class {@link RouterQuotaManager}.
+ */
+public class TestRouterQuotaManager {
+ private static RouterQuotaManager manager;
+
+ @Before
+ public void setup() {
+ manager = new RouterQuotaManager();
+ }
+
+ @After
+ public void cleanup() {
+ manager.clear();
+ }
+
+ @Test
+ public void testGetChildrenPaths() {
+ RouterQuotaUsage quotaUsage = new RouterQuotaUsage.Builder().build();
+ manager.put("/path1", quotaUsage);
+ manager.put("/path2", quotaUsage);
+ manager.put("/path1/subdir", quotaUsage);
+ manager.put("/path1/subdir/subdir", quotaUsage);
+
+ Set<String> childrenPaths = manager.getPaths("/path1");
+ assertEquals(3, childrenPaths.size());
+ assertTrue(childrenPaths.contains("/path1/subdir")
+ && childrenPaths.contains("/path1/subdir/subdir")
+ && childrenPaths.contains("/path1"));
+ }
+
+ @Test
+ public void testGetQuotaUsage() {
+ RouterQuotaUsage quotaGet;
+
+ // test case1: get quota with an non-exist path
+ quotaGet = manager.getQuotaUsage("/non-exist-path");
+ assertNull(quotaGet);
+
+ // test case2: get quota from an no-quota set path
+ RouterQuotaUsage.Builder quota = new RouterQuotaUsage.Builder()
+ .quota(HdfsConstants.QUOTA_DONT_SET)
+ .spaceQuota(HdfsConstants.QUOTA_DONT_SET);
+ manager.put("/noQuotaSet", quota.build());
+ quotaGet = manager.getQuotaUsage("/noQuotaSet");
+ // it should return null
+ assertNull(quotaGet);
+
+ // test case3: get quota from an quota-set path
+ quota.quota(1);
+ quota.spaceQuota(HdfsConstants.QUOTA_DONT_SET);
+ manager.put("/hasQuotaSet", quota.build());
+ quotaGet = manager.getQuotaUsage("/hasQuotaSet");
+ assertEquals(1, quotaGet.getQuota());
+ assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota());
+
+ // test case4: get quota with an non-exist child path
+ quotaGet = manager.getQuotaUsage("/hasQuotaSet/file");
+ // it will return the nearest ancestor which quota was set
+ assertEquals(1, quotaGet.getQuota());
+ assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota());
+
+ // test case5: get quota with an child path which its parent
+ // wasn't quota set
+ quota.quota(HdfsConstants.QUOTA_DONT_SET);
+ quota.spaceQuota(HdfsConstants.QUOTA_DONT_SET);
+ manager.put("/hasQuotaSet/noQuotaSet", quota.build());
+ // here should returns the quota of path /hasQuotaSet
+ // (the nearest ancestor which quota was set)
+ quotaGet = manager.getQuotaUsage("/hasQuotaSet/noQuotaSet/file");
+ assertEquals(1, quotaGet.getQuota());
+ assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota());
+
+ // test case6: get quota with an child path which its parent was quota set
+ quota.quota(2);
+ quota.spaceQuota(HdfsConstants.QUOTA_DONT_SET);
+ manager.put("/hasQuotaSet/hasQuotaSet", quota.build());
+ // here should return the quota of path /hasQuotaSet/hasQuotaSet
+ quotaGet = manager.getQuotaUsage("/hasQuotaSet/hasQuotaSet/file");
+ assertEquals(2, quotaGet.getQuota());
+ assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java
index 739d2e4..d306f77 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java
@@ -27,8 +27,10 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.junit.Test;
@@ -56,6 +58,14 @@ public class TestMountTable {
private static final long DATE_CREATED = 100;
private static final long DATE_MOD = 200;
+ private static final long NS_COUNT = 1;
+ private static final long NS_QUOTA = 5;
+ private static final long SS_COUNT = 10;
+ private static final long SS_QUOTA = 100;
+
+ private static final RouterQuotaUsage QUOTA = new RouterQuotaUsage.Builder()
+ .fileAndDirectoryCount(NS_COUNT).quota(NS_QUOTA).spaceConsumed(SS_COUNT)
+ .spaceQuota(SS_QUOTA).build();
@Test
public void testGetterSetter() throws IOException {
@@ -68,6 +78,12 @@ public class TestMountTable {
assertTrue(DATE_CREATED > 0);
assertTrue(DATE_MOD > 0);
+ RouterQuotaUsage quota = record.getQuota();
+ assertEquals(0, quota.getFileAndDirectoryCount());
+ assertEquals(HdfsConstants.QUOTA_DONT_SET, quota.getQuota());
+ assertEquals(0, quota.getSpaceConsumed());
+ assertEquals(HdfsConstants.QUOTA_DONT_SET, quota.getSpaceQuota());
+
MountTable record2 =
MountTable.newInstance(SRC, DST_MAP, DATE_CREATED, DATE_MOD);
@@ -94,6 +110,7 @@ public class TestMountTable {
SRC, DST_MAP, DATE_CREATED, DATE_MOD);
record.setReadOnly(true);
record.setDestOrder(order);
+ record.setQuota(QUOTA);
StateStoreSerializer serializer = StateStoreSerializer.getSerializer();
String serializedString = serializer.serializeString(record);
@@ -107,6 +124,12 @@ public class TestMountTable {
assertEquals(DATE_MOD, record2.getDateModified());
assertTrue(record2.isReadOnly());
assertEquals(order, record2.getDestOrder());
+
+ RouterQuotaUsage quotaGet = record2.getQuota();
+ assertEquals(NS_COUNT, quotaGet.getFileAndDirectoryCount());
+ assertEquals(NS_QUOTA, quotaGet.getQuota());
+ assertEquals(SS_COUNT, quotaGet.getSpaceConsumed());
+ assertEquals(SS_QUOTA, quotaGet.getSpaceQuota());
}
@Test
@@ -172,4 +195,22 @@ public class TestMountTable {
assertEquals(DST_NS_1, location2.getNameserviceId());
assertEquals(DST_PATH_1, location2.getDest());
}
+
+ @Test
+ public void testQuota() throws IOException {
+ MountTable record = MountTable.newInstance(SRC, DST_MAP);
+ record.setQuota(QUOTA);
+
+ validateDestinations(record);
+ assertEquals(SRC, record.getSourcePath());
+ assertEquals(DST, record.getDestinations());
+ assertTrue(DATE_CREATED > 0);
+ assertTrue(DATE_MOD > 0);
+
+ RouterQuotaUsage quotaGet = record.getQuota();
+ assertEquals(NS_COUNT, quotaGet.getFileAndDirectoryCount());
+ assertEquals(NS_QUOTA, quotaGet.getQuota());
+ assertEquals(SS_COUNT, quotaGet.getSpaceConsumed());
+ assertEquals(SS_QUOTA, quotaGet.getSpaceQuota());
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org