You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/06/30 07:20:04 UTC

[hbase] branch branch-2 updated: HBASE-24289 Heterogeneous Storage for Date Tiered Compaction (#1730)

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 2a12fd2  HBASE-24289 Heterogeneous Storage for Date Tiered Compaction (#1730)
2a12fd2 is described below

commit 2a12fd283e0e0caa4d8441fc50ee455aa33fb123
Author: pengmq1 <pm...@qq.com>
AuthorDate: Tue Jun 30 15:10:04 2020 +0800

    HBASE-24289 Heterogeneous Storage for Date Tiered Compaction (#1730)
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 ...rogeneous Storage for Date Tiered Compaction.md | 125 ++++++++++++++
 .../java/org/apache/hadoop/hbase/HConstants.java   |   5 +
 .../regionserver/AbstractMultiFileWriter.java      |   4 +
 .../regionserver/DateTieredMultiFileWriter.java    |  14 +-
 .../hbase/regionserver/DateTieredStoreEngine.java  |   4 +-
 .../apache/hadoop/hbase/regionserver/HStore.java   |  22 ++-
 .../hadoop/hbase/regionserver/StoreFileWriter.java |  22 +++
 .../compactions/AbstractMultiOutputCompactor.java  |   6 +
 .../compactions/CompactionConfiguration.java       |  51 ++++++
 .../hbase/regionserver/compactions/Compactor.java  |  10 +-
 .../compactions/DateTieredCompactionPolicy.java    |  52 +++++-
 .../compactions/DateTieredCompactionRequest.java   |  14 +-
 .../compactions/DateTieredCompactor.java           |   3 +
 .../AbstractTestDateTieredCompactionPolicy.java    |  29 +++-
 ...TieredCompactionPolicyHeterogeneousStorage.java | 189 +++++++++++++++++++++
 .../compactions/TestDateTieredCompactor.java       |  15 +-
 .../compactions/TestStripeCompactionPolicy.java    |  17 +-
 .../compactions/TestStripeCompactor.java           |   9 +-
 18 files changed, 553 insertions(+), 38 deletions(-)

diff --git a/dev-support/design-docs/HBASE-24289-Heterogeneous Storage for Date Tiered Compaction.md b/dev-support/design-docs/HBASE-24289-Heterogeneous Storage for Date Tiered Compaction.md
new file mode 100644
index 0000000..1a34455
--- /dev/null
+++ b/dev-support/design-docs/HBASE-24289-Heterogeneous Storage for Date Tiered Compaction.md	
@@ -0,0 +1,125 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Heterogeneous Storage for Date Tiered Compaction
+
+## Objective
+
+Support DateTiredCompaction([HBASE-15181](https://issues.apache.org/jira/browse/HBASE-15181))
+ for cold and hot data separation, support different storage policies for different time periods
+ of data to get better performance, for example, we can configure the data of last 1 month in SSD,
+ and 1 month ago data was in HDD.
+
++ Date Tiered Compaction (DTCP) is based on date tiering (date-aware), we hope to support
+  the separation of cold and hot data, heterogeneous storage. Set different storage
+  policies (in HDFS) for data in different time windows.
++ DTCP designs different windows, and we can classify the windows according to
+  the timestamps of the windows. For example: HOT window, WARM window, COLD window.
++ DTCP divides storefiles into different windows, and performs minor Compaction within
+  a time window. The storefile generated by Compaction will use the storage strategy of
+  this window. For example, if a window is a HOT window, the storefile generated by compaction
+  can be stored on the SSD. There are already WAL and the entire CF support storage policy
+  (HBASE-12848, HBASE-14061), our goal is to achieve cold and hot separation in one CF or
+  a region, using different storage policies.
+
+## Definition of hot and cold data
+
+Usually the data of the last 3 days can be defined as `HOT data`, hot age = 3 days.
+ If the written timestamp of the data(Cell) is > (timestamp now - hot age), we think the data is hot data.
+ Warm age can be defined in the same way. Only one type of data is allowed.
+ If data timestamp < (now - warm age), we consider it is COLD.
+ ```
+  if timestamp >= (now - hot age) , HOT data
+  else if timestamp >= (now - warm age), WARM data
+  else COLD data
+```
+
+## Time window
+When given a time now, it is the time when the compaction occurs. Each window and the size of
+ the window are automatically calculated by DTCP, and the window boundary is rounded according
+ to the base size.
+Assuming that the base window size is 1 hour, and each tier has 3 windows, the current time is
+ between 12:00 and 13:00. We have defined three types of winow (`HOT, WARM, COLD`). The type of
+ winodw is determined by the timestamp at the beginning of the window and the timestamp now.
+As shown in the figure 1 below, the type of each window can be determined by the age range
+ (hot / warm / cold) where (now - window.startTimestamp) falls. Cold age can not need to be set,
+ the default Long.MAX, meaning that the window with a very early time stamp belongs to the
+ cold window.
+![figure 1](https://raw.githubusercontent.com/pengmq1/images/master/F1-HDTCP.png "figure 1")
+
+## Example configuration
+
+| Configuration Key | value | Note |
+|:---|:---:|:---|
+|hbase.hstore.compaction.date.tiered.storage.policy.enable|true|if or not use storage policy for window. Default is false|
+|hbase.hstore.compaction.date.tiered.hot.window.age.millis|3600000|hot data age
+|hbase.hstore.compaction.date.tiered.hot.window.storage.policy|ALL_SSD|hot data storage policy, Corresponding HDFS storage policy
+|hbase.hstore.compaction.date.tiered.warm.window.age.millis|20600000||
+|hbase.hstore.compaction.date.tiered.warm.window.storage.policy|ONE_SSD||
+|hbase.hstore.compaction.date.tiered.cold.window.storage.policy|HOT||
+
+The original date tiered compaction related configuration has the same meaning and maintains
+ compatibility.
+If `hbase.hstore.compaction.date.tiered.storage.policy.enable = false`. DTCP still follows the
+ original logic and has not changed.
+
+## Storage strategy
+HDFS provides the following storage policies, you can refer to
+ https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
+
+|Policy ID | Policy Name | Block Placement (3  replicas)|
+|:---|:---|:---|
+|15|Lasy_Persist|RAM_DISK: 1, DISK: 2|
+|12|All_SSD|SSD: 3|
+|10|One_SSD|SSD: 1, DISK: 2|
+|7|Hot (default)|DISK: 3|
+|5|Warm|DISK: 1, ARCHIVE: 2|
+|2|Cold|ARCHIVE: 3|
+
+Date Tiered Compaction (DTCP) supports the output of multiple storefiles. We hope that these
+ storefiles can be set with different storage policies (in HDFS).
+ Therefore, through DateTieredMultiFileWriter to generate different StoreFileWriters with
+  storage policy to achieve the purpose.
+
+## Why use different child tmp dir
+Before StoreFileWriter writes a storefile, we can create different dirs in the tmp directory
+ of the region and set the corresponding storage policy for these dirs. This way
+ StoreFileWriter can write files to different dirs.
+
+Since **HDFS** does not support the create file with the storage policy parameter
+ (See https://issues.apache.org/jira/browse/HDFS-13209 and now not support on hadoop 2.x),
+ and HDFS cannot set a storage policy for a file / dir path that does not yet exist.
+ When the compaction ends, the storefile path must exist at this time, and I set the
+ storage policy to Storefile.
+
+But, in HDFS, when the file is written first, and then the storage policy is set.
+ The actual storage location of the data does not match the storage policy. For example,
+ write three copies of a file (1 block) in the HDD, then set storage policy is ALL_SSD,
+ but the data block will not be moved to the SSD immediately.
+ “HDFS wont move the file content across different block volumes on rename”. Data movement
+ requires the HDFS mover tool, or use HDFS SPS
+ (for details, see https://issues.apache.org/jira/browse/HDFS-10285), so in order to
+ avoid moving data blocks at the HDFS level, we can set the file parent directory to
+ the storage policy we need before writing data. The new file automatically inherits the
+ storage policy of the parent directory, and is written according to the correct disk
+ type when writing. So as to avoid later data movement.
+
+Over time, the original HOT data will become WARM / COLD and no longer belong to the
+ HOT window. When the compaction occurs again, the data will be automatically downgraded,
+ such as from SSD to HDD. The compaction mechanism will generate a new file (write into HDD)
+ and delete it Old file (SSD).
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 6194e32..57c81df 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -66,6 +66,11 @@ public final class HConstants {
   public static final String RECOVERED_HFILES_DIR = "recovered.hfiles";
 
   /**
+   * Date Tiered Compaction tmp dir prefix name if use storage policy
+   */
+  public static final String STORAGE_POLICY_PREFIX = "storage_policy_";
+
+  /**
    * The first four bytes of Hadoop RPC connections
    */
   public static final byte[] RPC_HEADER = new byte[] { 'H', 'B', 'a', 's' };
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
index f9cc400..f250304 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
@@ -43,6 +43,10 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen
 
   public interface WriterFactory {
     public StoreFileWriter createWriter() throws IOException;
+    default StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
+        throws IOException {
+      return createWriter();
+    };
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
index a13e4e7..8201cb1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
@@ -38,15 +38,20 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
 
   private final boolean needEmptyFile;
 
+  private final Map<Long, String> lowerBoundariesPolicies;
+
   /**
+   * @param lowerBoundariesPolicies each window to storage policy map.
    * @param needEmptyFile whether need to create an empty store file if we haven't written out
    *          anything.
    */
-  public DateTieredMultiFileWriter(List<Long> lowerBoundaries, boolean needEmptyFile) {
+  public DateTieredMultiFileWriter(List<Long> lowerBoundaries,
+      Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile) {
     for (Long lowerBoundary : lowerBoundaries) {
       lowerBoundary2Writer.put(lowerBoundary, null);
     }
     this.needEmptyFile = needEmptyFile;
+    this.lowerBoundariesPolicies = lowerBoundariesPolicies;
   }
 
   @Override
@@ -54,7 +59,12 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
     Map.Entry<Long, StoreFileWriter> entry = lowerBoundary2Writer.floorEntry(cell.getTimestamp());
     StoreFileWriter writer = entry.getValue();
     if (writer == null) {
-      writer = writerFactory.createWriter();
+      String lowerBoundaryStoragePolicy = lowerBoundariesPolicies.get(entry.getKey());
+      if (lowerBoundaryStoragePolicy != null) {
+        writer = writerFactory.createWriterWithStoragePolicy(lowerBoundaryStoragePolicy);
+      } else {
+        writer = writerFactory.createWriter();
+      }
       lowerBoundary2Writer.put(entry.getKey(), writer);
     }
     writer.append(cell);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
index daae083..1df953d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
@@ -93,7 +93,9 @@ public class DateTieredStoreEngine extends StoreEngine<DefaultStoreFlusher,
     public List<Path> compact(ThroughputController throughputController, User user)
         throws IOException {
       if (request instanceof DateTieredCompactionRequest) {
-        return compactor.compact(request, ((DateTieredCompactionRequest) request).getBoundaries(),
+        DateTieredCompactionRequest compactionRequest = (DateTieredCompactionRequest) request;
+        return compactor.compact(request, compactionRequest.getBoundaries(),
+          compactionRequest.getBoundariesPolicies(),
           throughputController, user);
       } else {
         throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: "
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index f193d8b..4e3c5ba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -99,6 +99,7 @@ import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -1162,7 +1163,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
     boolean shouldDropBehind) throws IOException {
     return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
-      includesTag, shouldDropBehind, -1);
+      includesTag, shouldDropBehind, -1, HConstants.EMPTY_STRING);
   }
 
   /**
@@ -1176,7 +1177,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
   // compaction
   public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
       boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
-      boolean shouldDropBehind, long totalCompactedFilesSize) throws IOException {
+      boolean shouldDropBehind, long totalCompactedFilesSize, String fileStoragePolicy)
+        throws IOException {
     // creating new cache config for each new writer
     final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
     if (isCompaction) {
@@ -1233,7 +1235,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
             .withFavoredNodes(favoredNodes)
             .withFileContext(hFileContext)
             .withShouldDropCacheBehind(shouldDropBehind)
-            .withCompactedFilesSupplier(this::getCompactedFiles);
+            .withCompactedFilesSupplier(this::getCompactedFiles)
+            .withFileStoragePolicy(fileStoragePolicy);
     return builder.build();
   }
 
@@ -1554,6 +1557,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
       Collection<HStoreFile> filesToCompact, User user, long compactionStartTime,
       List<Path> newFiles) throws IOException {
     // Do the steps necessary to complete the compaction.
+    setStoragePolicyFromFileName(newFiles);
     List<HStoreFile> sfs = moveCompactedFilesIntoPlace(cr, newFiles, user);
     writeCompactionWalRecord(filesToCompact, sfs);
     replaceStoreFiles(filesToCompact, sfs);
@@ -1583,6 +1587,18 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     return sfs;
   }
 
+  // Set correct storage policy from the file name of DTCP.
+  // Rename file will not change the storage policy.
+  private void setStoragePolicyFromFileName(List<Path> newFiles) throws IOException {
+    String prefix = HConstants.STORAGE_POLICY_PREFIX;
+    for (Path newFile : newFiles) {
+      if (newFile.getParent().getName().startsWith(prefix)) {
+        CommonFSUtils.setStoragePolicy(fs.getFileSystem(), newFile,
+            newFile.getParent().getName().substring(prefix.length()));
+      }
+    }
+  }
+
   private List<HStoreFile> moveCompactedFilesIntoPlace(CompactionRequestImpl cr,
       List<Path> newFiles, User user) throws IOException {
     List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
index 01fdeba..a4084cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
@@ -62,6 +62,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.base.Strings;
+import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 
@@ -420,6 +422,7 @@ public class StoreFileWriter implements CellSink, ShipperListener {
     private HFileContext fileContext;
     private boolean shouldDropCacheBehind;
     private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet();
+    private String fileStoragePolicy;
 
     public Builder(Configuration conf, CacheConfig cacheConf,
         FileSystem fs) {
@@ -501,6 +504,11 @@ public class StoreFileWriter implements CellSink, ShipperListener {
       return this;
     }
 
+    public Builder withFileStoragePolicy(String fileStoragePolicy) {
+      this.fileStoragePolicy = fileStoragePolicy;
+      return this;
+    }
+
     /**
      * Create a store file writer. Client is responsible for closing file when
      * done. If metadata, add BEFORE closing using
@@ -530,6 +538,20 @@ public class StoreFileWriter implements CellSink, ShipperListener {
       CommonFSUtils.setStoragePolicy(this.fs, dir, policyName);
 
       if (filePath == null) {
+        // The stored file and related blocks will used the directory based StoragePolicy.
+        // Because HDFS DistributedFileSystem does not support create files with storage policy
+        // before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files
+        // satisfy the specific storage policy when writing. So as to avoid later data movement.
+        // We don't want to change whole temp dir to 'fileStoragePolicy'.
+        if (!Strings.isNullOrEmpty(fileStoragePolicy)) {
+          dir = new Path(dir, HConstants.STORAGE_POLICY_PREFIX + fileStoragePolicy);
+          if (!fs.exists(dir)) {
+            HRegionFileSystem.mkdirs(fs, conf, dir);
+            LOG.info(
+              "Create tmp dir " + dir.toString() + " with storage policy: " + fileStoragePolicy);
+          }
+          CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy);
+        }
         filePath = getUniqueFile(fs, dir);
         if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
           bloomType = BloomType.NONE;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
index a8ffc2e..f2816d8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
@@ -53,6 +53,12 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
       public StoreFileWriter createWriter() throws IOException {
         return createTmpWriter(fd, shouldDropBehind);
       }
+
+      @Override
+      public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
+          throws IOException {
+        return createTmpWriter(fd, shouldDropBehind, fileStoragePolicy);
+      }
     };
     // Prepare multi-writer, and perform the compaction using scanner and writer.
     // It is ok here if storeScanner is null.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
index f9158c5..8ad1d3a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
@@ -90,6 +90,20 @@ public class CompactionConfiguration {
   private static final Class<? extends CompactionWindowFactory>
     DEFAULT_DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS = ExponentialCompactionWindowFactory.class;
 
+  public static final String DATE_TIERED_STORAGE_POLICY_ENABLE_KEY =
+    "hbase.hstore.compaction.date.tiered.storage.policy.enable";
+  public static final String DATE_TIERED_HOT_WINDOW_AGE_MILLIS_KEY =
+    "hbase.hstore.compaction.date.tiered.hot.window.age.millis";
+  public static final String DATE_TIERED_HOT_WINDOW_STORAGE_POLICY_KEY =
+    "hbase.hstore.compaction.date.tiered.hot.window.storage.policy";
+  public static final String DATE_TIERED_WARM_WINDOW_AGE_MILLIS_KEY =
+    "hbase.hstore.compaction.date.tiered.warm.window.age.millis";
+  public static final String DATE_TIERED_WARM_WINDOW_STORAGE_POLICY_KEY =
+    "hbase.hstore.compaction.date.tiered.warm.window.storage.policy";
+  /** Windows older than warm age belong to COLD_WINDOW **/
+  public static final String DATE_TIERED_COLD_WINDOW_STORAGE_POLICY_KEY =
+    "hbase.hstore.compaction.date.tiered.cold.window.storage.policy";
+
   Configuration conf;
   StoreConfigInformation storeConfigInfo;
 
@@ -111,6 +125,12 @@ public class CompactionConfiguration {
   private final String compactionPolicyForDateTieredWindow;
   private final boolean dateTieredSingleOutputForMinorCompaction;
   private final String dateTieredCompactionWindowFactory;
+  private final boolean dateTieredStoragePolicyEnable;
+  private long hotWindowAgeMillis;
+  private long warmWindowAgeMillis;
+  private String hotWindowStoragePolicy;
+  private String warmWindowStoragePolicy;
+  private String coldWindowStoragePolicy;
 
   CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
     this.conf = conf;
@@ -145,6 +165,13 @@ public class CompactionConfiguration {
     this.dateTieredCompactionWindowFactory = conf.get(
       DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS_KEY,
       DEFAULT_DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS.getName());
+    // for Heterogeneous Storage
+    dateTieredStoragePolicyEnable = conf.getBoolean(DATE_TIERED_STORAGE_POLICY_ENABLE_KEY, false);
+    hotWindowAgeMillis = conf.getLong(DATE_TIERED_HOT_WINDOW_AGE_MILLIS_KEY, 86400000L);
+    hotWindowStoragePolicy = conf.get(DATE_TIERED_HOT_WINDOW_STORAGE_POLICY_KEY, "ALL_SSD");
+    warmWindowAgeMillis = conf.getLong(DATE_TIERED_WARM_WINDOW_AGE_MILLIS_KEY, 604800000L);
+    warmWindowStoragePolicy = conf.get(DATE_TIERED_WARM_WINDOW_STORAGE_POLICY_KEY, "ONE_SSD");
+    coldWindowStoragePolicy = conf.get(DATE_TIERED_COLD_WINDOW_STORAGE_POLICY_KEY, "HOT");
     LOG.info(toString());
   }
 
@@ -291,4 +318,28 @@ public class CompactionConfiguration {
   public String getDateTieredCompactionWindowFactory() {
     return dateTieredCompactionWindowFactory;
   }
+
+  public boolean isDateTieredStoragePolicyEnable() {
+    return dateTieredStoragePolicyEnable;
+  }
+
+  public long getHotWindowAgeMillis() {
+    return hotWindowAgeMillis;
+  }
+
+  public long getWarmWindowAgeMillis() {
+    return warmWindowAgeMillis;
+  }
+
+  public String getHotWindowStoragePolicy() {
+    return hotWindowStoragePolicy.trim().toUpperCase();
+  }
+
+  public String getWarmWindowStoragePolicy() {
+    return warmWindowStoragePolicy.trim().toUpperCase();
+  }
+
+  public String getColdWindowStoragePolicy() {
+    return coldWindowStoragePolicy.trim().toUpperCase();
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 10fac55..be0a98e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -268,7 +268,15 @@ public abstract class Compactor<T extends CellSink> {
     // See HBASE-8166, HBASE-12600, and HBASE-13389.
     return store
       .createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint > 0,
-        fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize);
+        fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize,
+        HConstants.EMPTY_STRING);
+  }
+
+  protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind,
+      String fileStoragePolicy) throws IOException {
+    return store
+      .createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint > 0,
+        fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize, fileStoragePolicy);
   }
 
   private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
index 58969bf..1cc7dda 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
@@ -22,7 +22,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.OptionalLong;
 
 import org.apache.hadoop.conf.Configuration;
@@ -198,8 +200,10 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
 
   public CompactionRequestImpl selectMajorCompaction(ArrayList<HStoreFile> candidateSelection) {
     long now = EnvironmentEdgeManager.currentTime();
+    List<Long> boundaries = getCompactBoundariesForMajor(candidateSelection, now);
+    Map<Long, String> boundariesPolicies = getBoundariesStoragePolicyForMajor(boundaries, now);
     return new DateTieredCompactionRequest(candidateSelection,
-      this.getCompactBoundariesForMajor(candidateSelection, now));
+      boundaries, boundariesPolicies);
   }
 
   /**
@@ -253,7 +257,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
             LOG.debug("Processing files: " + fileList + " for window: " + window);
           }
           DateTieredCompactionRequest request = generateCompactionRequest(fileList, window,
-            mayUseOffPeak, mayBeStuck, minThreshold);
+            mayUseOffPeak, mayBeStuck, minThreshold, now);
           if (request != null) {
             return request;
           }
@@ -265,8 +269,8 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
   }
 
   private DateTieredCompactionRequest generateCompactionRequest(ArrayList<HStoreFile> storeFiles,
-      CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold)
-      throws IOException {
+      CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold,
+      long now) throws IOException {
     // The files has to be in ascending order for ratio-based compaction to work right
     // and removeExcessFile to exclude youngest files.
     Collections.reverse(storeFiles);
@@ -281,8 +285,11 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
       boolean singleOutput = storeFiles.size() != storeFileSelection.size() ||
         comConf.useDateTieredSingleOutputForMinorCompaction();
       List<Long> boundaries = getCompactionBoundariesForMinor(window, singleOutput);
+      // we want to generate policy to boundaries for minor compaction
+      Map<Long, String> boundaryPolicyMap =
+        getBoundariesStoragePolicyForMinor(singleOutput, window, now);
       DateTieredCompactionRequest result = new DateTieredCompactionRequest(storeFileSelection,
-        boundaries);
+        boundaries, boundaryPolicyMap);
       return result;
     }
     return null;
@@ -334,4 +341,39 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
       return Long.MIN_VALUE;
     }
   }
+
+  private Map<Long, String> getBoundariesStoragePolicyForMinor(boolean singleOutput,
+      CompactionWindow window, long now) {
+    Map<Long, String> boundariesPolicy = new HashMap<>();
+    if (!comConf.isDateTieredStoragePolicyEnable()) {
+      return boundariesPolicy;
+    }
+    String windowStoragePolicy = getWindowStoragePolicy(now, window.startMillis());
+    if (singleOutput) {
+      boundariesPolicy.put(Long.MIN_VALUE, windowStoragePolicy);
+    } else {
+      boundariesPolicy.put(window.startMillis(), windowStoragePolicy);
+    }
+    return boundariesPolicy;
+  }
+
+  private Map<Long, String> getBoundariesStoragePolicyForMajor(List<Long> boundaries, long now) {
+    Map<Long, String> boundariesPolicy = new HashMap<>();
+    if (!comConf.isDateTieredStoragePolicyEnable()) {
+      return boundariesPolicy;
+    }
+    for (Long startTs : boundaries) {
+      boundariesPolicy.put(startTs, getWindowStoragePolicy(now, startTs));
+    }
+    return boundariesPolicy;
+  }
+
+  private String getWindowStoragePolicy(long now, long windowStartMillis) {
+    if (windowStartMillis >= (now - comConf.getHotWindowAgeMillis())) {
+      return comConf.getHotWindowStoragePolicy();
+    } else if (windowStartMillis >= (now - comConf.getWarmWindowAgeMillis())) {
+      return comConf.getWarmWindowStoragePolicy();
+    }
+    return comConf.getColdWindowStoragePolicy();
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java
index 37b7059..ddf9a0c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver.compactions;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -28,18 +29,27 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public class DateTieredCompactionRequest extends CompactionRequestImpl {
   private List<Long> boundaries;
+  /** window start boundary to window storage policy map **/
+  private Map<Long, String> boundariesPolicies;
 
-  public DateTieredCompactionRequest(Collection<HStoreFile> files, List<Long> boundaryList) {
+  public DateTieredCompactionRequest(Collection<HStoreFile> files, List<Long> boundaryList,
+      Map<Long, String> boundaryPolicyMap) {
     super(files);
     boundaries = boundaryList;
+    boundariesPolicies = boundaryPolicyMap;
   }
 
   public List<Long> getBoundaries() {
     return boundaries;
   }
 
+  public Map<Long, String> getBoundariesPolicies() {
+    return boundariesPolicies;
+  }
+
   @Override
   public String toString() {
-    return super.toString() + " boundaries=" + Arrays.toString(boundaries.toArray());
+    return super.toString() + " boundaries=" + Arrays.toString(boundaries.toArray())
+      + " boundariesPolicies="+boundariesPolicies.toString();
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
index 1bf5236..ef64df1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.compactions;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.OptionalLong;
 
 import org.apache.hadoop.conf.Configuration;
@@ -55,6 +56,7 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
   }
 
   public List<Path> compact(final CompactionRequestImpl request, final List<Long> lowerBoundaries,
+      final Map<Long, String> lowerBoundariesPolicies,
       ThroughputController throughputController, User user) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Executing compaction with " + lowerBoundaries.size()
@@ -68,6 +70,7 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
         public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
             boolean shouldDropBehind) throws IOException {
           DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(lowerBoundaries,
+              lowerBoundariesPolicies,
               needEmptyFile(request));
           initMultiWriter(writer, scanner, fd, shouldDropBehind);
           return writer;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java
index 538294e..15a59ab 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
@@ -57,12 +58,21 @@ public class AbstractTestDateTieredCompactionPolicy extends TestCompactionPolicy
 
   protected void compactEquals(long now, ArrayList<HStoreFile> candidates, long[] expectedFileSizes,
       long[] expectedBoundaries, boolean isMajor, boolean toCompact) throws IOException {
+    DateTieredCompactionRequest request = getRequest(now, candidates, isMajor, toCompact);
+    List<HStoreFile> actual = Lists.newArrayList(request.getFiles());
+    assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual)));
+    assertEquals(Arrays.toString(expectedBoundaries),
+      Arrays.toString(request.getBoundaries().toArray()));
+  }
+
+  private DateTieredCompactionRequest getRequest(long now, ArrayList<HStoreFile> candidates,
+      boolean isMajor, boolean toCompact) throws IOException {
     ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
     EnvironmentEdgeManager.injectEdge(timeMachine);
     timeMachine.setValue(now);
     DateTieredCompactionRequest request;
     DateTieredCompactionPolicy policy =
-        (DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy();
+      (DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy();
     if (isMajor) {
       for (HStoreFile file : candidates) {
         ((MockHStoreFile) file).setIsMajor(true);
@@ -72,11 +82,18 @@ public class AbstractTestDateTieredCompactionPolicy extends TestCompactionPolicy
     } else {
       assertEquals(toCompact, policy.needsCompaction(candidates, ImmutableList.of()));
       request =
-          (DateTieredCompactionRequest) policy.selectMinorCompaction(candidates, false, false);
+        (DateTieredCompactionRequest) policy.selectMinorCompaction(candidates, false, false);
+    }
+    return request;
+  }
+
+  protected void compactEqualsStoragePolicy(long now, ArrayList<HStoreFile> candidates,
+      Map<Long, String> expectedBoundariesPolicies, boolean isMajor, boolean toCompact)
+      throws IOException {
+    DateTieredCompactionRequest request = getRequest(now, candidates, isMajor, toCompact);
+    Map<Long, String> boundariesPolicies = request.getBoundariesPolicies();
+    for (Map.Entry<Long, String> entry : expectedBoundariesPolicies.entrySet()) {
+      assertEquals(entry.getValue(), boundariesPolicies.get(entry.getKey()));
     }
-    List<HStoreFile> actual = Lists.newArrayList(request.getFiles());
-    assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual)));
-    assertEquals(Arrays.toString(expectedBoundaries),
-      Arrays.toString(request.getBoundaries().toArray()));
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyHeterogeneousStorage.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyHeterogeneousStorage.java
new file mode 100644
index 0000000..74210e6
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyHeterogeneousStorage.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.regionserver.compactions.ExponentialCompactionWindowFactory;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestDateTieredCompactionPolicyHeterogeneousStorage
+    extends AbstractTestDateTieredCompactionPolicy {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestDateTieredCompactionPolicyHeterogeneousStorage.class);
+  public static final String HOT_WINDOW_SP = "ALL_SSD";
+  public static final String WARM_WINDOW_SP = "ONE_SSD";
+  public static final String COLD_WINDOW_SP = "HOT";
+
+  @Override
+  protected void config() {
+    super.config();
+
+    // Set up policy
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY,
+      "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine");
+    conf.setLong(CompactionConfiguration.DATE_TIERED_MAX_AGE_MILLIS_KEY, 100);
+    conf.setLong(CompactionConfiguration.DATE_TIERED_INCOMING_WINDOW_MIN_KEY, 3);
+    conf.setLong(ExponentialCompactionWindowFactory.BASE_WINDOW_MILLIS_KEY, 6);
+    conf.setInt(ExponentialCompactionWindowFactory.WINDOWS_PER_TIER_KEY, 4);
+    conf.setBoolean(CompactionConfiguration.DATE_TIERED_SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY,
+      false);
+
+    // Special settings for compaction policy per window
+    this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2);
+    this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 12);
+    this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F);
+
+    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 20);
+    conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 5);
+
+    // Set Storage Policy for different type window
+    conf.setBoolean(CompactionConfiguration.DATE_TIERED_STORAGE_POLICY_ENABLE_KEY, true);
+    conf.setLong(CompactionConfiguration.DATE_TIERED_HOT_WINDOW_AGE_MILLIS_KEY, 6);
+    conf.set(CompactionConfiguration.DATE_TIERED_HOT_WINDOW_STORAGE_POLICY_KEY, HOT_WINDOW_SP);
+    conf.setLong(CompactionConfiguration.DATE_TIERED_WARM_WINDOW_AGE_MILLIS_KEY, 12);
+    conf.set(CompactionConfiguration.DATE_TIERED_WARM_WINDOW_STORAGE_POLICY_KEY, WARM_WINDOW_SP);
+    conf.set(CompactionConfiguration.DATE_TIERED_COLD_WINDOW_STORAGE_POLICY_KEY, COLD_WINDOW_SP);
+  }
+
+  /**
+   * Test for minor compaction of incoming window.
+   * Incoming window start ts >= now - hot age. So it is HOT window, will use HOT_WINDOW_SP.
+   * @throws IOException with error
+   */
+  @Test
+  public void testIncomingWindowHot() throws IOException {
+    long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
+    long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
+    long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 };
+    Map<Long, String> expected = new HashMap<>();
+    // expected DateTieredCompactionRequest boundaries = { Long.MIN_VALUE, 12 }
+    // test whether DateTieredCompactionRequest boundariesPolicies matches expected
+    expected.put(12L, HOT_WINDOW_SP);
+    compactEqualsStoragePolicy(16, sfCreate(minTimestamps, maxTimestamps, sizes),
+      expected, false, true);
+  }
+
+  /**
+   * Test for not incoming window.
+   * now - hot age > window start >= now - warm age,
+   * so this window and is WARM window, will use WARM_WINDOW_SP
+   * @throws IOException with error
+   */
+  @Test
+  public void testNotIncomingWindowWarm() throws IOException {
+    long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
+    long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 };
+    long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11 };
+    Map<Long, String> expected = new HashMap<>();
+    // expected DateTieredCompactionRequest boundaries = { Long.MIN_VALUE, 6 }
+    expected.put(6L, WARM_WINDOW_SP);
+    compactEqualsStoragePolicy(16, sfCreate(minTimestamps, maxTimestamps, sizes),
+      expected, false, true);
+  }
+
+  /**
+   * Test for not incoming window.
+   * this window start ts >= ow - hot age,
+   * So this incoming window and is HOT window. Use HOT_WINDOW_SP
+   * @throws IOException with error
+   */
+  @Test
+  public void testNotIncomingWindowAndIsHot() throws IOException {
+    long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
+    long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 };
+    long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11 };
+    Map<Long, String> expected = new HashMap<>();
+    // expected DateTieredCompactionRequest boundaries = { Long.MIN_VALUE, 6 }
+    expected.put(6L, HOT_WINDOW_SP);
+    compactEqualsStoragePolicy(12, sfCreate(minTimestamps, maxTimestamps, sizes),
+      expected, false, true);
+  }
+
+  /**
+   * Test for not incoming window.
+   * COLD window start timestamp < now - warm age, so use COLD_WINDOW_SP
+   * @throws IOException with error
+   */
+  @Test
+  public void testColdWindow() throws IOException {
+    long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
+    long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
+    long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10 };
+    Map<Long, String> expected = new HashMap<>();
+    // expected DateTieredCompactionRequest boundaries = { Long.MIN_VALUE, 6 }
+    expected.put(6L, COLD_WINDOW_SP);
+    compactEqualsStoragePolicy(22, sfCreate(minTimestamps, maxTimestamps, sizes),
+      expected, false, true);
+  }
+
+  /**
+   * Test for not incoming window. but not all hfiles will be selected to compact.
+   * Apply exploring logic on non-incoming window. More than one hfile left in this window.
+   * this means minor compact single out is true. boundaries only contains Long.MIN_VALUE
+   * @throws IOException with error
+   */
+  @Test
+  public void testRatioT0() throws IOException {
+    long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
+    long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
+    long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 280, 23, 24, 1 };
+    Map<Long, String> expected = new HashMap<>();
+    // window start = 6, expected DateTieredCompactionRequest boundaries = { Long.MIN_VALUE }
+    expected.put(Long.MIN_VALUE, WARM_WINDOW_SP);
+    compactEqualsStoragePolicy(16, sfCreate(minTimestamps, maxTimestamps, sizes),
+      expected, false, true);
+  }
+
+  /**
+   * Test for Major compaction. It will compact all files and create multi output files
+   * with different window storage policy.
+   * @throws IOException with error
+   */
+  @Test
+  public void testMajorCompation() throws IOException {
+    long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
+    long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 105, 106, 113, 145, 157 };
+    long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 };
+    Map<Long, String> expected = new HashMap<>();
+    expected.put(Long.MIN_VALUE, COLD_WINDOW_SP);
+    expected.put(24L, COLD_WINDOW_SP);
+    expected.put(48L, COLD_WINDOW_SP);
+    expected.put(72L, COLD_WINDOW_SP);
+    expected.put(96L, COLD_WINDOW_SP);
+    expected.put(120L, COLD_WINDOW_SP);
+    expected.put(144L, COLD_WINDOW_SP);
+    expected.put(150L, WARM_WINDOW_SP);
+    expected.put(156L, HOT_WINDOW_SP);
+    compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes),
+      new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 },
+      new long[] { Long.MIN_VALUE, 24, 48, 72, 96, 120, 144, 150, 156 }, true, true);
+    compactEqualsStoragePolicy(161, sfCreate(minTimestamps, maxTimestamps, sizes),
+      expected,true, true);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
index 812ee4b..92ba76d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
@@ -21,15 +21,17 @@ import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.cre
 import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyStoreFile;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyLong;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.OptionalLong;
 import org.apache.hadoop.conf.Configuration;
@@ -109,7 +111,7 @@ public class TestDateTieredCompactor {
     when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
       anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
     when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
-      anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers);
+      anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
     when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
     OptionalLong maxSequenceId = StoreUtils.getMaxSequenceIdInList(storefiles);
     when(store.getMaxSequenceId()).thenReturn(maxSequenceId);
@@ -138,7 +140,8 @@ public class TestDateTieredCompactor {
     HStoreFile sf2 = createDummyStoreFile(2L);
     DateTieredCompactor dtc = createCompactor(writers, input, Arrays.asList(sf1, sf2));
     List<Path> paths = dtc.compact(new CompactionRequestImpl(Arrays.asList(sf1)),
-      boundaries.subList(0, boundaries.size() - 1), NoLimitThroughputController.INSTANCE, null);
+      boundaries.subList(0, boundaries.size() - 1), new HashMap<Long, String>(),
+      NoLimitThroughputController.INSTANCE, null);
     writers.verifyKvs(output, allFiles, boundaries);
     if (allFiles) {
       assertEquals(output.length, paths.size());
@@ -167,7 +170,7 @@ public class TestDateTieredCompactor {
     DateTieredCompactor dtc = createCompactor(writers, new KeyValue[0],
       new ArrayList<>(request.getFiles()));
     List<Path> paths = dtc.compact(request, Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE),
-      NoLimitThroughputController.INSTANCE, null);
+      new HashMap<Long, String>(), NoLimitThroughputController.INSTANCE, null);
     assertEquals(1, paths.size());
     List<StoreFileWritersCapture.Writer> dummyWriters = writers.getWriters();
     assertEquals(1, dummyWriters.size());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index 3d221a1..e2bd257 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -23,13 +23,14 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.AdditionalMatchers.aryEq;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.only;
 import static org.mockito.Mockito.times;
@@ -796,7 +797,7 @@ public class TestStripeCompactionPolicy {
         anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
     when(
       store.createWriterInTmp(anyLong(), any(), anyBoolean(),
-        anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers);
+        anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
 
     Configuration conf = HBaseConfiguration.create();
     conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
index 0221274..6e8b19f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
@@ -20,9 +20,10 @@ package org.apache.hadoop.hbase.regionserver.compactions;
 import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
 import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyLong;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -209,7 +210,7 @@ public class TestStripeCompactor {
     when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
       anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
     when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
-      anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers);
+      anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
     when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
 
     return new StripeCompactor(conf, store) {