You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2019/11/05 00:01:56 UTC

[hbase] branch branch-2 updated: HBASE-23082 Backport of low latency space quotas for hbase snapshots

This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 46a1883  HBASE-23082 Backport of low latency space quotas for hbase snapshots
46a1883 is described below

commit 46a18833a001f338048c5494631eac1fd6880599
Author: Josh Elser <el...@apache.org>
AuthorDate: Wed Feb 21 18:27:51 2018 -0500

    HBASE-23082 Backport of low latency space quotas for hbase snapshots
    
    Includes the following, incorporating HBASE-20439 and HBASE-20440, too.
    
    1)
    HBASE-18133 Decrease quota reaction latency by HBase
    
    Certain operations in HBase are known to directly affect
    the utilization of tables on HDFS. When these actions
    occur, we can circumvent the normal path and notify the
    Master directly. This results in a much faster response to
    changes in HDFS usage.
    
    This requires FS scanning by the RS to be decoupled from
    the reporting of sizes to the Master. An API inside each
    RS is made so that any operation can hook into this call
    in the face of other operations (e.g. compaction, flush,
    bulk load).
    
    2)
    HBASE-18135 Implement mechanism for RegionServers to report file archival for space quotas
    
    This de-couples the snapshot size calculation from the
    SpaceQuotaObserverChore into another API which both the periodically
    invoked Master chore and the Master service endpoint can invoke. This
    allows for multiple sources of snapshot size to reported (from the
    multiple sources we have in HBase).
    
    When a file is archived, snapshot sizes can be more quickly realized and
    the Master can still perform periodical computations of the total
    snapshot size to account for any delayed/missing/lost file archival RPCs.
    
    3)
    HBASE-20531 RS may throw NPE when close meta regions in shutdown procedure.
---
 .../apache/hadoop/hbase/quotas/QuotaTableUtil.java |   8 +-
 .../MetricsRegionServerQuotaSource.java            |  24 +
 .../MetricsRegionServerQuotaSourceImpl.java        |  86 +++
 ...ase.regionserver.MetricsRegionServerQuotaSource |  18 +
 .../src/main/protobuf/RegionServerStatus.proto     |  16 +
 .../org/apache/hadoop/hbase/master/HMaster.java    |   4 +
 .../hadoop/hbase/master/MasterRpcServices.java     |  18 +
 .../hadoop/hbase/quotas/FileArchiverNotifier.java  |  53 ++
 .../hbase/quotas/FileArchiverNotifierFactory.java  |  35 ++
 .../quotas/FileArchiverNotifierFactoryImpl.java    | 114 ++++
 .../hbase/quotas/FileArchiverNotifierImpl.java     | 635 +++++++++++++++++++++
 .../hbase/quotas/FileSystemUtilizationChore.java   |  39 +-
 .../hadoop/hbase/quotas/MasterQuotaManager.java    |  30 +
 .../hadoop/hbase/quotas/NoOpRegionSizeStore.java   |  76 +++
 .../quotas/RegionServerSpaceQuotaManager.java      |  56 ++
 .../org/apache/hadoop/hbase/quotas/RegionSize.java |  50 ++
 .../apache/hadoop/hbase/quotas/RegionSizeImpl.java |  70 +++
 .../hbase/quotas/RegionSizeReportingChore.java     | 156 +++++
 .../hadoop/hbase/quotas/RegionSizeStore.java       |  82 +++
 .../hbase/quotas/RegionSizeStoreFactory.java       |  38 ++
 .../hadoop/hbase/quotas/RegionSizeStoreImpl.java   | 105 ++++
 .../hbase/quotas/SnapshotQuotaObserverChore.java   | 349 +----------
 .../quotas/SpaceViolationPolicyEnforcement.java    |   7 +-
 .../AbstractViolationPolicyEnforcement.java        |  32 +-
 .../DefaultViolationPolicyEnforcement.java         |  34 +-
 .../MissingSnapshotViolationPolicyEnforcement.java |   8 +-
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  11 +
 .../hadoop/hbase/regionserver/HRegionServer.java   |  89 ++-
 .../apache/hadoop/hbase/regionserver/HStore.java   | 109 +++-
 .../hbase/regionserver/MetricsRegionServer.java    |  16 +
 .../hadoop/hbase/regionserver/RSRpcServices.java   |  19 +-
 .../hbase/regionserver/RegionServerServices.java   |  25 +-
 .../hadoop/hbase/MockRegionServerServices.java     |  14 +
 .../hadoop/hbase/master/MockRegionServer.java      |  14 +
 .../hbase/quotas/SpaceQuotaHelperForTests.java     |  50 +-
 .../hbase/quotas/TestFileArchiverNotifierImpl.java | 312 ++++++++++
 .../quotas/TestFileSystemUtilizationChore.java     |  27 +-
 .../hbase/quotas/TestLowLatencySpaceQuotas.java    | 307 ++++++++++
 .../TestQuotaObserverChoreRegionReports.java       |  17 +-
 .../hadoop/hbase/quotas/TestRegionSizeImpl.java    |  49 ++
 .../hbase/quotas/TestRegionSizeReportingChore.java | 127 +++++
 .../hbase/quotas/TestRegionSizeStoreImpl.java      | 101 ++++
 .../quotas/TestSnapshotQuotaObserverChore.java     | 257 +++++----
 .../hadoop/hbase/quotas/TestSpaceQuotas.java       | 428 ++++++++++++++
 ...BulkLoadCheckingViolationPolicyEnforcement.java |   8 +-
 .../hadoop/hbase/regionserver/TestHStore.java      |  51 ++
 .../TestRegionServerRegionSpaceUseReport.java      |  53 +-
 47 files changed, 3649 insertions(+), 578 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java
index b8047c8..4f14911 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java
@@ -72,13 +72,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
 /**
  * Helper class to interact with the quota table.
  * <table>
- *   <tr><th>ROW-KEY</th><th>FAM/QUAL</th><th>DATA</th></tr>
+ *   <tr><th>ROW-KEY</th><th>FAM/QUAL</th><th>DATA</th><th>DESC</th></tr>
  *   <tr><td>n.&lt;namespace&gt;</td><td>q:s</td><td>&lt;global-quotas&gt;</td></tr>
  *   <tr><td>n.&lt;namespace&gt;</td><td>u:p</td><td>&lt;namespace-quota policy&gt;</td></tr>
- *   <tr><td>n.&lt;namespace&gt;</td><td>u:s</td><td>&lt;SpaceQuotaSnapshot&gt;</td></tr>
+ *   <tr><td>n.&lt;namespace&gt;</td><td>u:s</td><td>&lt;SpaceQuotaSnapshot&gt;</td>
+ *      <td>The size of all snapshots against tables in the namespace</td></tr>
  *   <tr><td>t.&lt;table&gt;</td><td>q:s</td><td>&lt;global-quotas&gt;</td></tr>
  *   <tr><td>t.&lt;table&gt;</td><td>u:p</td><td>&lt;table-quota policy&gt;</td></tr>
- *   <tr><td>t.&lt;table&gt;</td><td>u:ss.&lt;snapshot name&gt;</td><td>&lt;SpaceQuotaSnapshot&gt;</td></tr>
+ *   <tr><td>t.&lt;table&gt;</td><td>u:ss.&lt;snapshot name&gt;</td>
+ *      <td>&lt;SpaceQuotaSnapshot&gt;</td><td>The size of a snapshot against a table</td></tr>
  *   <tr><td>u.&lt;user&gt;</td><td>q:s</td><td>&lt;global-quotas&gt;</td></tr>
  *   <tr><td>u.&lt;user&gt;</td><td>q:s.&lt;table&gt;</td><td>&lt;table-quotas&gt;</td></tr>
  *   <tr><td>u.&lt;user&gt;</td><td>q:s.&lt;ns&gt;</td><td>&lt;namespace-quotas&gt;</td></tr>
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerQuotaSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerQuotaSource.java
index 9795374..93990ef 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerQuotaSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerQuotaSource.java
@@ -30,6 +30,14 @@ public interface MetricsRegionServerQuotaSource extends BaseSource {
   String METRICS_DESCRIPTION = "Metrics about HBase RegionServer Quotas";
   String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
 
+  String NUM_TABLES_IN_VIOLATION_NAME = "numTablesInViolation";
+  String NUM_SPACE_SNAPSHOTS_RECEIVED_NAME = "numSpaceSnapshotsReceived";
+  String FILE_SYSTEM_UTILIZATION_CHORE_TIME = "fileSystemUtilizationChoreTime";
+  String SPACE_QUOTA_REFRESHER_CHORE_TIME = "spaceQuotaRefresherChoreTime";
+
+  String NUM_REGION_SIZE_REPORT_NAME = "numRegionSizeReports";
+  String REGION_SIZE_REPORTING_CHORE_TIME_NAME = "regionSizeReportingChoreTime";
+
   /**
    * Updates the metric tracking how many tables this RegionServer has marked as in violation
    * of their space quota.
@@ -59,4 +67,20 @@ public interface MetricsRegionServerQuotaSource extends BaseSource {
    * @param time The execution time of the chore in milliseconds.
    */
   void incrementSpaceQuotaRefresherChoreTime(long time);
+
+  /**
+   * Updates the metric tracking how many region size reports were sent from this RegionServer to
+   * the Master. These reports contain information on the size of each Region hosted locally.
+   *
+   * @param numReportsSent The number of region size reports sent
+   */
+  void incrementNumRegionSizeReportsSent(long numReportsSent);
+
+  /**
+   * Updates the metric tracking how much time was spent sending region size reports to the Master
+   * by the RegionSizeReportingChore.
+   *
+   * @param time The execution time in milliseconds.
+   */
+  void incrementRegionSizeReportingChoreTime(long time);
 }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerQuotaSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerQuotaSourceImpl.java
new file mode 100644
index 0000000..3a796dd
--- /dev/null
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerQuotaSourceImpl.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.hbase.metrics.Counter;
+import org.apache.hadoop.hbase.metrics.Meter;
+import org.apache.hadoop.hbase.metrics.Timer;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Implementation of {@link MetricsRegionServerQuotaSource}.
+ */
+@InterfaceAudience.Private
+public class MetricsRegionServerQuotaSourceImpl extends BaseSourceImpl implements
+    MetricsRegionServerQuotaSource {
+
+  private final Meter tablesInViolationCounter;
+  private final Meter spaceQuotaSnapshotsReceived;
+  private final Timer fileSystemUtilizationChoreTimer;
+  private final Timer spaceQuotaRefresherChoreTimer;
+  private final Counter regionSizeReportCounter;
+  private final Timer regionSizeReportingChoreTimer;
+
+  public MetricsRegionServerQuotaSourceImpl() {
+    this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
+  }
+
+  public MetricsRegionServerQuotaSourceImpl(String metricsName, String metricsDescription,
+      String metricsContext, String metricsJmxContext) {
+    super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
+
+    tablesInViolationCounter = this.registry.meter(NUM_TABLES_IN_VIOLATION_NAME);
+    spaceQuotaSnapshotsReceived = this.registry.meter(NUM_SPACE_SNAPSHOTS_RECEIVED_NAME);
+    fileSystemUtilizationChoreTimer = this.registry.timer(FILE_SYSTEM_UTILIZATION_CHORE_TIME);
+    spaceQuotaRefresherChoreTimer = this.registry.timer(SPACE_QUOTA_REFRESHER_CHORE_TIME);
+    regionSizeReportCounter = this.registry.counter(NUM_REGION_SIZE_REPORT_NAME);
+    regionSizeReportingChoreTimer = registry.timer(REGION_SIZE_REPORTING_CHORE_TIME_NAME);
+  }
+
+  @Override
+  public void updateNumTablesInSpaceQuotaViolation(long tablesInViolation) {
+    this.tablesInViolationCounter.mark(tablesInViolation);
+  }
+
+  @Override
+  public void updateNumTableSpaceQuotaSnapshots(long numSnapshots) {
+    this.spaceQuotaSnapshotsReceived.mark(numSnapshots);
+  }
+
+  @Override
+  public void incrementSpaceQuotaFileSystemScannerChoreTime(long time) {
+    this.fileSystemUtilizationChoreTimer.updateMillis(time);
+  }
+
+  @Override
+  public void incrementSpaceQuotaRefresherChoreTime(long time) {
+    this.spaceQuotaRefresherChoreTimer.updateMillis(time);
+  }
+
+  @Override
+  public void incrementNumRegionSizeReportsSent(long numReportsSent) {
+    regionSizeReportCounter.increment(numReportsSent);
+  }
+
+  @Override
+  public void incrementRegionSizeReportingChoreTime(long time) {
+    regionSizeReportingChoreTimer.update(time, TimeUnit.MILLISECONDS);
+  }
+}
diff --git a/hbase-hadoop2-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.regionserver.MetricsRegionServerQuotaSource b/hbase-hadoop2-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.regionserver.MetricsRegionServerQuotaSource
new file mode 100644
index 0000000..58fe4d9
--- /dev/null
+++ b/hbase-hadoop2-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.regionserver.MetricsRegionServerQuotaSource
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.hadoop.hbase.regionserver.MetricsRegionServerQuotaSourceImpl
\ No newline at end of file
diff --git a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
index 5aab056..0137cb1 100644
--- a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
@@ -163,6 +163,18 @@ message ReportProcedureDoneRequest {
 message ReportProcedureDoneResponse {
 }
 
+message FileArchiveNotificationRequest {
+  message FileWithSize {
+    optional TableName table_name = 1;
+    optional string name = 2;
+    optional uint64 size = 3;
+  }
+  repeated FileWithSize archived_files = 1;
+}
+
+message FileArchiveNotificationResponse {
+}
+
 service RegionServerStatusService {
   /** Called when a region server first starts. */
   rpc RegionServerStartup(RegionServerStartupRequest)
@@ -201,4 +213,8 @@ service RegionServerStatusService {
 
   rpc ReportProcedureDone(ReportProcedureDoneRequest)
     returns(ReportProcedureDoneResponse);
+
+  /** Reports files that were moved to the archive directory for space quotas */
+  rpc ReportFileArchival(FileArchiveNotificationRequest)
+    returns(FileArchiveNotificationResponse);
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index ec9824b..1939703 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -3799,4 +3799,8 @@ public class HMaster extends HRegionServer implements MasterServices {
   public HbckChore getHbckChore() {
     return this.hbckChore;
   }
+
+  public SnapshotQuotaObserverChore getSnapshotQuotaObserverChore() {
+    return this.snapshotQuotaChore;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 833f715..4943d4d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -306,6 +306,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaSta
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
@@ -2618,6 +2620,22 @@ public class MasterRpcServices extends RSRpcServices
   }
 
   @Override
+  public FileArchiveNotificationResponse reportFileArchival(RpcController controller,
+      FileArchiveNotificationRequest request) throws ServiceException {
+    try {
+      master.checkInitialized();
+      if (!QuotaUtil.isQuotaEnabled(master.getConfiguration())) {
+        return FileArchiveNotificationResponse.newBuilder().build();
+      }
+      master.getMasterQuotaManager().processFileArchivals(request, master.getConnection(),
+          master.getConfiguration(), master.getFileSystem());
+      return FileArchiveNotificationResponse.newBuilder().build();
+    } catch (Exception e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
   public GrantResponse grant(RpcController controller, GrantRequest request)
       throws ServiceException {
     try {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifier.java
new file mode 100644
index 0000000..7f1e47b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifier.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Interface allowing various implementations of tracking files that have recently been archived to
+ * allow for the Master to notice changes to snapshot sizes for space quotas.
+ *
+ * This object needs to ensure that {@link #addArchivedFiles(Set)} and
+ * {@link #computeAndStoreSnapshotSizes(Collection)} are mutually exclusive. If a "full" computation
+ * is in progress, new changes being archived should be held.
+ */
+@InterfaceAudience.Private
+public interface FileArchiverNotifier {
+
+  /**
+   * Records a file and its size in bytes being moved to the archive directory.
+   *
+   * @param fileSizes A collection of file name to size in bytes
+   * @throws IOException If there was an IO-related error persisting the file size(s)
+   */
+  void addArchivedFiles(Set<Entry<String, Long>> fileSizes) throws IOException;
+
+  /**
+   * Computes the size of a table and all of its snapshots, recording new "full" sizes for each.
+   *
+   * @param currentSnapshots the current list of snapshots against this table
+   * @return The total size of all snapshots against this table.
+   * @throws IOException If there was an IO-related error computing or persisting the sizes.
+   */
+  long computeAndStoreSnapshotSizes(Collection<String> currentSnapshots) throws IOException;
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactory.java
new file mode 100644
index 0000000..98f188f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Factory class to create {@link FileArchiverNotifier} instances.
+ */
+@InterfaceAudience.Private
+public interface FileArchiverNotifierFactory {
+
+  /**
+   * Creates or obtains a {@link FileArchiverNotifier} instance for the given args.
+   */
+  FileArchiverNotifier get(Connection conn, Configuration conf, FileSystem fs, TableName tn);
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactoryImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactoryImpl.java
new file mode 100644
index 0000000..36b5356
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactoryImpl.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A factory for getting instances of {@link FileArchiverNotifier}.
+ */
+@InterfaceAudience.Private
+public final class FileArchiverNotifierFactoryImpl implements FileArchiverNotifierFactory {
+  private static final FileArchiverNotifierFactoryImpl DEFAULT_INSTANCE =
+      new FileArchiverNotifierFactoryImpl();
+  private static volatile FileArchiverNotifierFactory CURRENT_INSTANCE = DEFAULT_INSTANCE;
+  private final ConcurrentHashMap<TableName,FileArchiverNotifier> CACHE;
+
+  private FileArchiverNotifierFactoryImpl() {
+    CACHE = new ConcurrentHashMap<>();
+  }
+
+  public static FileArchiverNotifierFactory getInstance() {
+    return CURRENT_INSTANCE;
+  }
+
+  @VisibleForTesting
+  static void setInstance(FileArchiverNotifierFactory inst) {
+    CURRENT_INSTANCE = Objects.requireNonNull(inst);
+  }
+
+  @VisibleForTesting
+  static void reset() {
+    CURRENT_INSTANCE = DEFAULT_INSTANCE;
+  }
+
+  /**
+   * Returns the {@link FileArchiverNotifier} instance for the given {@link TableName}.
+   *
+   * @param tn The table to obtain a notifier for
+   * @return The notifier for the given {@code tablename}.
+   */
+  public FileArchiverNotifier get(
+      Connection conn, Configuration conf, FileSystem fs, TableName tn) {
+    // Ensure that only one instance is exposed to callers
+    final FileArchiverNotifier newMapping = new FileArchiverNotifierImpl(conn, conf, fs, tn);
+    final FileArchiverNotifier previousMapping = CACHE.putIfAbsent(tn, newMapping);
+    if (previousMapping == null) {
+      return newMapping;
+    }
+    return previousMapping;
+  }
+
+  public int getCacheSize() {
+    return CACHE.size();
+  }
+
+  static class CacheKey {
+    final Connection conn;
+    final Configuration conf;
+    final FileSystem fs;
+    final TableName tn;
+
+    CacheKey(Connection conn, Configuration conf, FileSystem fs, TableName tn) {
+      this.conn = conn;
+      this.conf = conf;
+      this.fs = fs;
+      this.tn = tn;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof CacheKey)) {
+        return false;
+      }
+      CacheKey other = (CacheKey) o;
+      // TableName should be the only thing differing..
+      return tn.equals(other.tn) && conn.equals(other.conn) && conf.equals(other.conf)
+          && fs.equals(other.fs);
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder().append(conn).append(conf).append(fs).append(tn).toHashCode();
+    }
+
+    @Override
+    public String toString() {
+      return "CacheKey[TableName=" + tn + "]";
+    }
+  }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierImpl.java
new file mode 100644
index 0000000..aa91696
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierImpl.java
@@ -0,0 +1,635 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
+import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
+import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.FamilyFiles;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.StoreFile;
+
+/**
+ * Tracks file archiving and updates the hbase quota table.
+ */
+@InterfaceAudience.Private
+public class FileArchiverNotifierImpl implements FileArchiverNotifier {
+  private static final Logger LOG = LoggerFactory.getLogger(FileArchiverNotifierImpl.class);
+  private final Connection conn;
+  private final Configuration conf;
+  private final FileSystem fs;
+  private final TableName tn;
+  private final ReadLock readLock;
+  private final WriteLock writeLock;
+  private volatile long lastFullCompute = Long.MIN_VALUE;
+  private List<String> currentSnapshots = Collections.emptyList();
+  private static final Map<String,Object> NAMESPACE_LOCKS = new HashMap<>();
+
+  /**
+   * An Exception thrown when SnapshotSize updates to hbase:quota fail to be written.
+   */
+  @InterfaceAudience.Private
+  public static class QuotaSnapshotSizeSerializationException extends IOException {
+    private static final long serialVersionUID = 1L;
+
+    public QuotaSnapshotSizeSerializationException(String msg) {
+      super(msg);
+    }
+  }
+
+  public FileArchiverNotifierImpl(
+      Connection conn, Configuration conf, FileSystem fs, TableName tn) {
+    this.conn = conn;
+    this.conf = conf;
+    this.fs = fs;
+    this.tn = tn;
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
+  }
+
+  static synchronized Object getLockForNamespace(String namespace) {
+    return NAMESPACE_LOCKS.computeIfAbsent(namespace, (ns) -> new Object());
+  }
+
+  /**
+   * Returns a strictly-increasing measure of time extracted by {@link System#nanoTime()}.
+   */
+  long getLastFullCompute() {
+    return lastFullCompute;
+  }
+
+  @Override
+  public void addArchivedFiles(Set<Entry<String, Long>> fileSizes) throws IOException {
+    long start = System.nanoTime();
+    readLock.lock();
+    try {
+      // We want to catch the case where we got an archival request, but there was a full
+      // re-computation in progress that was blocking us. Most likely, the full computation is going
+      // to already include the changes we were going to make.
+      //
+      // Same as "start < lastFullCompute" but avoiding numeric overflow per the
+      // System.nanoTime() javadoc
+      if (lastFullCompute != Long.MIN_VALUE && start - lastFullCompute < 0) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("A full computation was performed after this request was received."
+              + " Ignoring requested updates: " + fileSizes);
+        }
+        return;
+      }
+
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("currentSnapshots: " + currentSnapshots + " fileSize: "+ fileSizes);
+      }
+
+      // Write increment to quota table for the correct snapshot. Only do this if we have snapshots
+      // and some files that were archived.
+      if (!currentSnapshots.isEmpty() && !fileSizes.isEmpty()) {
+        // We get back the files which no snapshot referenced (the files which will be deleted soon)
+        groupArchivedFiledBySnapshotAndRecordSize(currentSnapshots, fileSizes);
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * For each file in the map, this updates the first snapshot (lexicographic snapshot name) that
+   * references this file. The result of this computation is serialized to the quota table.
+   *
+   * @param snapshots A collection of HBase snapshots to group the files into
+   * @param fileSizes A map of file names to their sizes
+   */
+  void groupArchivedFiledBySnapshotAndRecordSize(
+      List<String> snapshots, Set<Entry<String, Long>> fileSizes) throws IOException {
+    // Make a copy as we'll modify it.
+    final Map<String,Long> filesToUpdate = new HashMap<>(fileSizes.size());
+    for (Entry<String,Long> entry : fileSizes) {
+      filesToUpdate.put(entry.getKey(), entry.getValue());
+    }
+    // Track the change in size to each snapshot
+    final Map<String,Long> snapshotSizeChanges = new HashMap<>();
+    for (String snapshot : snapshots) {
+      // For each file in `filesToUpdate`, check if `snapshot` refers to it.
+      // If `snapshot` does, remove it from `filesToUpdate` and add it to `snapshotSizeChanges`.
+      bucketFilesToSnapshot(snapshot, filesToUpdate, snapshotSizeChanges);
+      if (filesToUpdate.isEmpty()) {
+        // If we have no more files recently archived, we have nothing more to check
+        break;
+      }
+    }
+    // We have computed changes to the snapshot size, we need to record them.
+    if (!snapshotSizeChanges.isEmpty()) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Writing snapshot size changes for: " + snapshotSizeChanges);
+      }
+      persistSnapshotSizeChanges(snapshotSizeChanges);
+    }
+  }
+
+  /**
+   * For the given snapshot, find all files which this {@code snapshotName} references. After a file
+   * is found to be referenced by the snapshot, it is removed from {@code filesToUpdate} and
+   * {@code snapshotSizeChanges} is updated in concert.
+   *
+   * @param snapshotName The snapshot to check
+   * @param filesToUpdate A mapping of archived files to their size
+   * @param snapshotSizeChanges A mapping of snapshots and their change in size
+   */
+  void bucketFilesToSnapshot(
+      String snapshotName, Map<String,Long> filesToUpdate, Map<String,Long> snapshotSizeChanges)
+          throws IOException {
+    // A quick check to avoid doing work if the caller unnecessarily invoked this method.
+    if (filesToUpdate.isEmpty()) {
+      return;
+    }
+
+    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(
+        snapshotName, FSUtils.getRootDir(conf));
+    SnapshotDescription sd = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
+    SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, sd);
+    // For each region referenced by the snapshot
+    for (SnapshotRegionManifest rm : manifest.getRegionManifests()) {
+      // For each column family in this region
+      for (FamilyFiles ff : rm.getFamilyFilesList()) {
+        // And each store file in that family
+        for (StoreFile sf : ff.getStoreFilesList()) {
+          Long valueOrNull = filesToUpdate.remove(sf.getName());
+          if (valueOrNull != null) {
+            // This storefile was recently archived, we should update this snapshot with its size
+            snapshotSizeChanges.merge(snapshotName, valueOrNull, Long::sum);
+          }
+          // Short-circuit, if we have no more files that were archived, we don't need to iterate
+          // over the rest of the snapshot.
+          if (filesToUpdate.isEmpty()) {
+            return;
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Reads the current size for each snapshot to update, generates a new update based on that value,
+   * and then writes the new update.
+   *
+   * @param snapshotSizeChanges A map of snapshot name to size change
+   */
+  void persistSnapshotSizeChanges(Map<String,Long> snapshotSizeChanges) throws IOException {
+    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
+      // Create a list (with a more typical ordering implied)
+      final List<Entry<String,Long>> snapshotSizeEntries = new ArrayList<>(
+          snapshotSizeChanges.entrySet());
+      // Create the Gets for each snapshot we need to update
+      final List<Get> snapshotSizeGets = snapshotSizeEntries.stream()
+          .map((e) -> QuotaTableUtil.makeGetForSnapshotSize(tn, e.getKey()))
+          .collect(Collectors.toList());
+      final Iterator<Entry<String,Long>> iterator = snapshotSizeEntries.iterator();
+      // A List to store each Put we'll create from the Get's we retrieve
+      final List<Put> updates = new ArrayList<>(snapshotSizeEntries.size());
+
+      // TODO Push this down to the RegionServer with a coprocessor:
+      //
+      // We would really like to piggy-back on the row-lock already being grabbed
+      // to handle the update of the row in the quota table. However, because the value
+      // is a serialized protobuf, the standard Increment API doesn't work for us. With a CP, we
+      // can just send the size deltas to the RS and atomically update the serialized PB object
+      // while relying on the row-lock for synchronization.
+      //
+      // Synchronizing on the namespace string is a "minor smell" but passable as this is
+      // only invoked via a single caller (the active Master). Using the namespace name lets us
+      // have some parallelism without worry of on caller seeing stale data from the quota table.
+      synchronized (getLockForNamespace(tn.getNamespaceAsString())) {
+        final Result[] existingSnapshotSizes = quotaTable.get(snapshotSizeGets);
+        long totalSizeChange = 0;
+        // Read the current size values (if they exist) to generate the new value
+        for (Result result : existingSnapshotSizes) {
+          Entry<String,Long> entry = iterator.next();
+          String snapshot = entry.getKey();
+          Long size = entry.getValue();
+          // Track the total size change for the namespace this table belongs in
+          totalSizeChange += size;
+          // Get the size of the previous value (or zero)
+          long previousSize = getSnapshotSizeFromResult(result);
+          // Create an update. A file was archived from the table, so the table's size goes
+          // down, but the snapshot's size goes up.
+          updates.add(QuotaTableUtil.createPutForSnapshotSize(tn, snapshot, previousSize + size));
+        }
+
+        // Create an update for the summation of all snapshots in the namespace
+        if (totalSizeChange != 0) {
+          long previousSize = getPreviousNamespaceSnapshotSize(
+              quotaTable, tn.getNamespaceAsString());
+          updates.add(QuotaTableUtil.createPutForNamespaceSnapshotSize(
+              tn.getNamespaceAsString(), previousSize + totalSizeChange));
+        }
+
+        // Send all of the quota table updates in one batch.
+        List<Object> failures = new ArrayList<>();
+        final Object[] results = new Object[updates.size()];
+        quotaTable.batch(updates, results);
+        for (Object result : results) {
+          // A null result is an error condition (all RPC attempts failed)
+          if (!(result instanceof Result)) {
+            failures.add(result);
+          }
+        }
+        // Propagate a failure if any updates failed
+        if (!failures.isEmpty()) {
+          throw new QuotaSnapshotSizeSerializationException(
+              "Failed to write some snapshot size updates: " + failures);
+        }
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return;
+    }
+  }
+
+  /**
+   * Fetches the current size of all snapshots in the given {@code namespace}.
+   *
+   * @param quotaTable The HBase quota table
+   * @param namespace Namespace to fetch the sum of snapshot sizes for
+   * @return The size of all snapshot sizes for the namespace in bytes.
+   */
+  long getPreviousNamespaceSnapshotSize(Table quotaTable, String namespace) throws IOException {
+    // Update the size of each snapshot for all snapshots in a namespace.
+    Result r = quotaTable.get(
+        QuotaTableUtil.createGetNamespaceSnapshotSize(namespace));
+    return getSnapshotSizeFromResult(r);
+  }
+
+  /**
+   * Extracts the size component from a serialized {@link SpaceQuotaSnapshot} protobuf.
+   *
+   * @param r A Result containing one cell with a SpaceQuotaSnapshot protobuf
+   * @return The size in bytes of the snapshot.
+   */
+  long getSnapshotSizeFromResult(Result r) throws InvalidProtocolBufferException {
+    // Per javadoc, Result should only be null if an exception was thrown. So, if we're here,
+    // we should be non-null. If we can't advance to the first cell, same as "no cell".
+    if (!r.isEmpty() && r.advance()) {
+      return QuotaTableUtil.parseSnapshotSize(r.current());
+    }
+    return 0L;
+  }
+
+  @Override
+  public long computeAndStoreSnapshotSizes(
+      Collection<String> currentSnapshots) throws IOException {
+    // Record what the current snapshots are
+    this.currentSnapshots = new ArrayList<>(currentSnapshots);
+    Collections.sort(this.currentSnapshots);
+
+    // compute new size for table + snapshots for that table
+    List<SnapshotWithSize> snapshotSizes = computeSnapshotSizes(this.currentSnapshots);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Computed snapshot sizes for " + tn + " of " + snapshotSizes);
+    }
+
+    // Compute the total size of all snapshots against our table
+    final long totalSnapshotSize = snapshotSizes.stream().mapToLong((sws) -> sws.getSize()).sum();
+
+    writeLock.lock();
+    try {
+      // Persist the size of each snapshot
+      try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
+        persistSnapshotSizes(quotaTable, snapshotSizes);
+      }
+
+      // Report the last time we did a recomputation
+      lastFullCompute = System.nanoTime();
+
+      return totalSnapshotSize;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(getClass().getSimpleName()).append("[");
+    sb.append("tableName=").append(tn).append(", currentSnapshots=");
+    sb.append(currentSnapshots).append(", lastFullCompute=").append(lastFullCompute);
+    return sb.append("]").toString();
+  }
+
+  /**
+   * Computes the size of each snapshot against the table referenced by {@code this}.
+   *
+   * @param snapshots A sorted list of snapshots against {@code tn}.
+   * @return A list of the size for each snapshot against {@code tn}.
+   */
+  List<SnapshotWithSize> computeSnapshotSizes(List<String> snapshots) throws IOException {
+    final List<SnapshotWithSize> snapshotSizes = new ArrayList<>(snapshots.size());
+    final Path rootDir = FSUtils.getRootDir(conf);
+
+    // Get the map of store file names to store file path for this table
+    final Set<String> tableReferencedStoreFiles;
+    try {
+      tableReferencedStoreFiles = FSUtils.getTableStoreFilePathMap(fs, rootDir).keySet();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return null;
+    }
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Paths for " + tn + ": " + tableReferencedStoreFiles);
+    }
+
+    // For each snapshot on this table, get the files which the snapshot references which
+    // the table does not.
+    Set<String> snapshotReferencedFiles = new HashSet<>();
+    for (String snapshotName : snapshots) {
+      Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+      SnapshotDescription sd = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
+      SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, sd);
+
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Files referenced by other snapshots: " + snapshotReferencedFiles);
+      }
+
+      // Get the set of files from the manifest that this snapshot references which are not also
+      // referenced by the originating table.
+      Set<StoreFileReference> unreferencedStoreFileNames = getStoreFilesFromSnapshot(
+          manifest, (sfn) -> !tableReferencedStoreFiles.contains(sfn)
+              && !snapshotReferencedFiles.contains(sfn));
+
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Snapshot " + snapshotName + " solely references the files: "
+            + unreferencedStoreFileNames);
+      }
+
+      // Compute the size of the store files for this snapshot
+      long size = getSizeOfStoreFiles(tn, unreferencedStoreFileNames);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Computed size of " + snapshotName + " to be " + size);
+      }
+
+      // Persist this snapshot's size into the map
+      snapshotSizes.add(new SnapshotWithSize(snapshotName, size));
+
+      // Make sure that we don't double-count the same file
+      for (StoreFileReference ref : unreferencedStoreFileNames) {
+        for (String fileNames : ref.getFamilyToFilesMapping().values()) {
+          snapshotReferencedFiles.add(fileNames);
+        }
+      }
+    }
+
+    return snapshotSizes;
+  }
+
+  /**
+   * Computes the size of each store file in {@code storeFileNames}
+   */
+  long getSizeOfStoreFiles(TableName tn, Set<StoreFileReference> storeFileNames) {
+    return storeFileNames.stream()
+        .collect(Collectors.summingLong((sfr) -> getSizeOfStoreFile(tn, sfr)));
+  }
+
+  /**
+   * Computes the size of the store files for a single region.
+   */
+  long getSizeOfStoreFile(TableName tn, StoreFileReference storeFileName) {
+    String regionName = storeFileName.getRegionName();
+    return storeFileName.getFamilyToFilesMapping()
+        .entries().stream()
+        .collect(Collectors.summingLong((e) ->
+            getSizeOfStoreFile(tn, regionName, e.getKey(), e.getValue())));
+  }
+
+  /**
+   * Computes the size of the store file given its name, region and family name in
+   * the archive directory.
+   */
+  long getSizeOfStoreFile(
+      TableName tn, String regionName, String family, String storeFile) {
+    Path familyArchivePath;
+    try {
+      familyArchivePath = HFileArchiveUtil.getStoreArchivePath(conf, tn, regionName, family);
+    } catch (IOException e) {
+      LOG.warn("Could not compute path for the archive directory for the region", e);
+      return 0L;
+    }
+    Path fileArchivePath = new Path(familyArchivePath, storeFile);
+    try {
+      if (fs.exists(fileArchivePath)) {
+        FileStatus[] status = fs.listStatus(fileArchivePath);
+        if (1 != status.length) {
+          LOG.warn("Expected " + fileArchivePath +
+              " to be a file but was a directory, ignoring reference");
+          return 0L;
+        }
+        return status[0].getLen();
+      }
+    } catch (IOException e) {
+      LOG.warn("Could not obtain the status of " + fileArchivePath, e);
+      return 0L;
+    }
+    LOG.warn("Expected " + fileArchivePath + " to exist but does not, ignoring reference.");
+    return 0L;
+  }
+
+  /**
+   * Extracts the names of the store files referenced by this snapshot which satisfy the given
+   * predicate (the predicate returns {@code true}).
+   */
+  Set<StoreFileReference> getStoreFilesFromSnapshot(
+      SnapshotManifest manifest, Predicate<String> filter) {
+    Set<StoreFileReference> references = new HashSet<>();
+    // For each region referenced by the snapshot
+    for (SnapshotRegionManifest rm : manifest.getRegionManifests()) {
+      StoreFileReference regionReference = new StoreFileReference(
+          ProtobufUtil.toRegionInfo(rm.getRegionInfo()).getEncodedName());
+
+      // For each column family in this region
+      for (FamilyFiles ff : rm.getFamilyFilesList()) {
+        final String familyName = ff.getFamilyName().toStringUtf8();
+        // And each store file in that family
+        for (StoreFile sf : ff.getStoreFilesList()) {
+          String storeFileName = sf.getName();
+          // A snapshot only "inherits" a files size if it uniquely refers to it (no table
+          // and no other snapshot references it).
+          if (filter.test(storeFileName)) {
+            regionReference.addFamilyStoreFile(familyName, storeFileName);
+          }
+        }
+      }
+      // Only add this Region reference if we retained any files.
+      if (!regionReference.getFamilyToFilesMapping().isEmpty()) {
+        references.add(regionReference);
+      }
+    }
+    return references;
+  }
+
+  /**
+   * Writes the snapshot sizes to the provided {@code table}.
+   */
+  void persistSnapshotSizes(
+      Table table, List<SnapshotWithSize> snapshotSizes) throws IOException {
+    // Convert each entry in the map to a Put and write them to the quota table
+    table.put(snapshotSizes
+        .stream()
+        .map(sws -> QuotaTableUtil.createPutForSnapshotSize(
+            tn, sws.getName(), sws.getSize()))
+        .collect(Collectors.toList()));
+  }
+
+  /**
+   * A struct encapsulating the name of a snapshot and its "size" on the filesystem. This size is
+   * defined as the amount of filesystem space taken by the files the snapshot refers to which
+   * the originating table no longer refers to.
+   */
+  static class SnapshotWithSize {
+    private final String name;
+    private final long size;
+
+    SnapshotWithSize(String name, long size) {
+      this.name = Objects.requireNonNull(name);
+      this.size = size;
+    }
+
+    String getName() {
+      return name;
+    }
+
+    long getSize() {
+      return size;
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder().append(name).append(size).toHashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+
+      if (!(o instanceof SnapshotWithSize)) {
+        return false;
+      }
+
+      SnapshotWithSize other = (SnapshotWithSize) o;
+      return name.equals(other.name) && size == other.size;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder(32);
+      return sb.append("SnapshotWithSize:[").append(name).append(" ")
+          .append(StringUtils.byteDesc(size)).append("]").toString();
+    }
+  }
+
+  /**
+   * A reference to a collection of files in the archive directory for a single region.
+   */
+  static class StoreFileReference {
+    private final String regionName;
+    private final Multimap<String,String> familyToFiles;
+
+    StoreFileReference(String regionName) {
+      this.regionName = Objects.requireNonNull(regionName);
+      familyToFiles = HashMultimap.create();
+    }
+
+    String getRegionName() {
+      return regionName;
+    }
+
+    Multimap<String,String> getFamilyToFilesMapping() {
+      return familyToFiles;
+    }
+
+    void addFamilyStoreFile(String family, String storeFileName) {
+      familyToFiles.put(family, storeFileName);
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder().append(regionName).append(familyToFiles).toHashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof StoreFileReference)) {
+        return false;
+      }
+      StoreFileReference other = (StoreFileReference) o;
+      return regionName.equals(other.regionName) && familyToFiles.equals(other.familyToFiles);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      return sb.append("StoreFileReference[region=").append(regionName).append(", files=")
+          .append(familyToFiles).append("]").toString();
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileSystemUtilizationChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileSystemUtilizationChore.java
index eded076..edda4df 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileSystemUtilizationChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileSystemUtilizationChore.java
@@ -16,10 +16,8 @@
  */
 package org.apache.hadoop.hbase.quotas;
 
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -36,7 +34,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A chore which computes the size of each {@link HRegion} on the FileSystem hosted by the given {@link HRegionServer}.
+ * A chore which computes the size of each {@link HRegion} on the FileSystem hosted by the given
+ * {@link HRegionServer}. The results of this computation are stored in the
+ * {@link RegionServerSpaceQuotaManager}'s {@link RegionSizeStore} object.
  */
 @InterfaceAudience.Private
 public class FileSystemUtilizationChore extends ScheduledChore {
@@ -53,9 +53,6 @@ public class FileSystemUtilizationChore extends ScheduledChore {
   static final String FS_UTILIZATION_MAX_ITERATION_DURATION_KEY = "hbase.regionserver.quotas.fs.utilization.chore.max.iteration.millis";
   static final long FS_UTILIZATION_MAX_ITERATION_DURATION_DEFAULT = 5000L;
 
-  private int numberOfCyclesToSkip = 0, prevNumberOfCyclesToSkip = 0;
-  private static final int CYCLE_UPPER_BOUND = 32;
-
   private final HRegionServer rs;
   private final long maxIterationMillis;
   private Iterator<Region> leftoverRegions;
@@ -70,11 +67,7 @@ public class FileSystemUtilizationChore extends ScheduledChore {
 
   @Override
   protected void chore() {
-    if (numberOfCyclesToSkip > 0) {
-      numberOfCyclesToSkip--;
-      return;
-    }
-    final Map<RegionInfo, Long> onlineRegionSizes = new HashMap<>();
+    final RegionSizeStore regionSizeStore = getRegionSizeStore();
     final Set<Region> onlineRegions = new HashSet<>(rs.getRegions());
     // Process the regions from the last run if we have any. If we are somehow having difficulty
     // processing the Regions, we want to avoid creating a backlog in memory of Region objs.
@@ -100,7 +93,7 @@ public class FileSystemUtilizationChore extends ScheduledChore {
       long timeRunning = EnvironmentEdgeManager.currentTime() - start;
       if (timeRunning > maxIterationMillis) {
         LOG.debug("Preempting execution of FileSystemUtilizationChore because it exceeds the"
-            + " maximum iteration configuration value. Will process remaining iterators"
+            + " maximum iteration configuration value. Will process remaining Regions"
             + " on a subsequent invocation.");
         setLeftoverRegions(iterator);
         break;
@@ -124,7 +117,7 @@ public class FileSystemUtilizationChore extends ScheduledChore {
         continue;
       }
       final long sizeInBytes = computeSize(region);
-      onlineRegionSizes.put(region.getRegionInfo(), sizeInBytes);
+      regionSizeStore.put(region.getRegionInfo(), sizeInBytes);
       regionSizesCalculated++;
     }
     if (LOG.isTraceEnabled()) {
@@ -133,14 +126,6 @@ public class FileSystemUtilizationChore extends ScheduledChore {
           + skippedSplitParents + " regions due to being the parent of a split, and"
           + skippedRegionReplicas + " regions due to being region replicas.");
     }
-    if (!reportRegionSizesToMaster(onlineRegionSizes)) {
-      // backoff reporting
-      numberOfCyclesToSkip = prevNumberOfCyclesToSkip > 0 ? 2 * prevNumberOfCyclesToSkip : 1;
-      if (numberOfCyclesToSkip > CYCLE_UPPER_BOUND) {
-        numberOfCyclesToSkip = CYCLE_UPPER_BOUND;
-      }
-      prevNumberOfCyclesToSkip = numberOfCyclesToSkip;
-    }
   }
 
   /**
@@ -176,15 +161,9 @@ public class FileSystemUtilizationChore extends ScheduledChore {
     return regionSize;
   }
 
-  /**
-   * Reports the computed region sizes to the currently active Master.
-   *
-   * @param onlineRegionSizes The computed region sizes to report.
-   * @return {@code false} if FileSystemUtilizationChore should pause reporting to master,
-   *    {@code true} otherwise.
-   */
-  boolean reportRegionSizesToMaster(Map<RegionInfo,Long> onlineRegionSizes) {
-    return this.rs.reportRegionSizesForQuotas(onlineRegionSizes);
+  // VisibleForTesting
+  RegionSizeStore getRegionSizeStore() {
+    return rs.getRegionServerSpaceQuotaManager().getRegionSizeStore();
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
index 1489a2b..28d5053 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
@@ -30,11 +30,14 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.RegionStateListener;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
@@ -46,8 +49,12 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
 import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
@@ -57,6 +64,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExce
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize;
 
 /**
  * Master Quota Manager.
@@ -763,5 +772,26 @@ public class MasterQuotaManager implements RegionStateListener {
   public void removeRegionSizesForTable(TableName tableName) {
     regionSizes.keySet().removeIf(regionInfo -> regionInfo.getTable().equals(tableName));
   }
+
+  public void processFileArchivals(FileArchiveNotificationRequest request, Connection conn,
+      Configuration conf, FileSystem fs) throws IOException {
+    final HashMultimap<TableName,Entry<String,Long>> archivedFilesByTable = HashMultimap.create();
+    // Group the archived files by table
+    for (FileWithSize fileWithSize : request.getArchivedFilesList()) {
+      TableName tn = ProtobufUtil.toTableName(fileWithSize.getTableName());
+      archivedFilesByTable.put(
+          tn, Maps.immutableEntry(fileWithSize.getName(), fileWithSize.getSize()));
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Grouped archived files by table: " + archivedFilesByTable);
+    }
+    // Report each set of files to the appropriate object
+    for (TableName tn : archivedFilesByTable.keySet()) {
+      final Set<Entry<String,Long>> filesWithSize = archivedFilesByTable.get(tn);
+      final FileArchiverNotifier notifier = FileArchiverNotifierFactoryImpl.getInstance().get(
+          conn, conf, fs, tn);
+      notifier.addArchivedFiles(filesWithSize);
+    }
+  }
 }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoOpRegionSizeStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoOpRegionSizeStore.java
new file mode 100644
index 0000000..df62d0a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoOpRegionSizeStore.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A {@link RegionSizeStore} implementation that stores nothing.
+ */
+@InterfaceAudience.Private
+public final class NoOpRegionSizeStore implements RegionSizeStore {
+  private static final NoOpRegionSizeStore INSTANCE = new NoOpRegionSizeStore();
+
+  private NoOpRegionSizeStore() {}
+
+  public static NoOpRegionSizeStore getInstance() {
+    return INSTANCE;
+  }
+
+  @Override
+  public Iterator<Entry<RegionInfo,RegionSize>> iterator() {
+    return null;
+  }
+
+  @Override
+  public long heapSize() {
+    return 0;
+  }
+
+  @Override
+  public RegionSize getRegionSize(RegionInfo regionInfo) {
+    return null;
+  }
+
+  @Override
+  public void put(RegionInfo regionInfo, long size) {}
+
+  @Override
+  public void incrementRegionSize(RegionInfo regionInfo, long delta) {}
+
+  @Override
+  public RegionSize remove(RegionInfo regionInfo) {
+    return null;
+  }
+
+  @Override
+  public int size() {
+    return 0;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return true;
+  }
+
+  @Override
+  public void clear() {}
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
index b0bdedeb..3972700 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.hbase.quotas;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -33,6 +34,11 @@ import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
 
 /**
  * A manager for filesystem space quotas in the RegionServer.
@@ -55,6 +61,8 @@ public class RegionServerSpaceQuotaManager {
   private boolean started = false;
   private final ConcurrentHashMap<TableName,SpaceViolationPolicyEnforcement> enforcedPolicies;
   private SpaceViolationPolicyEnforcementFactory factory;
+  private RegionSizeStore regionSizeStore;
+  private RegionSizeReportingChore regionSizeReporter;
 
   public RegionServerSpaceQuotaManager(RegionServerServices rsServices) {
     this(rsServices, SpaceViolationPolicyEnforcementFactory.getInstance());
@@ -67,6 +75,8 @@ public class RegionServerSpaceQuotaManager {
     this.factory = factory;
     this.enforcedPolicies = new ConcurrentHashMap<>();
     this.currentQuotaSnapshots = new AtomicReference<>(new HashMap<>());
+    // Initialize the size store to not track anything -- create the real one if we're start()'ed
+    this.regionSizeStore = NoOpRegionSizeStore.getInstance();
   }
 
   public synchronized void start() throws IOException {
@@ -79,8 +89,13 @@ public class RegionServerSpaceQuotaManager {
       LOG.warn("RegionServerSpaceQuotaManager has already been started!");
       return;
     }
+    // Start the chores
     this.spaceQuotaRefresher = new SpaceQuotaRefresherChore(this, rsServices.getClusterConnection());
     rsServices.getChoreService().scheduleChore(spaceQuotaRefresher);
+    this.regionSizeReporter = new RegionSizeReportingChore(rsServices);
+    rsServices.getChoreService().scheduleChore(regionSizeReporter);
+    // Instantiate the real RegionSizeStore
+    this.regionSizeStore = RegionSizeStoreFactory.getInstance().createStore();
     started = true;
   }
 
@@ -89,6 +104,10 @@ public class RegionServerSpaceQuotaManager {
       spaceQuotaRefresher.cancel();
       spaceQuotaRefresher = null;
     }
+    if (regionSizeReporter != null) {
+      regionSizeReporter.cancel();
+      regionSizeReporter = null;
+    }
     started = false;
   }
 
@@ -212,6 +231,43 @@ public class RegionServerSpaceQuotaManager {
   }
 
   /**
+   * Returns the {@link RegionSizeStore} tracking filesystem utilization by each region.
+   *
+   * @return A {@link RegionSizeStore} implementation.
+   */
+  public RegionSizeStore getRegionSizeStore() {
+    return regionSizeStore;
+  }
+
+  /**
+   * Builds the protobuf message to inform the Master of files being archived.
+   *
+   * @param tn The table the files previously belonged to.
+   * @param archivedFiles The files and their size in bytes that were archived.
+   * @return The protobuf representation
+   */
+  public RegionServerStatusProtos.FileArchiveNotificationRequest buildFileArchiveRequest(
+      TableName tn, Collection<Entry<String,Long>> archivedFiles) {
+    RegionServerStatusProtos.FileArchiveNotificationRequest.Builder builder =
+        RegionServerStatusProtos.FileArchiveNotificationRequest.newBuilder();
+    HBaseProtos.TableName protoTn = ProtobufUtil.toProtoTableName(tn);
+    for (Entry<String,Long> archivedFile : archivedFiles) {
+      RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize fws =
+          RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize.newBuilder()
+              .setName(archivedFile.getKey())
+              .setSize(archivedFile.getValue())
+              .setTableName(protoTn)
+              .build();
+      builder.addArchivedFiles(fws);
+    }
+    final RegionServerStatusProtos.FileArchiveNotificationRequest request = builder.build();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Reporting file archival to Master: " + TextFormat.shortDebugString(request));
+    }
+    return request;
+  }
+
+  /**
    * Returns the collection of tables which have quota violation policies enforced on
    * this RegionServer.
    */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionSize.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionSize.java
new file mode 100644
index 0000000..c1d94d6
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionSize.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Interface that encapsulates optionally sending a Region's size to the master.
+ */
+@InterfaceAudience.Private
+public interface RegionSize extends HeapSize {
+
+  /**
+   * Updates the size of the Region.
+   *
+   * @param newSize the new size of the Region
+   * @return {@code this}
+   */
+  RegionSize setSize(long newSize);
+
+  /**
+   * Atomically adds the provided {@code delta} to the region size.
+   *
+   * @param delta The change in size in bytes of the region.
+   * @return {@code this}
+   */
+  RegionSize incrementSize(long delta);
+
+  /**
+   * Returns the size of the region.
+   *
+   * @return The size in bytes.
+   */
+  long getSize();
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionSizeImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionSizeImpl.java
new file mode 100644
index 0000000..2a433b4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionSizeImpl.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An object encapsulating a Region's size and whether it's been reported to the master since
+ * the value last changed.
+ */
+@InterfaceAudience.Private
+public class RegionSizeImpl implements RegionSize {
+  private static final Logger LOG = LoggerFactory.getLogger(RegionSizeImpl.class);
+  private static final long HEAP_SIZE = ClassSize.OBJECT + ClassSize.ATOMIC_LONG +
+    ClassSize.REFERENCE;
+  private final AtomicLong size;
+
+  public RegionSizeImpl(long initialSize) {
+    // A region can never be negative in size. We can prevent this from being a larger problem, but
+    // we will need to leave ourselves a note to figure out how we got here.
+    if (initialSize < 0L && LOG.isTraceEnabled()) {
+      LOG.trace("Nonsensical negative Region size being constructed, this is likely an error",
+          new Exception());
+    }
+    this.size = new AtomicLong(initialSize < 0L ? 0L : initialSize);
+  }
+
+  @Override
+  public long heapSize() {
+    return HEAP_SIZE;
+  }
+
+  @Override
+  public RegionSizeImpl setSize(long newSize) {
+    // Set the new size before advertising that we need to tell the master about it. Worst case
+    // we have to wait for the next period to report it.
+    size.set(newSize);
+    return this;
+  }
+
+  @Override
+  public RegionSizeImpl incrementSize(long delta) {
+    size.addAndGet(delta);
+    return this;
+  }
+
+  @Override
+  public long getSize() {
+    return size.get();
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionSizeReportingChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionSizeReportingChore.java
new file mode 100644
index 0000000..0f4055c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionSizeReportingChore.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Chore which sends the region size reports on this RegionServer to the Master.
+ */
+@InterfaceAudience.Private
+public class RegionSizeReportingChore extends ScheduledChore {
+  private static final Logger LOG = LoggerFactory.getLogger(RegionSizeReportingChore.class);
+
+  static final String REGION_SIZE_REPORTING_CHORE_PERIOD_KEY =
+      "hbase.regionserver.quotas.region.size.reporting.chore.period";
+  static final int REGION_SIZE_REPORTING_CHORE_PERIOD_DEFAULT = 1000 * 60;
+
+  static final String REGION_SIZE_REPORTING_CHORE_DELAY_KEY =
+      "hbase.regionserver.quotas.region.size.reporting.chore.delay";
+  static final long REGION_SIZE_REPORTING_CHORE_DELAY_DEFAULT = 1000 * 30;
+
+  static final String REGION_SIZE_REPORTING_CHORE_TIMEUNIT_KEY =
+      "hbase.regionserver.quotas.region.size.reporting.chore.timeunit";
+  static final String REGION_SIZE_REPORTING_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
+
+  private final RegionServerServices rsServices;
+  private final MetricsRegionServer metrics;
+
+  public RegionSizeReportingChore(RegionServerServices rsServices) {
+    super(
+        RegionSizeReportingChore.class.getSimpleName(), rsServices,
+        getPeriod(rsServices.getConfiguration()), getInitialDelay(rsServices.getConfiguration()),
+        getTimeUnit(rsServices.getConfiguration()));
+    this.rsServices = rsServices;
+    this.metrics = rsServices.getMetrics();
+  }
+
+  @Override
+  protected void chore() {
+    final long start = System.nanoTime();
+    try {
+      _chore();
+    } finally {
+      if (metrics != null) {
+        metrics.incrementRegionSizeReportingChoreTime(
+            TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS));
+      }
+    }
+  }
+
+  void _chore() {
+    final RegionServerSpaceQuotaManager quotaManager =
+        rsServices.getRegionServerSpaceQuotaManager();
+    // Get the HRegionInfo for each online region
+    HashSet<RegionInfo> onlineRegionInfos = getOnlineRegionInfos(rsServices.getRegions());
+    RegionSizeStore store = quotaManager.getRegionSizeStore();
+    // Remove all sizes for non-online regions
+    removeNonOnlineRegions(store, onlineRegionInfos);
+    rsServices.reportRegionSizesForQuotas(store);
+  }
+
+  HashSet<RegionInfo> getOnlineRegionInfos(List<? extends Region> onlineRegions) {
+    HashSet<RegionInfo> regionInfos = new HashSet<>();
+    onlineRegions.forEach((region) -> regionInfos.add(region.getRegionInfo()));
+    return regionInfos;
+  }
+
+  void removeNonOnlineRegions(RegionSizeStore store, Set<RegionInfo> onlineRegions) {
+    // We have to remove regions which are no longer online from the store, otherwise they will
+    // continue to be sent to the Master which will prevent size report expiration.
+    if (onlineRegions.isEmpty()) {
+      // Easy-case, no online regions means no size reports
+      store.clear();
+      return;
+    }
+
+    Iterator<Entry<RegionInfo,RegionSize>> iter = store.iterator();
+    int numEntriesRemoved = 0;
+    while (iter.hasNext()) {
+      Entry<RegionInfo,RegionSize> entry = iter.next();
+      RegionInfo regionInfo = entry.getKey();
+      if (!onlineRegions.contains(regionInfo)) {
+        numEntriesRemoved++;
+        iter.remove();
+      }
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Removed " + numEntriesRemoved + " region sizes before reporting to Master "
+          + "because they are for non-online regions.");
+    }
+  }
+
+  /**
+   * Extracts the period for the chore from the configuration.
+   *
+   * @param conf The configuration object.
+   * @return The configured chore period or the default value.
+   */
+  static int getPeriod(Configuration conf) {
+    return conf.getInt(
+        REGION_SIZE_REPORTING_CHORE_PERIOD_KEY, REGION_SIZE_REPORTING_CHORE_PERIOD_DEFAULT);
+  }
+
+  /**
+   * Extracts the initial delay for the chore from the configuration.
+   *
+   * @param conf The configuration object.
+   * @return The configured chore initial delay or the default value.
+   */
+  static long getInitialDelay(Configuration conf) {
+    return conf.getLong(
+        REGION_SIZE_REPORTING_CHORE_DELAY_KEY, REGION_SIZE_REPORTING_CHORE_DELAY_DEFAULT);
+  }
+
+  /**
+   * Extracts the time unit for the chore period and initial delay from the configuration. The
+   * configuration value for {@link #REGION_SIZE_REPORTING_CHORE_TIMEUNIT_KEY} must correspond to a
+   * {@link TimeUnit} value.
+   *
+   * @param conf The configuration object.
+   * @return The configured time unit for the chore period and initial delay or the default value.
+   */
+  static TimeUnit getTimeUnit(Configuration conf) {
+    return TimeUnit.valueOf(conf.get(REGION_SIZE_REPORTING_CHORE_TIMEUNIT_KEY,
+        REGION_SIZE_REPORTING_CHORE_TIMEUNIT_DEFAULT));
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionSizeStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionSizeStore.java
new file mode 100644
index 0000000..bd5c5bb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionSizeStore.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * An interface for concurrently storing and updating the size of a Region.
+ */
+@InterfaceAudience.Private
+public interface RegionSizeStore extends Iterable<Entry<RegionInfo,RegionSize>>, HeapSize {
+
+  /**
+   * Returns the size for the give region if one exists. If no size exists, {@code null} is
+   * returned.
+   *
+   * @param regionInfo The region whose size is being fetched.
+   * @return The size in bytes of the region or null if no size is stored.
+   */
+  RegionSize getRegionSize(RegionInfo regionInfo);
+
+  /**
+   * Atomically sets the given {@code size} for a region.
+   *
+   * @param regionInfo An identifier for a region.
+   * @param size The size in bytes of the region.
+   */
+  void put(RegionInfo regionInfo, long size);
+
+  /**
+   * Atomically alter the size of a region.
+   *
+   * @param regionInfo The region to update.
+   * @param delta The change in size for the region, positive or negative.
+   */
+  void incrementRegionSize(RegionInfo regionInfo, long delta);
+
+  /**
+   * Removes the mapping for the given key, returning the value if one exists in the store.
+   *
+   * @param regionInfo The key to remove from the store
+   * @return The value removed from the store if one exists, otherwise null.
+   */
+  RegionSize remove(RegionInfo regionInfo);
+
+  /**
+   * Returns the number of entries in the store.
+   *
+   * @return The number of entries in the store.
+   */
+  int size();
+
+  /**
+   * Returns if the store is empty.
+   *
+   * @return true if there are no entries in the store, otherwise false.
+   */
+  boolean isEmpty();
+
+  /**
+   * Removes all entries from the store.
+   */
+  void clear();
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionSizeStoreFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionSizeStoreFactory.java
new file mode 100644
index 0000000..2564ecb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionSizeStoreFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A factory class for creating implementations of {@link RegionSizeStore}.
+ */
+@InterfaceAudience.Private
+public final class RegionSizeStoreFactory {
+  private static final RegionSizeStoreFactory INSTANCE = new RegionSizeStoreFactory();
+
+  private RegionSizeStoreFactory() {}
+
+  public static RegionSizeStoreFactory getInstance() {
+    return INSTANCE;
+  }
+
+  public RegionSizeStore createStore() {
+    // Presently, there is only one implementation.
+    return new RegionSizeStoreImpl();
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionSizeStoreImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionSizeStoreImpl.java
new file mode 100644
index 0000000..556fd12
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionSizeStoreImpl.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link RegionSizeStore} implementation backed by a ConcurrentHashMap. We expected similar
+ * amounts of reads and writes to the "store", so using a RWLock is not going to provide any
+ * exceptional gains.
+ */
+@InterfaceAudience.Private
+public class RegionSizeStoreImpl implements RegionSizeStore {
+  private static final Logger LOG = LoggerFactory.getLogger(RegionSizeStoreImpl.class);
+  private static final long sizeOfEntry = ClassSize.align(
+      ClassSize.CONCURRENT_HASHMAP_ENTRY
+      + ClassSize.OBJECT + Bytes.SIZEOF_LONG
+      // TODO Have RegionInfo implement HeapSize. 100B is an approximation based on a heapdump.
+      + ClassSize.OBJECT + 100);
+  private final ConcurrentHashMap<RegionInfo,RegionSize> store;
+
+  public RegionSizeStoreImpl() {
+    store = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public Iterator<Entry<RegionInfo,RegionSize>> iterator() {
+    return store.entrySet().iterator();
+  }
+
+  @Override
+  public RegionSize getRegionSize(RegionInfo regionInfo) {
+    return store.get(regionInfo);
+  }
+
+  @Override
+  public void put(RegionInfo regionInfo, long size) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Setting space quota size for " + regionInfo + " to " + size);
+    }
+    // Atomic. Either sets the new size for the first time, or replaces the existing value.
+    store.compute(regionInfo,
+      (key,value) -> value == null ? new RegionSizeImpl(size) : value.setSize(size));
+  }
+
+  @Override
+  public void incrementRegionSize(RegionInfo regionInfo, long delta) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Updating space quota size for " + regionInfo + " with a delta of " + delta);
+    }
+    // Atomic. Recomputes the stored value with the delta if there is one, otherwise use the delta.
+    store.compute(regionInfo,
+      (key,value) -> value == null ? new RegionSizeImpl(delta) : value.incrementSize(delta));
+  }
+
+  @Override
+  public RegionSize remove(RegionInfo regionInfo) {
+    return store.remove(regionInfo);
+  }
+
+  @Override
+  public long heapSize() {
+    // Will have to iterate over each element if RegionInfo implements HeapSize, for now it's just
+    // a simple calculation.
+    return sizeOfEntry * store.size();
+  }
+
+  @Override
+  public int size() {
+    return store.size();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return store.isEmpty();
+  }
+
+  @Override
+  public void clear() {
+    store.clear();
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SnapshotQuotaObserverChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SnapshotQuotaObserverChore.java
index 78f6c3e..9111b8d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SnapshotQuotaObserverChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SnapshotQuotaObserverChore.java
@@ -17,26 +17,19 @@
 package org.apache.hadoop.hbase.quotas;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ScheduledChore;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
@@ -49,15 +42,6 @@ import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MetricsMaster;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.FamilyFiles;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.StoreFile;
-import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
-import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.HFileArchiveUtil;
-import org.apache.hadoop.util.StringUtils;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
 import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
@@ -135,11 +119,11 @@ public class SnapshotQuotaObserverChore extends ScheduledChore {
     pruneNamespaceSnapshots(snapshotsToComputeSize);
 
     // For each table, compute the size of each snapshot
-    Multimap<TableName,SnapshotWithSize> snapshotsWithSize = computeSnapshotSizes(
-        snapshotsToComputeSize);
+    Map<String,Long> namespaceSnapshotSizes = computeSnapshotSizes(snapshotsToComputeSize);
 
-    // Write the size data to the quota table.
-    persistSnapshotSizes(snapshotsWithSize);
+    // Write the size data by namespaces to the quota table.
+    // We need to do this "globally" since each FileArchiverNotifier is limited to its own Table.
+    persistSnapshotSizesForNamespaces(namespaceSnapshotSizes);
   }
 
   /**
@@ -234,321 +218,50 @@ public class SnapshotQuotaObserverChore extends ScheduledChore {
    * @param snapshotsToComputeSize The snapshots to compute the size of
    * @return A mapping of table to snapshot created from that table and the snapshot's size.
    */
-  Multimap<TableName,SnapshotWithSize> computeSnapshotSizes(
+  Map<String,Long> computeSnapshotSizes(
       Multimap<TableName,String> snapshotsToComputeSize) throws IOException {
-    Multimap<TableName,SnapshotWithSize> snapshotSizes = HashMultimap.create();
+    final Map<String,Long> snapshotSizesByNamespace = new HashMap<>();
+    final long start = System.nanoTime();
     for (Entry<TableName,Collection<String>> entry : snapshotsToComputeSize.asMap().entrySet()) {
       final TableName tn = entry.getKey();
-      final List<String> snapshotNames = new ArrayList<>(entry.getValue());
-      // Sort the snapshots so we process them in lexicographic order. This ensures that multiple
-      // invocations of this Chore do not more the size ownership of some files between snapshots
-      // that reference the file (prevents size ownership from moving between snapshots).
-      Collections.sort(snapshotNames);
-      final Path rootDir = FSUtils.getRootDir(conf);
-      // Get the map of store file names to store file path for this table
-      // TODO is the store-file name unique enough? Does this need to be region+family+storefile?
-      final Set<String> tableReferencedStoreFiles;
-      try {
-        tableReferencedStoreFiles = FSUtils.getTableStoreFilePathMap(fs, rootDir).keySet();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        return null;
-      }
-
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Paths for " + tn + ": " + tableReferencedStoreFiles);
-      }
-
-      // For each snapshot on this table, get the files which the snapshot references which
-      // the table does not.
-      Set<String> snapshotReferencedFiles = new HashSet<>();
-      for (String snapshotName : snapshotNames) {
-        final long start = System.nanoTime();
-        Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
-        SnapshotDescription sd = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
-        SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, sd);
-
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Files referenced by other snapshots: " + snapshotReferencedFiles);
-        }
+      final Collection<String> snapshotNames = entry.getValue();
 
-        // Get the set of files from the manifest that this snapshot references which are not also
-        // referenced by the originating table.
-        Set<StoreFileReference> unreferencedStoreFileNames = getStoreFilesFromSnapshot(
-            manifest, (sfn) -> !tableReferencedStoreFiles.contains(sfn)
-                && !snapshotReferencedFiles.contains(sfn));
+      // Get our notifier instance, this is tracking archivals that happen out-of-band of this chore
+      FileArchiverNotifier notifier = getNotifierForTable(tn);
 
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Snapshot " + snapshotName + " solely references the files: "
-              + unreferencedStoreFileNames);
-        }
-
-        // Compute the size of the store files for this snapshot
-        long size = getSizeOfStoreFiles(tn, unreferencedStoreFileNames);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Computed size of " + snapshotName + " to be " + size);
-        }
-
-        // Persist this snapshot's size into the map
-        snapshotSizes.put(tn, new SnapshotWithSize(snapshotName, size));
-
-        // Make sure that we don't double-count the same file
-        for (StoreFileReference ref : unreferencedStoreFileNames) {
-          for (String fileName : ref.getFamilyToFilesMapping().values()) {
-            snapshotReferencedFiles.add(fileName);
-          }
-        }
-        // Update the amount of time it took to compute the snapshot's size
-        if (null != metrics) {
-          metrics.incrementSnapshotSizeComputationTime((System.nanoTime() - start) / 1_000_000);
-        }
-      }
+      // The total size consumed by all snapshots against this table
+      long totalSnapshotSize = notifier.computeAndStoreSnapshotSizes(snapshotNames);
+      // Bucket that size into the appropriate namespace
+      snapshotSizesByNamespace.merge(tn.getNamespaceAsString(), totalSnapshotSize, Long::sum);
     }
-    return snapshotSizes;
-  }
 
-  /**
-   * Extracts the names of the store files referenced by this snapshot which satisfy the given
-   * predicate (the predicate returns {@code true}).
-   */
-  Set<StoreFileReference> getStoreFilesFromSnapshot(
-      SnapshotManifest manifest, Predicate<String> filter) {
-    Set<StoreFileReference> references = new HashSet<>();
-    // For each region referenced by the snapshot
-    for (SnapshotRegionManifest rm : manifest.getRegionManifests()) {
-      StoreFileReference regionReference = new StoreFileReference(
-          HRegionInfo.convert(rm.getRegionInfo()).getEncodedName());
-
-      // For each column family in this region
-      for (FamilyFiles ff : rm.getFamilyFilesList()) {
-        final String familyName = ff.getFamilyName().toStringUtf8();
-        // And each store file in that family
-        for (StoreFile sf : ff.getStoreFilesList()) {
-          String storeFileName = sf.getName();
-          // A snapshot only "inherits" a files size if it uniquely refers to it (no table
-          // and no other snapshot references it).
-          if (filter.test(storeFileName)) {
-            regionReference.addFamilyStoreFile(familyName, storeFileName);
-          }
-        }
-      }
-      // Only add this Region reference if we retained any files.
-      if (!regionReference.getFamilyToFilesMapping().isEmpty()) {
-        references.add(regionReference);
-      }
+    // Update the amount of time it took to compute the size of the snapshots for a table
+    if (metrics != null) {
+      metrics.incrementSnapshotSizeComputationTime((System.nanoTime() - start) / 1_000_000);
     }
-    return references;
-  }
 
-  /**
-   * Calculates the directory in HDFS for a table based on the configuration.
-   */
-  Path getTableDir(TableName tn) throws IOException {
-    Path rootDir = FSUtils.getRootDir(conf);
-    return FSUtils.getTableDir(rootDir, tn);
-  }
-
-  /**
-   * Computes the size of each store file in {@code storeFileNames}
-   */
-  long getSizeOfStoreFiles(TableName tn, Set<StoreFileReference> storeFileNames) {
-    return storeFileNames.stream()
-        .collect(Collectors.summingLong((sfr) -> getSizeOfStoreFile(tn, sfr)));
-  }
-
-  /**
-   * Computes the size of the store files for a single region.
-   */
-  long getSizeOfStoreFile(TableName tn, StoreFileReference storeFileName) {
-    String regionName = storeFileName.getRegionName();
-    return storeFileName.getFamilyToFilesMapping()
-        .entries().stream()
-        .collect(Collectors.summingLong((e) ->
-            getSizeOfStoreFile(tn, regionName, e.getKey(), e.getValue())));
+    return snapshotSizesByNamespace;
   }
 
   /**
-   * Computes the size of the store file given its name, region and family name in
-   * the archive directory.
-   */
-  long getSizeOfStoreFile(
-      TableName tn, String regionName, String family, String storeFile) {
-    Path familyArchivePath;
-    try {
-      familyArchivePath = HFileArchiveUtil.getStoreArchivePath(conf, tn, regionName, family);
-    } catch (IOException e) {
-      LOG.warn("Could not compute path for the archive directory for the region", e);
-      return 0L;
-    }
-    Path fileArchivePath = new Path(familyArchivePath, storeFile);
-    try {
-      if (fs.exists(fileArchivePath)) {
-        FileStatus[] status = fs.listStatus(fileArchivePath);
-        if (1 != status.length) {
-          LOG.warn("Expected " + fileArchivePath +
-              " to be a file but was a directory, ignoring reference");
-          return 0L;
-        }
-        return status[0].getLen();
-      }
-    } catch (IOException e) {
-      LOG.warn("Could not obtain the status of " + fileArchivePath, e);
-      return 0L;
-    }
-    LOG.warn("Expected " + fileArchivePath + " to exist but does not, ignoring reference.");
-    return 0L;
-  }
-
-  /**
-   * Writes the snapshot sizes to the {@code hbase:quota} table.
+   * Returns the correct instance of {@link FileArchiverNotifier} for the given table name.
    *
-   * @param snapshotsWithSize The snapshot sizes to write.
-   */
-  void persistSnapshotSizes(
-      Multimap<TableName,SnapshotWithSize> snapshotsWithSize) throws IOException {
-    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
-      // Write each snapshot size for the table
-      persistSnapshotSizes(quotaTable, snapshotsWithSize);
-      // Write a size entry for all snapshots in a namespace
-      persistSnapshotSizesByNS(quotaTable, snapshotsWithSize);
-    }
-  }
-
-  /**
-   * Writes the snapshot sizes to the provided {@code table}.
-   */
-  void persistSnapshotSizes(
-      Table table, Multimap<TableName,SnapshotWithSize> snapshotsWithSize) throws IOException {
-    // Convert each entry in the map to a Put and write them to the quota table
-    table.put(snapshotsWithSize.entries()
-        .stream()
-        .map(e -> QuotaTableUtil.createPutForSnapshotSize(
-            e.getKey(), e.getValue().getName(), e.getValue().getSize()))
-        .collect(Collectors.toList()));
-  }
-
-  /**
-   * Rolls up the snapshot sizes by namespace and writes a single record for each namespace
-   * which is the size of all snapshots in that namespace.
-   */
-  void persistSnapshotSizesByNS(
-      Table quotaTable, Multimap<TableName,SnapshotWithSize> snapshotsWithSize) throws IOException {
-    Map<String,Long> namespaceSnapshotSizes = groupSnapshotSizesByNamespace(snapshotsWithSize);
-    quotaTable.put(namespaceSnapshotSizes.entrySet().stream()
-        .map(e -> QuotaTableUtil.createPutForNamespaceSnapshotSize(
-            e.getKey(), e.getValue()))
-        .collect(Collectors.toList()));
-  }
-
-  /**
-   * Sums the snapshot sizes for each namespace.
+   * @param tn The table name
+   * @return A {@link FileArchiverNotifier} instance
    */
-  Map<String,Long> groupSnapshotSizesByNamespace(
-      Multimap<TableName,SnapshotWithSize> snapshotsWithSize) {
-    return snapshotsWithSize.entries().stream()
-        .collect(Collectors.groupingBy(
-            // Convert TableName into the namespace string
-            (e) -> e.getKey().getNamespaceAsString(),
-            // Sum the values for namespace
-            Collectors.mapping(
-                Map.Entry::getValue, Collectors.summingLong((sws) -> sws.getSize()))));
+  FileArchiverNotifier getNotifierForTable(TableName tn) {
+    return FileArchiverNotifierFactoryImpl.getInstance().get(conn, conf, fs, tn);
   }
 
   /**
-   * A struct encapsulating the name of a snapshot and its "size" on the filesystem. This size is
-   * defined as the amount of filesystem space taken by the files the snapshot refers to which
-   * the originating table no longer refers to.
+   * Writes the size used by snapshots for each namespace to the quota table.
    */
-  static class SnapshotWithSize {
-    private final String name;
-    private final long size;
-
-    SnapshotWithSize(String name, long size) {
-      this.name = Objects.requireNonNull(name);
-      this.size = size;
-    }
-
-    String getName() {
-      return name;
-    }
-
-    long getSize() {
-      return size;
-    }
-
-    @Override
-    public int hashCode() {
-      return new HashCodeBuilder().append(name).append(size).toHashCode();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-
-      if (!(o instanceof SnapshotWithSize)) {
-        return false;
-      }
-
-      SnapshotWithSize other = (SnapshotWithSize) o;
-      return name.equals(other.name) && size == other.size;
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder(32);
-      return sb.append("SnapshotWithSize:[").append(name).append(" ")
-          .append(StringUtils.byteDesc(size)).append("]").toString();
-    }
-  }
-
-  /**
-   * A reference to a collection of files in the archive directory for a single region.
-   */
-  static class StoreFileReference {
-    private final String regionName;
-    private final Multimap<String,String> familyToFiles;
-
-    StoreFileReference(String regionName) {
-      this.regionName = Objects.requireNonNull(regionName);
-      familyToFiles = HashMultimap.create();
-    }
-
-    String getRegionName() {
-      return regionName;
-    }
-
-    Multimap<String,String> getFamilyToFilesMapping() {
-      return familyToFiles;
-    }
-
-    void addFamilyStoreFile(String family, String storeFileName) {
-      familyToFiles.put(family, storeFileName);
-    }
-
-    @Override
-    public int hashCode() {
-      return new HashCodeBuilder().append(regionName).append(familyToFiles).toHashCode();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (!(o instanceof StoreFileReference)) {
-        return false;
-      }
-      StoreFileReference other = (StoreFileReference) o;
-      return regionName.equals(other.regionName) && familyToFiles.equals(other.familyToFiles);
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      return sb.append("StoreFileReference[region=").append(regionName).append(", files=")
-          .append(familyToFiles).append("]").toString();
+  void persistSnapshotSizesForNamespaces(
+      Map<String,Long> snapshotSizesByNamespace) throws IOException {
+    try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
+      quotaTable.put(snapshotSizesByNamespace.entrySet().stream()
+          .map(e -> QuotaTableUtil.createPutForNamespaceSnapshotSize(e.getKey(), e.getValue()))
+          .collect(Collectors.toList()));
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcement.java
index b1f3cd0..d9730a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcement.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceViolationPolicyEnforcement.java
@@ -80,12 +80,13 @@ public interface SpaceViolationPolicyEnforcement {
   boolean shouldCheckBulkLoads();
 
   /**
-   * Checks the file at the given path against <code>this</code> policy and the current
-   * {@link SpaceQuotaSnapshot}. If the file would violate the policy, a
+   * Computes the size of the file(s) at the given path against <code>this</code> policy and the
+   * current {@link SpaceQuotaSnapshot}. If the file would violate the policy, a
    * {@link SpaceLimitingException} will be thrown.
    *
    * @param paths The paths in HDFS to files to be bulk loaded.
+   * @return The size, in bytes, of the files that would be loaded.
    */
-  void checkBulkLoad(FileSystem fs, List<String> paths) throws SpaceLimitingException;
+  long computeBulkLoadSize(FileSystem fs, List<String> paths) throws SpaceLimitingException;
 
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/AbstractViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/AbstractViolationPolicyEnforcement.java
index c919d7e..79c78bc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/AbstractViolationPolicyEnforcement.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/AbstractViolationPolicyEnforcement.java
@@ -16,14 +16,19 @@
  */
 package org.apache.hadoop.hbase.quotas.policies;
 
+import java.io.IOException;
 import java.util.Objects;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
+import org.apache.hadoop.hbase.quotas.SpaceLimitingException;
 import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
 import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
 
 /**
  * Abstract implementation for {@link SpaceViolationPolicyEnforcement}.
@@ -74,4 +79,27 @@ public abstract class AbstractViolationPolicyEnforcement
   public boolean areCompactionsDisabled() {
     return false;
   }
+
+  /**
+   * Computes the size of a single file on the filesystem. If the size cannot be computed for some
+   * reason, a {@link SpaceLimitingException} is thrown, as the file may violate a quota. If the
+   * provided path does not reference a file, an {@link IllegalArgumentException} is thrown.
+   *
+   * @param fs The FileSystem which the path refers to a file upon
+   * @param path The path on the {@code fs} to a file whose size is being checked
+   * @return The size in bytes of the file
+   */
+  long getFileSize(FileSystem fs, String path) throws SpaceLimitingException {
+    final FileStatus status;
+    try {
+      status = fs.getFileStatus(new Path(Objects.requireNonNull(path)));
+    } catch (IOException e) {
+      throw new SpaceLimitingException(
+          getPolicyName(), "Could not verify length of file to bulk load: " + path, e);
+    }
+    if (!status.isFile()) {
+      throw new IllegalArgumentException(path + " is not a file.");
+    }
+    return status.getLen();
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/DefaultViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/DefaultViolationPolicyEnforcement.java
index 28e7fd2..01217b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/DefaultViolationPolicyEnforcement.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/DefaultViolationPolicyEnforcement.java
@@ -18,11 +18,8 @@ package org.apache.hadoop.hbase.quotas.policies;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Objects;
 
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.quotas.SpaceLimitingException;
@@ -58,33 +55,22 @@ public class DefaultViolationPolicyEnforcement extends AbstractViolationPolicyEn
   }
 
   @Override
-  public void checkBulkLoad(FileSystem fs, List<String> paths) throws SpaceLimitingException {
+  public long computeBulkLoadSize(FileSystem fs, List<String> paths) throws SpaceLimitingException {
     // Compute the amount of space that could be used to save some arithmetic in the for-loop
     final long sizeAvailableForBulkLoads = quotaSnapshot.getLimit() - quotaSnapshot.getUsage();
     long size = 0L;
     for (String path : paths) {
-      size += addSingleFile(fs, path);
+      try {
+        size += getFileSize(fs, path);
+      } catch (IOException e) {
+        throw new SpaceLimitingException(
+            getPolicyName(), "Colud not verify length of file to bulk load: " + path, e);
+      }
       if (size > sizeAvailableForBulkLoads) {
-        break;
+        throw new SpaceLimitingException(getPolicyName(), "Bulk load of " + paths
+            + " is disallowed because the file(s) exceed the limits of a space quota.");
       }
     }
-    if (size > sizeAvailableForBulkLoads) {
-      throw new SpaceLimitingException(getPolicyName(), "Bulk load of " + paths
-          + " is disallowed because the file(s) exceed the limits of a space quota.");
-    }
-  }
-
-  private long addSingleFile(FileSystem fs, String path) throws SpaceLimitingException {
-    final FileStatus status;
-    try {
-      status = fs.getFileStatus(new Path(Objects.requireNonNull(path)));
-    } catch (IOException e) {
-      throw new SpaceLimitingException(
-          getPolicyName(), "Could not verify length of file to bulk load", e);
-    }
-    if (!status.isFile()) {
-      throw new IllegalArgumentException(path + " is not a file.");
-    }
-    return status.getLen();
+    return size;
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/MissingSnapshotViolationPolicyEnforcement.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/MissingSnapshotViolationPolicyEnforcement.java
index cbc70a0..0760df8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/MissingSnapshotViolationPolicyEnforcement.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/policies/MissingSnapshotViolationPolicyEnforcement.java
@@ -47,7 +47,13 @@ public final class MissingSnapshotViolationPolicyEnforcement
   }
 
   @Override
-  public void checkBulkLoad(FileSystem fs, List<String> paths) {}
+  public long computeBulkLoadSize(FileSystem fs, List<String> paths) throws SpaceLimitingException {
+    long size = 0;
+    for (String path : paths) {
+      size += getFileSize(fs, path);
+    }
+    return size;
+  }
 
   @Override
   public void enable() throws IOException {}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index fbeecc6..9521cb0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -140,6 +140,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.mob.MobFileCache;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
@@ -2817,6 +2818,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       MemStoreSize mss = prepareResult.totalFlushableSize.getMemStoreSize();
       this.decrMemStoreSize(mss);
 
+      // Increase the size of this Region for the purposes of quota. Noop if quotas are disabled.
+      // During startup, quota manager may not be initialized yet.
+      if (rsServices != null) {
+        RegionServerSpaceQuotaManager quotaManager = rsServices.getRegionServerSpaceQuotaManager();
+        if (quotaManager != null) {
+          quotaManager.getRegionSizeStore().incrementRegionSize(
+              this.getRegionInfo(), flushedOutputFileSize);
+        }
+      }
+
       if (wal != null) {
         // write flush marker to WAL. If fail, we should throw DroppedSnapshotException
         FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 109cc1a..f22137d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -129,6 +129,8 @@ import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
 import org.apache.hadoop.hbase.quotas.QuotaUtil;
 import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionSize;
+import org.apache.hadoop.hbase.quotas.RegionSizeStore;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
@@ -215,6 +217,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServe
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
@@ -1100,15 +1103,6 @@ public class HRegionServer extends HasThread implements
     if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
     sendShutdownInterrupt();
 
-    // Stop the quota manager
-    if (rsQuotaManager != null) {
-      rsQuotaManager.stop();
-    }
-    if (rsSpaceQuotaManager != null) {
-      rsSpaceQuotaManager.stop();
-      rsSpaceQuotaManager = null;
-    }
-
     // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
     if (rspmHost != null) {
       rspmHost.stop(this.abortRequested || this.killed);
@@ -1152,6 +1146,15 @@ public class HRegionServer extends HasThread implements
       LOG.info("stopping server " + this.serverName + "; all regions closed.");
     }
 
+    // Stop the quota manager
+    if (rsQuotaManager != null) {
+      rsQuotaManager.stop();
+    }
+    if (rsSpaceQuotaManager != null) {
+      rsSpaceQuotaManager.stop();
+      rsSpaceQuotaManager = null;
+    }
+
     //fsOk flag may be changed when closing regions throws exception.
     if (this.fsOk) {
       shutdownWAL(!abortRequested);
@@ -1258,10 +1261,10 @@ public class HRegionServer extends HasThread implements
   /**
    * Reports the given map of Regions and their size on the filesystem to the active Master.
    *
-   * @param onlineRegionSizes A map of region info to size in bytes
+   * @param regionSizeStore The store containing region sizes
    * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
    */
-  public boolean reportRegionSizesForQuotas(final Map<RegionInfo, Long> onlineRegionSizes) {
+  public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
     RegionServerStatusService.BlockingInterface rss = rssStub;
     if (rss == null) {
       // the current server could be stopping.
@@ -1269,9 +1272,7 @@ public class HRegionServer extends HasThread implements
       return true;
     }
     try {
-      RegionSpaceUseReportRequest request = buildRegionSpaceUseReportRequest(
-          Objects.requireNonNull(onlineRegionSizes));
-      rss.reportRegionSpaceUse(null, request);
+      buildReportAndSend(rss, regionSizeStore);
     } catch (ServiceException se) {
       IOException ioe = ProtobufUtil.getRemoteException(se);
       if (ioe instanceof PleaseHoldException) {
@@ -1300,15 +1301,33 @@ public class HRegionServer extends HasThread implements
   }
 
   /**
+   * Builds the region size report and sends it to the master. Upon successful sending of the
+   * report, the region sizes that were sent are marked as sent.
+   *
+   * @param rss The stub to send to the Master
+   * @param regionSizeStore The store containing region sizes
+   */
+  void buildReportAndSend(RegionServerStatusService.BlockingInterface rss,
+      RegionSizeStore regionSizeStore) throws ServiceException {
+    RegionSpaceUseReportRequest request =
+        buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore));
+    rss.reportRegionSpaceUse(null, request);
+    // Record the number of size reports sent
+    if (metricsRegionServer != null) {
+      metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size());
+    }
+  }
+
+  /**
    * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
    *
-   * @param regionSizes Map of region info to size in bytes.
+   * @param regionSizeStore The size in bytes of regions
    * @return The corresponding protocol buffer message.
    */
-  RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(Map<RegionInfo,Long> regionSizes) {
+  RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) {
     RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
-    for (Entry<RegionInfo, Long> entry : Objects.requireNonNull(regionSizes).entrySet()) {
-      request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue()));
+    for (Entry<RegionInfo, RegionSize> entry : regionSizes) {
+      request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize()));
     }
     return request.build();
   }
@@ -3798,6 +3817,40 @@ public class HRegionServer extends HasThread implements
     return this.rsSpaceQuotaManager;
   }
 
+  @Override
+  public boolean reportFileArchivalForQuotas(TableName tableName,
+      Collection<Entry<String, Long>> archivedFiles) {
+    RegionServerStatusService.BlockingInterface rss = rssStub;
+    if (rss == null || rsSpaceQuotaManager == null) {
+      // the current server could be stopping.
+      LOG.trace("Skipping file archival reporting to HMaster as stub is null");
+      return false;
+    }
+    try {
+      RegionServerStatusProtos.FileArchiveNotificationRequest request =
+          rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles);
+      rss.reportFileArchival(null, request);
+    } catch (ServiceException se) {
+      IOException ioe = ProtobufUtil.getRemoteException(se);
+      if (ioe instanceof PleaseHoldException) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Failed to report file archival(s) to Master because it is initializing."
+              + " This will be retried.", ioe);
+        }
+        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
+        return false;
+      }
+      if (rssStub == rss) {
+        rssStub = null;
+      }
+      // re-create the stub if we failed to report the archival
+      createRegionServerStatusStub(true);
+      LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe);
+      return false;
+    }
+    return true;
+  }
+
   public NettyEventLoopGroupConfig getEventLoopGroupConfig() {
     return eventLoopGroupConfig;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 3a37b12..f2ffc4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.NavigableSet;
 import java.util.Optional;
 import java.util.OptionalDouble;
@@ -82,6 +83,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.quotas.RegionSizeStore;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
@@ -109,6 +111,7 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection;
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
 import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
 import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
@@ -1524,12 +1527,51 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
       synchronized (filesCompacting) {
         filesCompacting.removeAll(compactedFiles);
       }
+
+      // These may be null when the RS is shutting down. The space quota Chores will fix the Region
+      // sizes later so it's not super-critical if we miss these.
+      RegionServerServices rsServices = region.getRegionServerServices();
+      if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) {
+        updateSpaceQuotaAfterFileReplacement(
+            rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(),
+            compactedFiles, result);
+      }
     } finally {
       this.lock.writeLock().unlock();
     }
   }
 
   /**
+   * Updates the space quota usage for this region, removing the size for files compacted away
+   * and adding in the size for new files.
+   *
+   * @param sizeStore The object tracking changes in region size for space quotas.
+   * @param regionInfo The identifier for the region whose size is being updated.
+   * @param oldFiles Files removed from this store's region.
+   * @param newFiles Files added to this store's region.
+   */
+  void updateSpaceQuotaAfterFileReplacement(
+      RegionSizeStore sizeStore, RegionInfo regionInfo, Collection<HStoreFile> oldFiles,
+      Collection<HStoreFile> newFiles) {
+    long delta = 0;
+    if (oldFiles != null) {
+      for (HStoreFile compactedFile : oldFiles) {
+        if (compactedFile.isHFile()) {
+          delta -= compactedFile.getReader().length();
+        }
+      }
+    }
+    if (newFiles != null) {
+      for (HStoreFile newFile : newFiles) {
+        if (newFile.isHFile()) {
+          delta += newFile.getReader().length();
+        }
+      }
+    }
+    sizeStore.incrementRegionSize(regionInfo, delta);
+  }
+
+  /**
    * Log a very elaborate compaction completion message.
    * @param cr Request.
    * @param sfs Resulting files.
@@ -2577,18 +2619,23 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
   /**
    * Archives and removes the compacted files
    * @param compactedfiles The compacted files in this store that are not active in reads
-   * @throws IOException
    */
   private void removeCompactedfiles(Collection<HStoreFile> compactedfiles)
       throws IOException {
     final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size());
+    final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size());
     for (final HStoreFile file : compactedfiles) {
       synchronized (file) {
         try {
           StoreFileReader r = file.getReader();
           if (r == null) {
             LOG.debug("The file {} was closed but still not archived", file);
+            // HACK: Temporarily re-open the reader so we can get the size of the file. Ideally,
+            // we should know the size of an HStoreFile without having to ask the HStoreFileReader
+            // for that.
+            long length = getStoreFileSize(file);
             filesToRemove.add(file);
+            storeFileSizes.add(length);
             continue;
           }
 
@@ -2596,9 +2643,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
             // Even if deleting fails we need not bother as any new scanners won't be
             // able to use the compacted file as the status is already compactedAway
             LOG.trace("Closing and archiving the file {}", file);
+            // Copy the file size before closing the reader
+            final long length = r.length();
             r.close(true);
             // Just close and return
             filesToRemove.add(file);
+            // Only add the length if we successfully added the file to `filesToRemove`
+            storeFileSizes.add(length);
           } else {
             LOG.info("Can't archive compacted file " + file.getPath()
                 + " because of either isCompactedAway=" + file.isCompactedAway()
@@ -2626,9 +2677,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
           // FileNotFoundException when we attempt to re-archive them in the next go around.
           Collection<Path> failedFiles = fae.getFailedFiles();
           Iterator<HStoreFile> iter = filesToRemove.iterator();
+          Iterator<Long> sizeIter = storeFileSizes.iterator();
           while (iter.hasNext()) {
+            sizeIter.next();
             if (failedFiles.contains(iter.next().getPath())) {
               iter.remove();
+              sizeIter.remove();
             }
           }
           if (!filesToRemove.isEmpty()) {
@@ -2641,9 +2695,36 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
     if (!filesToRemove.isEmpty()) {
       // Clear the compactedfiles from the store file manager
       clearCompactedfiles(filesToRemove);
+      // Try to send report of this archival to the Master for updating quota usage faster
+      reportArchivedFilesForQuota(filesToRemove, storeFileSizes);
     }
   }
 
+  /**
+   * Computes the length of a store file without succumbing to any errors along the way. If an
+   * error is encountered, the implementation returns {@code 0} instead of the actual size.
+   *
+   * @param file The file to compute the size of.
+   * @return The size in bytes of the provided {@code file}.
+   */
+  long getStoreFileSize(HStoreFile file) {
+    long length = 0;
+    try {
+      file.initReader();
+      length = file.getReader().length();
+    } catch (IOException e) {
+      LOG.trace("Failed to open reader when trying to compute store file size, ignoring", e);
+    } finally {
+      try {
+        file.closeStoreFile(
+            file.getCacheConf() != null ? file.getCacheConf().shouldEvictOnClose() : true);
+      } catch (IOException e) {
+        LOG.trace("Failed to close reader after computing store file size, ignoring", e);
+      }
+    }
+    return length;
+  }
+
   public Long preFlushSeqIDEstimation() {
     return memstore.preFlushSeqIDEstimation();
   }
@@ -2689,4 +2770,30 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
     return maxStoreFileRefCount.isPresent() ? maxStoreFileRefCount.getAsInt() : 0;
   }
 
+  void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, List<Long> fileSizes) {
+    // Sanity check from the caller
+    if (archivedFiles.size() != fileSizes.size()) {
+      throw new RuntimeException("Coding error: should never see lists of varying size");
+    }
+    RegionServerServices rss = this.region.getRegionServerServices();
+    if (rss == null) {
+      return;
+    }
+    List<Entry<String,Long>> filesWithSizes = new ArrayList<>(archivedFiles.size());
+    Iterator<Long> fileSizeIter = fileSizes.iterator();
+    for (StoreFile storeFile : archivedFiles) {
+      final long fileSize = fileSizeIter.next();
+      if (storeFile.isHFile() && fileSize != 0) {
+        filesWithSizes.add(Maps.immutableEntry(storeFile.getPath().getName(), fileSize));
+      }
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Files archived: " + archivedFiles + ", reporting the following to the Master: "
+          + filesWithSizes);
+    }
+    boolean success = rss.reportFileArchivalForQuotas(getTableName(), filesWithSizes);
+    if (!success) {
+      LOG.warn("Failed to report archival of files: " + filesWithSizes);
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
index 9bf2b32..808fc58 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
@@ -49,6 +49,7 @@ public class MetricsRegionServer {
   private RegionServerTableMetrics tableMetrics;
   private final MetricsTable metricsTable;
   private final MetricsUserAggregate userAggregate;
+  private MetricsRegionServerQuotaSource quotaSource;
 
   private MetricRegistry metricRegistry;
   private Timer bulkLoadTimer;
@@ -71,6 +72,7 @@ public class MetricsRegionServer {
 
     serverReadQueryMeter = metricRegistry.meter("ServerReadQueryPerSecond");
     serverWriteQueryMeter = metricRegistry.meter("ServerWriteQueryPerSecond");
+    quotaSource = CompatibilitySingletonFactory.getInstance(MetricsRegionServerQuotaSource.class);
   }
 
   MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper,
@@ -277,4 +279,18 @@ public class MetricsRegionServer {
     }
     this.serverWriteQueryMeter.mark();
   }
+
+  /**
+   * @see MetricsRegionServerQuotaSource#incrementNumRegionSizeReportsSent(long)
+   */
+  public void incrementNumRegionSizeReportsSent(long numReportsSent) {
+    quotaSource.incrementNumRegionSizeReportsSent(numReportsSent);
+  }
+
+  /**
+   * @see MetricsRegionServerQuotaSource#incrementRegionSizeReportingChoreTime(long)
+   */
+  public void incrementRegionSizeReportingChoreTime(long time) {
+    quotaSource.incrementRegionSizeReportingChoreTime(time);
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 32d75f4..8730c33 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2375,9 +2375,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       requestCount.increment();
       HRegion region = getRegion(request.getRegion());
       Map<byte[], List<Path>> map = null;
+      final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration());
+      long sizeToBeLoaded = -1;
 
       // Check to see if this bulk load would exceed the space quota for this table
-      if (QuotaUtil.isQuotaEnabled(getConfiguration())) {
+      if (spaceQuotaEnabled) {
         ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements();
         SpaceViolationPolicyEnforcement enforcement = activeSpaceQuotas.getPolicyEnforcement(
             region);
@@ -2388,7 +2390,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
             filePaths.add(familyPath.getPath());
           }
           // Check if the batch of files exceeds the current quota
-          enforcement.checkBulkLoad(regionServer.getFileSystem(), filePaths);
+          sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths);
         }
       }
 
@@ -2414,6 +2416,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       }
       BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
       builder.setLoaded(map != null);
+      if (map != null) {
+        // Treat any negative size as a flag to "ignore" updating the region size as that is
+        // not possible to occur in real life (cannot bulk load a file with negative size)
+        if (spaceQuotaEnabled && sizeToBeLoaded > 0) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Incrementing space use of " + region.getRegionInfo() + " by "
+                + sizeToBeLoaded + " bytes");
+          }
+          // Inform space quotas of the new files for this region
+          getSpaceQuotaManager().getRegionSizeStore().incrementRegionSize(
+              region.getRegionInfo(), sizeToBeLoaded);
+        }
+      }
       return builder.build();
     } catch (IOException ie) {
       throw new ServiceException(ie);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index f78a906..9d6fefe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -20,12 +20,15 @@ package org.apache.hadoop.hbase.regionserver;
 
 import com.google.protobuf.Service;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.locking.EntityLock;
 import org.apache.hadoop.hbase.executor.ExecutorService;
@@ -34,6 +37,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.mob.MobFileCache;
 import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionSizeStore;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.access.AccessChecker;
@@ -288,4 +292,23 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo
    * @return {@link ZKPermissionWatcher}
    */
   ZKPermissionWatcher getZKPermissionWatcher();
-}
\ No newline at end of file
+
+  /**
+   * Reports the provided Region sizes hosted by this RegionServer to the active Master.
+   *
+   * @param sizeStore The sizes for Regions locally hosted.
+   * @return {@code false} if reporting should be temporarily paused, {@code true} otherwise.
+   */
+  boolean reportRegionSizesForQuotas(RegionSizeStore sizeStore);
+
+  /**
+   * Reports a collection of files, and their sizes, that belonged to the given {@code table} were
+   * just moved to the archive directory.
+   *
+   * @param tableName The name of the table that files previously belonged to
+   * @param archivedFiles Files and their sizes that were moved to archive
+   * @return {@code true} if the files were successfully reported, {@code false} otherwise.
+   */
+  boolean reportFileArchivalForQuotas(
+      TableName tableName, Collection<Entry<String,Long>> archivedFiles);
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index d2bbf22..708e8bc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -20,10 +20,12 @@ package org.apache.hadoop.hbase;
 import com.google.protobuf.Service;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.mob.MobFileCache;
 import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionSizeStore;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
@@ -360,4 +363,15 @@ public class MockRegionServerServices implements RegionServerServices {
   public ZKPermissionWatcher getZKPermissionWatcher() {
     return null;
   }
+
+  @Override
+  public boolean reportRegionSizesForQuotas(RegionSizeStore sizeStore) {
+    return true;
+  }
+
+  @Override
+  public boolean reportFileArchivalForQuotas(
+      TableName tableName, Collection<Entry<String,Long>> archivedFiles) {
+    return true;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 3604ba5..7f85488 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -20,10 +20,12 @@ package org.apache.hadoop.hbase.master;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Random;
 import java.util.TreeMap;
@@ -53,6 +55,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.mob.MobFileCache;
 import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionSizeStore;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
@@ -715,4 +718,15 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
   public ZKPermissionWatcher getZKPermissionWatcher() {
     return null;
   }
+
+  @Override
+  public boolean reportRegionSizesForQuotas(RegionSizeStore sizeStore) {
+    return true;
+  }
+
+  @Override
+  public boolean reportFileArchivalForQuotas(
+      TableName tableName, Collection<Entry<String, Long>> archivedFiles) {
+    return false;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java
index 0bf383d..93367b8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java
@@ -34,9 +34,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
@@ -110,6 +108,8 @@ public class SpaceQuotaHelperForTests {
     conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_PERIOD_KEY, 1000);
     conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_DELAY_KEY, 1000);
     conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, 1000);
+    conf.setInt(RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_PERIOD_KEY, 1000);
+    conf.setInt(RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_DELAY_KEY, 1000);
     // The period at which we check for compacted files that should be deleted from HDFS
     conf.setInt("hbase.hfile.compaction.discharger.interval", 5 * 1000);
     conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
@@ -326,6 +326,44 @@ public class SpaceQuotaHelperForTests {
   }
 
   /**
+   * Bulk-loads a number of files with a number of rows to the given table.
+   */
+//  ClientServiceCallable<Boolean> generateFileToLoad(
+//      TableName tn, int numFiles, int numRowsPerFile) throws Exception {
+//    Connection conn = testUtil.getConnection();
+//    FileSystem fs = testUtil.getTestFileSystem();
+//    Configuration conf = testUtil.getConfiguration();
+//    Path baseDir = new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files");
+//    fs.mkdirs(baseDir);
+//    final List<Pair<byte[], String>> famPaths = new ArrayList<>();
+//    for (int i = 1; i <= numFiles; i++) {
+//      Path hfile = new Path(baseDir, "file" + i);
+//      TestHRegionServerBulkLoad.createHFile(
+//          fs, hfile, Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("my"),
+//          Bytes.toBytes("file"), numRowsPerFile);
+//      famPaths.add(new Pair<>(Bytes.toBytes(SpaceQuotaHelperForTests.F1), hfile.toString()));
+//    }
+//
+//    // bulk load HFiles
+//    Table table = conn.getTable(tn);
+//    final String bulkToken = new SecureBulkLoadClient(conf, table).prepareBulkLoad(conn);
+//    return new ClientServiceCallable<Boolean>(
+//        conn, tn, Bytes.toBytes("row"), new RpcControllerFactory(conf).newController(),
+//        HConstants.PRIORITY_UNSET) {
+//      @Override
+//     public Boolean rpcCall() throws Exception {
+//        SecureBulkLoadClient secureClient = null;
+//        byte[] regionName = getLocation().getRegion().getRegionName();
+//        try (Table table = conn.getTable(getTableName())) {
+//          secureClient = new SecureBulkLoadClient(conf, table);
+//          return secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+//                true, null, bulkToken);
+//        }
+//      }
+//    };
+//  }
+
+  /**
    * Removes all quotas defined in the HBase quota table.
    */
   void removeAllQuotas() throws Exception {
@@ -383,14 +421,14 @@ public class SpaceQuotaHelperForTests {
   /**
    * Waits 30seconds for the HBase quota table to exist.
    */
-  void waitForQuotaTable(Connection conn) throws IOException {
+  public void waitForQuotaTable(Connection conn) throws IOException {
     waitForQuotaTable(conn, 30_000);
   }
 
   /**
    * Waits {@code timeout} milliseconds for the HBase quota table to exist.
    */
-  void waitForQuotaTable(Connection conn, long timeout) throws IOException {
+  public void waitForQuotaTable(Connection conn, long timeout) throws IOException {
     testUtil.waitFor(timeout, 1000, new Predicate<IOException>() {
       @Override
       public boolean evaluate() throws IOException {
@@ -569,8 +607,8 @@ public class SpaceQuotaHelperForTests {
     }
 
     // Create the table
-    HTableDescriptor tableDesc = new HTableDescriptor(tn);
-    tableDesc.addFamily(new HColumnDescriptor(F1));
+    TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tn)
+        .addColumnFamily(ColumnFamilyDescriptorBuilder.of(F1)).build();
 
     admin.createTable(tableDesc);
     return tn;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileArchiverNotifierImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileArchiverNotifierImpl.java
new file mode 100644
index 0000000..e139e4f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileArchiverNotifierImpl.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.SnapshotDescription;
+import org.apache.hadoop.hbase.client.SnapshotType;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.quotas.FileArchiverNotifierImpl.SnapshotWithSize;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.FamilyFiles;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.StoreFile;
+
+/**
+ * Test class for {@link FileArchiverNotifierImpl}.
+ */
+@Category(MediumTests.class)
+public class TestFileArchiverNotifierImpl {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestFileArchiverNotifierImpl.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final AtomicLong COUNTER = new AtomicLong();
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private Connection conn;
+  private Admin admin;
+  private SpaceQuotaHelperForTests helper;
+  private FileSystem fs;
+  private Configuration conf;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
+    // Clean up the compacted files faster than normal (15s instead of 2mins)
+    conf.setInt("hbase.hfile.compaction.discharger.interval", 15 * 1000);
+    // Prevent the SnapshotQuotaObserverChore from running
+    conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_DELAY_KEY, 60 * 60 * 1000);
+    conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, 60 * 60 * 1000);
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    conn = TEST_UTIL.getConnection();
+    admin = TEST_UTIL.getAdmin();
+    helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER);
+    helper.removeAllQuotas(conn);
+    fs = TEST_UTIL.getTestFileSystem();
+    conf = TEST_UTIL.getConfiguration();
+  }
+
+  @Test
+  public void testSnapshotSizePersistence() throws IOException {
+    final Admin admin = TEST_UTIL.getAdmin();
+    final TableName tn = TableName.valueOf(testName.getMethodName());
+    if (admin.tableExists(tn)) {
+      admin.disableTable(tn);
+      admin.deleteTable(tn);
+    }
+    TableDescriptor desc = TableDescriptorBuilder.newBuilder(tn).addColumnFamily(
+        ColumnFamilyDescriptorBuilder.of(QuotaTableUtil.QUOTA_FAMILY_USAGE)).build();
+    admin.createTable(desc);
+
+    FileArchiverNotifierImpl notifier = new FileArchiverNotifierImpl(conn, conf, fs, tn);
+    List<SnapshotWithSize> snapshotsWithSizes = new ArrayList<>();
+    try (Table table = conn.getTable(tn)) {
+      // Writing no values will result in no records written.
+      verify(table, () -> {
+        notifier.persistSnapshotSizes(table, snapshotsWithSizes);
+        assertEquals(0, count(table));
+      });
+
+      verify(table, () -> {
+        snapshotsWithSizes.add(new SnapshotWithSize("ss1", 1024L));
+        snapshotsWithSizes.add(new SnapshotWithSize("ss2", 4096L));
+        notifier.persistSnapshotSizes(table, snapshotsWithSizes);
+        assertEquals(2, count(table));
+        assertEquals(1024L, extractSnapshotSize(table, tn, "ss1"));
+        assertEquals(4096L, extractSnapshotSize(table, tn, "ss2"));
+      });
+    }
+  }
+
+  @Test
+  public void testIncrementalFileArchiving() throws Exception {
+    final Admin admin = TEST_UTIL.getAdmin();
+    final TableName tn = TableName.valueOf(testName.getMethodName());
+    if (admin.tableExists(tn)) {
+      admin.disableTable(tn);
+      admin.deleteTable(tn);
+    }
+    final Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME);
+    final TableName tn1 = helper.createTableWithRegions(1);
+    admin.setQuota(QuotaSettingsFactory.limitTableSpace(
+        tn1, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS));
+
+    // Write some data and flush it
+    helper.writeData(tn1, 256L * SpaceQuotaHelperForTests.ONE_KILOBYTE);
+    admin.flush(tn1);
+
+    // Create a snapshot on the table
+    final String snapshotName1 = tn1 + "snapshot1";
+    admin.snapshot(new SnapshotDescription(snapshotName1, tn1, SnapshotType.SKIPFLUSH));
+
+    FileArchiverNotifierImpl notifier = new FileArchiverNotifierImpl(conn, conf, fs, tn);
+    long t1 = notifier.getLastFullCompute();
+    long snapshotSize = notifier.computeAndStoreSnapshotSizes(Arrays.asList(snapshotName1));
+    assertEquals("The size of the snapshots should be zero", 0, snapshotSize);
+    assertTrue("Last compute time was not less than current compute time",
+        t1 < notifier.getLastFullCompute());
+
+    // No recently archived files and the snapshot should have no size
+    assertEquals(0, extractSnapshotSize(quotaTable, tn, snapshotName1));
+
+    // Invoke the addArchivedFiles method with no files
+    notifier.addArchivedFiles(Collections.emptySet());
+
+    // The size should not have changed
+    assertEquals(0, extractSnapshotSize(quotaTable, tn, snapshotName1));
+
+    notifier.addArchivedFiles(ImmutableSet.of(entry("a", 1024L), entry("b", 1024L)));
+
+    // The size should not have changed
+    assertEquals(0, extractSnapshotSize(quotaTable, tn, snapshotName1));
+
+    // Pull one file referenced by the snapshot out of the manifest
+    Set<String> referencedFiles = getFilesReferencedBySnapshot(snapshotName1);
+    assertTrue("Found snapshot referenced files: " + referencedFiles, referencedFiles.size() >= 1);
+    String referencedFile = Iterables.getFirst(referencedFiles, null);
+    assertNotNull(referencedFile);
+
+    // Report that a file this snapshot referenced was moved to the archive. This is a sign
+    // that the snapshot should now "own" the size of this file
+    final long fakeFileSize = 2048L;
+    notifier.addArchivedFiles(ImmutableSet.of(entry(referencedFile, fakeFileSize)));
+
+    // Verify that the snapshot owns this file.
+    assertEquals(fakeFileSize, extractSnapshotSize(quotaTable, tn, snapshotName1));
+
+    // In reality, we did not actually move the file, so a "full" computation should re-set the
+    // size of the snapshot back to 0.
+    long t2 = notifier.getLastFullCompute();
+    snapshotSize = notifier.computeAndStoreSnapshotSizes(Arrays.asList(snapshotName1));
+    assertEquals(0, snapshotSize);
+    assertEquals(0, extractSnapshotSize(quotaTable, tn, snapshotName1));
+    // We should also have no recently archived files after a re-computation
+    assertTrue("Last compute time was not less than current compute time",
+        t2 < notifier.getLastFullCompute());
+  }
+
+  @Test
+  public void testParseOldNamespaceSnapshotSize() throws Exception {
+    final Admin admin = TEST_UTIL.getAdmin();
+    final TableName fakeQuotaTableName = TableName.valueOf(testName.getMethodName());
+    final TableName tn = TableName.valueOf(testName.getMethodName() + "1");
+    if (admin.tableExists(fakeQuotaTableName)) {
+      admin.disableTable(fakeQuotaTableName);
+      admin.deleteTable(fakeQuotaTableName);
+    }
+    TableDescriptor desc = TableDescriptorBuilder.newBuilder(fakeQuotaTableName).addColumnFamily(
+        ColumnFamilyDescriptorBuilder.of(QuotaTableUtil.QUOTA_FAMILY_USAGE))
+        .addColumnFamily(ColumnFamilyDescriptorBuilder.of(QuotaUtil.QUOTA_FAMILY_INFO)).build();
+    admin.createTable(desc);
+
+    final String ns = "";
+    try (Table fakeQuotaTable = conn.getTable(fakeQuotaTableName)) {
+      FileArchiverNotifierImpl notifier = new FileArchiverNotifierImpl(conn, conf, fs, tn);
+      // Verify no record is treated as zero
+      assertEquals(0, notifier.getPreviousNamespaceSnapshotSize(fakeQuotaTable, ns));
+
+      // Set an explicit value of zero
+      fakeQuotaTable.put(QuotaTableUtil.createPutForNamespaceSnapshotSize(ns, 0L));
+      assertEquals(0, notifier.getPreviousNamespaceSnapshotSize(fakeQuotaTable, ns));
+
+      // Set a non-zero value
+      fakeQuotaTable.put(QuotaTableUtil.createPutForNamespaceSnapshotSize(ns, 1024L));
+      assertEquals(1024L, notifier.getPreviousNamespaceSnapshotSize(fakeQuotaTable, ns));
+    }
+  }
+
+  private long count(Table t) throws IOException {
+    try (ResultScanner rs = t.getScanner(new Scan())) {
+      long sum = 0;
+      for (Result r : rs) {
+        while (r.advance()) {
+          sum++;
+        }
+      }
+      return sum;
+    }
+  }
+
+  private long extractSnapshotSize(
+      Table quotaTable, TableName tn, String snapshot) throws IOException {
+    Get g = QuotaTableUtil.makeGetForSnapshotSize(tn, snapshot);
+    Result r = quotaTable.get(g);
+    assertNotNull(r);
+    CellScanner cs = r.cellScanner();
+    assertTrue(cs.advance());
+    Cell c = cs.current();
+    assertNotNull(c);
+    return QuotaTableUtil.extractSnapshotSize(
+        c.getValueArray(), c.getValueOffset(), c.getValueLength());
+  }
+
+  private void verify(Table t, IOThrowingRunnable test) throws IOException {
+    admin.disableTable(t.getName());
+    admin.truncateTable(t.getName(), false);
+    test.run();
+  }
+
+  @FunctionalInterface
+  private interface IOThrowingRunnable {
+    void run() throws IOException;
+  }
+
+  private Set<String> getFilesReferencedBySnapshot(String snapshotName) throws IOException {
+    HashSet<String> files = new HashSet<>();
+    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(
+        snapshotName, FSUtils.getRootDir(conf));
+    SnapshotProtos.SnapshotDescription sd = SnapshotDescriptionUtils.readSnapshotInfo(
+        fs, snapshotDir);
+    SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, sd);
+    // For each region referenced by the snapshot
+    for (SnapshotRegionManifest rm : manifest.getRegionManifests()) {
+      // For each column family in this region
+      for (FamilyFiles ff : rm.getFamilyFilesList()) {
+        // And each store file in that family
+        for (StoreFile sf : ff.getStoreFilesList()) {
+          files.add(sf.getName());
+        }
+      }
+    }
+    return files;
+  }
+
+  private <K,V> Entry<K,V> entry(K k, V v) {
+    return Maps.immutableEntry(k, v);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java
index bc2ac78..38d98e4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFileSystemUtilizationChore.java
@@ -57,7 +57,6 @@ public class TestFileSystemUtilizationChore {
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestFileSystemUtilizationChore.class);
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testNoOnlineRegions() {
     // One region with a store size of one.
@@ -67,14 +66,13 @@ public class TestFileSystemUtilizationChore {
     final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
     doAnswer(new ExpectedRegionSizeSummationAnswer(sum(regionSizes)))
         .when(rs)
-        .reportRegionSizesForQuotas((Map<RegionInfo,Long>) any());
+        .reportRegionSizesForQuotas(any(RegionSizeStore.class));
 
     final Region region = mockRegionWithSize(regionSizes);
     Mockito.doReturn(Arrays.asList(region)).when(rs).getRegions();
     chore.chore();
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testRegionSizes() {
     // One region with a store size of one.
@@ -84,14 +82,13 @@ public class TestFileSystemUtilizationChore {
     final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
     doAnswer(new ExpectedRegionSizeSummationAnswer(sum(regionSizes)))
         .when(rs)
-        .reportRegionSizesForQuotas((Map<RegionInfo,Long>) any());
+        .reportRegionSizesForQuotas(any(RegionSizeStore.class));
 
     final Region region = mockRegionWithSize(regionSizes);
     Mockito.doReturn(Arrays.asList(region)).when(rs).getRegions();
     chore.chore();
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testMultipleRegionSizes() {
     final Configuration conf = getDefaultHBaseConfiguration();
@@ -108,7 +105,7 @@ public class TestFileSystemUtilizationChore {
     final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
     doAnswer(new ExpectedRegionSizeSummationAnswer(sum(Arrays.asList(r1Sum, r2Sum, r3Sum))))
         .when(rs)
-        .reportRegionSizesForQuotas((Map<RegionInfo,Long>) any());
+        .reportRegionSizesForQuotas(any(RegionSizeStore.class));
 
     final Region r1 = mockRegionWithSize(r1Sizes);
     final Region r2 = mockRegionWithSize(r2Sizes);
@@ -151,7 +148,6 @@ public class TestFileSystemUtilizationChore {
     assertEquals(timeUnit, chore.getTimeUnit());
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testProcessingLeftoverRegions() {
     final Configuration conf = getDefaultHBaseConfiguration();
@@ -173,7 +169,7 @@ public class TestFileSystemUtilizationChore {
     };
     doAnswer(new ExpectedRegionSizeSummationAnswer(sum(Arrays.asList(leftover1Sum, leftover2Sum))))
         .when(rs)
-        .reportRegionSizesForQuotas((Map<RegionInfo,Long>) any());
+        .reportRegionSizesForQuotas(any(RegionSizeStore.class));
 
     // We shouldn't compute all of these region sizes, just the leftovers
     final Region r1 = mockRegionWithSize(Arrays.asList(1024L, 2048L));
@@ -184,7 +180,6 @@ public class TestFileSystemUtilizationChore {
     chore.chore();
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testProcessingNowOfflineLeftoversAreIgnored() {
     final Configuration conf = getDefaultHBaseConfiguration();
@@ -205,7 +200,7 @@ public class TestFileSystemUtilizationChore {
     };
     doAnswer(new ExpectedRegionSizeSummationAnswer(sum(Arrays.asList(leftover1Sum))))
         .when(rs)
-        .reportRegionSizesForQuotas((Map<RegionInfo,Long>) any());
+        .reportRegionSizesForQuotas(any(RegionSizeStore.class));
 
     // We shouldn't compute all of these region sizes, just the leftovers
     final Region r1 = mockRegionWithSize(Arrays.asList(1024L, 2048L));
@@ -217,7 +212,6 @@ public class TestFileSystemUtilizationChore {
     chore.chore();
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testIgnoreSplitParents() {
     final Configuration conf = getDefaultHBaseConfiguration();
@@ -231,7 +225,7 @@ public class TestFileSystemUtilizationChore {
     final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
     doAnswer(new ExpectedRegionSizeSummationAnswer(sum(Arrays.asList(r1Sum))))
         .when(rs)
-        .reportRegionSizesForQuotas((Map<RegionInfo,Long>) any());
+        .reportRegionSizesForQuotas(any(RegionSizeStore.class));
 
     final Region r1 = mockRegionWithSize(r1Sizes);
     final Region r2 = mockSplitParentRegionWithSize(r2Sizes);
@@ -239,7 +233,6 @@ public class TestFileSystemUtilizationChore {
     chore.chore();
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testIgnoreRegionReplicas() {
     final Configuration conf = getDefaultHBaseConfiguration();
@@ -253,7 +246,7 @@ public class TestFileSystemUtilizationChore {
     final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
     doAnswer(new ExpectedRegionSizeSummationAnswer(r1Sum))
         .when(rs)
-        .reportRegionSizesForQuotas((Map<RegionInfo,Long>) any());
+        .reportRegionSizesForQuotas(any(RegionSizeStore.class));
 
     final Region r1 = mockRegionWithSize(r1Sizes);
     final Region r2 = mockRegionReplicaWithSize(r2Sizes);
@@ -261,7 +254,6 @@ public class TestFileSystemUtilizationChore {
     chore.chore();
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testNonHFilesAreIgnored() {
     final Configuration conf = getDefaultHBaseConfiguration();
@@ -280,7 +272,7 @@ public class TestFileSystemUtilizationChore {
     final FileSystemUtilizationChore chore = new FileSystemUtilizationChore(rs);
     doAnswer(new ExpectedRegionSizeSummationAnswer(
         sum(Arrays.asList(r1HFileSizeSum, r2HFileSizeSum))))
-        .when(rs).reportRegionSizesForQuotas((Map<RegionInfo,Long>) any());
+        .when(rs).reportRegionSizesForQuotas(any(RegionSizeStore.class));
 
     final Region r1 = mockRegionWithHFileLinks(r1StoreFileSizes, r1HFileSizes);
     final Region r2 = mockRegionWithHFileLinks(r2StoreFileSizes, r2HFileSizes);
@@ -302,7 +294,10 @@ public class TestFileSystemUtilizationChore {
    */
   private HRegionServer mockRegionServer(Configuration conf) {
     final HRegionServer rs = mock(HRegionServer.class);
+    final RegionServerSpaceQuotaManager quotaManager = mock(RegionServerSpaceQuotaManager.class);
     when(rs.getConfiguration()).thenReturn(conf);
+    when(rs.getRegionServerSpaceQuotaManager()).thenReturn(quotaManager);
+    when(quotaManager.getRegionSizeStore()).thenReturn(new RegionSizeStoreImpl());
     return rs;
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestLowLatencySpaceQuotas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestLowLatencySpaceQuotas.java
new file mode 100644
index 0000000..7fad94a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestLowLatencySpaceQuotas.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ClientServiceCallable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RpcRetryingCaller;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.client.SnapshotType;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.SpaceQuotaSnapshotPredicate;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+
+@Category({MediumTests.class})
+public class TestLowLatencySpaceQuotas {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestLowLatencySpaceQuotas.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  // Global for all tests in the class
+  private static final AtomicLong COUNTER = new AtomicLong(0);
+
+  @Rule
+  public TestName testName = new TestName();
+  private SpaceQuotaHelperForTests helper;
+  private Connection conn;
+  private Admin admin;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    // The default 1s period for QuotaObserverChore is good.
+    SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
+    // Set the period/delay to read region size from HDFS to be very long
+    conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000 * 120);
+    conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000 * 120);
+    // Set the same long period/delay to compute snapshot sizes
+    conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, 1000 * 120);
+    conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_DELAY_KEY, 1000 * 120);
+    // Clean up the compacted files faster than normal (5s instead of 2mins)
+    conf.setInt("hbase.hfile.compaction.discharger.interval", 5 * 1000);
+
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void removeAllQuotas() throws Exception {
+    helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER);
+    conn = TEST_UTIL.getConnection();
+    admin = TEST_UTIL.getAdmin();
+    helper.waitForQuotaTable(conn);
+  }
+
+  @Test
+  public void testFlushes() throws Exception {
+    TableName tn = helper.createTableWithRegions(1);
+    // Set a quota
+    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
+        tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
+    admin.setQuota(settings);
+
+    // Write some data
+    final long initialSize = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
+    helper.writeData(tn, initialSize);
+
+    // Make sure a flush happened
+    admin.flush(tn);
+
+    // We should be able to observe the system recording an increase in size (even
+    // though we know the filesystem scanning did not happen).
+    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
+      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
+        return snapshot.getUsage() >= initialSize;
+      }
+    });
+  }
+
+  @Test
+  public void testMajorCompaction() throws Exception {
+    TableName tn = helper.createTableWithRegions(1);
+    // Set a quota
+    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
+        tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
+    admin.setQuota(settings);
+
+    // Write some data and flush it to disk.
+    final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
+    helper.writeData(tn, sizePerBatch);
+    admin.flush(tn);
+
+    // Write the same data again, flushing it to a second file
+    helper.writeData(tn, sizePerBatch);
+    admin.flush(tn);
+
+    // After two flushes, both hfiles would contain similar data. We should see 2x the data.
+    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
+      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
+        return snapshot.getUsage() >= 2L * sizePerBatch;
+      }
+    });
+
+    // Rewrite the two files into one.
+    admin.majorCompact(tn);
+
+    // After we major compact the table, we should notice quickly that the amount of data in the
+    // table is much closer to reality (the duplicate entries across the two files are removed).
+    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
+      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
+        return snapshot.getUsage() >= sizePerBatch && snapshot.getUsage() <= 2L * sizePerBatch;
+      }
+    });
+  }
+
+  @Test
+  public void testMinorCompaction() throws Exception {
+    TableName tn = helper.createTableWithRegions(1);
+    // Set a quota
+    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
+        tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
+    admin.setQuota(settings);
+
+    // Write some data and flush it to disk.
+    final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
+    final long numBatches = 6;
+    for (long i = 0; i < numBatches; i++) {
+      helper.writeData(tn, sizePerBatch);
+      admin.flush(tn);
+    }
+
+    HRegion region = Iterables.getOnlyElement(TEST_UTIL.getHBaseCluster().getRegions(tn));
+    long numFiles = getNumHFilesForRegion(region);
+    assertEquals(numBatches, numFiles);
+
+    // After two flushes, both hfiles would contain similar data. We should see 2x the data.
+    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
+      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
+        return snapshot.getUsage() >= numFiles * sizePerBatch;
+      }
+    });
+
+    // Rewrite some files into fewer
+    TEST_UTIL.compact(tn, false);
+    long numFilesAfterMinorCompaction = getNumHFilesForRegion(region);
+
+    // After we major compact the table, we should notice quickly that the amount of data in the
+    // table is much closer to reality (the duplicate entries across the two files are removed).
+    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
+      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
+        return snapshot.getUsage() >= numFilesAfterMinorCompaction * sizePerBatch &&
+            snapshot.getUsage() <= (numFilesAfterMinorCompaction + 1) * sizePerBatch;
+      }
+    });
+  }
+
+  private long getNumHFilesForRegion(HRegion region) {
+    return region.getStores().stream().mapToLong((s) -> s.getNumHFiles()).sum();
+  }
+
+  @Test
+  public void testBulkLoading() throws Exception {
+    TableName tn = helper.createTableWithRegions(1);
+    // Set a quota
+    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
+        tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
+    admin.setQuota(settings);
+
+    ClientServiceCallable<Void> callable = helper.generateFileToLoad(tn, 3, 550);
+    // Make sure the files are about as long as we expect
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    FileStatus[] files = fs.listStatus(
+        new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files"));
+    long totalSize = 0;
+    for (FileStatus file : files) {
+      assertTrue(
+          "Expected the file, " + file.getPath() + ",  length to be larger than 25KB, but was "
+              + file.getLen(),
+          file.getLen() > 25 * SpaceQuotaHelperForTests.ONE_KILOBYTE);
+      totalSize += file.getLen();
+    }
+
+    RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
+    RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
+    caller.callWithRetries(callable, Integer.MAX_VALUE);
+
+    final long finalTotalSize = totalSize;
+    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
+      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
+        return snapshot.getUsage() >= finalTotalSize;
+      }
+    });
+  }
+
+  @Test
+  public void testSnapshotSizes() throws Exception {
+    TableName tn = helper.createTableWithRegions(1);
+    // Set a quota
+    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
+        tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
+    admin.setQuota(settings);
+
+    // Write some data and flush it to disk.
+    final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
+    helper.writeData(tn, sizePerBatch);
+    admin.flush(tn);
+
+    final String snapshot1 = "snapshot1";
+    admin.snapshot(snapshot1, tn, SnapshotType.SKIPFLUSH);
+
+    // Compute the size of the file for the Region we'll send to archive
+    Region region = Iterables.getOnlyElement(TEST_UTIL.getHBaseCluster().getRegions(tn));
+    List<? extends Store> stores = region.getStores();
+    long summer = 0;
+    for (Store store : stores) {
+      summer += store.getStorefilesSize();
+    }
+    final long storeFileSize = summer;
+
+    // Wait for the table to show the usage
+    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
+      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
+        return snapshot.getUsage() == storeFileSize;
+      }
+    });
+
+    // Spoof a "full" computation of snapshot size. Normally the chore handles this, but we want
+    // to test in the absence of this chore.
+    FileArchiverNotifier notifier = TEST_UTIL.getHBaseCluster().getMaster()
+        .getSnapshotQuotaObserverChore().getNotifierForTable(tn);
+    notifier.computeAndStoreSnapshotSizes(Collections.singletonList(snapshot1));
+
+    // Force a major compaction to create a new file and push the old file to the archive
+    TEST_UTIL.compact(tn, true);
+
+    // After moving the old file to archive/, the space of this table should double
+    // We have a new file created by the majc referenced by the table and the snapshot still
+    // referencing the old file.
+    TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
+      @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
+        return snapshot.getUsage() >= 2 * storeFileSize;
+      }
+    });
+
+    try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
+      Result r = quotaTable.get(QuotaTableUtil.makeGetForSnapshotSize(tn, snapshot1));
+      assertTrue("Expected a non-null, non-empty Result", r != null && !r.isEmpty());
+      assertTrue(r.advance());
+      assertEquals("The snapshot's size should be the same as the origin store file",
+          storeFileSize, QuotaTableUtil.parseSnapshotSize(r.current()));
+
+      r = quotaTable.get(QuotaTableUtil.createGetNamespaceSnapshotSize(tn.getNamespaceAsString()));
+      assertTrue("Expected a non-null, non-empty Result", r != null && !r.isEmpty());
+      assertTrue(r.advance());
+      assertEquals("The snapshot's size should be the same as the origin store file",
+          storeFileSize, QuotaTableUtil.parseSnapshotSize(r.current()));
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreRegionReports.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreRegionReports.java
index debd54c..7391fa1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreRegionReports.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreRegionReports.java
@@ -29,17 +29,18 @@ import java.util.Map.Entry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -86,8 +87,8 @@ public class TestQuotaObserverChoreRegionReports {
   @Test
   public void testReportExpiration() throws Exception {
     Configuration conf = TEST_UTIL.getConfiguration();
-    // Send reports every 30 seconds
-    conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 25000);
+    // Send reports every 25 seconds
+    conf.setInt(RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_PERIOD_KEY, 25000);
     // Expire the reports after 5 seconds
     conf.setInt(QuotaObserverChore.REGION_REPORT_RETENTION_DURATION_KEY, 5000);
     TEST_UTIL.startMiniCluster(1);
@@ -103,8 +104,8 @@ public class TestQuotaObserverChoreRegionReports {
 
     // Create a table
     final TableName tn = TableName.valueOf("reportExpiration");
-    HTableDescriptor tableDesc = new HTableDescriptor(tn);
-    tableDesc.addFamily(new HColumnDescriptor(FAM1));
+    TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tn).addColumnFamily(
+        ColumnFamilyDescriptorBuilder.of(FAM1)).build();
     TEST_UTIL.getAdmin().createTable(tableDesc);
 
     // No reports right after we created this table.
@@ -148,8 +149,8 @@ public class TestQuotaObserverChoreRegionReports {
 
     // Create a table
     final TableName tn = TableName.valueOf("quotaAcceptanceWithoutReports");
-    HTableDescriptor tableDesc = new HTableDescriptor(tn);
-    tableDesc.addFamily(new HColumnDescriptor(FAM1));
+    TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tn).addColumnFamily(
+        ColumnFamilyDescriptorBuilder.of(FAM1)).build();
     TEST_UTIL.getAdmin().createTable(tableDesc);
 
     // Set a quota
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionSizeImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionSizeImpl.java
new file mode 100644
index 0000000..9217762
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionSizeImpl.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({SmallTests.class})
+public class TestRegionSizeImpl {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestRegionSizeImpl.class);
+
+  @Test
+  public void testReportingWithSizeChanges() {
+    long currentSize = 1024L;
+    RegionSizeImpl size = new RegionSizeImpl(currentSize);
+
+    assertEquals(currentSize, size.getSize());
+
+    currentSize *= 2L;
+    size.setSize(currentSize);
+    assertEquals(currentSize, size.getSize());
+
+    long delta = 512L;
+    currentSize += delta;
+    size.incrementSize(delta);
+    assertEquals(currentSize, size.getSize());
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionSizeReportingChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionSizeReportingChore.java
new file mode 100644
index 0000000..6541cdc
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionSizeReportingChore.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({SmallTests.class})
+public class TestRegionSizeReportingChore {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestRegionSizeReportingChore.class);
+
+  @Test
+  public void testDefaultConfigurationProperties() {
+    final Configuration conf = getDefaultHBaseConfiguration();
+    final HRegionServer rs = mockRegionServer(conf);
+    RegionSizeReportingChore chore = new RegionSizeReportingChore(rs);
+    assertEquals(
+        RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_DELAY_DEFAULT,
+        chore.getInitialDelay());
+    assertEquals(
+        RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_PERIOD_DEFAULT, chore.getPeriod());
+    assertEquals(
+        TimeUnit.valueOf(RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_TIMEUNIT_DEFAULT),
+        chore.getTimeUnit());
+  }
+
+  @Test
+  public void testNonDefaultConfigurationProperties() {
+    final Configuration conf = getDefaultHBaseConfiguration();
+    final HRegionServer rs = mockRegionServer(conf);
+    final int period = RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_PERIOD_DEFAULT + 1;
+    final long delay = RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_DELAY_DEFAULT + 1L;
+    final String timeUnit = TimeUnit.SECONDS.name();
+    conf.setInt(RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_PERIOD_KEY, period);
+    conf.setLong(RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_DELAY_KEY, delay);
+    conf.set(RegionSizeReportingChore.REGION_SIZE_REPORTING_CHORE_TIMEUNIT_KEY, timeUnit);
+    RegionSizeReportingChore chore = new RegionSizeReportingChore(rs);
+    assertEquals(delay, chore.getInitialDelay());
+    assertEquals(period, chore.getPeriod());
+    assertEquals(TimeUnit.valueOf(timeUnit), chore.getTimeUnit());
+  }
+
+  @Test
+  public void testRemovableOfNonOnlineRegions() {
+    final Configuration conf = getDefaultHBaseConfiguration();
+    final HRegionServer rs = mockRegionServer(conf);
+    RegionSizeReportingChore chore = new RegionSizeReportingChore(rs);
+
+    RegionInfo infoA = RegionInfoBuilder.newBuilder(TableName.valueOf("T1"))
+        .setStartKey(Bytes.toBytes("a")).setEndKey(Bytes.toBytes("b")).build();
+    RegionInfo infoB = RegionInfoBuilder.newBuilder(TableName.valueOf("T1"))
+        .setStartKey(Bytes.toBytes("b")).setEndKey(Bytes.toBytes("d")).build();
+    RegionInfo infoC = RegionInfoBuilder.newBuilder(TableName.valueOf("T1"))
+        .setStartKey(Bytes.toBytes("c")).setEndKey(Bytes.toBytes("d")).build();
+
+    RegionSizeStore store = new RegionSizeStoreImpl();
+    store.put(infoA, 1024L);
+    store.put(infoB, 1024L);
+    store.put(infoC, 1024L);
+
+    // If there are no online regions, all entries should be removed.
+    chore.removeNonOnlineRegions(store, Collections.<RegionInfo> emptySet());
+    assertTrue(store.isEmpty());
+
+    store.put(infoA, 1024L);
+    store.put(infoB, 1024L);
+    store.put(infoC, 1024L);
+
+    // Remove a single region
+    chore.removeNonOnlineRegions(store, new HashSet<>(Arrays.asList(infoA, infoC)));
+    assertEquals(2, store.size());
+    assertNotNull(store.getRegionSize(infoA));
+    assertNotNull(store.getRegionSize(infoC));
+  }
+
+  /**
+   * Creates an HBase Configuration object for the default values.
+   */
+  private Configuration getDefaultHBaseConfiguration() {
+    final Configuration conf = HBaseConfiguration.create();
+    conf.addResource("hbase-default.xml");
+    return conf;
+  }
+
+  private HRegionServer mockRegionServer(Configuration conf) {
+    HRegionServer rs = mock(HRegionServer.class);
+    when(rs.getConfiguration()).thenReturn(conf);
+    return rs;
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionSizeStoreImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionSizeStoreImpl.java
new file mode 100644
index 0000000..688fde0
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionSizeStoreImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({SmallTests.class})
+public class TestRegionSizeStoreImpl {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestRegionSizeStoreImpl.class);
+
+  private static final RegionInfo INFOA = RegionInfoBuilder.newBuilder(TableName.valueOf("TEST"))
+      .setStartKey(Bytes.toBytes("a")).setEndKey(Bytes.toBytes("b")).build();
+  private static final RegionInfo INFOB = RegionInfoBuilder.newBuilder(TableName.valueOf("TEST"))
+      .setStartKey(Bytes.toBytes("b")).setEndKey(Bytes.toBytes("c")).build();
+
+  @Test
+  public void testSizeUpdates() {
+    RegionSizeStore store = new RegionSizeStoreImpl();
+    assertTrue(store.isEmpty());
+    assertEquals(0, store.size());
+
+    store.put(INFOA, 1024L);
+
+    assertFalse(store.isEmpty());
+    assertEquals(1, store.size());
+    assertEquals(1024L, store.getRegionSize(INFOA).getSize());
+
+    store.put(INFOA, 2048L);
+    assertEquals(1, store.size());
+    assertEquals(2048L, store.getRegionSize(INFOA).getSize());
+
+    store.incrementRegionSize(INFOA, 512L);
+    assertEquals(1, store.size());
+    assertEquals(2048L + 512L, store.getRegionSize(INFOA).getSize());
+
+    store.remove(INFOA);
+    assertTrue(store.isEmpty());
+    assertEquals(0, store.size());
+
+    store.put(INFOA, 64L);
+    store.put(INFOB, 128L);
+
+    assertEquals(2, store.size());
+    Map<RegionInfo,RegionSize> records = new HashMap<>();
+    for (Entry<RegionInfo,RegionSize> entry : store) {
+      records.put(entry.getKey(), entry.getValue());
+    }
+
+    assertEquals(64L, records.remove(INFOA).getSize());
+    assertEquals(128L, records.remove(INFOB).getSize());
+    assertTrue(records.isEmpty());
+  }
+
+  @Test
+  public void testNegativeDeltaForMissingRegion() {
+    RegionSizeStore store = new RegionSizeStoreImpl();
+
+    assertNull(store.getRegionSize(INFOA));
+
+    // We shouldn't allow a negative size to enter the RegionSizeStore. Getting a negative size
+    // like this shouldn't be possible, but we can prevent the bad state from propagating and
+    // getting worse.
+    store.incrementRegionSize(INFOA, -5);
+    assertNotNull(store.getRegionSize(INFOA));
+    assertEquals(0, store.getRegionSize(INFOA).getSize());
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSnapshotQuotaObserverChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSnapshotQuotaObserverChore.java
index f9dc1ea..d843fc9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSnapshotQuotaObserverChore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSnapshotQuotaObserverChore.java
@@ -18,40 +18,45 @@
 package org.apache.hadoop.hbase.quotas;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter.Predicate;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.SnapshotDescription;
 import org.apache.hadoop.hbase.client.SnapshotType;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.quotas.SnapshotQuotaObserverChore.SnapshotWithSize;
 import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.NoFilesToDischarge;
 import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.SpaceQuotaSnapshotPredicate;
 import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
+import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil.SnapshotVisitor;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -65,9 +70,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
-import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
 import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.StoreFile;
+
 /**
  * Test class for the {@link SnapshotQuotaObserverChore}.
  */
@@ -118,50 +125,6 @@ public class TestSnapshotQuotaObserverChore {
   }
 
   @Test
-  public void testSnapshotSizePersistence() throws IOException {
-    final Admin admin = TEST_UTIL.getAdmin();
-    final TableName tn = TableName.valueOf("quota_snapshotSizePersistence");
-    if (admin.tableExists(tn)) {
-      admin.disableTable(tn);
-      admin.deleteTable(tn);
-    }
-    HTableDescriptor desc = new HTableDescriptor(tn);
-    desc.addFamily(new HColumnDescriptor(QuotaTableUtil.QUOTA_FAMILY_USAGE));
-    admin.createTable(desc);
-
-    Multimap<TableName,SnapshotWithSize> snapshotsWithSizes = HashMultimap.create();
-    try (Table table = conn.getTable(tn)) {
-      // Writing no values will result in no records written.
-      verify(table, () -> {
-        testChore.persistSnapshotSizes(table, snapshotsWithSizes);
-        assertEquals(0, count(table));
-      });
-
-      verify(table, () -> {
-        TableName originatingTable = TableName.valueOf("t1");
-        snapshotsWithSizes.put(originatingTable, new SnapshotWithSize("ss1", 1024L));
-        snapshotsWithSizes.put(originatingTable, new SnapshotWithSize("ss2", 4096L));
-        testChore.persistSnapshotSizes(table, snapshotsWithSizes);
-        assertEquals(2, count(table));
-        assertEquals(1024L, extractSnapshotSize(table, originatingTable, "ss1"));
-        assertEquals(4096L, extractSnapshotSize(table, originatingTable, "ss2"));
-      });
-
-      snapshotsWithSizes.clear();
-      verify(table, () -> {
-        snapshotsWithSizes.put(TableName.valueOf("t1"), new SnapshotWithSize("ss1", 1024L));
-        snapshotsWithSizes.put(TableName.valueOf("t2"), new SnapshotWithSize("ss2", 4096L));
-        snapshotsWithSizes.put(TableName.valueOf("t3"), new SnapshotWithSize("ss3", 8192L));
-        testChore.persistSnapshotSizes(table, snapshotsWithSizes);
-        assertEquals(3, count(table));
-        assertEquals(1024L, extractSnapshotSize(table, TableName.valueOf("t1"), "ss1"));
-        assertEquals(4096L, extractSnapshotSize(table, TableName.valueOf("t2"), "ss2"));
-        assertEquals(8192L, extractSnapshotSize(table, TableName.valueOf("t3"), "ss3"));
-      });
-    }
-  }
-
-  @Test
   public void testSnapshotsFromTables() throws Exception {
     TableName tn1 = helper.createTableWithRegions(1);
     TableName tn2 = helper.createTableWithRegions(1);
@@ -279,13 +242,13 @@ public class TestSnapshotQuotaObserverChore {
         "Expected to see the single snapshot: " + snapshotsToCompute, 1, snapshotsToCompute.size());
 
     // Get the size of our snapshot
-    Multimap<TableName,SnapshotWithSize> snapshotsWithSize = testChore.computeSnapshotSizes(
+    Map<String,Long> namespaceSnapshotSizes = testChore.computeSnapshotSizes(
         snapshotsToCompute);
-    assertEquals(1, snapshotsWithSize.size());
-    SnapshotWithSize sws = Iterables.getOnlyElement(snapshotsWithSize.get(tn1));
-    assertEquals(snapshotName, sws.getName());
+    assertEquals(1, namespaceSnapshotSizes.size());
+    Long size = namespaceSnapshotSizes.get(tn1.getNamespaceAsString());
+    assertNotNull(size);
     // The snapshot should take up no space since the table refers to it completely
-    assertEquals(0, sws.getSize());
+    assertEquals(0, size.longValue());
 
     // Write some more data, flush it, and then major_compact the table
     helper.writeData(tn1, 256L * SpaceQuotaHelperForTests.ONE_KILOBYTE);
@@ -313,35 +276,58 @@ public class TestSnapshotQuotaObserverChore {
     snapshotsToCompute = testChore.getSnapshotsToComputeSize();
     assertEquals(
         "Expected to see the single snapshot: " + snapshotsToCompute, 1, snapshotsToCompute.size());
-    snapshotsWithSize = testChore.computeSnapshotSizes(
+    namespaceSnapshotSizes = testChore.computeSnapshotSizes(
             snapshotsToCompute);
-    assertEquals(1, snapshotsWithSize.size());
-    sws = Iterables.getOnlyElement(snapshotsWithSize.get(tn1));
-    assertEquals(snapshotName, sws.getName());
+    assertEquals(1, namespaceSnapshotSizes.size());
+    size = namespaceSnapshotSizes.get(tn1.getNamespaceAsString());
+    assertNotNull(size);
     // The snapshot should take up the size the table originally took up
-    assertEquals(snapshotSize, sws.getSize());
+    assertEquals(snapshotSize, size.longValue());
   }
 
   @Test
   public void testPersistingSnapshotsForNamespaces() throws Exception {
-    Multimap<TableName,SnapshotWithSize> snapshotsWithSizes = HashMultimap.create();
     TableName tn1 = TableName.valueOf("ns1:tn1");
     TableName tn2 = TableName.valueOf("ns1:tn2");
     TableName tn3 = TableName.valueOf("ns2:tn1");
     TableName tn4 = TableName.valueOf("ns2:tn2");
     TableName tn5 = TableName.valueOf("tn1");
-
-    snapshotsWithSizes.put(tn1, new SnapshotWithSize("", 1024L));
-    snapshotsWithSizes.put(tn2, new SnapshotWithSize("", 1024L));
-    snapshotsWithSizes.put(tn3, new SnapshotWithSize("", 512L));
-    snapshotsWithSizes.put(tn4, new SnapshotWithSize("", 1024L));
-    snapshotsWithSizes.put(tn5, new SnapshotWithSize("", 3072L));
-
-    Map<String,Long> nsSizes = testChore.groupSnapshotSizesByNamespace(snapshotsWithSizes);
-    assertEquals(3, nsSizes.size());
-    assertEquals(2048L, (long) nsSizes.get("ns1"));
-    assertEquals(1536L, (long) nsSizes.get("ns2"));
-    assertEquals(3072L, (long) nsSizes.get(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR));
+    // Shim in a custom factory to avoid computing snapshot sizes.
+    FileArchiverNotifierFactory test = new FileArchiverNotifierFactory() {
+      Map<TableName,Long> tableToSize = ImmutableMap.of(
+          tn1, 1024L, tn2, 1024L, tn3, 512L, tn4, 1024L, tn5, 3072L);
+      @Override
+      public FileArchiverNotifier get(
+          Connection conn, Configuration conf, FileSystem fs, TableName tn) {
+        return new FileArchiverNotifier() {
+          @Override public void addArchivedFiles(Set<Entry<String,Long>> fileSizes)
+              throws IOException {}
+
+          @Override
+          public long computeAndStoreSnapshotSizes(Collection<String> currentSnapshots)
+              throws IOException {
+            return tableToSize.get(tn);
+          }
+        };
+      }
+    };
+    try {
+      FileArchiverNotifierFactoryImpl.setInstance(test);
+
+      Multimap<TableName,String> snapshotsToCompute = HashMultimap.create();
+      snapshotsToCompute.put(tn1, "");
+      snapshotsToCompute.put(tn2, "");
+      snapshotsToCompute.put(tn3, "");
+      snapshotsToCompute.put(tn4, "");
+      snapshotsToCompute.put(tn5, "");
+      Map<String,Long> nsSizes = testChore.computeSnapshotSizes(snapshotsToCompute);
+      assertEquals(3, nsSizes.size());
+      assertEquals(2048L, (long) nsSizes.get("ns1"));
+      assertEquals(1536L, (long) nsSizes.get("ns2"));
+      assertEquals(3072L, (long) nsSizes.get(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR));
+    } finally {
+      FileArchiverNotifierFactoryImpl.reset();
+    }
   }
 
   @Test
@@ -430,44 +416,113 @@ public class TestSnapshotQuotaObserverChore {
     });
   }
 
-  private long count(Table t) throws IOException {
-    try (ResultScanner rs = t.getScanner(new Scan())) {
-      long sum = 0;
-      for (Result r : rs) {
-        while (r.advance()) {
-          sum++;
+  @Test
+  public void testBucketingFilesToSnapshots() throws Exception {
+    // Create a table and set a quota
+    TableName tn1 = helper.createTableWithRegions(1);
+    admin.setQuota(QuotaSettingsFactory.limitTableSpace(
+        tn1, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS));
+
+    // Write some data and flush it
+    helper.writeData(tn1, 256L * SpaceQuotaHelperForTests.ONE_KILOBYTE);
+    admin.flush(tn1);
+
+    final AtomicReference<Long> lastSeenSize = new AtomicReference<>();
+    // Wait for the Master chore to run to see the usage (with a fudge factor)
+    TEST_UTIL.waitFor(30_000, new SpaceQuotaSnapshotPredicate(conn, tn1) {
+      @Override
+      boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
+        lastSeenSize.set(snapshot.getUsage());
+        return snapshot.getUsage() > 230L * SpaceQuotaHelperForTests.ONE_KILOBYTE;
+      }
+    });
+
+    // Create a snapshot on the table
+    final String snapshotName1 = tn1 + "snapshot1";
+    admin.snapshot(new SnapshotDescription(snapshotName1, tn1, SnapshotType.SKIPFLUSH));
+    // Major compact the table to force a rewrite
+    TEST_UTIL.compact(tn1, true);
+
+    // Make sure that the snapshot owns the size
+    final Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME);
+    TEST_UTIL.waitFor(30_000, new Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        LOG.info("Waiting to see quota snapshot1 size");
+        debugFilesForSnapshot(tn1, snapshotName1);
+        Get g = QuotaTableUtil.makeGetForSnapshotSize(tn1, snapshotName1);
+        Result r = quotaTable.get(g);
+        if (r == null || r.isEmpty()) {
+          return false;
         }
+        r.advance();
+        Cell c = r.current();
+        // The compaction result file has an additional compaction event tracker
+        return lastSeenSize.get() == QuotaTableUtil.parseSnapshotSize(c);
       }
-      return sum;
-    }
-  }
+    });
+
+    LOG.info("Snapshotting table again");
+    // Create another snapshot on the table
+    final String snapshotName2 = tn1 + "snapshot2";
+    admin.snapshot(new SnapshotDescription(snapshotName2, tn1, SnapshotType.SKIPFLUSH));
+    LOG.info("Compacting table");
+    // Major compact the table to force a rewrite
+    TEST_UTIL.compact(tn1, true);
 
-  private long extractSnapshotSize(
-      Table quotaTable, TableName tn, String snapshot) throws IOException {
-    Get g = QuotaTableUtil.makeGetForSnapshotSize(tn, snapshot);
+    // Make sure that the snapshot owns the size
+    TEST_UTIL.waitFor(30_000, new Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        LOG.info("Waiting to see quota snapshot2 size");
+        debugFilesForSnapshot(tn1, snapshotName2);
+        Get g = QuotaTableUtil.makeGetForSnapshotSize(tn1, snapshotName2);
+        Result r = quotaTable.get(g);
+        if (r == null || r.isEmpty()) {
+          return false;
+        }
+        r.advance();
+        Cell c = r.current();
+        return closeInSize(lastSeenSize.get(),
+            QuotaTableUtil.parseSnapshotSize(c), SpaceQuotaHelperForTests.ONE_KILOBYTE);
+      }
+    });
+
+    Get g = QuotaTableUtil.createGetNamespaceSnapshotSize(tn1.getNamespaceAsString());
     Result r = quotaTable.get(g);
     assertNotNull(r);
-    CellScanner cs = r.cellScanner();
-    cs.advance();
-    Cell c = cs.current();
-    assertNotNull(c);
-    return QuotaTableUtil.extractSnapshotSize(
-        c.getValueArray(), c.getValueOffset(), c.getValueLength());
-  }
-
-  private void verify(Table t, IOThrowingRunnable test) throws IOException {
-    admin.disableTable(t.getName());
-    admin.truncateTable(t.getName(), false);
-    test.run();
+    assertFalse(r.isEmpty());
+    r.advance();
+    long size = QuotaTableUtil.parseSnapshotSize(r.current());
+    // Two snapshots of equal size.
+    assertTrue(closeInSize(lastSeenSize.get() * 2, size, SpaceQuotaHelperForTests.ONE_KILOBYTE));
   }
 
-  @FunctionalInterface
-  private interface IOThrowingRunnable {
-    void run() throws IOException;
+  /**
+   * Prints details about every file referenced by the snapshot with the given name.
+   */
+  void debugFilesForSnapshot(TableName table, String snapshot) throws IOException {
+    final Configuration conf = TEST_UTIL.getConfiguration();
+    final FileSystem fs = TEST_UTIL.getTestFileSystem();
+    final Path snapshotDir = new Path(conf.get("hbase.rootdir"), HConstants.SNAPSHOT_DIR_NAME);
+    SnapshotReferenceUtil.visitReferencedFiles(conf, fs, new Path(snapshotDir, snapshot),
+        new SnapshotVisitor() {
+          @Override
+          public void storeFile(
+              RegionInfo regionInfo, String familyName, StoreFile storeFile) throws IOException {
+            LOG.info("Snapshot={} references file={}, size={}", snapshot, storeFile.getName(),
+                storeFile.getFileSize());
+          }
+        }
+    );
   }
 
   /**
    * Computes if {@code size2} is within {@code delta} of {@code size1}, inclusive.
+   *
+   * The size of our store files will change after the first major compaction as the last
+   * compaction gets serialized into the store file (see the fields referenced by
+   * COMPACTION_EVENT_KEY in HFilePrettyPrinter).
    */
   boolean closeInSize(long size1, long size2, long delta) {
     long lower = size1 - delta;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
new file mode 100644
index 0000000..0aa888e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
@@ -0,0 +1,428 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.ClientServiceCallable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.RpcRetryingCaller;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.quotas.policies.DefaultViolationPolicyEnforcement;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * End-to-end test class for filesystem space quotas.
+ */
+@Category(LargeTests.class)
+public class TestSpaceQuotas {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestSpaceQuotas.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestSpaceQuotas.class);
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  // Global for all tests in the class
+  private static final AtomicLong COUNTER = new AtomicLong(0);
+  private static final int NUM_RETRIES = 10;
+
+  @Rule
+  public TestName testName = new TestName();
+  private SpaceQuotaHelperForTests helper;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void removeAllQuotas() throws Exception {
+    final Connection conn = TEST_UTIL.getConnection();
+    if (helper == null) {
+      helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER);
+    }
+    // Wait for the quota table to be created
+    if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) {
+      helper.waitForQuotaTable(conn);
+    } else {
+      // Or, clean up any quotas from previous test runs.
+      helper.removeAllQuotas(conn);
+      assertEquals(0, helper.listNumDefinedQuotas(conn));
+    }
+  }
+
+  @Test
+  public void testNoInsertsWithPut() throws Exception {
+    Put p = new Put(Bytes.toBytes("to_reject"));
+    p.addColumn(
+        Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject"));
+    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, p);
+  }
+
+  @Test
+  public void testNoInsertsWithAppend() throws Exception {
+    Append a = new Append(Bytes.toBytes("to_reject"));
+    a.addColumn(
+        Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject"));
+    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, a);
+  }
+
+  @Test
+  public void testNoInsertsWithIncrement() throws Exception {
+    Increment i = new Increment(Bytes.toBytes("to_reject"));
+    i.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("count"), 0);
+    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, i);
+  }
+
+  @Test
+  public void testDeletesAfterNoInserts() throws Exception {
+    final TableName tn = writeUntilViolation(SpaceViolationPolicy.NO_INSERTS);
+    // Try a couple of times to verify that the quota never gets enforced, same as we
+    // do when we're trying to catch the failure.
+    Delete d = new Delete(Bytes.toBytes("should_not_be_rejected"));
+    for (int i = 0; i < NUM_RETRIES; i++) {
+      try (Table t = TEST_UTIL.getConnection().getTable(tn)) {
+        t.delete(d);
+      }
+    }
+  }
+
+  @Test
+  public void testNoWritesWithPut() throws Exception {
+    Put p = new Put(Bytes.toBytes("to_reject"));
+    p.addColumn(
+        Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject"));
+    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, p);
+  }
+
+  @Test
+  public void testNoWritesWithAppend() throws Exception {
+    Append a = new Append(Bytes.toBytes("to_reject"));
+    a.addColumn(
+        Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject"));
+    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, a);
+  }
+
+  @Test
+  public void testNoWritesWithIncrement() throws Exception {
+    Increment i = new Increment(Bytes.toBytes("to_reject"));
+    i.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("count"), 0);
+    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, i);
+  }
+
+  @Test
+  public void testNoWritesWithDelete() throws Exception {
+    Delete d = new Delete(Bytes.toBytes("to_reject"));
+    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, d);
+  }
+
+  @Test
+  public void testNoCompactions() throws Exception {
+    Put p = new Put(Bytes.toBytes("to_reject"));
+    p.addColumn(
+        Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject"));
+    final TableName tn = writeUntilViolationAndVerifyViolation(
+        SpaceViolationPolicy.NO_WRITES_COMPACTIONS, p);
+    // We know the policy is active at this point
+
+    // Major compactions should be rejected
+    try {
+      TEST_UTIL.getAdmin().majorCompact(tn);
+      fail("Expected that invoking the compaction should throw an Exception");
+    } catch (DoNotRetryIOException e) {
+      // Expected!
+    }
+    // Minor compactions should also be rejected.
+    try {
+      TEST_UTIL.getAdmin().compact(tn);
+      fail("Expected that invoking the compaction should throw an Exception");
+    } catch (DoNotRetryIOException e) {
+      // Expected!
+    }
+  }
+
+  @Test
+  public void testNoEnableAfterDisablePolicy() throws Exception {
+    Put p = new Put(Bytes.toBytes("to_reject"));
+    p.addColumn(
+        Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject"));
+    final TableName tn = writeUntilViolation(SpaceViolationPolicy.DISABLE);
+    final Admin admin = TEST_UTIL.getAdmin();
+    // Disabling a table relies on some external action (over the other policies), so wait a bit
+    // more than the other tests.
+    for (int i = 0; i < NUM_RETRIES * 2; i++) {
+      if (admin.isTableEnabled(tn)) {
+        LOG.info(tn + " is still enabled, expecting it to be disabled. Will wait and re-check.");
+        Thread.sleep(2000);
+      }
+    }
+    assertFalse(tn + " is still enabled but it should be disabled", admin.isTableEnabled(tn));
+    try {
+      admin.enableTable(tn);
+    } catch (AccessDeniedException e) {
+      String exceptionContents = StringUtils.stringifyException(e);
+      final String expectedText = "violated space quota";
+      assertTrue("Expected the exception to contain " + expectedText + ", but was: "
+          + exceptionContents, exceptionContents.contains(expectedText));
+    }
+  }
+
+  @Test
+  public void testNoBulkLoadsWithNoWrites() throws Exception {
+    Put p = new Put(Bytes.toBytes("to_reject"));
+    p.addColumn(
+        Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject"));
+    TableName tableName = writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, p);
+
+    // The table is now in violation. Try to do a bulk load
+    ClientServiceCallable<Void> callable = helper.generateFileToLoad(tableName, 1, 50);
+    RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
+    RpcRetryingCaller<Void> caller = factory.newCaller();
+    try {
+      caller.callWithRetries(callable, Integer.MAX_VALUE);
+      fail("Expected the bulk load call to fail!");
+    } catch (SpaceLimitingException e) {
+      // Pass
+      LOG.trace("Caught expected exception", e);
+    }
+  }
+
+  @Test
+  public void testAtomicBulkLoadUnderQuota() throws Exception {
+    // Need to verify that if the batch of hfiles cannot be loaded, none are loaded.
+    TableName tn = helper.createTableWithRegions(10);
+
+    final long sizeLimit = 50L * SpaceQuotaHelperForTests.ONE_KILOBYTE;
+    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(
+        tn, sizeLimit, SpaceViolationPolicy.NO_INSERTS);
+    TEST_UTIL.getAdmin().setQuota(settings);
+
+    HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
+    RegionServerSpaceQuotaManager spaceQuotaManager = rs.getRegionServerSpaceQuotaManager();
+    Map<TableName,SpaceQuotaSnapshot> snapshots = spaceQuotaManager.copyQuotaSnapshots();
+    Map<RegionInfo,Long> regionSizes = getReportedSizesForTable(tn);
+    while (true) {
+      SpaceQuotaSnapshot snapshot = snapshots.get(tn);
+      if (snapshot != null && snapshot.getLimit() > 0) {
+        break;
+      }
+      LOG.debug(
+          "Snapshot does not yet realize quota limit: " + snapshots + ", regionsizes: " +
+          regionSizes);
+      Thread.sleep(3000);
+      snapshots = spaceQuotaManager.copyQuotaSnapshots();
+      regionSizes = getReportedSizesForTable(tn);
+    }
+    // Our quota limit should be reflected in the latest snapshot
+    SpaceQuotaSnapshot snapshot = snapshots.get(tn);
+    assertEquals(0L, snapshot.getUsage());
+    assertEquals(sizeLimit, snapshot.getLimit());
+
+    // We would also not have a "real" policy in violation
+    ActivePolicyEnforcement activePolicies = spaceQuotaManager.getActiveEnforcements();
+    SpaceViolationPolicyEnforcement enforcement = activePolicies.getPolicyEnforcement(tn);
+    assertTrue(
+        "Expected to find Noop policy, but got " + enforcement.getClass().getSimpleName(),
+        enforcement instanceof DefaultViolationPolicyEnforcement);
+
+    // Should generate two files, each of which is over 25KB each
+    ClientServiceCallable<Void> callable = helper.generateFileToLoad(tn, 2, 525);
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    FileStatus[] files = fs.listStatus(
+        new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files"));
+    for (FileStatus file : files) {
+      assertTrue(
+          "Expected the file, " + file.getPath() + ",  length to be larger than 25KB, but was "
+              + file.getLen(),
+          file.getLen() > 25 * SpaceQuotaHelperForTests.ONE_KILOBYTE);
+      LOG.debug(file.getPath() + " -> " + file.getLen() +"B");
+    }
+
+    RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
+    RpcRetryingCaller<Void> caller = factory.newCaller();
+    try {
+      caller.callWithRetries(callable, Integer.MAX_VALUE);
+      fail("Expected the bulk load call to fail!");
+    } catch (SpaceLimitingException e) {
+      // Pass
+      LOG.trace("Caught expected exception", e);
+    }
+    // Verify that we have no data in the table because neither file should have been
+    // loaded even though one of the files could have.
+    Table table = TEST_UTIL.getConnection().getTable(tn);
+    ResultScanner scanner = table.getScanner(new Scan());
+    try {
+      assertNull("Expected no results", scanner.next());
+    } finally{
+      scanner.close();
+    }
+  }
+
+  @Test
+  public void testTableQuotaOverridesNamespaceQuota() throws Exception {
+    final SpaceViolationPolicy policy = SpaceViolationPolicy.NO_INSERTS;
+    final TableName tn = helper.createTableWithRegions(10);
+
+    // 2MB limit on the table, 1GB limit on the namespace
+    final long tableLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
+    final long namespaceLimit = 1024L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
+    TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory.limitTableSpace(tn, tableLimit, policy));
+    TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory.limitNamespaceSpace(
+        tn.getNamespaceAsString(), namespaceLimit, policy));
+
+    // Write more data than should be allowed and flush it to disk
+    helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
+
+    // This should be sufficient time for the chores to run and see the change.
+    Thread.sleep(5000);
+
+    // The write should be rejected because the table quota takes priority over the namespace
+    Put p = new Put(Bytes.toBytes("to_reject"));
+    p.addColumn(
+        Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject"));
+    verifyViolation(policy, tn, p);
+  }
+
+  private Map<RegionInfo,Long> getReportedSizesForTable(TableName tn) {
+    HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
+    MasterQuotaManager quotaManager = master.getMasterQuotaManager();
+    Map<RegionInfo,Long> filteredRegionSizes = new HashMap<>();
+    for (Entry<RegionInfo,Long> entry : quotaManager.snapshotRegionSizes().entrySet()) {
+      if (entry.getKey().getTable().equals(tn)) {
+        filteredRegionSizes.put(entry.getKey(), entry.getValue());
+      }
+    }
+    return filteredRegionSizes;
+  }
+
+  private TableName writeUntilViolation(SpaceViolationPolicy policyToViolate) throws Exception {
+    TableName tn = helper.createTableWithRegions(10);
+
+    final long sizeLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
+    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, policyToViolate);
+    TEST_UTIL.getAdmin().setQuota(settings);
+
+    // Write more data than should be allowed and flush it to disk
+    helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
+
+    // This should be sufficient time for the chores to run and see the change.
+    Thread.sleep(5000);
+
+    return tn;
+  }
+
+  private TableName writeUntilViolationAndVerifyViolation(
+      SpaceViolationPolicy policyToViolate, Mutation m) throws Exception {
+    final TableName tn = writeUntilViolation(policyToViolate);
+    verifyViolation(policyToViolate, tn, m);
+    return tn;
+  }
+
+  private void verifyViolation(
+      SpaceViolationPolicy policyToViolate, TableName tn, Mutation m) throws Exception {
+    // But let's try a few times to get the exception before failing
+    boolean sawError = false;
+    for (int i = 0; i < NUM_RETRIES && !sawError; i++) {
+      try (Table table = TEST_UTIL.getConnection().getTable(tn)) {
+        if (m instanceof Put) {
+          table.put((Put) m);
+        } else if (m instanceof Delete) {
+          table.delete((Delete) m);
+        } else if (m instanceof Append) {
+          table.append((Append) m);
+        } else if (m instanceof Increment) {
+          table.increment((Increment) m);
+        } else {
+          fail(
+              "Failed to apply " + m.getClass().getSimpleName() +
+              " to the table. Programming error");
+        }
+        LOG.info("Did not reject the " + m.getClass().getSimpleName() + ", will sleep and retry");
+        Thread.sleep(2000);
+      } catch (Exception e) {
+        String msg = StringUtils.stringifyException(e);
+        assertTrue("Expected exception message to contain the word '" + policyToViolate.name() +
+            "', but was " + msg, msg.contains(policyToViolate.name()));
+        sawError = true;
+      }
+    }
+    if (!sawError) {
+      try (Table quotaTable = TEST_UTIL.getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
+        ResultScanner scanner = quotaTable.getScanner(new Scan());
+        Result result = null;
+        LOG.info("Dumping contents of hbase:quota table");
+        while ((result = scanner.next()) != null) {
+          LOG.info(Bytes.toString(result.getRow()) + " => " + result.toString());
+        }
+        scanner.close();
+      }
+    }
+    assertTrue(
+        "Expected to see an exception writing data to a table exceeding its quota", sawError);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/policies/TestBulkLoadCheckingViolationPolicyEnforcement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/policies/TestBulkLoadCheckingViolationPolicyEnforcement.java
index 3628738..4995de7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/policies/TestBulkLoadCheckingViolationPolicyEnforcement.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/policies/TestBulkLoadCheckingViolationPolicyEnforcement.java
@@ -78,7 +78,7 @@ public class TestBulkLoadCheckingViolationPolicyEnforcement {
 
     policy.initialize(rss, tableName, snapshot);
 
-    policy.checkBulkLoad(fs, paths);
+    policy.computeBulkLoadSize(fs, paths);
   }
 
   @Test(expected = IllegalArgumentException.class)
@@ -97,7 +97,7 @@ public class TestBulkLoadCheckingViolationPolicyEnforcement {
     policy.initialize(rss, tableName, snapshot);
 
     // If the file to bulk load isn't a file, this should throw an exception
-    policy.checkBulkLoad(fs, paths);
+    policy.computeBulkLoadSize(fs, paths);
   }
 
   @Test(expected = SpaceLimitingException.class)
@@ -120,7 +120,7 @@ public class TestBulkLoadCheckingViolationPolicyEnforcement {
 
     policy.initialize(rss, tableName, snapshot);
 
-    policy.checkBulkLoad(fs, paths);
+    policy.computeBulkLoadSize(fs, paths);
   }
 
   @Test(expected = SpaceLimitingException.class)
@@ -143,6 +143,6 @@ public class TestBulkLoadCheckingViolationPolicyEnforcement {
 
     policy.initialize(rss, tableName, snapshot);
 
-    policy.checkBulkLoad(fs, paths);
+    policy.computeBulkLoadSize(fs, paths);
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 3f65d38..dbf1bd9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -89,6 +89,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
@@ -1676,6 +1677,56 @@ public class TestHStore {
       .startsWith("eager".toUpperCase()));
   }
 
+  @Test
+  public void testSpaceQuotaChangeAfterReplacement() throws IOException {
+    final TableName tn = TableName.valueOf(name.getMethodName());
+    init(name.getMethodName());
+
+    RegionSizeStoreImpl sizeStore = new RegionSizeStoreImpl();
+
+    HStoreFile sf1 = mockStoreFileWithLength(1024L);
+    HStoreFile sf2 = mockStoreFileWithLength(2048L);
+    HStoreFile sf3 = mockStoreFileWithLength(4096L);
+    HStoreFile sf4 = mockStoreFileWithLength(8192L);
+
+    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("a"))
+        .setEndKey(Bytes.toBytes("b")).build();
+
+    // Compacting two files down to one, reducing size
+    sizeStore.put(regionInfo, 1024L + 4096L);
+    store.updateSpaceQuotaAfterFileReplacement(
+        sizeStore, regionInfo, Arrays.asList(sf1, sf3), Arrays.asList(sf2));
+
+    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
+
+    // The same file length in and out should have no change
+    store.updateSpaceQuotaAfterFileReplacement(
+        sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf2));
+
+    assertEquals(2048L, sizeStore.getRegionSize(regionInfo).getSize());
+
+    // Increase the total size used
+    store.updateSpaceQuotaAfterFileReplacement(
+        sizeStore, regionInfo, Arrays.asList(sf2), Arrays.asList(sf3));
+
+    assertEquals(4096L, sizeStore.getRegionSize(regionInfo).getSize());
+
+    RegionInfo regionInfo2 = RegionInfoBuilder.newBuilder(tn).setStartKey(Bytes.toBytes("b"))
+        .setEndKey(Bytes.toBytes("c")).build();
+    store.updateSpaceQuotaAfterFileReplacement(sizeStore, regionInfo2, null, Arrays.asList(sf4));
+
+    assertEquals(8192L, sizeStore.getRegionSize(regionInfo2).getSize());
+  }
+
+  private HStoreFile mockStoreFileWithLength(long length) {
+    HStoreFile sf = mock(HStoreFile.class);
+    StoreFileReader sfr = mock(StoreFileReader.class);
+    when(sf.isHFile()).thenReturn(true);
+    when(sf.getReader()).thenReturn(sfr);
+    when(sfr.length()).thenReturn(length);
+    return sf;
+  }
+
   private static class MyThread extends Thread {
     private StoreScanner scanner;
     private KeyValueHeap heap;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerRegionSpaceUseReport.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerRegionSpaceUseReport.java
index e17b87c..3cac439 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerRegionSpaceUseReport.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerRegionSpaceUseReport.java
@@ -25,12 +25,13 @@ import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.mock;
 
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.quotas.RegionSize;
+import org.apache.hadoop.hbase.quotas.RegionSizeStore;
+import org.apache.hadoop.hbase.quotas.RegionSizeStoreFactory;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.ClassRule;
@@ -68,52 +69,24 @@ public class TestRegionServerRegionSpaceUseReport {
         .setStartKey(Bytes.toBytes("c"))
         .setEndKey(Bytes.toBytes("d"))
         .build();
-    Map<RegionInfo,Long> sizes = new HashMap<>();
-    sizes.put(hri1, 1024L * 1024L);
-    sizes.put(hri2, 1024L * 1024L * 8L);
-    sizes.put(hri3, 1024L * 1024L * 32L);
+    RegionSizeStore store = RegionSizeStoreFactory.getInstance().createStore();
+    store.put(hri1, 1024L * 1024L);
+    store.put(hri2, 1024L * 1024L * 8L);
+    store.put(hri3, 1024L * 1024L * 32L);
 
     // Call the real method to convert the map into a protobuf
     HRegionServer rs = mock(HRegionServer.class);
-    doCallRealMethod().when(rs).buildRegionSpaceUseReportRequest(any());
+    doCallRealMethod().when(rs).buildRegionSpaceUseReportRequest(any(RegionSizeStore.class));
     doCallRealMethod().when(rs).convertRegionSize(any(), anyLong());
 
-    RegionSpaceUseReportRequest requests = rs.buildRegionSpaceUseReportRequest(sizes);
-    assertEquals(sizes.size(), requests.getSpaceUseCount());
+    RegionSpaceUseReportRequest requests = rs.buildRegionSpaceUseReportRequest(store);
+    assertEquals(store.size(), requests.getSpaceUseCount());
     for (RegionSpaceUse spaceUse : requests.getSpaceUseList()) {
       RegionInfo hri = ProtobufUtil.toRegionInfo(spaceUse.getRegionInfo());
-      Long expectedSize = sizes.remove(hri);
+      RegionSize expectedSize = store.remove(hri);
       assertNotNull("Could not find size for HRI: " + hri, expectedSize);
-      assertEquals(expectedSize.longValue(), spaceUse.getRegionSize());
+      assertEquals(expectedSize.getSize(), spaceUse.getRegionSize());
     }
-    assertTrue("Should not have any space use entries left: " + sizes, sizes.isEmpty());
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testNullMap() {
-    // Call the real method to convert the map into a protobuf
-    HRegionServer rs = mock(HRegionServer.class);
-    doCallRealMethod().when(rs).buildRegionSpaceUseReportRequest(any());
-    doCallRealMethod().when(rs).convertRegionSize(any(), anyLong());
-
-    rs.buildRegionSpaceUseReportRequest(null);
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testMalformedMap() {
-    TableName tn = TableName.valueOf("table1");
-    RegionInfo hri1 = RegionInfoBuilder.newBuilder(tn)
-        .setStartKey(Bytes.toBytes("a"))
-        .setEndKey(Bytes.toBytes("b"))
-        .build();
-    Map<RegionInfo,Long> sizes = new HashMap<>();
-    sizes.put(hri1, null);
-
-    // Call the real method to convert the map into a protobuf
-    HRegionServer rs = mock(HRegionServer.class);
-    doCallRealMethod().when(rs).buildRegionSpaceUseReportRequest(any());
-    doCallRealMethod().when(rs).convertRegionSize(any(), anyLong());
-
-    rs.buildRegionSpaceUseReportRequest(sizes);
+    assertTrue("Should not have any space use entries left: " + store, store.isEmpty());
   }
 }