You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/02/11 20:49:05 UTC

[47/50] [abbrv] hadoop git commit: HDFS-7723. Quota By Storage Type namenode implemenation. (Contributed by Xiaoyu Yao)

HDFS-7723. Quota By Storage Type namenode implemenation. (Contributed by Xiaoyu Yao)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5dae97a5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5dae97a5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5dae97a5

Branch: refs/heads/YARN-2928
Commit: 5dae97a584d30cef3e34141edfaca49c4ec57913
Parents: e42fc1a
Author: Arpit Agarwal <ar...@apache.org>
Authored: Wed Feb 11 10:41:50 2015 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Wed Feb 11 10:41:50 2015 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../server/blockmanagement/BlockManager.java    |   4 +
 .../hdfs/server/namenode/BackupImage.java       |   4 +-
 .../ContentSummaryComputationContext.java       |   5 +
 .../namenode/DirectoryWithQuotaFeature.java     | 228 +++++---
 .../hdfs/server/namenode/FSDirAttrOp.java       |  64 ++-
 .../hdfs/server/namenode/FSDirConcatOp.java     |  45 +-
 .../hdfs/server/namenode/FSDirDeleteOp.java     |  11 +-
 .../hdfs/server/namenode/FSDirRenameOp.java     |  47 +-
 .../hdfs/server/namenode/FSDirectory.java       | 171 ++++--
 .../hadoop/hdfs/server/namenode/FSEditLog.java  |  10 +
 .../hdfs/server/namenode/FSEditLogLoader.java   |  15 +-
 .../hdfs/server/namenode/FSEditLogOp.java       |  85 +++
 .../hdfs/server/namenode/FSEditLogOpCodes.java  |   1 +
 .../hadoop/hdfs/server/namenode/FSImage.java    |  60 ++-
 .../hdfs/server/namenode/FSImageFormat.java     |  15 +-
 .../server/namenode/FSImageFormatPBINode.java   |  80 ++-
 .../server/namenode/FSImageSerialization.java   |   6 +-
 .../hdfs/server/namenode/FSNamesystem.java      |  13 +-
 .../hadoop/hdfs/server/namenode/INode.java      |  76 +--
 .../hdfs/server/namenode/INodeDirectory.java    | 108 ++--
 .../namenode/INodeDirectoryAttributes.java      |  30 +-
 .../hadoop/hdfs/server/namenode/INodeFile.java  |  84 ++-
 .../hadoop/hdfs/server/namenode/INodeMap.java   |  19 +-
 .../hdfs/server/namenode/INodeReference.java    |  79 +--
 .../hdfs/server/namenode/INodeSymlink.java      |  18 +-
 .../server/namenode/NameNodeLayoutVersion.java  |   3 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java |   8 +-
 .../hadoop/hdfs/server/namenode/Quota.java      |   2 +-
 .../namenode/QuotaByStorageTypeEntry.java       |  85 +++
 .../hdfs/server/namenode/QuotaCounts.java       | 179 +++++++
 .../namenode/snapshot/AbstractINodeDiff.java    |  11 +-
 .../snapshot/AbstractINodeDiffList.java         |  12 +-
 .../snapshot/DirectorySnapshottableFeature.java |  11 +-
 .../snapshot/DirectoryWithSnapshotFeature.java  | 103 ++--
 .../snapshot/FSImageFormatPBSnapshot.java       |  37 +-
 .../hdfs/server/namenode/snapshot/FileDiff.java |  12 +-
 .../server/namenode/snapshot/FileDiffList.java  |   5 +-
 .../snapshot/FileWithSnapshotFeature.java       |  69 ++-
 .../namenode/snapshot/SnapshotManager.java      |   3 +-
 .../apache/hadoop/hdfs/util/EnumCounters.java   |  36 +-
 .../hadoop-hdfs/src/main/proto/fsimage.proto    |  10 +
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |   2 +
 .../hdfs/TestDFSInotifyEventInputStream.java    |   2 +-
 .../hdfs/server/namenode/FSImageTestUtil.java   |   2 +-
 .../namenode/TestDiskspaceQuotaUpdate.java      |  34 +-
 .../namenode/TestFSImageWithSnapshot.java       |   2 +-
 .../server/namenode/TestQuotaByStorageType.java | 524 +++++++++++++++++++
 .../snapshot/TestRenameWithSnapshots.java       |  49 +-
 .../namenode/snapshot/TestSnapshotDeletion.java |  16 +-
 .../hadoop-hdfs/src/test/resources/editsStored  | Bin 5803 -> 5850 bytes
 .../src/test/resources/editsStored.xml          | 373 ++++++-------
 52 files changed, 2154 insertions(+), 717 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 758bcb1..625735e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -23,6 +23,9 @@ Trunk (Unreleased)
     HDFS-7720. Quota by Storage Type API, tools and ClientNameNode Protocol
     changes. (Xiaoyu Yao via Arpit Agarwal)
 
+    HDFS-7723. Quota By Storage Type namenode implemenation. (Xiaoyu Yao via
+    Arpit Agarwal)
+
   IMPROVEMENTS
 
     HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 3fe47af..b05d6a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -421,6 +421,10 @@ public class BlockManager {
     }
   }
 
+  public BlockStoragePolicySuite getStoragePolicySuite() {
+    return storagePolicySuite;
+  }
+
   /** get the BlockTokenSecretManager */
   @VisibleForTesting
   public BlockTokenSecretManager getBlockTokenSecretManager() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
index dc44959..f0879ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
@@ -216,7 +216,9 @@ public class BackupImage extends FSImage {
       }
       lastAppliedTxId = logLoader.getLastAppliedTxId();
 
-      FSImage.updateCountForQuota(getNamesystem().dir.getRoot()); // inefficient!
+      FSImage.updateCountForQuota(
+          getNamesystem().dir.getBlockStoragePolicySuite(),
+          getNamesystem().dir.rootDir); // inefficient!
     } finally {
       backupInputStream.clear();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryComputationContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryComputationContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryComputationContext.java
index dab64ec..63fa8c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryComputationContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryComputationContext.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -116,4 +117,8 @@ public class ContentSummaryComputationContext {
   public Content.Counts getCounts() {
     return counts;
   }
+
+  public BlockStoragePolicySuite getBlockStoragePolicySuite() {
+      return fsn.getBlockManager().getStoragePolicySuite();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DirectoryWithQuotaFeature.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DirectoryWithQuotaFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DirectoryWithQuotaFeature.java
index 05742b2..e7eeba6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DirectoryWithQuotaFeature.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DirectoryWithQuotaFeature.java
@@ -21,47 +21,97 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.util.EnumCounters;
 
 /**
  * Quota feature for {@link INodeDirectory}. 
  */
 public final class DirectoryWithQuotaFeature implements INode.Feature {
   public static final long DEFAULT_NAMESPACE_QUOTA = Long.MAX_VALUE;
-  public static final long DEFAULT_DISKSPACE_QUOTA = HdfsConstants.QUOTA_RESET;
-
-  /** Name space quota */
-  private long nsQuota = DEFAULT_NAMESPACE_QUOTA;
-  /** Name space count */
-  private long namespace = 1L;
-  /** Disk space quota */
-  private long dsQuota = DEFAULT_DISKSPACE_QUOTA;
-  /** Disk space count */
-  private long diskspace = 0L;
-  
-  DirectoryWithQuotaFeature(long nsQuota, long dsQuota) {
-    this.nsQuota = nsQuota;
-    this.dsQuota = dsQuota;
+  public static final long DEFAULT_SPACE_QUOTA = HdfsConstants.QUOTA_RESET;
+
+  private QuotaCounts quota;
+  private QuotaCounts usage;
+
+  public static class Builder {
+    private QuotaCounts quota;
+    private QuotaCounts usage;
+
+    public Builder() {
+      this.quota = new QuotaCounts.Builder().nameCount(DEFAULT_NAMESPACE_QUOTA).
+          spaceCount(DEFAULT_SPACE_QUOTA).typeCounts(DEFAULT_SPACE_QUOTA).build();
+      this.usage = new QuotaCounts.Builder().nameCount(1).build();
+    }
+
+    public Builder nameSpaceQuota(long nameSpaceQuota) {
+      this.quota.setNameSpace(nameSpaceQuota);
+      return this;
+    }
+
+    public Builder spaceQuota(long spaceQuota) {
+      this.quota.setDiskSpace(spaceQuota);
+      return this;
+    }
+
+    public Builder typeQuotas(EnumCounters<StorageType> typeQuotas) {
+      this.quota.setTypeSpaces(typeQuotas);
+      return this;
+    }
+
+    public Builder typeQuota(StorageType type, long quota) {
+      this.quota.setTypeSpace(type, quota);
+      return this;
+    }
+
+    public DirectoryWithQuotaFeature build() {
+      return new DirectoryWithQuotaFeature(this);
+    }
+  }
+
+  private DirectoryWithQuotaFeature(Builder builder) {
+    this.quota = builder.quota;
+    this.usage = builder.usage;
   }
 
   /** @return the quota set or -1 if it is not set. */
-  Quota.Counts getQuota() {
-    return Quota.Counts.newInstance(nsQuota, dsQuota);
+  QuotaCounts getQuota() {
+    return new QuotaCounts.Builder().quotaCount(this.quota).build();
   }
-  
+
   /** Set this directory's quota
    * 
    * @param nsQuota Namespace quota to be set
    * @param dsQuota Diskspace quota to be set
+   * @param type Storage type quota to be set
+   * * To set traditional space/namespace quota, type must be null
    */
+  void setQuota(long nsQuota, long dsQuota, StorageType type) {
+    if (type != null) {
+      this.quota.setTypeSpace(type, dsQuota);
+    } else {
+      setQuota(nsQuota, dsQuota);
+    }
+  }
+
   void setQuota(long nsQuota, long dsQuota) {
-    this.nsQuota = nsQuota;
-    this.dsQuota = dsQuota;
+    this.quota.setNameSpace(nsQuota);
+    this.quota.setDiskSpace(dsQuota);
   }
-  
-  Quota.Counts addNamespaceDiskspace(Quota.Counts counts) {
-    counts.add(Quota.NAMESPACE, namespace);
-    counts.add(Quota.DISKSPACE, diskspace);
+
+  void setQuota(long dsQuota, StorageType type) {
+    this.quota.setTypeSpace(type, dsQuota);
+  }
+
+  // Set in a batch only during FSImage load
+  void setQuota(EnumCounters<StorageType> typeQuotas) {
+    this.quota.setTypeSpaces(typeQuotas);
+  }
+
+  QuotaCounts addNamespaceDiskspace(QuotaCounts counts) {
+    counts.add(this.usage);
     return counts;
   }
 
@@ -76,45 +126,42 @@ public final class DirectoryWithQuotaFeature implements INode.Feature {
     }
     return summary;
   }
-  
+
   private void checkDiskspace(final INodeDirectory dir, final long computed) {
-    if (-1 != getQuota().get(Quota.DISKSPACE) && diskspace != computed) {
+    if (-1 != quota.getDiskSpace() && usage.getDiskSpace() != computed) {
       NameNode.LOG.error("BUG: Inconsistent diskspace for directory "
-          + dir.getFullPathName() + ". Cached = " + diskspace
+          + dir.getFullPathName() + ". Cached = " + usage.getDiskSpace()
           + " != Computed = " + computed);
     }
   }
 
-  void addSpaceConsumed(final INodeDirectory dir, final long nsDelta,
-      final long dsDelta, boolean verify) throws QuotaExceededException {
-    if (dir.isQuotaSet()) { 
-      // The following steps are important: 
+  void addSpaceConsumed(final INodeDirectory dir, final QuotaCounts counts,
+      boolean verify) throws QuotaExceededException {
+    if (dir.isQuotaSet()) {
+      // The following steps are important:
       // check quotas in this inode and all ancestors before changing counts
       // so that no change is made if there is any quota violation.
-
       // (1) verify quota in this inode
       if (verify) {
-        verifyQuota(nsDelta, dsDelta);
+        verifyQuota(counts);
       }
-      // (2) verify quota and then add count in ancestors 
-      dir.addSpaceConsumed2Parent(nsDelta, dsDelta, verify);
+      // (2) verify quota and then add count in ancestors
+      dir.addSpaceConsumed2Parent(counts, verify);
       // (3) add count in this inode
-      addSpaceConsumed2Cache(nsDelta, dsDelta);
+      addSpaceConsumed2Cache(counts);
     } else {
-      dir.addSpaceConsumed2Parent(nsDelta, dsDelta, verify);
+      dir.addSpaceConsumed2Parent(counts, verify);
     }
   }
   
-  /** Update the size of the tree
+  /** Update the space/namespace/type usage of the tree
    * 
-   * @param nsDelta the change of the tree size
-   * @param dsDelta change to disk space occupied
+   * @param delta the change of the namespace/space/type usage
    */
-  public void addSpaceConsumed2Cache(long nsDelta, long dsDelta) {
-    namespace += nsDelta;
-    diskspace += dsDelta;
+  public void addSpaceConsumed2Cache(QuotaCounts delta) {
+    usage.add(delta);
   }
-  
+
   /** 
    * Sets namespace and diskspace take by the directory rooted 
    * at this INode. This should be used carefully. It does not check 
@@ -122,52 +169,103 @@ public final class DirectoryWithQuotaFeature implements INode.Feature {
    * 
    * @param namespace size of the directory to be set
    * @param diskspace disk space take by all the nodes under this directory
+   * @param typeUsed counters of storage type usage
    */
-  void setSpaceConsumed(long namespace, long diskspace) {
-    this.namespace = namespace;
-    this.diskspace = diskspace;
+  void setSpaceConsumed(long namespace, long diskspace,
+      EnumCounters<StorageType> typeUsed) {
+    usage.setNameSpace(namespace);
+    usage.setDiskSpace(diskspace);
+    usage.setTypeSpaces(typeUsed);
   }
-  
+
+  void setSpaceConsumed(QuotaCounts c) {
+    usage.setNameSpace(c.getNameSpace());
+    usage.setDiskSpace(c.getDiskSpace());
+    usage.setTypeSpaces(c.getTypeSpaces());
+  }
+
   /** @return the namespace and diskspace consumed. */
-  public Quota.Counts getSpaceConsumed() {
-    return Quota.Counts.newInstance(namespace, diskspace);
+  public QuotaCounts getSpaceConsumed() {
+    return new QuotaCounts.Builder().quotaCount(usage).build();
   }
 
   /** Verify if the namespace quota is violated after applying delta. */
   private void verifyNamespaceQuota(long delta) throws NSQuotaExceededException {
-    if (Quota.isViolated(nsQuota, namespace, delta)) {
-      throw new NSQuotaExceededException(nsQuota, namespace + delta);
+    if (Quota.isViolated(quota.getNameSpace(), usage.getNameSpace(), delta)) {
+      throw new NSQuotaExceededException(quota.getNameSpace(),
+          usage.getNameSpace() + delta);
     }
   }
   /** Verify if the diskspace quota is violated after applying delta. */
   private void verifyDiskspaceQuota(long delta) throws DSQuotaExceededException {
-    if (Quota.isViolated(dsQuota, diskspace, delta)) {
-      throw new DSQuotaExceededException(dsQuota, diskspace + delta);
+    if (Quota.isViolated(quota.getDiskSpace(), usage.getDiskSpace(), delta)) {
+      throw new DSQuotaExceededException(quota.getDiskSpace(),
+          usage.getDiskSpace() + delta);
+    }
+  }
+
+  private void verifyQuotaByStorageType(EnumCounters<StorageType> typeDelta)
+      throws QuotaByStorageTypeExceededException {
+    if (!isQuotaByStorageTypeSet()) {
+      return;
+    }
+    for (StorageType t: StorageType.getTypesSupportingQuota()) {
+      if (!isQuotaByStorageTypeSet(t)) {
+        continue;
+      }
+      if (Quota.isViolated(quota.getTypeSpace(t), usage.getTypeSpace(t),
+          typeDelta.get(t))) {
+        throw new QuotaByStorageTypeExceededException(
+          quota.getTypeSpace(t), usage.getTypeSpace(t) + typeDelta.get(t), t);
+      }
     }
   }
 
   /**
-   * @throws QuotaExceededException if namespace or diskspace quotas is
-   *         violated after applying the deltas.
+   * @throws QuotaExceededException if namespace, diskspace or storage type quotas
+   * is violated after applying the deltas.
    */
-  void verifyQuota(long nsDelta, long dsDelta) throws QuotaExceededException {
-    verifyNamespaceQuota(nsDelta);
-    verifyDiskspaceQuota(dsDelta);
+  void verifyQuota(QuotaCounts counts) throws QuotaExceededException {
+    verifyNamespaceQuota(counts.getNameSpace());
+    verifyDiskspaceQuota(counts.getDiskSpace());
+    verifyQuotaByStorageType(counts.getTypeSpaces());
   }
-  
+
   boolean isQuotaSet() {
-    return nsQuota >= 0 || dsQuota >= 0;
+    return quota.anyNsSpCountGreaterOrEqual(0) ||
+        quota.anyTypeCountGreaterOrEqual(0);
+  }
+
+  boolean isQuotaByStorageTypeSet() {
+    return quota.anyTypeCountGreaterOrEqual(0);
+  }
+
+  boolean isQuotaByStorageTypeSet(StorageType t) {
+    return quota.getTypeSpace(t) >= 0;
   }
 
   private String namespaceString() {
-    return "namespace: " + (nsQuota < 0? "-": namespace + "/" + nsQuota);
+    return "namespace: " + (quota.getNameSpace() < 0? "-":
+        usage.getNameSpace() + "/" + quota.getNameSpace());
   }
   private String diskspaceString() {
-    return "diskspace: " + (dsQuota < 0? "-": diskspace + "/" + dsQuota);
+    return "diskspace: " + (quota.getDiskSpace() < 0? "-":
+        usage.getDiskSpace() + "/" + quota.getDiskSpace());
   }
-  
+
+  private String quotaByStorageTypeString() {
+    StringBuilder sb = new StringBuilder();
+    for (StorageType t : StorageType.getTypesSupportingQuota()) {
+      sb.append("StorageType: " + t +
+          (quota.getTypeSpace(t) < 0? "-":
+          usage.getTypeSpace(t) + "/" + usage.getTypeSpace(t)));
+    }
+    return sb.toString();
+  }
+
   @Override
   public String toString() {
-    return "Quota[" + namespaceString() + ", " + diskspaceString() + "]";
+    return "Quota[" + namespaceString() + ", " + diskspaceString() +
+        ", " + quotaByStorageTypeString() + "]";
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
index 6c1890e..5843b4c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.util.EnumCounters;
 import org.apache.hadoop.security.AccessControlException;
 
 import java.io.FileNotFoundException;
@@ -41,6 +43,7 @@ import java.util.EnumSet;
 import java.util.List;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
 
 public class FSDirAttrOp {
@@ -216,11 +219,11 @@ public class FSDirAttrOp {
   }
 
   /**
-   * Set the namespace quota and diskspace quota for a directory.
+   * Set the namespace quota, diskspace and typeSpace quota for a directory.
    *
    * Note: This does not support ".inodes" relative path.
    */
-  static void setQuota(FSDirectory fsd, String src, long nsQuota, long dsQuota)
+  static void setQuota(FSDirectory fsd, String src, long nsQuota, long dsQuota, StorageType type)
       throws IOException {
     if (fsd.isPermissionEnabled()) {
       FSPermissionChecker pc = fsd.getPermissionChecker();
@@ -229,11 +232,15 @@ public class FSDirAttrOp {
 
     fsd.writeLock();
     try {
-      INodeDirectory changed = unprotectedSetQuota(fsd, src, nsQuota, dsQuota);
+      INodeDirectory changed = unprotectedSetQuota(fsd, src, nsQuota, dsQuota, type);
       if (changed != null) {
-        final Quota.Counts q = changed.getQuotaCounts();
-        fsd.getEditLog().logSetQuota(
-            src, q.get(Quota.NAMESPACE), q.get(Quota.DISKSPACE));
+        final QuotaCounts q = changed.getQuotaCounts();
+        if (type == null) {
+          fsd.getEditLog().logSetQuota(src, q.getNameSpace(), q.getDiskSpace());
+        } else {
+          fsd.getEditLog().logSetQuotaByStorageType(
+              src, q.getTypeSpaces().get(type), type);
+        }
       }
     } finally {
       fsd.writeUnlock();
@@ -294,7 +301,8 @@ public class FSDirAttrOp {
   }
 
   /**
-   * See {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long)}
+   * See {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String,
+   *     long, long, StorageType)}
    * for the contract.
    * Sets quota for for a directory.
    * @return INodeDirectory if any of the quotas have changed. null otherwise.
@@ -306,10 +314,10 @@ public class FSDirAttrOp {
    * @throws SnapshotAccessControlException if path is in RO snapshot
    */
   static INodeDirectory unprotectedSetQuota(
-      FSDirectory fsd, String src, long nsQuota, long dsQuota)
+      FSDirectory fsd, String src, long nsQuota, long dsQuota, StorageType type)
       throws FileNotFoundException, PathIsNotDirectoryException,
       QuotaExceededException, UnresolvedLinkException,
-      SnapshotAccessControlException {
+      SnapshotAccessControlException, UnsupportedActionException {
     assert fsd.hasWriteLock();
     // sanity check
     if ((nsQuota < 0 && nsQuota != HdfsConstants.QUOTA_DONT_SET &&
@@ -320,6 +328,15 @@ public class FSDirAttrOp {
                                          "dsQuota : " + nsQuota + " and " +
                                          dsQuota);
     }
+    // sanity check for quota by storage type
+    if ((type != null) && (!fsd.isQuotaByStorageTypeEnabled() ||
+        nsQuota != HdfsConstants.QUOTA_DONT_SET)) {
+      throw new UnsupportedActionException(
+          "Failed to set quota by storage type because either" +
+          DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY + " is set to " +
+          fsd.isQuotaByStorageTypeEnabled() + " or nsQuota value is illegal " +
+          nsQuota);
+    }
 
     String srcs = FSDirectory.normalizePath(src);
     final INodesInPath iip = fsd.getINodesInPath4Write(srcs, true);
@@ -327,22 +344,33 @@ public class FSDirAttrOp {
     if (dirNode.isRoot() && nsQuota == HdfsConstants.QUOTA_RESET) {
       throw new IllegalArgumentException("Cannot clear namespace quota on root.");
     } else { // a directory inode
-      final Quota.Counts oldQuota = dirNode.getQuotaCounts();
-      final long oldNsQuota = oldQuota.get(Quota.NAMESPACE);
-      final long oldDsQuota = oldQuota.get(Quota.DISKSPACE);
+      final QuotaCounts oldQuota = dirNode.getQuotaCounts();
+      final long oldNsQuota = oldQuota.getNameSpace();
+      final long oldDsQuota = oldQuota.getDiskSpace();
+
       if (nsQuota == HdfsConstants.QUOTA_DONT_SET) {
         nsQuota = oldNsQuota;
       }
       if (dsQuota == HdfsConstants.QUOTA_DONT_SET) {
         dsQuota = oldDsQuota;
       }
-      if (oldNsQuota == nsQuota && oldDsQuota == dsQuota) {
+
+      // unchanged space/namespace quota
+      if (type == null && oldNsQuota == nsQuota && oldDsQuota == dsQuota) {
         return null;
       }
 
+      // unchanged type quota
+      if (type != null) {
+          EnumCounters<StorageType> oldTypeQuotas = oldQuota.getTypeSpaces();
+          if (oldTypeQuotas != null && oldTypeQuotas.get(type) == dsQuota) {
+              return null;
+          }
+      }
+
       final int latest = iip.getLatestSnapshotId();
       dirNode.recordModification(latest);
-      dirNode.setQuota(nsQuota, dsQuota);
+      dirNode.setQuota(fsd.getBlockStoragePolicySuite(), nsQuota, dsQuota, type);
       return dirNode;
     }
   }
@@ -365,8 +393,8 @@ public class FSDirAttrOp {
     // if replication > oldBR, then newBR == replication.
     // if replication < oldBR, we don't know newBR yet.
     if (replication > oldBR) {
-      long dsDelta = (replication - oldBR)*(file.diskspaceConsumed()/oldBR);
-      fsd.updateCount(iip, 0, dsDelta, true);
+      long dsDelta = file.diskspaceConsumed()/oldBR;
+      fsd.updateCount(iip, 0L, dsDelta, oldBR, replication, true);
     }
 
     file.setFileReplication(replication, iip.getLatestSnapshotId());
@@ -374,8 +402,8 @@ public class FSDirAttrOp {
     final short newBR = file.getBlockReplication();
     // check newBR < oldBR case.
     if (newBR < oldBR) {
-      long dsDelta = (newBR - oldBR)*(file.diskspaceConsumed()/newBR);
-      fsd.updateCount(iip, 0, dsDelta, true);
+      long dsDelta = file.diskspaceConsumed()/newBR;
+      fsd.updateCount(iip, 0L, dsDelta, oldBR, newBR, true);
     }
 
     if (blockRepls != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
index ecfd2e1..d8cf42a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
@@ -20,14 +20,17 @@ package org.apache.hadoop.hdfs.server.namenode;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
+import org.apache.hadoop.hdfs.StorageType;
 
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.List;
 
 import static org.apache.hadoop.util.Time.now;
 
@@ -140,25 +143,42 @@ class FSDirConcatOp {
     return si.toArray(new INodeFile[si.size()]);
   }
 
-  private static long computeQuotaDelta(INodeFile target, INodeFile[] srcList) {
-    long delta = 0;
+  private static QuotaCounts computeQuotaDeltas(FSDirectory fsd, INodeFile target, INodeFile[] srcList) {
+    QuotaCounts deltas = new QuotaCounts.Builder().build();
     short targetRepl = target.getBlockReplication();
     for (INodeFile src : srcList) {
-      if (targetRepl != src.getBlockReplication()) {
-        delta += src.computeFileSize() *
-            (targetRepl - src.getBlockReplication());
+      short srcRepl = src.getBlockReplication();
+      long fileSize = src.computeFileSize();
+      if (targetRepl != srcRepl) {
+        deltas.addDiskSpace(fileSize * (targetRepl - srcRepl));
+        BlockStoragePolicy bsp =
+            fsd.getBlockStoragePolicySuite().getPolicy(src.getStoragePolicyID());
+        if (bsp != null) {
+          List<StorageType> srcTypeChosen = bsp.chooseStorageTypes(srcRepl);
+          for (StorageType t : srcTypeChosen) {
+            if (t.supportTypeQuota()) {
+              deltas.addTypeSpace(t, -fileSize);
+            }
+          }
+          List<StorageType> targetTypeChosen = bsp.chooseStorageTypes(targetRepl);
+          for (StorageType t : targetTypeChosen) {
+            if (t.supportTypeQuota()) {
+              deltas.addTypeSpace(t, fileSize);
+            }
+          }
+        }
       }
     }
-    return delta;
+    return deltas;
   }
 
   private static void verifyQuota(FSDirectory fsd, INodesInPath targetIIP,
-      long delta) throws QuotaExceededException {
+      QuotaCounts deltas) throws QuotaExceededException {
     if (!fsd.getFSNamesystem().isImageLoaded() || fsd.shouldSkipQuotaChecks()) {
       // Do not check quota if editlog is still being processed
       return;
     }
-    FSDirectory.verifyQuota(targetIIP, targetIIP.length() - 1, 0, delta, null);
+    FSDirectory.verifyQuota(targetIIP, targetIIP.length() - 1, deltas, null);
   }
 
   /**
@@ -174,8 +194,8 @@ class FSDirConcatOp {
     }
 
     final INodeFile trgInode = targetIIP.getLastINode().asFile();
-    long delta = computeQuotaDelta(trgInode, srcList);
-    verifyQuota(fsd, targetIIP, delta);
+    QuotaCounts deltas = computeQuotaDeltas(fsd, trgInode, srcList);
+    verifyQuota(fsd, targetIIP, deltas);
 
     // the target file can be included in a snapshot
     trgInode.recordModification(targetIIP.getLatestSnapshotId());
@@ -195,8 +215,7 @@ class FSDirConcatOp {
 
     trgInode.setModificationTime(timestamp, targetIIP.getLatestSnapshotId());
     trgParent.updateModificationTime(timestamp, targetIIP.getLatestSnapshotId());
-    // update quota on the parent directory ('count' files removed, 0 space)
-    FSDirectory.unprotectedUpdateCount(targetIIP, targetIIP.length() - 1,
-        -count, delta);
+    // update quota on the parent directory with deltas
+    FSDirectory.unprotectedUpdateCount(targetIIP, targetIIP.length() - 1, deltas);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
index 47c641c..2fc4711 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirDeleteOp.java
@@ -230,13 +230,14 @@ class FSDirDeleteOp {
 
     // collect block and update quota
     if (!targetNode.isInLatestSnapshot(latestSnapshot)) {
-      targetNode.destroyAndCollectBlocks(collectedBlocks, removedINodes);
+      targetNode.destroyAndCollectBlocks(fsd.getBlockStoragePolicySuite(),
+        collectedBlocks, removedINodes);
     } else {
-      Quota.Counts counts = targetNode.cleanSubtree(CURRENT_STATE_ID,
+      QuotaCounts counts = targetNode.cleanSubtree(
+        fsd.getBlockStoragePolicySuite(), CURRENT_STATE_ID,
           latestSnapshot, collectedBlocks, removedINodes);
-      removed = counts.get(Quota.NAMESPACE);
-      fsd.updateCountNoQuotaCheck(iip, iip.length() - 1,
-          -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
+      removed = counts.getNameSpace();
+      fsd.updateCountNoQuotaCheck(iip, iip.length() -1, counts.negation());
     }
 
     if (NameNode.stateChangeLog.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
index 6a58093..c57cae2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
@@ -90,16 +91,15 @@ class FSDirRenameOp {
     int i = 0;
     while(src.getINode(i) == dst.getINode(i)) { i++; }
     // src[i - 1] is the last common ancestor.
-
-    final Quota.Counts delta = src.getLastINode().computeQuotaUsage();
+    BlockStoragePolicySuite bsps = fsd.getBlockStoragePolicySuite();
+    final QuotaCounts delta = src.getLastINode().computeQuotaUsage(bsps);
 
     // Reduce the required quota by dst that is being removed
     final INode dstINode = dst.getLastINode();
     if (dstINode != null) {
-      delta.subtract(dstINode.computeQuotaUsage());
+      delta.subtract(dstINode.computeQuotaUsage(bsps));
     }
-    FSDirectory.verifyQuota(dst, dst.length() - 1, delta.get(Quota.NAMESPACE),
-        delta.get(Quota.DISKSPACE), src.getINode(i - 1));
+    FSDirectory.verifyQuota(dst, dst.length() - 1, delta, src.getINode(i - 1));
   }
 
   /**
@@ -207,7 +207,7 @@ class FSDirRenameOp {
         }
 
         tx.updateMtimeAndLease(timestamp);
-        tx.updateQuotasInSourceTree();
+        tx.updateQuotasInSourceTree(fsd.getBlockStoragePolicySuite());
 
         return true;
       }
@@ -356,6 +356,7 @@ class FSDirRenameOp {
       throw new IOException(error);
     }
 
+    BlockStoragePolicySuite bsps = fsd.getBlockStoragePolicySuite();
     fsd.ezManager.checkMoveValidity(srcIIP, dstIIP, src);
     final INode dstInode = dstIIP.getLastINode();
     List<INodeDirectory> snapshottableDirs = new ArrayList<>();
@@ -412,7 +413,7 @@ class FSDirRenameOp {
         if (undoRemoveDst) {
           undoRemoveDst = false;
           if (removedNum > 0) {
-            filesDeleted = tx.cleanDst(collectedBlocks);
+            filesDeleted = tx.cleanDst(bsps, collectedBlocks);
           }
         }
 
@@ -422,7 +423,7 @@ class FSDirRenameOp {
           fsd.getFSNamesystem().removeSnapshottableDirs(snapshottableDirs);
         }
 
-        tx.updateQuotasInSourceTree();
+        tx.updateQuotasInSourceTree(bsps);
         return filesDeleted;
       }
     } finally {
@@ -430,7 +431,7 @@ class FSDirRenameOp {
         tx.restoreSource();
       }
       if (undoRemoveDst) { // Rename failed - restore dst
-        tx.restoreDst();
+        tx.restoreDst(bsps);
       }
     }
     NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: " +
@@ -566,7 +567,7 @@ class FSDirRenameOp {
     private final byte[] srcChildName;
     private final boolean isSrcInSnapshot;
     private final boolean srcChildIsReference;
-    private final Quota.Counts oldSrcCounts;
+    private final QuotaCounts oldSrcCounts;
     private INode srcChild;
     private INode oldDstChild;
 
@@ -581,6 +582,7 @@ class FSDirRenameOp {
       this.srcParentIIP = srcIIP.getParentINodesInPath();
       this.dstParentIIP = dstIIP.getParentINodesInPath();
 
+      BlockStoragePolicySuite bsps = fsd.getBlockStoragePolicySuite();
       srcChild = this.srcIIP.getLastINode();
       srcChildName = srcChild.getLocalNameBytes();
       final int srcLatestSnapshotId = srcIIP.getLatestSnapshotId();
@@ -598,7 +600,7 @@ class FSDirRenameOp {
       // check srcChild for reference
       srcRefDstSnapshot = srcChildIsReference ?
           srcChild.asReference().getDstSnapshotId() : Snapshot.CURRENT_STATE_ID;
-      oldSrcCounts = Quota.Counts.newInstance();
+      oldSrcCounts = new QuotaCounts.Builder().build();
       if (isSrcInSnapshot) {
         final INodeReference.WithName withName = srcParent
             .replaceChild4ReferenceWithName(srcChild, srcLatestSnapshotId);
@@ -607,7 +609,7 @@ class FSDirRenameOp {
         this.srcIIP = INodesInPath.replace(srcIIP, srcIIP.length() - 1,
             srcChild);
         // get the counts before rename
-        withCount.getReferredINode().computeQuotaUsage(oldSrcCounts, true);
+        withCount.getReferredINode().computeQuotaUsage(bsps, oldSrcCounts, true);
       } else if (srcChildIsReference) {
         // srcChild is reference but srcChild is not in latest snapshot
         withCount = (INodeReference.WithCount) srcChild.asReference()
@@ -709,11 +711,11 @@ class FSDirRenameOp {
       }
     }
 
-    void restoreDst() throws QuotaExceededException {
+    void restoreDst(BlockStoragePolicySuite bsps) throws QuotaExceededException {
       Preconditions.checkState(oldDstChild != null);
       final INodeDirectory dstParent = dstParentIIP.getLastINode().asDirectory();
       if (dstParent.isWithSnapshot()) {
-        dstParent.undoRename4DstParent(oldDstChild, dstIIP.getLatestSnapshotId());
+        dstParent.undoRename4DstParent(bsps, oldDstChild, dstIIP.getLatestSnapshotId());
       } else {
         fsd.addLastINodeNoQuotaCheck(dstParentIIP, oldDstChild);
       }
@@ -725,32 +727,31 @@ class FSDirRenameOp {
       }
     }
 
-    boolean cleanDst(BlocksMapUpdateInfo collectedBlocks)
+    boolean cleanDst(BlockStoragePolicySuite bsps, BlocksMapUpdateInfo collectedBlocks)
         throws QuotaExceededException {
       Preconditions.checkState(oldDstChild != null);
       List<INode> removedINodes = new ChunkedArrayList<>();
       final boolean filesDeleted;
       if (!oldDstChild.isInLatestSnapshot(dstIIP.getLatestSnapshotId())) {
-        oldDstChild.destroyAndCollectBlocks(collectedBlocks, removedINodes);
+        oldDstChild.destroyAndCollectBlocks(bsps, collectedBlocks, removedINodes);
         filesDeleted = true;
       } else {
-        filesDeleted = oldDstChild.cleanSubtree(Snapshot.CURRENT_STATE_ID,
+        filesDeleted = oldDstChild.cleanSubtree(bsps, Snapshot.CURRENT_STATE_ID,
             dstIIP.getLatestSnapshotId(), collectedBlocks, removedINodes)
-            .get(Quota.NAMESPACE) >= 0;
+            .getNameSpace() >= 0;
       }
       fsd.getFSNamesystem().removeLeasesAndINodes(src, removedINodes, false);
       return filesDeleted;
     }
 
-    void updateQuotasInSourceTree() throws QuotaExceededException {
+    void updateQuotasInSourceTree(BlockStoragePolicySuite bsps) throws QuotaExceededException {
       // update the quota usage in src tree
       if (isSrcInSnapshot) {
         // get the counts after rename
-        Quota.Counts newSrcCounts = srcChild.computeQuotaUsage(
-            Quota.Counts.newInstance(), false);
+        QuotaCounts newSrcCounts = srcChild.computeQuotaUsage(bsps,
+            new QuotaCounts.Builder().build(), false);
         newSrcCounts.subtract(oldSrcCounts);
-        srcParent.addSpaceConsumed(newSrcCounts.get(Quota.NAMESPACE),
-            newSrcCounts.get(Quota.DISKSPACE), false);
+        srcParent.addSpaceConsumed(newSrcCounts, false);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 7450249..7f1437d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
 import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
@@ -56,9 +57,12 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.util.ByteArray;
+import org.apache.hadoop.hdfs.util.EnumCounters;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
@@ -76,6 +80,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
@@ -100,8 +106,9 @@ public class FSDirectory implements Closeable {
         namesystem.createFsOwnerPermissions(new FsPermission((short) 0755)),
         0L);
     r.addDirectoryWithQuotaFeature(
-        DirectoryWithQuotaFeature.DEFAULT_NAMESPACE_QUOTA,
-        DirectoryWithQuotaFeature.DEFAULT_DISKSPACE_QUOTA);
+        new DirectoryWithQuotaFeature.Builder().
+            nameSpaceQuota(DirectoryWithQuotaFeature.DEFAULT_NAMESPACE_QUOTA).
+            spaceQuota(DirectoryWithQuotaFeature.DEFAULT_SPACE_QUOTA).build());
     r.addSnapshottableFeature();
     r.setSnapshotQuota(0);
     return r;
@@ -149,6 +156,8 @@ public class FSDirectory implements Closeable {
   private final long accessTimePrecision;
   // whether setStoragePolicy is allowed.
   private final boolean storagePolicyEnabled;
+  // whether quota by storage type is allowed
+  private final boolean quotaByStorageTypeEnabled;
 
   private final String fsOwnerShortUserName;
   private final String supergroup;
@@ -236,6 +245,10 @@ public class FSDirectory implements Closeable {
         conf.getBoolean(DFS_STORAGE_POLICY_ENABLED_KEY,
                         DFS_STORAGE_POLICY_ENABLED_DEFAULT);
 
+    this.quotaByStorageTypeEnabled =
+        conf.getBoolean(DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY,
+                        DFS_QUOTA_BY_STORAGETYPE_ENABLED_DEFAULT);
+
     int configuredLimit = conf.getInt(
         DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
     this.lsLimit = configuredLimit>0 ?
@@ -291,6 +304,10 @@ public class FSDirectory implements Closeable {
     return rootDir;
   }
 
+  public BlockStoragePolicySuite getBlockStoragePolicySuite() {
+    return getBlockManager().getStoragePolicySuite();
+  }
+
   boolean isPermissionEnabled() {
     return isPermissionEnabled;
   }
@@ -307,6 +324,9 @@ public class FSDirectory implements Closeable {
   boolean isAccessTimeSupported() {
     return accessTimePrecision > 0;
   }
+  boolean isQuotaByStorageTypeEnabled() {
+    return quotaByStorageTypeEnabled;
+  }
 
 
   int getLsLimit() {
@@ -452,7 +472,8 @@ public class FSDirectory implements Closeable {
       Preconditions.checkState(fileINode.isUnderConstruction());
 
       // check quota limits and updated space consumed
-      updateCount(inodesInPath, 0, fileINode.getPreferredBlockDiskspace(), true);
+      updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
+          fileINode.getBlockReplication(), true);
 
       // associate new last block for the file
       BlockInfoContiguousUnderConstruction blockInfo =
@@ -508,7 +529,8 @@ public class FSDirectory implements Closeable {
     }
 
     // update space consumed
-    updateCount(iip, 0, -fileNode.getPreferredBlockDiskspace(), true);
+    updateCount(iip, 0, -fileNode.getPreferredBlockSize(),
+        fileNode.getBlockReplication(), true);
     return true;
   }
 
@@ -584,19 +606,20 @@ public class FSDirectory implements Closeable {
    * @param iip the INodesInPath instance containing all the INodes for
    *            updating quota usage
    * @param nsDelta the delta change of namespace
-   * @param dsDelta the delta change of diskspace
+   * @param dsDelta the delta change of space consumed without replication
+   * @param replication the replication factor of the block consumption change
    * @throws QuotaExceededException if the new count violates any quota limit
    * @throws FileNotFoundException if path does not exist.
    */
-  void updateSpaceConsumed(INodesInPath iip, long nsDelta, long dsDelta)
-      throws QuotaExceededException, FileNotFoundException,
-          UnresolvedLinkException, SnapshotAccessControlException {
+  void updateSpaceConsumed(INodesInPath iip, long nsDelta, long dsDelta, short replication)
+    throws QuotaExceededException, FileNotFoundException,
+    UnresolvedLinkException, SnapshotAccessControlException {
     writeLock();
     try {
       if (iip.getLastINode() == null) {
         throw new FileNotFoundException("Path not found: " + iip.getPath());
       }
-      updateCount(iip, nsDelta, dsDelta, true);
+      updateCount(iip, nsDelta, dsDelta, replication, true);
     } finally {
       writeUnlock();
     }
@@ -610,29 +633,52 @@ public class FSDirectory implements Closeable {
   void updateCountForDelete(final INode inode, final INodesInPath iip) {
     if (getFSNamesystem().isImageLoaded() &&
         !inode.isInLatestSnapshot(iip.getLatestSnapshotId())) {
-      Quota.Counts counts = inode.computeQuotaUsage();
-      unprotectedUpdateCount(iip, iip.length() - 1,
-          -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
+      QuotaCounts counts = inode.computeQuotaUsage(getBlockStoragePolicySuite());
+      unprotectedUpdateCount(iip, iip.length() - 1, counts.negation());
     }
   }
 
-  void updateCount(INodesInPath iip, long nsDelta, long dsDelta,
+  /**
+   * Update usage count without replication factor change
+   */
+  void updateCount(INodesInPath iip, long nsDelta, long dsDelta, short replication,
       boolean checkQuota) throws QuotaExceededException {
-    updateCount(iip, iip.length() - 1, nsDelta, dsDelta, checkQuota);
+    final INodeFile fileINode = iip.getLastINode().asFile();
+    EnumCounters<StorageType> typeSpaceDeltas =
+      getStorageTypeDeltas(fileINode.getStoragePolicyID(), dsDelta,
+          replication, replication);;
+    updateCount(iip, iip.length() - 1,
+      new QuotaCounts.Builder().nameCount(nsDelta).spaceCount(dsDelta * replication).
+          typeCounts(typeSpaceDeltas).build(),
+        checkQuota);
+  }
+
+  /**
+   * Update usage count with replication factor change due to setReplication
+   */
+  void updateCount(INodesInPath iip, long nsDelta, long dsDelta, short oldRep,
+      short newRep, boolean checkQuota) throws QuotaExceededException {
+    final INodeFile fileINode = iip.getLastINode().asFile();
+    EnumCounters<StorageType> typeSpaceDeltas =
+        getStorageTypeDeltas(fileINode.getStoragePolicyID(), dsDelta, oldRep, newRep);
+    updateCount(iip, iip.length() - 1,
+        new QuotaCounts.Builder().nameCount(nsDelta).
+            spaceCount(dsDelta * (newRep - oldRep)).
+            typeCounts(typeSpaceDeltas).build(),
+        checkQuota);
   }
 
   /** update count of each inode with quota
    * 
    * @param iip inodes in a path
    * @param numOfINodes the number of inodes to update starting from index 0
-   * @param nsDelta the delta change of namespace
-   * @param dsDelta the delta change of diskspace
+   * @param counts the count of space/namespace/type usage to be update
    * @param checkQuota if true then check if quota is exceeded
    * @throws QuotaExceededException if the new count violates any quota limit
    */
-  private void updateCount(INodesInPath iip, int numOfINodes, 
-                           long nsDelta, long dsDelta, boolean checkQuota)
-                           throws QuotaExceededException {
+   void updateCount(INodesInPath iip, int numOfINodes,
+                    QuotaCounts counts, boolean checkQuota)
+                    throws QuotaExceededException {
     assert hasWriteLock();
     if (!namesystem.isImageLoaded()) {
       //still initializing. do not check or update quotas.
@@ -642,20 +688,20 @@ public class FSDirectory implements Closeable {
       numOfINodes = iip.length();
     }
     if (checkQuota && !skipQuotaCheck) {
-      verifyQuota(iip, numOfINodes, nsDelta, dsDelta, null);
+      verifyQuota(iip, numOfINodes, counts, null);
     }
-    unprotectedUpdateCount(iip, numOfINodes, nsDelta, dsDelta);
+    unprotectedUpdateCount(iip, numOfINodes, counts);
   }
   
   /** 
    * update quota of each inode and check to see if quota is exceeded. 
-   * See {@link #updateCount(INodesInPath, long, long, boolean)}
+   * See {@link #updateCount(INodesInPath, int, QuotaCounts, boolean)}
    */ 
-  void updateCountNoQuotaCheck(INodesInPath inodesInPath, int numOfINodes,
-      long nsDelta, long dsDelta) {
+   void updateCountNoQuotaCheck(INodesInPath inodesInPath,
+      int numOfINodes, QuotaCounts counts) {
     assert hasWriteLock();
     try {
-      updateCount(inodesInPath, numOfINodes, nsDelta, dsDelta, false);
+      updateCount(inodesInPath, numOfINodes, counts, false);
     } catch (QuotaExceededException e) {
       NameNode.LOG.error("BUG: unexpected exception ", e);
     }
@@ -666,15 +712,49 @@ public class FSDirectory implements Closeable {
    * callers responsibility is to make sure quota is not exceeded
    */
   static void unprotectedUpdateCount(INodesInPath inodesInPath,
-      int numOfINodes, long nsDelta, long dsDelta) {
+      int numOfINodes, QuotaCounts counts) {
     for(int i=0; i < numOfINodes; i++) {
       if (inodesInPath.getINode(i).isQuotaSet()) { // a directory with quota
         inodesInPath.getINode(i).asDirectory().getDirectoryWithQuotaFeature()
-            .addSpaceConsumed2Cache(nsDelta, dsDelta);
+            .addSpaceConsumed2Cache(counts);
       }
     }
   }
-  
+
+  public EnumCounters<StorageType> getStorageTypeDeltas(byte storagePolicyID,
+      long dsDelta, short oldRep, short newRep) {
+    EnumCounters<StorageType> typeSpaceDeltas =
+        new EnumCounters<StorageType>(StorageType.class);
+    // Storage type and its quota are only available when storage policy is set
+    if (storagePolicyID != BlockStoragePolicySuite.ID_UNSPECIFIED) {
+      BlockStoragePolicy storagePolicy = getBlockManager().getStoragePolicy(storagePolicyID);
+
+      if (oldRep != newRep) {
+        List<StorageType> oldChosenStorageTypes =
+            storagePolicy.chooseStorageTypes(oldRep);
+
+        for (StorageType t : oldChosenStorageTypes) {
+          if (!t.supportTypeQuota()) {
+            continue;
+          }
+          Preconditions.checkArgument(dsDelta > 0);
+          typeSpaceDeltas.add(t, -dsDelta);
+        }
+      }
+
+      List<StorageType> newChosenStorageTypes =
+          storagePolicy.chooseStorageTypes(newRep);
+
+      for (StorageType t : newChosenStorageTypes) {
+        if (!t.supportTypeQuota()) {
+          continue;
+        }
+        typeSpaceDeltas.add(t, dsDelta);
+      }
+    }
+    return typeSpaceDeltas;
+  }
+
   /** Return the name of the path represented by inodes at [0, pos] */
   static String getFullPathName(INode[] inodes, int pos) {
     StringBuilder fullPathName = new StringBuilder();
@@ -751,16 +831,16 @@ public class FSDirectory implements Closeable {
    *  
    * @param iip INodes corresponding to a path
    * @param pos position where a new INode will be added
-   * @param nsDelta needed namespace
-   * @param dsDelta needed diskspace
+   * @param deltas needed namespace, diskspace and storage types
    * @param commonAncestor Last node in inodes array that is a common ancestor
    *          for a INode that is being moved from one location to the other.
    *          Pass null if a node is not being moved.
    * @throws QuotaExceededException if quota limit is exceeded.
    */
-  static void verifyQuota(INodesInPath iip, int pos, long nsDelta,
-      long dsDelta, INode commonAncestor) throws QuotaExceededException {
-    if (nsDelta <= 0 && dsDelta <= 0) {
+  static void verifyQuota(INodesInPath iip, int pos, QuotaCounts deltas,
+                          INode commonAncestor) throws QuotaExceededException {
+    if (deltas.getNameSpace() <= 0 && deltas.getDiskSpace() <= 0
+        && deltas.getTypeSpaces().allLessOrEqual(0L)) {
       // if quota is being freed or not being consumed
       return;
     }
@@ -775,7 +855,7 @@ public class FSDirectory implements Closeable {
           = iip.getINode(i).asDirectory().getDirectoryWithQuotaFeature();
       if (q != null) { // a directory with quota
         try {
-          q.verifyQuota(nsDelta, dsDelta);
+          q.verifyQuota(deltas);
         } catch (QuotaExceededException e) {
           List<INode> inodes = iip.getReadOnlyINodes();
           final String path = getFullPathName(inodes.toArray(new INode[inodes.size()]), i);
@@ -845,7 +925,7 @@ public class FSDirectory implements Closeable {
       }
     }
   }
-  
+
   /**
    * Add a child to the end of the path specified by INodesInPath.
    * @return an INodesInPath instance containing the new INode
@@ -873,7 +953,7 @@ public class FSDirectory implements Closeable {
     // odd. It's because a rename operation deletes the src, tries to add
     // to the dest, if that fails, re-adds the src from whence it came.
     // The rename code disables the quota when it's restoring to the
-    // original location becase a quota violation would cause the the item
+    // original location because a quota violation would cause the the item
     // to go "poof".  The fs limits must be bypassed for the same reason.
     if (checkQuota) {
       final String parentPath = existing.getPath(pos - 1);
@@ -883,21 +963,19 @@ public class FSDirectory implements Closeable {
     // always verify inode name
     verifyINodeName(inode.getLocalNameBytes());
 
-    final Quota.Counts counts = inode.computeQuotaUsage();
-    updateCount(existing, pos,
-        counts.get(Quota.NAMESPACE), counts.get(Quota.DISKSPACE), checkQuota);
+    final QuotaCounts counts = inode.computeQuotaUsage(getBlockStoragePolicySuite());
+    updateCount(existing, pos, counts, checkQuota);
+
     boolean isRename = (inode.getParent() != null);
     boolean added;
     try {
       added = parent.addChild(inode, true, existing.getLatestSnapshotId());
     } catch (QuotaExceededException e) {
-      updateCountNoQuotaCheck(existing, pos,
-          -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
+      updateCountNoQuotaCheck(existing, pos, counts.negation());
       throw e;
     }
     if (!added) {
-      updateCountNoQuotaCheck(existing, pos,
-          -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
+      updateCountNoQuotaCheck(existing, pos, counts.negation());
       return null;
     } else {
       if (!isRename) {
@@ -1023,12 +1101,13 @@ public class FSDirectory implements Closeable {
     INodeFile file = iip.getLastINode().asFile();
     int latestSnapshot = iip.getLatestSnapshotId();
     file.recordModification(latestSnapshot, true);
-    long oldDiskspace = file.diskspaceConsumed();
+    long oldDiskspaceNoRep = file.diskspaceConsumedNoReplication();
     long remainingLength =
         file.collectBlocksBeyondMax(newLength, collectedBlocks);
     file.excludeSnapshotBlocks(latestSnapshot, collectedBlocks);
     file.setModificationTime(mtime);
-    updateCount(iip, 0, file.diskspaceConsumed() - oldDiskspace, true);
+    updateCount(iip, 0, file.diskspaceConsumedNoReplication() - oldDiskspaceNoRep,
+      file.getBlockReplication(), true);
     // return whether on a block boundary
     return (remainingLength - newLength) == 0;
   }
@@ -1102,7 +1181,7 @@ public class FSDirectory implements Closeable {
     readLock();
     try {
       return rootDir.getDirectoryWithQuotaFeature().getSpaceConsumed()
-          .get(Quota.NAMESPACE);
+          .getNameSpace();
     } finally {
       readUnlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index 9ce3fa9..7675703 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaByStorageTypeOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
@@ -97,6 +98,7 @@ import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -872,6 +874,14 @@ public class FSEditLog implements LogsPurgeable {
     logEdit(op);
   }
 
+  /** Add set quota by storage type record to edit log */
+  void logSetQuotaByStorageType(String src, long dsQuota, StorageType type) {
+    SetQuotaByStorageTypeOp op = SetQuotaByStorageTypeOp.getInstance(cache.get())
+      .setSource(src)
+      .setQuotaByStorageType(dsQuota, type);
+    logEdit(op);
+  }
+
   /**  Add set permissions record to edit log */
   void logSetPermissions(String src, FsPermission permissions) {
     SetPermissionsOp op = SetPermissionsOp.getInstance(cache.get())

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 5fcad74..a09df82 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -595,14 +595,14 @@ public class FSEditLogLoader {
       SetNSQuotaOp setNSQuotaOp = (SetNSQuotaOp)op;
       FSDirAttrOp.unprotectedSetQuota(
           fsDir, renameReservedPathsOnUpgrade(setNSQuotaOp.src, logVersion),
-          setNSQuotaOp.nsQuota, HdfsConstants.QUOTA_DONT_SET);
+          setNSQuotaOp.nsQuota, HdfsConstants.QUOTA_DONT_SET, null);
       break;
     }
     case OP_CLEAR_NS_QUOTA: {
       ClearNSQuotaOp clearNSQuotaOp = (ClearNSQuotaOp)op;
       FSDirAttrOp.unprotectedSetQuota(
           fsDir, renameReservedPathsOnUpgrade(clearNSQuotaOp.src, logVersion),
-          HdfsConstants.QUOTA_RESET, HdfsConstants.QUOTA_DONT_SET);
+          HdfsConstants.QUOTA_RESET, HdfsConstants.QUOTA_DONT_SET, null);
       break;
     }
 
@@ -610,9 +610,18 @@ public class FSEditLogLoader {
       SetQuotaOp setQuotaOp = (SetQuotaOp) op;
       FSDirAttrOp.unprotectedSetQuota(fsDir,
           renameReservedPathsOnUpgrade(setQuotaOp.src, logVersion),
-          setQuotaOp.nsQuota, setQuotaOp.dsQuota);
+          setQuotaOp.nsQuota, setQuotaOp.dsQuota, null);
       break;
 
+    case OP_SET_QUOTA_BY_STORAGETYPE:
+        FSEditLogOp.SetQuotaByStorageTypeOp setQuotaByStorageTypeOp =
+          (FSEditLogOp.SetQuotaByStorageTypeOp) op;
+        FSDirAttrOp.unprotectedSetQuota(fsDir,
+          renameReservedPathsOnUpgrade(setQuotaByStorageTypeOp.src, logVersion),
+          HdfsConstants.QUOTA_DONT_SET, setQuotaByStorageTypeOp.dsQuota,
+          setQuotaByStorageTypeOp.type);
+        break;
+
     case OP_TIMES: {
       TimesOp timesOp = (TimesOp)op;
       FSDirAttrOp.unprotectedSetTimes(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
index 1629d80..dab10d3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
@@ -64,6 +64,7 @@ import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TRUNCAT
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_BLOCKS;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_MASTER_KEY;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_STORAGE_POLICY;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_QUOTA_BY_STORAGETYPE;
 
 import java.io.DataInput;
 import java.io.DataInputStream;
@@ -106,6 +107,7 @@ import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.XAttrEditLogProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.util.XMLUtils;
 import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
 import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
@@ -209,6 +211,7 @@ public abstract class FSEditLogOp {
       inst.put(OP_REMOVE_XATTR, new RemoveXAttrOp());
       inst.put(OP_SET_STORAGE_POLICY, new SetStoragePolicyOp());
       inst.put(OP_APPEND, new AppendOp());
+      inst.put(OP_SET_QUOTA_BY_STORAGETYPE, new SetQuotaByStorageTypeOp());
     }
     
     public FSEditLogOp get(FSEditLogOpCodes opcode) {
@@ -2267,6 +2270,88 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /** {@literal @Idempotent} for {@link ClientProtocol#setQuota} */
+  static class SetQuotaByStorageTypeOp extends FSEditLogOp {
+    String src;
+    long dsQuota;
+    StorageType type;
+
+    private SetQuotaByStorageTypeOp() {
+      super(OP_SET_QUOTA_BY_STORAGETYPE);
+    }
+
+    static SetQuotaByStorageTypeOp getInstance(OpInstanceCache cache) {
+      return (SetQuotaByStorageTypeOp)cache.get(OP_SET_QUOTA_BY_STORAGETYPE);
+    }
+
+    @Override
+    void resetSubFields() {
+      src = null;
+      dsQuota = -1L;
+      type = StorageType.DEFAULT;
+    }
+
+    SetQuotaByStorageTypeOp setSource(String src) {
+      this.src = src;
+      return this;
+    }
+
+    SetQuotaByStorageTypeOp setQuotaByStorageType(long dsQuota, StorageType type) {
+      this.type = type;
+      this.dsQuota = dsQuota;
+      return this;
+    }
+
+    @Override
+    public
+    void writeFields(DataOutputStream out) throws IOException {
+      FSImageSerialization.writeString(src, out);
+      FSImageSerialization.writeInt(type.ordinal(), out);
+      FSImageSerialization.writeLong(dsQuota, out);
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion)
+      throws IOException {
+      this.src = FSImageSerialization.readString(in);
+      this.type = StorageType.parseStorageType(FSImageSerialization.readInt(in));
+      this.dsQuota = FSImageSerialization.readLong(in);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("SetTypeQuotaOp [src=");
+      builder.append(src);
+      builder.append(", storageType=");
+      builder.append(type);
+      builder.append(", dsQuota=");
+      builder.append(dsQuota);
+      builder.append(", opCode=");
+      builder.append(opCode);
+      builder.append(", txid=");
+      builder.append(txid);
+      builder.append("]");
+      return builder.toString();
+    }
+
+    @Override
+    protected void toXml(ContentHandler contentHandler) throws SAXException {
+      XMLUtils.addSaxString(contentHandler, "SRC", src);
+      XMLUtils.addSaxString(contentHandler, "STORAGETYPE",
+        Integer.toString(type.ordinal()));
+      XMLUtils.addSaxString(contentHandler, "DSQUOTA",
+        Long.toString(dsQuota));
+    }
+
+    @Override void fromXml(Stanza st) throws InvalidXmlException {
+      this.src = st.getValue("SRC");
+      this.type = StorageType.parseStorageType(
+          Integer.parseInt(st.getValue("STORAGETYPE")));
+      this.dsQuota = Long.parseLong(st.getValue("DSQUOTA"));
+    }
+  }
+
   /** {@literal @Idempotent} for {@link ClientProtocol#setTimes} */
   static class TimesOp extends FSEditLogOp {
     int length;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
index 6cd1617..1a0a296 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
@@ -75,6 +75,7 @@ public enum FSEditLogOpCodes {
   OP_SET_STORAGE_POLICY         ((byte) 45),
   OP_TRUNCATE                   ((byte) 46),
   OP_APPEND                     ((byte) 47),
+  OP_SET_QUOTA_BY_STORAGETYPE   ((byte) 48),
 
   // Note that the current range of the valid OP code is 0~127
   OP_INVALID                    ((byte) -1);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index 3b5d2c3..c630917 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
@@ -65,12 +66,13 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.EnumCounters;
 import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.util.Time;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -841,7 +843,8 @@ public class FSImage implements Closeable {
     } finally {
       FSEditLog.closeAllStreams(editStreams);
       // update the counts
-      updateCountForQuota(target.dir.rootDir);
+      updateCountForQuota(target.getBlockManager().getStoragePolicySuite(),
+          target.dir.rootDir);
     }
     prog.endPhase(Phase.LOADING_EDITS);
     return lastAppliedTxId - prevLastAppliedTxId;
@@ -855,47 +858,64 @@ public class FSImage implements Closeable {
    * This is an update of existing state of the filesystem and does not
    * throw QuotaExceededException.
    */
-  static void updateCountForQuota(INodeDirectory root) {
-    updateCountForQuotaRecursively(root, Quota.Counts.newInstance());
-  }
-  
-  private static void updateCountForQuotaRecursively(INodeDirectory dir,
-      Quota.Counts counts) {
-    final long parentNamespace = counts.get(Quota.NAMESPACE);
-    final long parentDiskspace = counts.get(Quota.DISKSPACE);
-
-    dir.computeQuotaUsage4CurrentDirectory(counts);
+  static void updateCountForQuota(BlockStoragePolicySuite bsps,
+                                  INodeDirectory root) {
+    updateCountForQuotaRecursively(bsps, root, new QuotaCounts.Builder().build());
+ }
+
+  private static void updateCountForQuotaRecursively(BlockStoragePolicySuite bsps,
+      INodeDirectory dir, QuotaCounts counts) {
+    final long parentNamespace = counts.getNameSpace();
+    final long parentDiskspace = counts.getDiskSpace();
+    final EnumCounters<StorageType> parentTypeSpaces = counts.getTypeSpaces();
+
+    dir.computeQuotaUsage4CurrentDirectory(bsps, counts);
     
     for (INode child : dir.getChildrenList(Snapshot.CURRENT_STATE_ID)) {
       if (child.isDirectory()) {
-        updateCountForQuotaRecursively(child.asDirectory(), counts);
+        updateCountForQuotaRecursively(bsps, child.asDirectory(), counts);
       } else {
         // file or symlink: count here to reduce recursive calls.
-        child.computeQuotaUsage(counts, false);
+        child.computeQuotaUsage(bsps, counts, false);
       }
     }
       
     if (dir.isQuotaSet()) {
       // check if quota is violated. It indicates a software bug.
-      final Quota.Counts q = dir.getQuotaCounts();
+      final QuotaCounts q = dir.getQuotaCounts();
 
-      final long namespace = counts.get(Quota.NAMESPACE) - parentNamespace;
-      final long nsQuota = q.get(Quota.NAMESPACE);
+      final long namespace = counts.getNameSpace() - parentNamespace;
+      final long nsQuota = q.getNameSpace();
       if (Quota.isViolated(nsQuota, namespace)) {
         LOG.error("BUG: Namespace quota violation in image for "
             + dir.getFullPathName()
             + " quota = " + nsQuota + " < consumed = " + namespace);
       }
 
-      final long diskspace = counts.get(Quota.DISKSPACE) - parentDiskspace;
-      final long dsQuota = q.get(Quota.DISKSPACE);
+      final long diskspace = counts.getDiskSpace() - parentDiskspace;
+      final long dsQuota = q.getDiskSpace();
       if (Quota.isViolated(dsQuota, diskspace)) {
         LOG.error("BUG: Diskspace quota violation in image for "
             + dir.getFullPathName()
             + " quota = " + dsQuota + " < consumed = " + diskspace);
       }
 
-      dir.getDirectoryWithQuotaFeature().setSpaceConsumed(namespace, diskspace);
+      final EnumCounters<StorageType> typeSpaces =
+          new EnumCounters<StorageType>(StorageType.class);
+      for (StorageType t : StorageType.getTypesSupportingQuota()) {
+        final long typeSpace = counts.getTypeSpaces().get(t) -
+            parentTypeSpaces.get(t);
+        final long typeQuota = q.getTypeSpaces().get(t);
+        if (Quota.isViolated(typeQuota, typeSpace)) {
+          LOG.error("BUG Disk quota by storage type violation in image for "
+              + dir.getFullPathName()
+              + " type = " + t.toString() + " quota = "
+              + typeQuota + " < consumed " + typeSpace);
+        }
+      }
+
+      dir.getDirectoryWithQuotaFeature().setSpaceConsumed(namespace, diskspace,
+          typeSpaces);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
index ed05d40..e4b86ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
@@ -447,9 +447,9 @@ public class FSImageFormat {
 
   /** Update the root node's attributes */
   private void updateRootAttr(INodeWithAdditionalFields root) {                                                           
-    final Quota.Counts q = root.getQuotaCounts();
-    final long nsQuota = q.get(Quota.NAMESPACE);
-    final long dsQuota = q.get(Quota.DISKSPACE);
+    final QuotaCounts q = root.getQuotaCounts();
+    final long nsQuota = q.getNameSpace();
+    final long dsQuota = q.getDiskSpace();
     FSDirectory fsDir = namesystem.dir;
     if (nsQuota != -1 || dsQuota != -1) {
       fsDir.rootDir.getDirectoryWithQuotaFeature().setQuota(nsQuota, dsQuota);
@@ -823,7 +823,8 @@ public class FSImageFormat {
       final INodeDirectory dir = new INodeDirectory(inodeId, localName,
           permissions, modificationTime);
       if (nsQuota >= 0 || dsQuota >= 0) {
-        dir.addDirectoryWithQuotaFeature(nsQuota, dsQuota);
+        dir.addDirectoryWithQuotaFeature(new DirectoryWithQuotaFeature.Builder().
+            nameSpaceQuota(nsQuota).spaceQuota(dsQuota).build());
       }
       if (withSnapshot) {
         dir.addSnapshotFeature(null);
@@ -907,11 +908,11 @@ public class FSImageFormat {
       //read quotas
       final long nsQuota = in.readLong();
       final long dsQuota = in.readLong();
-  
+
       return nsQuota == -1L && dsQuota == -1L ? new INodeDirectoryAttributes.SnapshotCopy(
           name, permissions, null, modificationTime, null)
         : new INodeDirectoryAttributes.CopyWithQuota(name, permissions,
-            null, modificationTime, nsQuota, dsQuota, null);
+            null, modificationTime, nsQuota, dsQuota, null, null);
     }
   
     private void loadFilesUnderConstruction(DataInput in,
@@ -1229,7 +1230,7 @@ public class FSImageFormat {
       final FSNamesystem sourceNamesystem = context.getSourceNamesystem();
       final INodeDirectory rootDir = sourceNamesystem.dir.rootDir;
       final long numINodes = rootDir.getDirectoryWithQuotaFeature()
-          .getSpaceConsumed().get(Quota.NAMESPACE);
+          .getSpaceConsumed().getNameSpace();
       String sdPath = newFile.getParentFile().getParentFile().getAbsolutePath();
       Step step = new Step(StepType.INODES, sdPath);
       StartupProgress prog = NameNode.getStartupProgress();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index f20add1..88b1fa2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@ -37,7 +37,9 @@ import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
@@ -52,7 +54,10 @@ import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.AclFeatureProto;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.XAttrCompactProto;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.XAttrFeatureProto;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.QuotaByStorageTypeEntryProto;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.QuotaByStorageTypeFeatureProto;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.util.EnumCounters;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
 import com.google.common.base.Preconditions;
@@ -141,6 +146,18 @@ public final class FSImageFormatPBINode {
       return b.build();
     }
 
+    public static ImmutableList<QuotaByStorageTypeEntry> loadQuotaByStorageTypeEntries(
+      QuotaByStorageTypeFeatureProto proto) {
+      ImmutableList.Builder<QuotaByStorageTypeEntry> b = ImmutableList.builder();
+      for (QuotaByStorageTypeEntryProto quotaEntry : proto.getQuotasList()) {
+        StorageType type = PBHelper.convertStorageType(quotaEntry.getStorageType());
+        long quota = quotaEntry.getQuota();
+        b.add(new QuotaByStorageTypeEntry.Builder().setStorageType(type)
+            .setQuota(quota).build());
+      }
+      return b.build();
+    }
+
     public static INodeDirectory loadINodeDirectory(INodeSection.INode n,
         LoaderContext state) {
       assert n.getType() == INodeSection.INode.Type.DIRECTORY;
@@ -150,10 +167,33 @@ public final class FSImageFormatPBINode {
           state.getStringTable());
       final INodeDirectory dir = new INodeDirectory(n.getId(), n.getName()
           .toByteArray(), permissions, d.getModificationTime());
-
       final long nsQuota = d.getNsQuota(), dsQuota = d.getDsQuota();
       if (nsQuota >= 0 || dsQuota >= 0) {
-        dir.addDirectoryWithQuotaFeature(nsQuota, dsQuota);
+        dir.addDirectoryWithQuotaFeature(new DirectoryWithQuotaFeature.Builder().
+            nameSpaceQuota(nsQuota).spaceQuota(dsQuota).build());
+      }
+      EnumCounters<StorageType> typeQuotas = null;
+      if (d.hasTypeQuotas()) {
+        ImmutableList<QuotaByStorageTypeEntry> qes =
+            loadQuotaByStorageTypeEntries(d.getTypeQuotas());
+        typeQuotas = new EnumCounters<StorageType>(StorageType.class,
+            HdfsConstants.QUOTA_RESET);
+        for (QuotaByStorageTypeEntry qe : qes) {
+          if (qe.getQuota() >= 0 && qe.getStorageType() != null &&
+              qe.getStorageType().supportTypeQuota()) {
+            typeQuotas.set(qe.getStorageType(), qe.getQuota());
+          }
+        }
+
+        if (typeQuotas.anyGreaterOrEqual(0)) {
+          DirectoryWithQuotaFeature q = dir.getDirectoryWithQuotaFeature();
+          if (q == null) {
+            dir.addDirectoryWithQuotaFeature(new DirectoryWithQuotaFeature.
+                Builder().typeQuotas(typeQuotas).build());
+          } else {
+            q.setQuota(typeQuotas);
+          }
+        }
       }
 
       if (d.hasAcl()) {
@@ -335,12 +375,16 @@ public final class FSImageFormatPBINode {
 
     private void loadRootINode(INodeSection.INode p) {
       INodeDirectory root = loadINodeDirectory(p, parent.getLoaderContext());
-      final Quota.Counts q = root.getQuotaCounts();
-      final long nsQuota = q.get(Quota.NAMESPACE);
-      final long dsQuota = q.get(Quota.DISKSPACE);
+      final QuotaCounts q = root.getQuotaCounts();
+      final long nsQuota = q.getNameSpace();
+      final long dsQuota = q.getDiskSpace();
       if (nsQuota != -1 || dsQuota != -1) {
         dir.rootDir.getDirectoryWithQuotaFeature().setQuota(nsQuota, dsQuota);
       }
+      final EnumCounters<StorageType> typeQuotas = q.getTypeSpaces();
+      if (typeQuotas.anyGreaterOrEqual(0)) {
+        dir.rootDir.getDirectoryWithQuotaFeature().setQuota(typeQuotas);
+      }
       dir.rootDir.cloneModificationTime(root);
       dir.rootDir.clonePermissionStatus(root);
       // root dir supports having extended attributes according to POSIX
@@ -399,6 +443,22 @@ public final class FSImageFormatPBINode {
       return b;
     }
 
+    private static QuotaByStorageTypeFeatureProto.Builder
+        buildQuotaByStorageTypeEntries(QuotaCounts q) {
+      QuotaByStorageTypeFeatureProto.Builder b =
+          QuotaByStorageTypeFeatureProto.newBuilder();
+      for (StorageType t: StorageType.getTypesSupportingQuota()) {
+        if (q.getTypeSpace(t) >= 0) {
+          QuotaByStorageTypeEntryProto.Builder eb =
+              QuotaByStorageTypeEntryProto.newBuilder().
+              setStorageType(PBHelper.convertStorageType(t)).
+              setQuota(q.getTypeSpace(t));
+          b.addQuotas(eb);
+        }
+      }
+      return b;
+    }
+
     public static INodeSection.INodeFile.Builder buildINodeFile(
         INodeFileAttributes file, final SaverContext state) {
       INodeSection.INodeFile.Builder b = INodeSection.INodeFile.newBuilder()
@@ -422,13 +482,17 @@ public final class FSImageFormatPBINode {
 
     public static INodeSection.INodeDirectory.Builder buildINodeDirectory(
         INodeDirectoryAttributes dir, final SaverContext state) {
-      Quota.Counts quota = dir.getQuotaCounts();
+      QuotaCounts quota = dir.getQuotaCounts();
       INodeSection.INodeDirectory.Builder b = INodeSection.INodeDirectory
           .newBuilder().setModificationTime(dir.getModificationTime())
-          .setNsQuota(quota.get(Quota.NAMESPACE))
-          .setDsQuota(quota.get(Quota.DISKSPACE))
+          .setNsQuota(quota.getNameSpace())
+          .setDsQuota(quota.getDiskSpace())
           .setPermission(buildPermissionStatus(dir, state.getStringMap()));
 
+      if (quota.getTypeSpaces().anyGreaterOrEqual(0)) {
+        b.setTypeQuotas(buildQuotaByStorageTypeEntries(quota));
+      }
+
       AclFeature f = dir.getAclFeature();
       if (f != null) {
         b.setAcl(buildAclEntries(f, state.getStringMap()));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5dae97a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
index 33f644d..9721f66 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
@@ -221,10 +221,10 @@ public class FSImageSerialization {
     out.writeLong(file.getPreferredBlockSize());
   }
 
-  private static void writeQuota(Quota.Counts quota, DataOutput out)
+  private static void writeQuota(QuotaCounts quota, DataOutput out)
       throws IOException {
-    out.writeLong(quota.get(Quota.NAMESPACE));
-    out.writeLong(quota.get(Quota.DISKSPACE));
+    out.writeLong(quota.getNameSpace());
+    out.writeLong(quota.getDiskSpace());
   }
 
   /**