You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2018/01/18 23:47:35 UTC

[04/49] hadoop git commit: HDFS-12934. RBF: Federation supports global quota. Contributed by Yiqun Lin.

HDFS-12934. RBF: Federation supports global quota. Contributed by Yiqun Lin.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d98a2e6e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d98a2e6e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d98a2e6e

Branch: refs/heads/YARN-7402
Commit: d98a2e6e2383f8b66def346409b0517aa32d298d
Parents: d9006d8
Author: Yiqun Lin <yq...@apache.org>
Authored: Wed Jan 10 13:59:11 2018 +0800
Committer: Yiqun Lin <yq...@apache.org>
Committed: Wed Jan 10 13:59:11 2018 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   9 +
 .../hdfs/server/federation/router/Quota.java    | 208 +++++++++
 .../hdfs/server/federation/router/Router.java   |  38 +-
 .../federation/router/RouterQuotaManager.java   | 160 +++++++
 .../router/RouterQuotaUpdateService.java        | 228 ++++++++++
 .../federation/router/RouterQuotaUsage.java     |  88 ++++
 .../federation/router/RouterRpcServer.java      |  40 +-
 .../federation/store/records/MountTable.java    |  22 +
 .../store/records/impl/pb/MountTablePBImpl.java |  32 ++
 .../hadoop/hdfs/server/namenode/Quota.java      |   2 +-
 .../hdfs/tools/federation/RouterAdmin.java      | 133 +++++-
 .../src/main/proto/FederationProtocol.proto     |   3 +
 .../src/main/resources/hdfs-default.xml         |  20 +
 .../server/federation/RouterConfigBuilder.java  |  12 +
 .../federation/router/TestRouterAdminCLI.java   |  54 +++
 .../federation/router/TestRouterQuota.java      | 452 +++++++++++++++++++
 .../router/TestRouterQuotaManager.java          | 113 +++++
 .../store/records/TestMountTable.java           |  41 ++
 18 files changed, 1639 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index a380833..2825cc9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1322,6 +1322,15 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_ROUTER_HTTPS_ADDRESS_DEFAULT =
       "0.0.0.0:" + DFS_ROUTER_HTTPS_PORT_DEFAULT;
 
+  // HDFS Router-based federation quota
+  public static final String DFS_ROUTER_QUOTA_ENABLE =
+      FEDERATION_ROUTER_PREFIX + "quota.enable";
+  public static final boolean DFS_ROUTER_QUOTA_ENABLED_DEFAULT = false;
+  public static final String DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL =
+      FEDERATION_ROUTER_PREFIX + "quota-cache.update.interval";
+  public static final long DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL_DEFAULT =
+      60000;
+
   // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry 
   @Deprecated
   public static final String  DFS_CLIENT_RETRY_POLICY_ENABLED_KEY

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
new file mode 100644
index 0000000..f5e5272
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.fs.QuotaUsage;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Module that implements the quota relevant RPC calls
+ * {@link ClientProtocol#setQuota(String, long, long, StorageType)}
+ * and
+ * {@link ClientProtocol#getQuotaUsage(String)}
+ * in the {@link RouterRpcServer}.
+ */
+public class Quota {
+  private static final Logger LOG = LoggerFactory.getLogger(Quota.class);
+
+  /** RPC server to receive client calls. */
+  private final RouterRpcServer rpcServer;
+  /** RPC clients to connect to the Namenodes. */
+  private final RouterRpcClient rpcClient;
+  /** Router used in RouterRpcServer. */
+  private final Router router;
+
+  public Quota(Router router, RouterRpcServer server) {
+    this.router = router;
+    this.rpcServer = server;
+    this.rpcClient = server.getRPCClient();
+  }
+
+  /**
+   * Set quota for the federation path.
+   * @param path Federation path.
+   * @param namespaceQuota Name space quota.
+   * @param storagespaceQuota Storage space quota.
+   * @param type StorageType that the space quota is intended to be set on.
+   * @throws IOException
+   */
+  public void setQuota(String path, long namespaceQuota,
+      long storagespaceQuota, StorageType type) throws IOException {
+    rpcServer.checkOperation(OperationCategory.WRITE);
+
+    // Set quota for current path and its children mount table path.
+    final List<RemoteLocation> locations = getQuotaRemoteLocations(path);
+    if (LOG.isDebugEnabled()) {
+      for (RemoteLocation loc : locations) {
+        LOG.debug("Set quota for path: nsId: {}, dest: {}.",
+            loc.getNameserviceId(), loc.getDest());
+      }
+    }
+
+    RemoteMethod method = new RemoteMethod("setQuota",
+        new Class<?>[] {String.class, long.class, long.class,
+            StorageType.class},
+        new RemoteParam(), namespaceQuota, storagespaceQuota, type);
+    rpcClient.invokeConcurrent(locations, method, false, false);
+  }
+
+  /**
+   * Get quota usage for the federation path.
+   * @param path Federation path.
+   * @return Aggregated quota.
+   * @throws IOException
+   */
+  public QuotaUsage getQuotaUsage(String path) throws IOException {
+    final List<RemoteLocation> quotaLocs = getValidQuotaLocations(path);
+    RemoteMethod method = new RemoteMethod("getQuotaUsage",
+        new Class<?>[] {String.class}, new RemoteParam());
+    Map<RemoteLocation, Object> results = rpcClient.invokeConcurrent(quotaLocs,
+        method, true, false);
+
+    return aggregateQuota(results);
+  }
+
+  /**
+   * Get valid quota remote locations used in {@link #getQuotaUsage(String)}.
+   * Differentiate the method {@link #getQuotaRemoteLocations(String)}, this
+   * method will do some additional filtering.
+   * @param path Federation path.
+   * @return List of valid quota remote locations.
+   * @throws IOException
+   */
+  private List<RemoteLocation> getValidQuotaLocations(String path)
+      throws IOException {
+    final List<RemoteLocation> locations = getQuotaRemoteLocations(path);
+
+    // NameService -> Locations
+    Map<String, List<RemoteLocation>> validLocations = new HashMap<>();
+    for (RemoteLocation loc : locations) {
+      String nsId = loc.getNameserviceId();
+      List<RemoteLocation> dests = validLocations.get(nsId);
+      if (dests == null) {
+        dests = new LinkedList<>();
+        dests.add(loc);
+        validLocations.put(nsId, dests);
+      } else {
+        // Ensure the paths in the same nameservice is different.
+        // Don't include parent-child paths.
+        boolean isChildPath = false;
+        for (RemoteLocation d : dests) {
+          if (loc.getDest().startsWith(d.getDest())) {
+            isChildPath = true;
+            break;
+          }
+        }
+
+        if (!isChildPath) {
+          dests.add(loc);
+        }
+      }
+    }
+
+    List<RemoteLocation> quotaLocs = new LinkedList<>();
+    for (List<RemoteLocation> locs : validLocations.values()) {
+      quotaLocs.addAll(locs);
+    }
+
+    return quotaLocs;
+  }
+
+  /**
+   * Aggregate quota that queried from sub-clusters.
+   * @param results Quota query result.
+   * @return Aggregated Quota.
+   */
+  private QuotaUsage aggregateQuota(Map<RemoteLocation, Object> results) {
+    long nsCount = 0;
+    long ssCount = 0;
+    boolean hasQuotaUnSet = false;
+
+    for (Map.Entry<RemoteLocation, Object> entry : results.entrySet()) {
+      RemoteLocation loc = entry.getKey();
+      QuotaUsage usage = (QuotaUsage) entry.getValue();
+      if (usage != null) {
+        // If quota is not set in real FileSystem, the usage
+        // value will return -1.
+        if (usage.getQuota() == -1 && usage.getSpaceQuota() == -1) {
+          hasQuotaUnSet = true;
+        }
+
+        nsCount += usage.getFileAndDirectoryCount();
+        ssCount += usage.getSpaceConsumed();
+        LOG.debug(
+            "Get quota usage for path: nsId: {}, dest: {},"
+                + " nsCount: {}, ssCount: {}.",
+            loc.getNameserviceId(), loc.getDest(),
+            usage.getFileAndDirectoryCount(), usage.getSpaceConsumed());
+      }
+    }
+
+    QuotaUsage.Builder builder = new QuotaUsage.Builder()
+        .fileAndDirectoryCount(nsCount).spaceConsumed(ssCount);
+    if (hasQuotaUnSet) {
+      builder.quota(HdfsConstants.QUOTA_DONT_SET);
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Get all quota remote locations across subclusters under given
+   * federation path.
+   * @param path Federation path.
+   * @return List of quota remote locations.
+   * @throws IOException
+   */
+  private List<RemoteLocation> getQuotaRemoteLocations(String path)
+      throws IOException {
+    List<RemoteLocation> locations = new LinkedList<>();
+    RouterQuotaManager manager = this.router.getQuotaManager();
+    if (manager != null) {
+      Set<String> childrenPaths = manager.getPaths(path);
+      for (String childPath : childrenPaths) {
+        locations.addAll(rpcServer.getLocationsForPath(childPath, true));
+      }
+    }
+
+    return locations;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
index 413566e..ea8a1c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java
@@ -45,6 +45,8 @@ import org.apache.hadoop.util.JvmPauseMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Router that provides a unified view of multiple federated HDFS clusters. It
  * has two main roles: (1) federated interface and (2) NameNode heartbeat.
@@ -105,7 +107,10 @@ public class Router extends CompositeService {
   /** JVM pauses (GC and others). */
   private JvmPauseMonitor pauseMonitor;
 
-
+  /** Quota usage update service. */
+  private RouterQuotaUpdateService quotaUpdateService;
+  /** Quota cache manager. */
+  private RouterQuotaManager quotaManager;
 
   /////////////////////////////////////////////////////////
   // Constructor
@@ -200,6 +205,14 @@ public class Router extends CompositeService {
       this.pauseMonitor.init(conf);
     }
 
+    // Initial quota relevant service
+    if (conf.getBoolean(DFSConfigKeys.DFS_ROUTER_QUOTA_ENABLE,
+        DFSConfigKeys.DFS_ROUTER_QUOTA_ENABLED_DEFAULT)) {
+      this.quotaManager = new RouterQuotaManager();
+      this.quotaUpdateService = new RouterQuotaUpdateService(this);
+      addService(this.quotaUpdateService);
+    }
+
     super.serviceInit(conf);
   }
 
@@ -524,4 +537,27 @@ public class Router extends CompositeService {
       this.namenodeResolver.setRouterId(this.routerId);
     }
   }
+
+  /**
+   * If the quota system is enabled in Router.
+   */
+  public boolean isQuotaEnabled() {
+    return this.quotaManager != null;
+  }
+
+  /**
+   * Get route quota manager.
+   * @return RouterQuotaManager Quota manager.
+   */
+  public RouterQuotaManager getQuotaManager() {
+    return this.quotaManager;
+  }
+
+  /**
+   * Get quota cache update service.
+   */
+  @VisibleForTesting
+  RouterQuotaUpdateService getQuotaCacheUpdateService() {
+    return this.quotaUpdateService;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java
new file mode 100644
index 0000000..fc3575c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+
+/**
+ * Router quota manager in Router. The manager maintains
+ * {@link RouterQuotaUsage} cache of mount tables and do management
+ * for the quota caches.
+ */
+public class RouterQuotaManager {
+  /** Quota usage <MountTable Path, Aggregated QuotaUsage> cache. */
+  private TreeMap<String, RouterQuotaUsage> cache;
+
+  /** Lock to access the quota cache. */
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
+  public RouterQuotaManager() {
+    this.cache = new TreeMap<>();
+  }
+
+  /**
+   * Get all the mount quota paths.
+   */
+  public Set<String> getAll() {
+    readLock.lock();
+    try {
+      return this.cache.keySet();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Get the nearest ancestor's quota usage, and meanwhile its quota was set.
+   * @param path The path being written.
+   * @return RouterQuotaUsage Quota usage.
+   */
+  public RouterQuotaUsage getQuotaUsage(String path) {
+    readLock.lock();
+    try {
+      RouterQuotaUsage quotaUsage = this.cache.get(path);
+      if (quotaUsage != null && isQuotaSet(quotaUsage)) {
+        return quotaUsage;
+      }
+
+      // If not found, look for its parent path usage value.
+      int pos = path.lastIndexOf(Path.SEPARATOR);
+      if (pos != -1) {
+        String parentPath = path.substring(0, pos);
+        return getQuotaUsage(parentPath);
+      }
+    } finally {
+      readLock.unlock();
+    }
+
+    return null;
+  }
+
+  /**
+   * Get children paths (can including itself) under specified federation path.
+   * @param parentPath
+   * @return Set<String> Children path set.
+   */
+  public Set<String> getPaths(String parentPath) {
+    readLock.lock();
+    try {
+      String from = parentPath;
+      String to = parentPath + Character.MAX_VALUE;
+      SortedMap<String, RouterQuotaUsage> subMap = this.cache.subMap(from, to);
+      return subMap.keySet();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Put new entity into cache.
+   * @param path Mount table path.
+   * @param quotaUsage Corresponding cache value.
+   */
+  public void put(String path, RouterQuotaUsage quotaUsage) {
+    writeLock.lock();
+    try {
+      this.cache.put(path, quotaUsage);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Remove the entity from cache.
+   * @param path Mount table path.
+   */
+  public void remove(String path) {
+    writeLock.lock();
+    try {
+      this.cache.remove(path);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Clean up the cache.
+   */
+  public void clear() {
+    writeLock.lock();
+    try {
+      this.cache.clear();
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Check if the quota was set.
+   * @param quota RouterQuotaUsage set in mount table.
+   */
+  public boolean isQuotaSet(RouterQuotaUsage quota) {
+    if (quota != null) {
+      long nsQuota = quota.getQuota();
+      long ssQuota = quota.getSpaceQuota();
+
+      // once nsQuota or ssQuota was set, this mount table is quota set
+      if (nsQuota != HdfsConstants.QUOTA_DONT_SET
+          || ssQuota != HdfsConstants.QUOTA_DONT_SET) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
new file mode 100644
index 0000000..80abc11
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.QuotaUsage;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to periodically update the {@link RouterQuotaUsage}
+ * cached information in the {@link Router} and update corresponding
+ * mount table in State Store.
+ */
+public class RouterQuotaUpdateService extends PeriodicService {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterQuotaUpdateService.class);
+
+  private MountTableStore mountTableStore;
+  private RouterRpcServer rpcServer;
+  /** Router using this Service. */
+  private final Router router;
+  /** Router Quota manager. */
+  private RouterQuotaManager quotaManager;
+
+  public RouterQuotaUpdateService(final Router router) throws IOException {
+    super(RouterQuotaUpdateService.class.getName());
+    this.router = router;
+    this.rpcServer = router.getRpcServer();
+    this.quotaManager = router.getQuotaManager();
+
+    if (this.quotaManager == null) {
+      throw new IOException("Router quota manager is not initialized.");
+    }
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    this.setIntervalMs(conf.getTimeDuration(
+        DFSConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL,
+        DFSConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS));
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void periodicInvoke() {
+    LOG.debug("Start to update quota cache.");
+    try {
+      List<MountTable> updateMountTables = new LinkedList<>();
+      List<MountTable> mountTables = getQuotaSetMountTables();
+      for (MountTable entry : mountTables) {
+        String src = entry.getSourcePath();
+        RouterQuotaUsage oldQuota = entry.getQuota();
+        long nsQuota = oldQuota.getQuota();
+        long ssQuota = oldQuota.getSpaceQuota();
+        // Call RouterRpcServer#getQuotaUsage for getting current quota usage.
+        QuotaUsage currentQuotaUsage = this.rpcServer.getQuotaModule()
+            .getQuotaUsage(src);
+        // If quota is not set in some subclusters under federation path,
+        // set quota for this path.
+        if (currentQuotaUsage.getQuota() == HdfsConstants.QUOTA_DONT_SET) {
+          this.rpcServer.setQuota(src, nsQuota, ssQuota, null);
+        }
+
+        RouterQuotaUsage newQuota = generateNewQuota(oldQuota,
+            currentQuotaUsage);
+        this.quotaManager.put(src, newQuota);
+        entry.setQuota(newQuota);
+
+        // only update mount tables which quota was changed
+        if (!oldQuota.equals(newQuota)) {
+          updateMountTables.add(entry);
+
+          LOG.debug(
+              "Update quota usage entity of path: {}, nsCount: {},"
+                  + " nsQuota: {}, ssCount: {}, ssQuota: {}.",
+              src, newQuota.getFileAndDirectoryCount(),
+              newQuota.getQuota(), newQuota.getSpaceConsumed(),
+              newQuota.getSpaceQuota());
+        }
+      }
+
+      updateMountTableEntries(updateMountTables);
+    } catch (IOException e) {
+      LOG.error("Quota cache updated error.", e);
+    }
+  }
+
+  /**
+   * Get mount table store management interface.
+   * @return MountTableStore instance.
+   * @throws IOException
+   */
+  private MountTableStore getMountTableStore() throws IOException {
+    if (this.mountTableStore == null) {
+      this.mountTableStore = router.getStateStore().getRegisteredRecordStore(
+          MountTableStore.class);
+      if (this.mountTableStore == null) {
+        throw new IOException("Mount table state store is not available.");
+      }
+    }
+    return this.mountTableStore;
+  }
+
+  /**
+   * Get all the existing mount tables.
+   * @return List of mount tables.
+   * @throws IOException
+   */
+  private List<MountTable> getMountTableEntries() throws IOException {
+    // scan mount tables from root path
+    GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
+        .newInstance("/");
+    GetMountTableEntriesResponse getResponse = getMountTableStore()
+        .getMountTableEntries(getRequest);
+    return getResponse.getEntries();
+  }
+
+  /**
+   * Get mount tables which quota was set.
+   * During this time, the quota usage cache will also be updated by
+   * quota manager:
+   * 1. Stale paths (entries) will be removed.
+   * 2. Existing entries will be override and updated.
+   * @return List of mount tables which quota was set.
+   * @throws IOException
+   */
+  private List<MountTable> getQuotaSetMountTables() throws IOException {
+    List<MountTable> mountTables = getMountTableEntries();
+    Set<String> stalePaths = new HashSet<>();
+    for (String path : this.quotaManager.getAll()) {
+      stalePaths.add(path);
+    }
+
+    List<MountTable> neededMountTables = new LinkedList<>();
+    for (MountTable entry : mountTables) {
+      // select mount tables which is quota set
+      if (isQuotaSet(entry)) {
+        neededMountTables.add(entry);
+      }
+
+      // update mount table entries info in quota cache
+      String src = entry.getSourcePath();
+      this.quotaManager.put(src, entry.getQuota());
+      stalePaths.remove(src);
+    }
+
+    // remove stale paths that currently cached
+    for (String stalePath : stalePaths) {
+      this.quotaManager.remove(stalePath);
+    }
+
+    return neededMountTables;
+  }
+
+  /**
+   * Check if the quota was set in given MountTable.
+   * @param mountTable Mount table entry.
+   */
+  private boolean isQuotaSet(MountTable mountTable) {
+    if (mountTable != null) {
+      return this.quotaManager.isQuotaSet(mountTable.getQuota());
+    }
+    return false;
+  }
+
+  /**
+   * Generate a new quota based on old quota and current quota usage value.
+   * @param oldQuota Old quota stored in State Store.
+   * @param currentQuotaUsage Current quota usage value queried from
+   *        subcluster.
+   * @return A new RouterQuotaUsage.
+   */
+  private RouterQuotaUsage generateNewQuota(RouterQuotaUsage oldQuota,
+      QuotaUsage currentQuotaUsage) {
+    RouterQuotaUsage newQuota = new RouterQuotaUsage.Builder()
+        .fileAndDirectoryCount(currentQuotaUsage.getFileAndDirectoryCount())
+        .quota(oldQuota.getQuota())
+        .spaceConsumed(currentQuotaUsage.getSpaceConsumed())
+        .spaceQuota(oldQuota.getSpaceQuota()).build();
+    return newQuota;
+  }
+
+  /**
+   * Write out updated mount table entries into State Store.
+   * @param updateMountTables Mount tables to be updated.
+   * @throws IOException
+   */
+  private void updateMountTableEntries(List<MountTable> updateMountTables)
+      throws IOException {
+    for (MountTable entry : updateMountTables) {
+      UpdateMountTableEntryRequest updateRequest = UpdateMountTableEntryRequest
+          .newInstance(entry);
+      getMountTableStore().updateMountTableEntry(updateRequest);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java
new file mode 100644
index 0000000..55bfc48
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.fs.QuotaUsage;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature;
+import org.apache.hadoop.hdfs.server.namenode.Quota;
+
+/**
+ * The subclass of {@link QuotaUsage} used in Router-based federation.
+ */
+public final class RouterQuotaUsage extends QuotaUsage {
+  private RouterQuotaUsage(Builder builder) {
+    super(builder);
+  }
+
+  /** Build the instance based on the builder. */
+  public static class Builder extends QuotaUsage.Builder {
+
+    public RouterQuotaUsage build() {
+      return new RouterQuotaUsage(this);
+    }
+
+    @Override
+    public Builder fileAndDirectoryCount(long count) {
+      super.fileAndDirectoryCount(count);
+      return this;
+    }
+
+    @Override
+    public Builder quota(long quota) {
+      super.quota(quota);
+      return this;
+    }
+
+    @Override
+    public Builder spaceConsumed(long spaceConsumed) {
+      super.spaceConsumed(spaceConsumed);
+      return this;
+    }
+
+    @Override
+    public Builder spaceQuota(long spaceQuota) {
+      super.spaceQuota(spaceQuota);
+      return this;
+    }
+  }
+
+  /**
+   * Verify if namespace quota is violated once quota is set. Relevant
+   * method {@link DirectoryWithQuotaFeature#verifyNamespaceQuota}.
+   * @throws NSQuotaExceededException
+   */
+  public void verifyNamespaceQuota() throws NSQuotaExceededException {
+    if (Quota.isViolated(getQuota(), getFileAndDirectoryCount())) {
+      throw new NSQuotaExceededException(getQuota(),
+          getFileAndDirectoryCount());
+    }
+  }
+
+  /**
+   * Verify if storage space quota is violated once quota is set. Relevant
+   * method {@link DirectoryWithQuotaFeature#verifyStoragespaceQuota}.
+   * @throws DSQuotaExceededException
+   */
+  public void verifyStoragespaceQuota() throws DSQuotaExceededException {
+    if (Quota.isViolated(getSpaceQuota(), getSpaceConsumed())) {
+      throw new DSQuotaExceededException(getSpaceQuota(), getSpaceConsumed());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 11f7fa6..73b189e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -181,6 +181,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
   /** Category of the operation that a thread is executing. */
   private final ThreadLocal<OperationCategory> opCategory = new ThreadLocal<>();
 
+  /** Router Quota calls. */
+  private final Quota quotaCall;
 
   /**
    * Construct a router RPC server.
@@ -277,6 +279,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
     // Create the client
     this.rpcClient = new RouterRpcClient(this.conf, this.router.getRouterId(),
         this.namenodeResolver, this.rpcMonitor);
+
+    // Initialize modules
+    this.quotaCall = new Quota(this.router, this);
   }
 
   @Override
@@ -384,7 +389,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
    * @throws StandbyException If the Router is in safe mode and cannot serve
    *                          client requests.
    */
-  private void checkOperation(OperationCategory op) throws StandbyException {
+  protected void checkOperation(OperationCategory op) throws StandbyException {
     // Log the function we are currently calling.
     if (rpcMonitor != null) {
       rpcMonitor.startOp();
@@ -1839,21 +1844,13 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
   @Override // ClientProtocol
   public void setQuota(String path, long namespaceQuota, long storagespaceQuota,
       StorageType type) throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
-    // TODO assign global replicas instead of applying them to each folder
-    final List<RemoteLocation> locations = getLocationsForPath(path, true);
-    RemoteMethod method = new RemoteMethod("setQuota",
-        new Class<?>[] {String.class, Long.class, Long.class,
-            StorageType.class},
-        new RemoteParam(), namespaceQuota, storagespaceQuota, type);
-    rpcClient.invokeConcurrent(locations, method, false, false);
+    this.quotaCall.setQuota(path, namespaceQuota, storagespaceQuota, type);
   }
 
   @Override // ClientProtocol
   public QuotaUsage getQuotaUsage(String path) throws IOException {
-    checkOperation(OperationCategory.READ, false);
-    return null;
+    checkOperation(OperationCategory.READ);
+    return this.quotaCall.getQuotaUsage(path);
   }
 
   @Override
@@ -1996,7 +1993,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
    * @return Prioritized list of locations in the federated cluster.
    * @throws IOException If the location for this path cannot be determined.
    */
-  private List<RemoteLocation> getLocationsForPath(
+  protected List<RemoteLocation> getLocationsForPath(
       String path, boolean failIfLocked) throws IOException {
     try {
       // Check the location for this path
@@ -2016,6 +2013,16 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
           }
           throw new IOException(path + " is in a read only mount point");
         }
+
+        // Check quota
+        if (this.router.isQuotaEnabled()) {
+          RouterQuotaUsage quotaUsage = this.router.getQuotaManager()
+              .getQuotaUsage(path);
+          if (quotaUsage != null) {
+            quotaUsage.verifyNamespaceQuota();
+            quotaUsage.verifyStoragespaceQuota();
+          }
+        }
       }
 
       return location.getDestinations();
@@ -2119,4 +2126,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
     UserGroupInformation ugi = Server.getRemoteUser();
     return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
   }
+
+  /**
+   * Get quota module implement.
+   */
+  public Quota getQuotaModule() {
+    return this.quotaCall;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
index 1b5d2d6..53ad1e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
@@ -29,9 +29,11 @@ import java.util.TreeMap;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
 import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
 import org.apache.hadoop.hdfs.server.federation.router.RouterPermissionChecker;
+import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -140,6 +142,12 @@ public abstract class MountTable extends BaseRecord {
     record.setMode(new FsPermission(
         RouterPermissionChecker.MOUNT_TABLE_PERMISSION_DEFAULT));
 
+    // Set quota for mount table
+    RouterQuotaUsage quota = new RouterQuotaUsage.Builder()
+        .fileAndDirectoryCount(0).quota(HdfsConstants.QUOTA_DONT_SET)
+        .spaceConsumed(0).spaceQuota(HdfsConstants.QUOTA_DONT_SET).build();
+    record.setQuota(quota);
+
     // Validate
     record.validate();
     return record;
@@ -249,6 +257,20 @@ public abstract class MountTable extends BaseRecord {
   public abstract void setMode(FsPermission mode);
 
   /**
+   * Get quota of this mount table entry.
+   *
+   * @return RouterQuotaUsage quota usage
+   */
+  public abstract RouterQuotaUsage getQuota();
+
+  /**
+   * Set quota for this mount table entry.
+   *
+   * @param quota QuotaUsage for mount table entry
+   */
+  public abstract void setQuota(RouterQuotaUsage quota);
+
+  /**
    * Get the default location.
    * @return The default location.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
index 372f209..d9e9555 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
@@ -27,10 +27,12 @@ import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProt
 import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto.DestOrder;
 import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProtoOrBuilder;
 import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoteLocationProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.QuotaUsageProto;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
 import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
 import org.apache.hadoop.hdfs.server.federation.router.RouterAdminServer;
 import org.apache.hadoop.hdfs.server.federation.router.RouterPermissionChecker;
+import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.FederationProtocolPBTranslator;
 import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
 
@@ -250,6 +252,36 @@ public class MountTablePBImpl extends MountTable implements PBRecord {
     }
   }
 
+  @Override
+  public RouterQuotaUsage getQuota() {
+    MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
+    if (!proto.hasQuota()) {
+      return null;
+    }
+
+    QuotaUsageProto quotaProto = proto.getQuota();
+    RouterQuotaUsage.Builder builder = new RouterQuotaUsage.Builder()
+        .fileAndDirectoryCount(quotaProto.getFileAndDirectoryCount())
+        .quota(quotaProto.getQuota())
+        .spaceConsumed(quotaProto.getSpaceConsumed())
+        .spaceQuota(quotaProto.getSpaceQuota());
+    return builder.build();
+  }
+
+  @Override
+  public void setQuota(RouterQuotaUsage quota) {
+    Builder builder = this.translator.getBuilder();
+    if (quota == null) {
+      builder.clearQuota();
+    } else {
+      QuotaUsageProto quotaUsage = QuotaUsageProto.newBuilder()
+          .setFileAndDirectoryCount(quota.getFileAndDirectoryCount())
+          .setQuota(quota.getQuota()).setSpaceConsumed(quota.getSpaceConsumed())
+          .setSpaceQuota(quota.getSpaceQuota()).build();
+      builder.setQuota(quotaUsage);
+    }
+  }
+
   private DestinationOrder convert(DestOrder order) {
     switch (order) {
     case LOCAL:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java
index 6d20e6c..5e708be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java
@@ -49,7 +49,7 @@ public enum Quota {
    * Is quota violated?
    * The quota is violated if quota is set and usage > quota. 
    */
-  static boolean isViolated(final long quota, final long usage) {
+  public static boolean isViolated(final long quota, final long usage) {
     return quota >= 0 && usage > quota;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
index fd961f2..0681ed5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
@@ -29,10 +29,12 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
 import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
 import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
+import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
@@ -80,7 +82,9 @@ public class RouterAdmin extends Configured implements Tool {
         + "\t[-add <source> <nameservice> <destination> "
         + "[-readonly] -owner <owner> -group <group> -mode <mode>]\n"
         + "\t[-rm <source>]\n"
-        + "\t[-ls <path>]\n";
+        + "\t[-ls <path>]\n"
+        + "\t[-setQuota <path> -ns <nsQuota> -ss <ssQuota>]\n"
+        + "\t[-clrQuota <path>\n";
     System.out.println(usage);
   }
 
@@ -109,6 +113,18 @@ public class RouterAdmin extends Configured implements Tool {
         printUsage();
         return exitCode;
       }
+    } else if ("-setQuota".equalsIgnoreCase(cmd)) {
+      if (argv.length < 4) {
+        System.err.println("Not enough parameters specificed for cmd " + cmd);
+        printUsage();
+        return exitCode;
+      }
+    } else if ("-clrQuota".equalsIgnoreCase(cmd)) {
+      if (argv.length < 2) {
+        System.err.println("Not enough parameters specificed for cmd " + cmd);
+        printUsage();
+        return exitCode;
+      }
     }
 
     // Initialize RouterClient
@@ -144,6 +160,16 @@ public class RouterAdmin extends Configured implements Tool {
         } else {
           listMounts("/");
         }
+      } else if ("-setQuota".equals(cmd)) {
+        if (setQuota(argv, i)) {
+          System.out.println(
+              "Successfully set quota for mount point " + argv[i]);
+        }
+      } else if ("-clrQuota".equals(cmd)) {
+        if (clrQuota(argv[i])) {
+          System.out.println(
+              "Successfully clear quota for mount point " + argv[i]);
+        }
       } else {
         printUsage();
         return exitCode;
@@ -389,6 +415,111 @@ public class RouterAdmin extends Configured implements Tool {
   }
 
   /**
+   * Set quota for a mount table entry.
+   *
+   * @param parameters Parameters of the quota.
+   * @param i Index in the parameters.
+   */
+  private boolean setQuota(String[] parameters, int i) throws IOException {
+    long nsQuota = HdfsConstants.QUOTA_DONT_SET;
+    long ssQuota = HdfsConstants.QUOTA_DONT_SET;
+
+    String mount = parameters[i++];
+    while (i < parameters.length) {
+      if (parameters[i].equals("-nsQuota")) {
+        i++;
+        try {
+          nsQuota = Long.parseLong(parameters[i]);
+        } catch (Exception e) {
+          System.err.println("Cannot parse nsQuota: " + parameters[i]);
+        }
+      } else if (parameters[i].equals("-ssQuota")) {
+        i++;
+        try {
+          ssQuota = Long.parseLong(parameters[i]);
+        } catch (Exception e) {
+          System.err.println("Cannot parse ssQuota: " + parameters[i]);
+        }
+      }
+
+      i++;
+    }
+
+    if (nsQuota <= 0 || ssQuota <= 0) {
+      System.err.println("Input quota value should be a positive number.");
+      return false;
+    }
+
+    return updateQuota(mount, nsQuota, ssQuota);
+  }
+
+  /**
+   * Clear quota of the mount point.
+   *
+   * @param mount Mount table to clear
+   * @return If the quota was cleared.
+   * @throws IOException Error clearing the mount point.
+   */
+  private boolean clrQuota(String mount) throws IOException {
+    return updateQuota(mount, HdfsConstants.QUOTA_DONT_SET,
+        HdfsConstants.QUOTA_DONT_SET);
+  }
+
+  /**
+   * Update quota of specified mount table.
+   *
+   * @param mount Specified mount table to update.
+   * @param nsQuota Namespace quota.
+   * @param ssQuota Storage space quota.
+   * @return If the quota was updated.
+   * @throws IOException Error updating quota.
+   */
+  private boolean updateQuota(String mount, long nsQuota, long ssQuota)
+      throws IOException {
+    // Get existing entry
+    MountTableManager mountTable = client.getMountTableManager();
+    GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
+        .newInstance(mount);
+    GetMountTableEntriesResponse getResponse = mountTable
+        .getMountTableEntries(getRequest);
+    List<MountTable> results = getResponse.getEntries();
+    MountTable existingEntry = null;
+    for (MountTable result : results) {
+      if (mount.equals(result.getSourcePath())) {
+        existingEntry = result;
+        break;
+      }
+    }
+
+    if (existingEntry == null) {
+      return false;
+    } else {
+      long nsCount = existingEntry.getQuota().getFileAndDirectoryCount();
+      long ssCount = existingEntry.getQuota().getSpaceConsumed();
+      // If nsQuota or ssQuota was unset, reset corresponding usage
+      // value to zero.
+      if (nsQuota == HdfsConstants.QUOTA_DONT_SET) {
+        nsCount = 0;
+      }
+
+      if (nsQuota == HdfsConstants.QUOTA_DONT_SET) {
+        ssCount = 0;
+      }
+
+      RouterQuotaUsage updatedQuota = new RouterQuotaUsage.Builder()
+          .fileAndDirectoryCount(nsCount).quota(nsQuota)
+          .spaceConsumed(ssCount).spaceQuota(ssQuota).build();
+      existingEntry.setQuota(updatedQuota);
+    }
+
+    UpdateMountTableEntryRequest updateRequest =
+        UpdateMountTableEntryRequest.newInstance(existingEntry);
+    UpdateMountTableEntryResponse updateResponse = mountTable
+        .updateMountTableEntry(updateRequest);
+    return updateResponse.getStatus();
+  }
+
+  /**
    * Inner class that stores ACL info of mount table.
    */
   static class ACLEntity {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto
index 043a21a..5d9b9d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto
@@ -22,6 +22,7 @@ option java_generic_services = true;
 option java_generate_equals_and_hash = true;
 package hadoop.hdfs;
 
+import "hdfs.proto";
 
 /////////////////////////////////////////////////
 // Membership
@@ -134,6 +135,8 @@ message MountTableRecordProto {
   optional string ownerName = 10;
   optional string groupName = 11;
   optional int32 mode = 12;
+
+  optional QuotaUsageProto quota = 13;
 }
 
 message AddMountTableEntryRequestProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 4ca7b58..cd365be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -5126,4 +5126,24 @@
     </description>
   </property>
 
+  <property>
+    <name>dfs.federation.router.quota.enable</name>
+    <value>false</value>
+    <description>
+      Set to true to enable quota system in Router.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.quota-cache.update.interval</name>
+    <value>60s</value>
+    <description>
+      Interval time for updating quota usage cache in Router.
+      This property is used only if the value of
+      dfs.federation.router.quota.enable is true.
+      This setting supports multiple time unit suffixes as described
+      in dfs.heartbeat.interval. If no suffix is specified then milliseconds
+      is assumed.
+    </description>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
index 88da77e..3d8b35c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
@@ -34,6 +34,7 @@ public class RouterConfigBuilder {
   private boolean enableLocalHeartbeat = false;
   private boolean enableStateStore = false;
   private boolean enableMetrics = false;
+  private boolean enableQuota = false;
 
   public RouterConfigBuilder(Configuration configuration) {
     this.conf = configuration;
@@ -89,6 +90,11 @@ public class RouterConfigBuilder {
     return this;
   }
 
+  public RouterConfigBuilder quota(boolean enable) {
+    this.enableQuota = enable;
+    return this;
+  }
+
   public RouterConfigBuilder rpc() {
     return this.rpc(true);
   }
@@ -113,6 +119,10 @@ public class RouterConfigBuilder {
     return this.metrics(true);
   }
 
+  public RouterConfigBuilder quota() {
+    return this.quota(true);
+  }
+
   public Configuration build() {
     conf.setBoolean(DFSConfigKeys.DFS_ROUTER_STORE_ENABLE,
         this.enableStateStore);
@@ -127,6 +137,8 @@ public class RouterConfigBuilder {
         this.enableLocalHeartbeat);
     conf.setBoolean(DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE,
         this.enableMetrics);
+    conf.setBoolean(DFSConfigKeys.DFS_ROUTER_QUOTA_ENABLE,
+        this.enableQuota);
     return conf;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
index 9e82967..ec47a41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
@@ -28,6 +28,7 @@ import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
 import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
@@ -294,4 +295,57 @@ public class TestRouterAdminCLI {
     argv = new String[] {"-rm", mount};
     assertEquals(rmCommandCode, ToolRunner.run(admin, argv));
   }
+
+  @Test
+  public void testSetAndClearQuota() throws Exception {
+    String nsId = "ns0";
+    String src = "/test-QuotaMounttable";
+    String dest = "/QuotaMounttable";
+    String[] argv = new String[] {"-add", src, nsId, dest};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
+        .newInstance(src);
+    GetMountTableEntriesResponse getResponse = client.getMountTableManager()
+        .getMountTableEntries(getRequest);
+    MountTable mountTable = getResponse.getEntries().get(0);
+    RouterQuotaUsage quotaUsage = mountTable.getQuota();
+
+    // verify the default quota set
+    assertEquals(0, quotaUsage.getFileAndDirectoryCount());
+    assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getQuota());
+    assertEquals(0, quotaUsage.getSpaceConsumed());
+    assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getSpaceQuota());
+
+    long nsQuota = 50;
+    long ssQuota = 100;
+    argv = new String[] {"-setQuota", src, "-nsQuota", String.valueOf(nsQuota),
+        "-ssQuota", String.valueOf(ssQuota)};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    getResponse = client.getMountTableManager()
+        .getMountTableEntries(getRequest);
+    mountTable = getResponse.getEntries().get(0);
+    quotaUsage = mountTable.getQuota();
+
+    // verify if the quota is set
+    assertEquals(nsQuota, quotaUsage.getQuota());
+    assertEquals(ssQuota, quotaUsage.getSpaceQuota());
+
+    // test clrQuota command
+    argv = new String[] {"-clrQuota", src};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    getResponse = client.getMountTableManager()
+        .getMountTableEntries(getRequest);
+    mountTable = getResponse.getEntries().get(0);
+    quotaUsage = mountTable.getQuota();
+
+    // verify if quota unset successfully
+    assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getQuota());
+    assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getSpaceQuota());
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java
new file mode 100644
index 0000000..368b5e2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java
@@ -0,0 +1,452 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.QuotaUsage;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Supplier;
+
+/**
+ * Tests quota behaviors in Router-based Federation.
+ */
+public class TestRouterQuota {
+  private static StateStoreDFSCluster cluster;
+  private static NamenodeContext nnContext1;
+  private static NamenodeContext nnContext2;
+  private static RouterContext routerContext;
+  private static MountTableResolver resolver;
+
+  private static final int BLOCK_SIZE = 512;
+
+  @Before
+  public void setUp() throws Exception {
+
+    // Build and start a federated cluster
+    cluster = new StateStoreDFSCluster(false, 2);
+    Configuration routerConf = new RouterConfigBuilder()
+        .stateStore()
+        .admin()
+        .quota()
+        .rpc()
+        .build();
+    routerConf.set(DFSConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL, "2s");
+
+    // override some hdfs settings that used in testing space quota
+    Configuration hdfsConf = new Configuration(false);
+    hdfsConf.setInt(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    hdfsConf.setInt(HdfsClientConfigKeys.DFS_REPLICATION_KEY, 1);
+
+    cluster.addRouterOverrides(routerConf);
+    cluster.addNamenodeOverrides(hdfsConf);
+    cluster.startCluster();
+    cluster.startRouters();
+    cluster.waitClusterUp();
+
+    nnContext1 = cluster.getNamenode(cluster.getNameservices().get(0), null);
+    nnContext2 = cluster.getNamenode(cluster.getNameservices().get(1), null);
+    routerContext = cluster.getRandomRouter();
+    Router router = routerContext.getRouter();
+    resolver = (MountTableResolver) router.getSubclusterResolver();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.stopRouter(routerContext);
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Test
+  public void testNamespaceQuotaExceed() throws Exception {
+    long nsQuota = 3;
+    final FileSystem nnFs1 = nnContext1.getFileSystem();
+    final FileSystem nnFs2 = nnContext2.getFileSystem();
+
+    // Add two mount tables:
+    // /nsquota --> ns0---testdir1
+    // /nsquota/subdir --> ns1---testdir2
+    nnFs1.mkdirs(new Path("/testdir1"));
+    nnFs2.mkdirs(new Path("/testdir2"));
+    MountTable mountTable1 = MountTable.newInstance("/nsquota",
+        Collections.singletonMap("ns0", "/testdir1"));
+
+    mountTable1.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota).build());
+    addMountTable(mountTable1);
+
+    MountTable mountTable2 = MountTable.newInstance("/nsquota/subdir",
+        Collections.singletonMap("ns1", "/testdir2"));
+    mountTable2.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota).build());
+    addMountTable(mountTable2);
+
+    final FileSystem routerFs = routerContext.getFileSystem();
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+
+      @Override
+      public Boolean get() {
+        boolean isNsQuotaViolated = false;
+        try {
+          // create new directory to trigger NSQuotaExceededException
+          routerFs.mkdirs(new Path("/nsquota/" + UUID.randomUUID()));
+          routerFs.mkdirs(new Path("/nsquota/subdir/" + UUID.randomUUID()));
+        } catch (NSQuotaExceededException e) {
+          isNsQuotaViolated = true;
+        } catch (IOException ignored) {
+        }
+        return isNsQuotaViolated;
+      }
+    }, 5000, 60000);
+    // mkdir in real FileSystem should be okay
+    nnFs1.mkdirs(new Path("/testdir1/" + UUID.randomUUID()));
+    nnFs2.mkdirs(new Path("/testdir2/" + UUID.randomUUID()));
+  }
+
+  @Test
+  public void testStorageSpaceQuotaaExceed() throws Exception {
+    long ssQuota = 3071;
+    final FileSystem nnFs1 = nnContext1.getFileSystem();
+    final FileSystem nnFs2 = nnContext2.getFileSystem();
+
+    // Add two mount tables:
+    // /ssquota --> ns0---testdir3
+    // /ssquota/subdir --> ns1---testdir4
+    nnFs1.mkdirs(new Path("/testdir3"));
+    nnFs2.mkdirs(new Path("/testdir4"));
+    MountTable mountTable1 = MountTable.newInstance("/ssquota",
+        Collections.singletonMap("ns0", "/testdir3"));
+
+    mountTable1
+        .setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota).build());
+    addMountTable(mountTable1);
+
+    MountTable mountTable2 = MountTable.newInstance("/ssquota/subdir",
+        Collections.singletonMap("ns1", "/testdir4"));
+    mountTable2
+        .setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota).build());
+    addMountTable(mountTable2);
+
+    DFSClient routerClient = routerContext.getClient();
+    routerClient.create("/ssquota/file", true).close();
+    routerClient.create("/ssquota/subdir/file", true).close();
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+
+      @Override
+      public Boolean get() {
+        boolean isDsQuotaViolated = false;
+        try {
+          // append data to trigger NSQuotaExceededException
+          appendData("/ssquota/file", routerClient, BLOCK_SIZE);
+          appendData("/ssquota/subdir/file", routerClient, BLOCK_SIZE);
+        } catch (DSQuotaExceededException e) {
+          isDsQuotaViolated = true;
+        } catch (IOException ignored) {
+        }
+        return isDsQuotaViolated;
+      }
+    }, 5000, 60000);
+
+    // append data to destination path in real FileSystem should be okay
+    appendData("/testdir3/file", nnContext1.getClient(), BLOCK_SIZE);
+    appendData("/testdir4/file", nnContext2.getClient(), BLOCK_SIZE);
+  }
+
+  /**
+   * Add a mount table entry to the mount table through the admin API.
+   * @param entry Mount table entry to add.
+   * @return If it was successfully added.
+   * @throws IOException Problems adding entries.
+   */
+  private boolean addMountTable(final MountTable entry) throws IOException {
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTableManager = client.getMountTableManager();
+    AddMountTableEntryRequest addRequest =
+        AddMountTableEntryRequest.newInstance(entry);
+    AddMountTableEntryResponse addResponse =
+        mountTableManager.addMountTableEntry(addRequest);
+
+    // Reload the Router cache
+    resolver.loadCache(true);
+
+    return addResponse.getStatus();
+  }
+
+  /**
+   * Append data in specified file.
+   * @param path Path of file.
+   * @param client DFS Client.
+   * @param dataLen The length of write data.
+   * @throws IOException
+   */
+  private void appendData(String path, DFSClient client, int dataLen)
+      throws IOException {
+    EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.APPEND);
+    HdfsDataOutputStream stream = client.append(path, 1024, createFlag, null,
+        null);
+    byte[] data = new byte[dataLen];
+    stream.write(data);
+    stream.close();
+  }
+
+  @Test
+  public void testSetQuota() throws Exception {
+    long nsQuota = 5;
+    long ssQuota = 100;
+    final FileSystem nnFs1 = nnContext1.getFileSystem();
+    final FileSystem nnFs2 = nnContext2.getFileSystem();
+
+    // Add two mount tables:
+    // /setquota --> ns0---testdir5
+    // /setquota/subdir --> ns1---testdir6
+    nnFs1.mkdirs(new Path("/testdir5"));
+    nnFs2.mkdirs(new Path("/testdir6"));
+    MountTable mountTable1 = MountTable.newInstance("/setquota",
+        Collections.singletonMap("ns0", "/testdir5"));
+    mountTable1
+        .setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
+        .spaceQuota(ssQuota).build());
+    addMountTable(mountTable1);
+
+    // don't set quota for subpath of mount table
+    MountTable mountTable2 = MountTable.newInstance("/setquota/subdir",
+        Collections.singletonMap("ns1", "/testdir6"));
+    addMountTable(mountTable2);
+
+    RouterQuotaUpdateService updateService = routerContext.getRouter()
+        .getQuotaCacheUpdateService();
+    // ensure setQuota RPC call was invoked
+    updateService.periodicInvoke();
+
+    ClientProtocol client1 = nnContext1.getClient().getNamenode();
+    ClientProtocol client2 = nnContext2.getClient().getNamenode();
+    final QuotaUsage quota1 = client1.getQuotaUsage("/testdir5");
+    final QuotaUsage quota2 = client2.getQuotaUsage("/testdir6");
+
+    assertEquals(nsQuota, quota1.getQuota());
+    assertEquals(ssQuota, quota1.getSpaceQuota());
+    assertEquals(nsQuota, quota2.getQuota());
+    assertEquals(ssQuota, quota2.getSpaceQuota());
+  }
+
+  @Test
+  public void testGetQuota() throws Exception {
+    long nsQuota = 10;
+    long ssQuota = 100;
+    final FileSystem nnFs1 = nnContext1.getFileSystem();
+    final FileSystem nnFs2 = nnContext2.getFileSystem();
+
+    // Add two mount tables:
+    // /getquota --> ns0---/testdir7
+    // /getquota/subdir1 --> ns0---/testdir7/subdir
+    // /getquota/subdir2 --> ns1---/testdir8
+    nnFs1.mkdirs(new Path("/testdir7"));
+    nnFs1.mkdirs(new Path("/testdir7/subdir"));
+    nnFs2.mkdirs(new Path("/testdir8"));
+    MountTable mountTable1 = MountTable.newInstance("/getquota",
+        Collections.singletonMap("ns0", "/testdir7"));
+    mountTable1
+        .setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
+        .spaceQuota(ssQuota).build());
+    addMountTable(mountTable1);
+
+    MountTable mountTable2 = MountTable.newInstance("/getquota/subdir1",
+        Collections.singletonMap("ns0", "/testdir7/subdir"));
+    addMountTable(mountTable2);
+
+    MountTable mountTable3 = MountTable.newInstance("/getquota/subdir2",
+        Collections.singletonMap("ns1", "/testdir8"));
+    addMountTable(mountTable3);
+
+    // use router client to create new files
+    DFSClient routerClient = routerContext.getClient();
+    routerClient.create("/getquota/file", true).close();
+    routerClient.create("/getquota/subdir1/file", true).close();
+    routerClient.create("/getquota/subdir2/file", true).close();
+
+    ClientProtocol clientProtocol = routerContext.getClient().getNamenode();
+    RouterQuotaUpdateService updateService = routerContext.getRouter()
+        .getQuotaCacheUpdateService();
+    updateService.periodicInvoke();
+    final QuotaUsage quota = clientProtocol.getQuotaUsage("/getquota");
+    // the quota should be aggregated
+    assertEquals(6, quota.getFileAndDirectoryCount());
+  }
+
+  @Test
+  public void testStaleQuotaRemoving() throws Exception {
+    long nsQuota = 20;
+    long ssQuota = 200;
+    String stalePath = "/stalequota";
+    final FileSystem nnFs1 = nnContext1.getFileSystem();
+
+    // Add one mount tables:
+    // /stalequota --> ns0---/testdir9
+    nnFs1.mkdirs(new Path("/testdir9"));
+    MountTable mountTable = MountTable.newInstance(stalePath,
+        Collections.singletonMap("ns0", "/testdir9"));
+    mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
+        .spaceQuota(ssQuota).build());
+    addMountTable(mountTable);
+
+    // Call periodicInvoke to ensure quota for stalePath was
+    // loaded into quota manager.
+    RouterQuotaUpdateService updateService = routerContext.getRouter()
+        .getQuotaCacheUpdateService();
+    updateService.periodicInvoke();
+
+    // use quota manager to get its quota usage and do verification
+    RouterQuotaManager quotaManager = routerContext.getRouter()
+        .getQuotaManager();
+    RouterQuotaUsage quota = quotaManager.getQuotaUsage(stalePath);
+    assertEquals(nsQuota, quota.getQuota());
+    assertEquals(ssQuota, quota.getSpaceQuota());
+
+    // remove stale path entry
+    removeMountTable(stalePath);
+    updateService.periodicInvoke();
+    // the stale entry should be removed and we will get null
+    quota = quotaManager.getQuotaUsage(stalePath);
+    assertNull(quota);
+  }
+
+  /**
+   * Remove a mount table entry to the mount table through the admin API.
+   * @param entry Mount table entry to remove.
+   * @return If it was successfully removed.
+   * @throws IOException Problems removing entries.
+   */
+  private boolean removeMountTable(String path) throws IOException {
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTableManager = client.getMountTableManager();
+    RemoveMountTableEntryRequest removeRequest = RemoveMountTableEntryRequest
+        .newInstance(path);
+    RemoveMountTableEntryResponse removeResponse = mountTableManager
+        .removeMountTableEntry(removeRequest);
+
+    // Reload the Router cache
+    resolver.loadCache(true);
+    return removeResponse.getStatus();
+  }
+
+  @Test
+  public void testQuotaUpdating() throws Exception {
+    long nsQuota = 30;
+    long ssQuota = 1024;
+    String path = "/updatequota";
+    final FileSystem nnFs1 = nnContext1.getFileSystem();
+
+    // Add one mount table:
+    // /updatequota --> ns0---/testdir10
+    nnFs1.mkdirs(new Path("/testdir10"));
+    MountTable mountTable = MountTable.newInstance(path,
+        Collections.singletonMap("ns0", "/testdir10"));
+    mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
+        .spaceQuota(ssQuota).build());
+    addMountTable(mountTable);
+
+    // Call periodicInvoke to ensure quota  updated in quota manager
+    // and state store.
+    RouterQuotaUpdateService updateService = routerContext.getRouter()
+        .getQuotaCacheUpdateService();
+    updateService.periodicInvoke();
+
+    // verify initial quota value
+    List<MountTable> results = getMountTable(path);
+    MountTable updatedMountTable = !results.isEmpty() ? results.get(0) : null;
+    RouterQuotaUsage quota = updatedMountTable.getQuota();
+    assertEquals(nsQuota, quota.getQuota());
+    assertEquals(ssQuota, quota.getSpaceQuota());
+    assertEquals(1, quota.getFileAndDirectoryCount());
+    assertEquals(0, quota.getSpaceConsumed());
+
+    // mkdir and write a new file
+    final FileSystem routerFs = routerContext.getFileSystem();
+    routerFs.mkdirs(new Path(path + UUID.randomUUID()));
+    DFSClient routerClient = routerContext.getClient();
+    routerClient.create(path + "/file", true).close();
+    appendData(path + "/file", routerClient, BLOCK_SIZE);
+
+    updateService.periodicInvoke();
+    results = getMountTable(path);
+    updatedMountTable = !results.isEmpty() ? results.get(0) : null;
+    quota = updatedMountTable.getQuota();
+
+    // verify if quota has been updated in state store
+    assertEquals(nsQuota, quota.getQuota());
+    assertEquals(ssQuota, quota.getSpaceQuota());
+    assertEquals(3, quota.getFileAndDirectoryCount());
+    assertEquals(BLOCK_SIZE, quota.getSpaceConsumed());
+  }
+
+  /**
+   * Get the mount table entries of specified path through the admin API.
+   * @param path Mount table entry to get.
+   * @return If it was successfully got.
+   * @throws IOException Problems getting entries.
+   */
+  private List<MountTable> getMountTable(String path) throws IOException {
+    // Reload the Router cache
+    resolver.loadCache(true);
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTableManager = client.getMountTableManager();
+    GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
+        .newInstance(path);
+    GetMountTableEntriesResponse removeResponse = mountTableManager
+        .getMountTableEntries(getRequest);
+
+    return removeResponse.getEntries();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuotaManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuotaManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuotaManager.java
new file mode 100644
index 0000000..346c881
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuotaManager.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Set;
+
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for class {@link RouterQuotaManager}.
+ */
+public class TestRouterQuotaManager {
+  private static RouterQuotaManager manager;
+
+  @Before
+  public void setup() {
+    manager = new RouterQuotaManager();
+  }
+
+  @After
+  public void cleanup() {
+    manager.clear();
+  }
+
+  @Test
+  public void testGetChildrenPaths() {
+    RouterQuotaUsage quotaUsage = new RouterQuotaUsage.Builder().build();
+    manager.put("/path1", quotaUsage);
+    manager.put("/path2", quotaUsage);
+    manager.put("/path1/subdir", quotaUsage);
+    manager.put("/path1/subdir/subdir", quotaUsage);
+
+    Set<String> childrenPaths = manager.getPaths("/path1");
+    assertEquals(3, childrenPaths.size());
+    assertTrue(childrenPaths.contains("/path1/subdir")
+        && childrenPaths.contains("/path1/subdir/subdir")
+        && childrenPaths.contains("/path1"));
+  }
+
+  @Test
+  public void testGetQuotaUsage() {
+    RouterQuotaUsage quotaGet;
+
+    // test case1: get quota with an non-exist path
+    quotaGet = manager.getQuotaUsage("/non-exist-path");
+    assertNull(quotaGet);
+
+    // test case2: get quota from an no-quota set path
+    RouterQuotaUsage.Builder quota = new RouterQuotaUsage.Builder()
+        .quota(HdfsConstants.QUOTA_DONT_SET)
+        .spaceQuota(HdfsConstants.QUOTA_DONT_SET);
+    manager.put("/noQuotaSet", quota.build());
+    quotaGet = manager.getQuotaUsage("/noQuotaSet");
+    // it should return null
+    assertNull(quotaGet);
+
+    // test case3: get quota from an quota-set path
+    quota.quota(1);
+    quota.spaceQuota(HdfsConstants.QUOTA_DONT_SET);
+    manager.put("/hasQuotaSet", quota.build());
+    quotaGet = manager.getQuotaUsage("/hasQuotaSet");
+    assertEquals(1, quotaGet.getQuota());
+    assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota());
+
+    // test case4: get quota with an non-exist child path
+    quotaGet = manager.getQuotaUsage("/hasQuotaSet/file");
+    // it will return the nearest ancestor which quota was set
+    assertEquals(1, quotaGet.getQuota());
+    assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota());
+
+    // test case5: get quota with an child path which its parent
+    // wasn't quota set
+    quota.quota(HdfsConstants.QUOTA_DONT_SET);
+    quota.spaceQuota(HdfsConstants.QUOTA_DONT_SET);
+    manager.put("/hasQuotaSet/noQuotaSet", quota.build());
+    // here should returns the quota of path /hasQuotaSet
+    // (the nearest ancestor which quota was set)
+    quotaGet = manager.getQuotaUsage("/hasQuotaSet/noQuotaSet/file");
+    assertEquals(1, quotaGet.getQuota());
+    assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota());
+
+    // test case6: get quota with an child path which its parent was quota set
+    quota.quota(2);
+    quota.spaceQuota(HdfsConstants.QUOTA_DONT_SET);
+    manager.put("/hasQuotaSet/hasQuotaSet", quota.build());
+    // here should return the quota of path /hasQuotaSet/hasQuotaSet
+    quotaGet = manager.getQuotaUsage("/hasQuotaSet/hasQuotaSet/file");
+    assertEquals(2, quotaGet.getQuota());
+    assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d98a2e6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java
index 739d2e4..d306f77 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java
@@ -27,8 +27,10 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
 import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
 import org.junit.Test;
 
@@ -56,6 +58,14 @@ public class TestMountTable {
   private static final long DATE_CREATED = 100;
   private static final long DATE_MOD = 200;
 
+  private static final long NS_COUNT = 1;
+  private static final long NS_QUOTA = 5;
+  private static final long SS_COUNT = 10;
+  private static final long SS_QUOTA = 100;
+
+  private static final RouterQuotaUsage QUOTA = new RouterQuotaUsage.Builder()
+      .fileAndDirectoryCount(NS_COUNT).quota(NS_QUOTA).spaceConsumed(SS_COUNT)
+      .spaceQuota(SS_QUOTA).build();
 
   @Test
   public void testGetterSetter() throws IOException {
@@ -68,6 +78,12 @@ public class TestMountTable {
     assertTrue(DATE_CREATED > 0);
     assertTrue(DATE_MOD > 0);
 
+    RouterQuotaUsage quota = record.getQuota();
+    assertEquals(0, quota.getFileAndDirectoryCount());
+    assertEquals(HdfsConstants.QUOTA_DONT_SET, quota.getQuota());
+    assertEquals(0, quota.getSpaceConsumed());
+    assertEquals(HdfsConstants.QUOTA_DONT_SET, quota.getSpaceQuota());
+
     MountTable record2 =
         MountTable.newInstance(SRC, DST_MAP, DATE_CREATED, DATE_MOD);
 
@@ -94,6 +110,7 @@ public class TestMountTable {
         SRC, DST_MAP, DATE_CREATED, DATE_MOD);
     record.setReadOnly(true);
     record.setDestOrder(order);
+    record.setQuota(QUOTA);
 
     StateStoreSerializer serializer = StateStoreSerializer.getSerializer();
     String serializedString = serializer.serializeString(record);
@@ -107,6 +124,12 @@ public class TestMountTable {
     assertEquals(DATE_MOD, record2.getDateModified());
     assertTrue(record2.isReadOnly());
     assertEquals(order, record2.getDestOrder());
+
+    RouterQuotaUsage quotaGet = record2.getQuota();
+    assertEquals(NS_COUNT, quotaGet.getFileAndDirectoryCount());
+    assertEquals(NS_QUOTA, quotaGet.getQuota());
+    assertEquals(SS_COUNT, quotaGet.getSpaceConsumed());
+    assertEquals(SS_QUOTA, quotaGet.getSpaceQuota());
   }
 
   @Test
@@ -172,4 +195,22 @@ public class TestMountTable {
     assertEquals(DST_NS_1, location2.getNameserviceId());
     assertEquals(DST_PATH_1, location2.getDest());
   }
+
+  @Test
+  public void testQuota() throws IOException {
+    MountTable record = MountTable.newInstance(SRC, DST_MAP);
+    record.setQuota(QUOTA);
+
+    validateDestinations(record);
+    assertEquals(SRC, record.getSourcePath());
+    assertEquals(DST, record.getDestinations());
+    assertTrue(DATE_CREATED > 0);
+    assertTrue(DATE_MOD > 0);
+
+    RouterQuotaUsage quotaGet = record.getQuota();
+    assertEquals(NS_COUNT, quotaGet.getFileAndDirectoryCount());
+    assertEquals(NS_QUOTA, quotaGet.getQuota());
+    assertEquals(SS_COUNT, quotaGet.getSpaceConsumed());
+    assertEquals(SS_QUOTA, quotaGet.getSpaceQuota());
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org