You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/06/30 07:20:04 UTC
[hbase] branch branch-2 updated: HBASE-24289 Heterogeneous Storage
for Date Tiered Compaction (#1730)
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 2a12fd2 HBASE-24289 Heterogeneous Storage for Date Tiered Compaction (#1730)
2a12fd2 is described below
commit 2a12fd283e0e0caa4d8441fc50ee455aa33fb123
Author: pengmq1 <pm...@qq.com>
AuthorDate: Tue Jun 30 15:10:04 2020 +0800
HBASE-24289 Heterogeneous Storage for Date Tiered Compaction (#1730)
Signed-off-by: Guanghao Zhang <zg...@apache.org>
Signed-off-by: Duo Zhang <zh...@apache.org>
---
...rogeneous Storage for Date Tiered Compaction.md | 125 ++++++++++++++
.../java/org/apache/hadoop/hbase/HConstants.java | 5 +
.../regionserver/AbstractMultiFileWriter.java | 4 +
.../regionserver/DateTieredMultiFileWriter.java | 14 +-
.../hbase/regionserver/DateTieredStoreEngine.java | 4 +-
.../apache/hadoop/hbase/regionserver/HStore.java | 22 ++-
.../hadoop/hbase/regionserver/StoreFileWriter.java | 22 +++
.../compactions/AbstractMultiOutputCompactor.java | 6 +
.../compactions/CompactionConfiguration.java | 51 ++++++
.../hbase/regionserver/compactions/Compactor.java | 10 +-
.../compactions/DateTieredCompactionPolicy.java | 52 +++++-
.../compactions/DateTieredCompactionRequest.java | 14 +-
.../compactions/DateTieredCompactor.java | 3 +
.../AbstractTestDateTieredCompactionPolicy.java | 29 +++-
...TieredCompactionPolicyHeterogeneousStorage.java | 189 +++++++++++++++++++++
.../compactions/TestDateTieredCompactor.java | 15 +-
.../compactions/TestStripeCompactionPolicy.java | 17 +-
.../compactions/TestStripeCompactor.java | 9 +-
18 files changed, 553 insertions(+), 38 deletions(-)
diff --git a/dev-support/design-docs/HBASE-24289-Heterogeneous Storage for Date Tiered Compaction.md b/dev-support/design-docs/HBASE-24289-Heterogeneous Storage for Date Tiered Compaction.md
new file mode 100644
index 0000000..1a34455
--- /dev/null
+++ b/dev-support/design-docs/HBASE-24289-Heterogeneous Storage for Date Tiered Compaction.md
@@ -0,0 +1,125 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# Heterogeneous Storage for Date Tiered Compaction
+
+## Objective
+
+Support DateTiredCompaction([HBASE-15181](https://issues.apache.org/jira/browse/HBASE-15181))
+ for cold and hot data separation, support different storage policies for different time periods
+ of data to get better performance, for example, we can configure the data of last 1 month in SSD,
+ and 1 month ago data was in HDD.
+
++ Date Tiered Compaction (DTCP) is based on date tiering (date-aware), we hope to support
+ the separation of cold and hot data, heterogeneous storage. Set different storage
+ policies (in HDFS) for data in different time windows.
++ DTCP designs different windows, and we can classify the windows according to
+ the timestamps of the windows. For example: HOT window, WARM window, COLD window.
++ DTCP divides storefiles into different windows, and performs minor Compaction within
+ a time window. The storefile generated by Compaction will use the storage strategy of
+ this window. For example, if a window is a HOT window, the storefile generated by compaction
+ can be stored on the SSD. There are already WAL and the entire CF support storage policy
+ (HBASE-12848, HBASE-14061), our goal is to achieve cold and hot separation in one CF or
+ a region, using different storage policies.
+
+## Definition of hot and cold data
+
+Usually the data of the last 3 days can be defined as `HOT data`, hot age = 3 days.
+ If the written timestamp of the data(Cell) is > (timestamp now - hot age), we think the data is hot data.
+ Warm age can be defined in the same way. Only one type of data is allowed.
+ If data timestamp < (now - warm age), we consider it is COLD.
+ ```
+ if timestamp >= (now - hot age) , HOT data
+ else if timestamp >= (now - warm age), WARM data
+ else COLD data
+```
+
+## Time window
+When given a time now, it is the time when the compaction occurs. Each window and the size of
+ the window are automatically calculated by DTCP, and the window boundary is rounded according
+ to the base size.
+Assuming that the base window size is 1 hour, and each tier has 3 windows, the current time is
+ between 12:00 and 13:00. We have defined three types of winow (`HOT, WARM, COLD`). The type of
+ winodw is determined by the timestamp at the beginning of the window and the timestamp now.
+As shown in the figure 1 below, the type of each window can be determined by the age range
+ (hot / warm / cold) where (now - window.startTimestamp) falls. Cold age can not need to be set,
+ the default Long.MAX, meaning that the window with a very early time stamp belongs to the
+ cold window.
+![figure 1](https://raw.githubusercontent.com/pengmq1/images/master/F1-HDTCP.png "figure 1")
+
+## Example configuration
+
+| Configuration Key | value | Note |
+|:---|:---:|:---|
+|hbase.hstore.compaction.date.tiered.storage.policy.enable|true|if or not use storage policy for window. Default is false|
+|hbase.hstore.compaction.date.tiered.hot.window.age.millis|3600000|hot data age
+|hbase.hstore.compaction.date.tiered.hot.window.storage.policy|ALL_SSD|hot data storage policy, Corresponding HDFS storage policy
+|hbase.hstore.compaction.date.tiered.warm.window.age.millis|20600000||
+|hbase.hstore.compaction.date.tiered.warm.window.storage.policy|ONE_SSD||
+|hbase.hstore.compaction.date.tiered.cold.window.storage.policy|HOT||
+
+The original date tiered compaction related configuration has the same meaning and maintains
+ compatibility.
+If `hbase.hstore.compaction.date.tiered.storage.policy.enable = false`. DTCP still follows the
+ original logic and has not changed.
+
+## Storage strategy
+HDFS provides the following storage policies, you can refer to
+ https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
+
+|Policy ID | Policy Name | Block Placement (3 replicas)|
+|:---|:---|:---|
+|15|Lasy_Persist|RAM_DISK: 1, DISK: 2|
+|12|All_SSD|SSD: 3|
+|10|One_SSD|SSD: 1, DISK: 2|
+|7|Hot (default)|DISK: 3|
+|5|Warm|DISK: 1, ARCHIVE: 2|
+|2|Cold|ARCHIVE: 3|
+
+Date Tiered Compaction (DTCP) supports the output of multiple storefiles. We hope that these
+ storefiles can be set with different storage policies (in HDFS).
+ Therefore, through DateTieredMultiFileWriter to generate different StoreFileWriters with
+ storage policy to achieve the purpose.
+
+## Why use different child tmp dir
+Before StoreFileWriter writes a storefile, we can create different dirs in the tmp directory
+ of the region and set the corresponding storage policy for these dirs. This way
+ StoreFileWriter can write files to different dirs.
+
+Since **HDFS** does not support the create file with the storage policy parameter
+ (See https://issues.apache.org/jira/browse/HDFS-13209 and now not support on hadoop 2.x),
+ and HDFS cannot set a storage policy for a file / dir path that does not yet exist.
+ When the compaction ends, the storefile path must exist at this time, and I set the
+ storage policy to Storefile.
+
+But, in HDFS, when the file is written first, and then the storage policy is set.
+ The actual storage location of the data does not match the storage policy. For example,
+ write three copies of a file (1 block) in the HDD, then set storage policy is ALL_SSD,
+ but the data block will not be moved to the SSD immediately.
+ “HDFS wont move the file content across different block volumes on rename”. Data movement
+ requires the HDFS mover tool, or use HDFS SPS
+ (for details, see https://issues.apache.org/jira/browse/HDFS-10285), so in order to
+ avoid moving data blocks at the HDFS level, we can set the file parent directory to
+ the storage policy we need before writing data. The new file automatically inherits the
+ storage policy of the parent directory, and is written according to the correct disk
+ type when writing. So as to avoid later data movement.
+
+Over time, the original HOT data will become WARM / COLD and no longer belong to the
+ HOT window. When the compaction occurs again, the data will be automatically downgraded,
+ such as from SSD to HDD. The compaction mechanism will generate a new file (write into HDD)
+ and delete it Old file (SSD).
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 6194e32..57c81df 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -66,6 +66,11 @@ public final class HConstants {
public static final String RECOVERED_HFILES_DIR = "recovered.hfiles";
/**
+ * Date Tiered Compaction tmp dir prefix name if use storage policy
+ */
+ public static final String STORAGE_POLICY_PREFIX = "storage_policy_";
+
+ /**
* The first four bytes of Hadoop RPC connections
*/
public static final byte[] RPC_HEADER = new byte[] { 'H', 'B', 'a', 's' };
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
index f9cc400..f250304 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
@@ -43,6 +43,10 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen
public interface WriterFactory {
public StoreFileWriter createWriter() throws IOException;
+ default StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
+ throws IOException {
+ return createWriter();
+ };
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
index a13e4e7..8201cb1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
@@ -38,15 +38,20 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
private final boolean needEmptyFile;
+ private final Map<Long, String> lowerBoundariesPolicies;
+
/**
+ * @param lowerBoundariesPolicies each window to storage policy map.
* @param needEmptyFile whether need to create an empty store file if we haven't written out
* anything.
*/
- public DateTieredMultiFileWriter(List<Long> lowerBoundaries, boolean needEmptyFile) {
+ public DateTieredMultiFileWriter(List<Long> lowerBoundaries,
+ Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile) {
for (Long lowerBoundary : lowerBoundaries) {
lowerBoundary2Writer.put(lowerBoundary, null);
}
this.needEmptyFile = needEmptyFile;
+ this.lowerBoundariesPolicies = lowerBoundariesPolicies;
}
@Override
@@ -54,7 +59,12 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
Map.Entry<Long, StoreFileWriter> entry = lowerBoundary2Writer.floorEntry(cell.getTimestamp());
StoreFileWriter writer = entry.getValue();
if (writer == null) {
- writer = writerFactory.createWriter();
+ String lowerBoundaryStoragePolicy = lowerBoundariesPolicies.get(entry.getKey());
+ if (lowerBoundaryStoragePolicy != null) {
+ writer = writerFactory.createWriterWithStoragePolicy(lowerBoundaryStoragePolicy);
+ } else {
+ writer = writerFactory.createWriter();
+ }
lowerBoundary2Writer.put(entry.getKey(), writer);
}
writer.append(cell);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
index daae083..1df953d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
@@ -93,7 +93,9 @@ public class DateTieredStoreEngine extends StoreEngine<DefaultStoreFlusher,
public List<Path> compact(ThroughputController throughputController, User user)
throws IOException {
if (request instanceof DateTieredCompactionRequest) {
- return compactor.compact(request, ((DateTieredCompactionRequest) request).getBoundaries(),
+ DateTieredCompactionRequest compactionRequest = (DateTieredCompactionRequest) request;
+ return compactor.compact(request, compactionRequest.getBoundaries(),
+ compactionRequest.getBoundariesPolicies(),
throughputController, user);
} else {
throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: "
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index f193d8b..4e3c5ba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -99,6 +99,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -1162,7 +1163,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
boolean shouldDropBehind) throws IOException {
return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
- includesTag, shouldDropBehind, -1);
+ includesTag, shouldDropBehind, -1, HConstants.EMPTY_STRING);
}
/**
@@ -1176,7 +1177,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
// compaction
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
- boolean shouldDropBehind, long totalCompactedFilesSize) throws IOException {
+ boolean shouldDropBehind, long totalCompactedFilesSize, String fileStoragePolicy)
+ throws IOException {
// creating new cache config for each new writer
final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
if (isCompaction) {
@@ -1233,7 +1235,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
.withFavoredNodes(favoredNodes)
.withFileContext(hFileContext)
.withShouldDropCacheBehind(shouldDropBehind)
- .withCompactedFilesSupplier(this::getCompactedFiles);
+ .withCompactedFilesSupplier(this::getCompactedFiles)
+ .withFileStoragePolicy(fileStoragePolicy);
return builder.build();
}
@@ -1554,6 +1557,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
Collection<HStoreFile> filesToCompact, User user, long compactionStartTime,
List<Path> newFiles) throws IOException {
// Do the steps necessary to complete the compaction.
+ setStoragePolicyFromFileName(newFiles);
List<HStoreFile> sfs = moveCompactedFilesIntoPlace(cr, newFiles, user);
writeCompactionWalRecord(filesToCompact, sfs);
replaceStoreFiles(filesToCompact, sfs);
@@ -1583,6 +1587,18 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
return sfs;
}
+ // Set correct storage policy from the file name of DTCP.
+ // Rename file will not change the storage policy.
+ private void setStoragePolicyFromFileName(List<Path> newFiles) throws IOException {
+ String prefix = HConstants.STORAGE_POLICY_PREFIX;
+ for (Path newFile : newFiles) {
+ if (newFile.getParent().getName().startsWith(prefix)) {
+ CommonFSUtils.setStoragePolicy(fs.getFileSystem(), newFile,
+ newFile.getParent().getName().substring(prefix.length()));
+ }
+ }
+ }
+
private List<HStoreFile> moveCompactedFilesIntoPlace(CompactionRequestImpl cr,
List<Path> newFiles, User user) throws IOException {
List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
index 01fdeba..a4084cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
@@ -62,6 +62,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.base.Strings;
+import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -420,6 +422,7 @@ public class StoreFileWriter implements CellSink, ShipperListener {
private HFileContext fileContext;
private boolean shouldDropCacheBehind;
private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet();
+ private String fileStoragePolicy;
public Builder(Configuration conf, CacheConfig cacheConf,
FileSystem fs) {
@@ -501,6 +504,11 @@ public class StoreFileWriter implements CellSink, ShipperListener {
return this;
}
+ public Builder withFileStoragePolicy(String fileStoragePolicy) {
+ this.fileStoragePolicy = fileStoragePolicy;
+ return this;
+ }
+
/**
* Create a store file writer. Client is responsible for closing file when
* done. If metadata, add BEFORE closing using
@@ -530,6 +538,20 @@ public class StoreFileWriter implements CellSink, ShipperListener {
CommonFSUtils.setStoragePolicy(this.fs, dir, policyName);
if (filePath == null) {
+ // The stored file and related blocks will used the directory based StoragePolicy.
+ // Because HDFS DistributedFileSystem does not support create files with storage policy
+ // before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files
+ // satisfy the specific storage policy when writing. So as to avoid later data movement.
+ // We don't want to change whole temp dir to 'fileStoragePolicy'.
+ if (!Strings.isNullOrEmpty(fileStoragePolicy)) {
+ dir = new Path(dir, HConstants.STORAGE_POLICY_PREFIX + fileStoragePolicy);
+ if (!fs.exists(dir)) {
+ HRegionFileSystem.mkdirs(fs, conf, dir);
+ LOG.info(
+ "Create tmp dir " + dir.toString() + " with storage policy: " + fileStoragePolicy);
+ }
+ CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy);
+ }
filePath = getUniqueFile(fs, dir);
if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
bloomType = BloomType.NONE;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
index a8ffc2e..f2816d8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
@@ -53,6 +53,12 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
public StoreFileWriter createWriter() throws IOException {
return createTmpWriter(fd, shouldDropBehind);
}
+
+ @Override
+ public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
+ throws IOException {
+ return createTmpWriter(fd, shouldDropBehind, fileStoragePolicy);
+ }
};
// Prepare multi-writer, and perform the compaction using scanner and writer.
// It is ok here if storeScanner is null.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
index f9158c5..8ad1d3a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
@@ -90,6 +90,20 @@ public class CompactionConfiguration {
private static final Class<? extends CompactionWindowFactory>
DEFAULT_DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS = ExponentialCompactionWindowFactory.class;
+ public static final String DATE_TIERED_STORAGE_POLICY_ENABLE_KEY =
+ "hbase.hstore.compaction.date.tiered.storage.policy.enable";
+ public static final String DATE_TIERED_HOT_WINDOW_AGE_MILLIS_KEY =
+ "hbase.hstore.compaction.date.tiered.hot.window.age.millis";
+ public static final String DATE_TIERED_HOT_WINDOW_STORAGE_POLICY_KEY =
+ "hbase.hstore.compaction.date.tiered.hot.window.storage.policy";
+ public static final String DATE_TIERED_WARM_WINDOW_AGE_MILLIS_KEY =
+ "hbase.hstore.compaction.date.tiered.warm.window.age.millis";
+ public static final String DATE_TIERED_WARM_WINDOW_STORAGE_POLICY_KEY =
+ "hbase.hstore.compaction.date.tiered.warm.window.storage.policy";
+ /** Windows older than warm age belong to COLD_WINDOW **/
+ public static final String DATE_TIERED_COLD_WINDOW_STORAGE_POLICY_KEY =
+ "hbase.hstore.compaction.date.tiered.cold.window.storage.policy";
+
Configuration conf;
StoreConfigInformation storeConfigInfo;
@@ -111,6 +125,12 @@ public class CompactionConfiguration {
private final String compactionPolicyForDateTieredWindow;
private final boolean dateTieredSingleOutputForMinorCompaction;
private final String dateTieredCompactionWindowFactory;
+ private final boolean dateTieredStoragePolicyEnable;
+ private long hotWindowAgeMillis;
+ private long warmWindowAgeMillis;
+ private String hotWindowStoragePolicy;
+ private String warmWindowStoragePolicy;
+ private String coldWindowStoragePolicy;
CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
this.conf = conf;
@@ -145,6 +165,13 @@ public class CompactionConfiguration {
this.dateTieredCompactionWindowFactory = conf.get(
DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS_KEY,
DEFAULT_DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS.getName());
+ // for Heterogeneous Storage
+ dateTieredStoragePolicyEnable = conf.getBoolean(DATE_TIERED_STORAGE_POLICY_ENABLE_KEY, false);
+ hotWindowAgeMillis = conf.getLong(DATE_TIERED_HOT_WINDOW_AGE_MILLIS_KEY, 86400000L);
+ hotWindowStoragePolicy = conf.get(DATE_TIERED_HOT_WINDOW_STORAGE_POLICY_KEY, "ALL_SSD");
+ warmWindowAgeMillis = conf.getLong(DATE_TIERED_WARM_WINDOW_AGE_MILLIS_KEY, 604800000L);
+ warmWindowStoragePolicy = conf.get(DATE_TIERED_WARM_WINDOW_STORAGE_POLICY_KEY, "ONE_SSD");
+ coldWindowStoragePolicy = conf.get(DATE_TIERED_COLD_WINDOW_STORAGE_POLICY_KEY, "HOT");
LOG.info(toString());
}
@@ -291,4 +318,28 @@ public class CompactionConfiguration {
public String getDateTieredCompactionWindowFactory() {
return dateTieredCompactionWindowFactory;
}
+
+ public boolean isDateTieredStoragePolicyEnable() {
+ return dateTieredStoragePolicyEnable;
+ }
+
+ public long getHotWindowAgeMillis() {
+ return hotWindowAgeMillis;
+ }
+
+ public long getWarmWindowAgeMillis() {
+ return warmWindowAgeMillis;
+ }
+
+ public String getHotWindowStoragePolicy() {
+ return hotWindowStoragePolicy.trim().toUpperCase();
+ }
+
+ public String getWarmWindowStoragePolicy() {
+ return warmWindowStoragePolicy.trim().toUpperCase();
+ }
+
+ public String getColdWindowStoragePolicy() {
+ return coldWindowStoragePolicy.trim().toUpperCase();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 10fac55..be0a98e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -268,7 +268,15 @@ public abstract class Compactor<T extends CellSink> {
// See HBASE-8166, HBASE-12600, and HBASE-13389.
return store
.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint > 0,
- fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize);
+ fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize,
+ HConstants.EMPTY_STRING);
+ }
+
+ protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind,
+ String fileStoragePolicy) throws IOException {
+ return store
+ .createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, fd.maxMVCCReadpoint > 0,
+ fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize, fileStoragePolicy);
}
private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
index 58969bf..1cc7dda 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
@@ -22,7 +22,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.OptionalLong;
import org.apache.hadoop.conf.Configuration;
@@ -198,8 +200,10 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
public CompactionRequestImpl selectMajorCompaction(ArrayList<HStoreFile> candidateSelection) {
long now = EnvironmentEdgeManager.currentTime();
+ List<Long> boundaries = getCompactBoundariesForMajor(candidateSelection, now);
+ Map<Long, String> boundariesPolicies = getBoundariesStoragePolicyForMajor(boundaries, now);
return new DateTieredCompactionRequest(candidateSelection,
- this.getCompactBoundariesForMajor(candidateSelection, now));
+ boundaries, boundariesPolicies);
}
/**
@@ -253,7 +257,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
LOG.debug("Processing files: " + fileList + " for window: " + window);
}
DateTieredCompactionRequest request = generateCompactionRequest(fileList, window,
- mayUseOffPeak, mayBeStuck, minThreshold);
+ mayUseOffPeak, mayBeStuck, minThreshold, now);
if (request != null) {
return request;
}
@@ -265,8 +269,8 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
}
private DateTieredCompactionRequest generateCompactionRequest(ArrayList<HStoreFile> storeFiles,
- CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold)
- throws IOException {
+ CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold,
+ long now) throws IOException {
// The files has to be in ascending order for ratio-based compaction to work right
// and removeExcessFile to exclude youngest files.
Collections.reverse(storeFiles);
@@ -281,8 +285,11 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
boolean singleOutput = storeFiles.size() != storeFileSelection.size() ||
comConf.useDateTieredSingleOutputForMinorCompaction();
List<Long> boundaries = getCompactionBoundariesForMinor(window, singleOutput);
+ // we want to generate policy to boundaries for minor compaction
+ Map<Long, String> boundaryPolicyMap =
+ getBoundariesStoragePolicyForMinor(singleOutput, window, now);
DateTieredCompactionRequest result = new DateTieredCompactionRequest(storeFileSelection,
- boundaries);
+ boundaries, boundaryPolicyMap);
return result;
}
return null;
@@ -334,4 +341,39 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
return Long.MIN_VALUE;
}
}
+
+ private Map<Long, String> getBoundariesStoragePolicyForMinor(boolean singleOutput,
+ CompactionWindow window, long now) {
+ Map<Long, String> boundariesPolicy = new HashMap<>();
+ if (!comConf.isDateTieredStoragePolicyEnable()) {
+ return boundariesPolicy;
+ }
+ String windowStoragePolicy = getWindowStoragePolicy(now, window.startMillis());
+ if (singleOutput) {
+ boundariesPolicy.put(Long.MIN_VALUE, windowStoragePolicy);
+ } else {
+ boundariesPolicy.put(window.startMillis(), windowStoragePolicy);
+ }
+ return boundariesPolicy;
+ }
+
+ private Map<Long, String> getBoundariesStoragePolicyForMajor(List<Long> boundaries, long now) {
+ Map<Long, String> boundariesPolicy = new HashMap<>();
+ if (!comConf.isDateTieredStoragePolicyEnable()) {
+ return boundariesPolicy;
+ }
+ for (Long startTs : boundaries) {
+ boundariesPolicy.put(startTs, getWindowStoragePolicy(now, startTs));
+ }
+ return boundariesPolicy;
+ }
+
+ private String getWindowStoragePolicy(long now, long windowStartMillis) {
+ if (windowStartMillis >= (now - comConf.getHotWindowAgeMillis())) {
+ return comConf.getHotWindowStoragePolicy();
+ } else if (windowStartMillis >= (now - comConf.getWarmWindowAgeMillis())) {
+ return comConf.getWarmWindowStoragePolicy();
+ }
+ return comConf.getColdWindowStoragePolicy();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java
index 37b7059..ddf9a0c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionRequest.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.yetus.audience.InterfaceAudience;
@@ -28,18 +29,27 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class DateTieredCompactionRequest extends CompactionRequestImpl {
private List<Long> boundaries;
+ /** window start boundary to window storage policy map **/
+ private Map<Long, String> boundariesPolicies;
- public DateTieredCompactionRequest(Collection<HStoreFile> files, List<Long> boundaryList) {
+ public DateTieredCompactionRequest(Collection<HStoreFile> files, List<Long> boundaryList,
+ Map<Long, String> boundaryPolicyMap) {
super(files);
boundaries = boundaryList;
+ boundariesPolicies = boundaryPolicyMap;
}
public List<Long> getBoundaries() {
return boundaries;
}
+ public Map<Long, String> getBoundariesPolicies() {
+ return boundariesPolicies;
+ }
+
@Override
public String toString() {
- return super.toString() + " boundaries=" + Arrays.toString(boundaries.toArray());
+ return super.toString() + " boundaries=" + Arrays.toString(boundaries.toArray())
+ + " boundariesPolicies="+boundariesPolicies.toString();
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
index 1bf5236..ef64df1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.OptionalLong;
import org.apache.hadoop.conf.Configuration;
@@ -55,6 +56,7 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
}
public List<Path> compact(final CompactionRequestImpl request, final List<Long> lowerBoundaries,
+ final Map<Long, String> lowerBoundariesPolicies,
ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing compaction with " + lowerBoundaries.size()
@@ -68,6 +70,7 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd,
boolean shouldDropBehind) throws IOException {
DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(lowerBoundaries,
+ lowerBoundariesPolicies,
needEmptyFile(request));
initMultiWriter(writer, scanner, fd, shouldDropBehind);
return writer;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java
index 538294e..15a59ab 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/AbstractTestDateTieredCompactionPolicy.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
@@ -57,12 +58,21 @@ public class AbstractTestDateTieredCompactionPolicy extends TestCompactionPolicy
protected void compactEquals(long now, ArrayList<HStoreFile> candidates, long[] expectedFileSizes,
long[] expectedBoundaries, boolean isMajor, boolean toCompact) throws IOException {
+ DateTieredCompactionRequest request = getRequest(now, candidates, isMajor, toCompact);
+ List<HStoreFile> actual = Lists.newArrayList(request.getFiles());
+ assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual)));
+ assertEquals(Arrays.toString(expectedBoundaries),
+ Arrays.toString(request.getBoundaries().toArray()));
+ }
+
+ private DateTieredCompactionRequest getRequest(long now, ArrayList<HStoreFile> candidates,
+ boolean isMajor, boolean toCompact) throws IOException {
ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(timeMachine);
timeMachine.setValue(now);
DateTieredCompactionRequest request;
DateTieredCompactionPolicy policy =
- (DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy();
+ (DateTieredCompactionPolicy) store.storeEngine.getCompactionPolicy();
if (isMajor) {
for (HStoreFile file : candidates) {
((MockHStoreFile) file).setIsMajor(true);
@@ -72,11 +82,18 @@ public class AbstractTestDateTieredCompactionPolicy extends TestCompactionPolicy
} else {
assertEquals(toCompact, policy.needsCompaction(candidates, ImmutableList.of()));
request =
- (DateTieredCompactionRequest) policy.selectMinorCompaction(candidates, false, false);
+ (DateTieredCompactionRequest) policy.selectMinorCompaction(candidates, false, false);
+ }
+ return request;
+ }
+
+ protected void compactEqualsStoragePolicy(long now, ArrayList<HStoreFile> candidates,
+ Map<Long, String> expectedBoundariesPolicies, boolean isMajor, boolean toCompact)
+ throws IOException {
+ DateTieredCompactionRequest request = getRequest(now, candidates, isMajor, toCompact);
+ Map<Long, String> boundariesPolicies = request.getBoundariesPolicies();
+ for (Map.Entry<Long, String> entry : expectedBoundariesPolicies.entrySet()) {
+ assertEquals(entry.getValue(), boundariesPolicies.get(entry.getKey()));
}
- List<HStoreFile> actual = Lists.newArrayList(request.getFiles());
- assertEquals(Arrays.toString(expectedFileSizes), Arrays.toString(getSizes(actual)));
- assertEquals(Arrays.toString(expectedBoundaries),
- Arrays.toString(request.getBoundaries().toArray()));
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyHeterogeneousStorage.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyHeterogeneousStorage.java
new file mode 100644
index 0000000..74210e6
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDateTieredCompactionPolicyHeterogeneousStorage.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.regionserver.compactions.ExponentialCompactionWindowFactory;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestDateTieredCompactionPolicyHeterogeneousStorage
+ extends AbstractTestDateTieredCompactionPolicy {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestDateTieredCompactionPolicyHeterogeneousStorage.class);
+ public static final String HOT_WINDOW_SP = "ALL_SSD";
+ public static final String WARM_WINDOW_SP = "ONE_SSD";
+ public static final String COLD_WINDOW_SP = "HOT";
+
+ @Override
+ protected void config() {
+ super.config();
+
+ // Set up policy
+ conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY,
+ "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine");
+ conf.setLong(CompactionConfiguration.DATE_TIERED_MAX_AGE_MILLIS_KEY, 100);
+ conf.setLong(CompactionConfiguration.DATE_TIERED_INCOMING_WINDOW_MIN_KEY, 3);
+ conf.setLong(ExponentialCompactionWindowFactory.BASE_WINDOW_MILLIS_KEY, 6);
+ conf.setInt(ExponentialCompactionWindowFactory.WINDOWS_PER_TIER_KEY, 4);
+ conf.setBoolean(CompactionConfiguration.DATE_TIERED_SINGLE_OUTPUT_FOR_MINOR_COMPACTION_KEY,
+ false);
+
+ // Special settings for compaction policy per window
+ this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2);
+ this.conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 12);
+ this.conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F);
+
+ conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 20);
+ conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 5);
+
+ // Set Storage Policy for different type window
+ conf.setBoolean(CompactionConfiguration.DATE_TIERED_STORAGE_POLICY_ENABLE_KEY, true);
+ conf.setLong(CompactionConfiguration.DATE_TIERED_HOT_WINDOW_AGE_MILLIS_KEY, 6);
+ conf.set(CompactionConfiguration.DATE_TIERED_HOT_WINDOW_STORAGE_POLICY_KEY, HOT_WINDOW_SP);
+ conf.setLong(CompactionConfiguration.DATE_TIERED_WARM_WINDOW_AGE_MILLIS_KEY, 12);
+ conf.set(CompactionConfiguration.DATE_TIERED_WARM_WINDOW_STORAGE_POLICY_KEY, WARM_WINDOW_SP);
+ conf.set(CompactionConfiguration.DATE_TIERED_COLD_WINDOW_STORAGE_POLICY_KEY, COLD_WINDOW_SP);
+ }
+
+ /**
+ * Test for minor compaction of incoming window.
+ * Incoming window start ts >= now - hot age. So it is HOT window, will use HOT_WINDOW_SP.
+ * @throws IOException with error
+ */
+ @Test
+ public void testIncomingWindowHot() throws IOException {
+ long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
+ long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
+ long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11, 12, 13 };
+ Map<Long, String> expected = new HashMap<>();
+ // expected DateTieredCompactionRequest boundaries = { Long.MIN_VALUE, 12 }
+ // test whether DateTieredCompactionRequest boundariesPolicies matches expected
+ expected.put(12L, HOT_WINDOW_SP);
+ compactEqualsStoragePolicy(16, sfCreate(minTimestamps, maxTimestamps, sizes),
+ expected, false, true);
+ }
+
+ /**
+ * Test for not incoming window.
+ * now - hot age > window start >= now - warm age,
+ * so this window and is WARM window, will use WARM_WINDOW_SP
+ * @throws IOException with error
+ */
+ @Test
+ public void testNotIncomingWindowWarm() throws IOException {
+ long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
+ long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 };
+ long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11 };
+ Map<Long, String> expected = new HashMap<>();
+ // expected DateTieredCompactionRequest boundaries = { Long.MIN_VALUE, 6 }
+ expected.put(6L, WARM_WINDOW_SP);
+ compactEqualsStoragePolicy(16, sfCreate(minTimestamps, maxTimestamps, sizes),
+ expected, false, true);
+ }
+
+ /**
+ * Test for not incoming window.
+ * this window start ts >= ow - hot age,
+ * So this incoming window and is HOT window. Use HOT_WINDOW_SP
+ * @throws IOException with error
+ */
+ @Test
+ public void testNotIncomingWindowAndIsHot() throws IOException {
+ long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
+ long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 };
+ long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10, 11 };
+ Map<Long, String> expected = new HashMap<>();
+ // expected DateTieredCompactionRequest boundaries = { Long.MIN_VALUE, 6 }
+ expected.put(6L, HOT_WINDOW_SP);
+ compactEqualsStoragePolicy(12, sfCreate(minTimestamps, maxTimestamps, sizes),
+ expected, false, true);
+ }
+
+ /**
+ * Test for not incoming window.
+ * COLD window start timestamp < now - warm age, so use COLD_WINDOW_SP
+ * @throws IOException with error
+ */
+ @Test
+ public void testColdWindow() throws IOException {
+ long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
+ long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
+ long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 23, 24, 25, 10 };
+ Map<Long, String> expected = new HashMap<>();
+ // expected DateTieredCompactionRequest boundaries = { Long.MIN_VALUE, 6 }
+ expected.put(6L, COLD_WINDOW_SP);
+ compactEqualsStoragePolicy(22, sfCreate(minTimestamps, maxTimestamps, sizes),
+ expected, false, true);
+ }
+
+ /**
+ * Test for not incoming window. but not all hfiles will be selected to compact.
+ * Apply exploring logic on non-incoming window. More than one hfile left in this window.
+ * this means minor compact single out is true. boundaries only contains Long.MIN_VALUE
+ * @throws IOException with error
+ */
+ @Test
+ public void testRatioT0() throws IOException {
+ long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
+ long[] maxTimestamps = new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
+ long[] sizes = new long[] { 30, 31, 32, 33, 34, 20, 21, 22, 280, 23, 24, 1 };
+ Map<Long, String> expected = new HashMap<>();
+ // window start = 6, expected DateTieredCompactionRequest boundaries = { Long.MIN_VALUE }
+ expected.put(Long.MIN_VALUE, WARM_WINDOW_SP);
+ compactEqualsStoragePolicy(16, sfCreate(minTimestamps, maxTimestamps, sizes),
+ expected, false, true);
+ }
+
+ /**
+ * Test for Major compaction. It will compact all files and create multi output files
+ * with different window storage policy.
+ * @throws IOException with error
+ */
+ @Test
+ public void testMajorCompation() throws IOException {
+ long[] minTimestamps = new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
+ long[] maxTimestamps = new long[] { 44, 60, 61, 96, 100, 104, 105, 106, 113, 145, 157 };
+ long[] sizes = new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 };
+ Map<Long, String> expected = new HashMap<>();
+ expected.put(Long.MIN_VALUE, COLD_WINDOW_SP);
+ expected.put(24L, COLD_WINDOW_SP);
+ expected.put(48L, COLD_WINDOW_SP);
+ expected.put(72L, COLD_WINDOW_SP);
+ expected.put(96L, COLD_WINDOW_SP);
+ expected.put(120L, COLD_WINDOW_SP);
+ expected.put(144L, COLD_WINDOW_SP);
+ expected.put(150L, WARM_WINDOW_SP);
+ expected.put(156L, HOT_WINDOW_SP);
+ compactEquals(161, sfCreate(minTimestamps, maxTimestamps, sizes),
+ new long[] { 0, 50, 51, 40, 41, 42, 33, 30, 31, 2, 1 },
+ new long[] { Long.MIN_VALUE, 24, 48, 72, 96, 120, 144, 150, 156 }, true, true);
+ compactEqualsStoragePolicy(161, sfCreate(minTimestamps, maxTimestamps, sizes),
+ expected,true, true);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
index 812ee4b..92ba76d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
@@ -21,15 +21,17 @@ import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.cre
import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyStoreFile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyLong;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.OptionalLong;
import org.apache.hadoop.conf.Configuration;
@@ -109,7 +111,7 @@ public class TestDateTieredCompactor {
when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
- anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers);
+ anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
OptionalLong maxSequenceId = StoreUtils.getMaxSequenceIdInList(storefiles);
when(store.getMaxSequenceId()).thenReturn(maxSequenceId);
@@ -138,7 +140,8 @@ public class TestDateTieredCompactor {
HStoreFile sf2 = createDummyStoreFile(2L);
DateTieredCompactor dtc = createCompactor(writers, input, Arrays.asList(sf1, sf2));
List<Path> paths = dtc.compact(new CompactionRequestImpl(Arrays.asList(sf1)),
- boundaries.subList(0, boundaries.size() - 1), NoLimitThroughputController.INSTANCE, null);
+ boundaries.subList(0, boundaries.size() - 1), new HashMap<Long, String>(),
+ NoLimitThroughputController.INSTANCE, null);
writers.verifyKvs(output, allFiles, boundaries);
if (allFiles) {
assertEquals(output.length, paths.size());
@@ -167,7 +170,7 @@ public class TestDateTieredCompactor {
DateTieredCompactor dtc = createCompactor(writers, new KeyValue[0],
new ArrayList<>(request.getFiles()));
List<Path> paths = dtc.compact(request, Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE),
- NoLimitThroughputController.INSTANCE, null);
+ new HashMap<Long, String>(), NoLimitThroughputController.INSTANCE, null);
assertEquals(1, paths.size());
List<StoreFileWritersCapture.Writer> dummyWriters = writers.getWriters();
assertEquals(1, dummyWriters.size());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index 3d221a1..e2bd257 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -23,13 +23,14 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.AdditionalMatchers.aryEq;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.only;
import static org.mockito.Mockito.times;
@@ -796,7 +797,7 @@ public class TestStripeCompactionPolicy {
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
when(
store.createWriterInTmp(anyLong(), any(), anyBoolean(),
- anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers);
+ anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
Configuration conf = HBaseConfiguration.create();
conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
index 0221274..6e8b19f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
@@ -20,9 +20,10 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyLong;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -209,7 +210,7 @@ public class TestStripeCompactor {
when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
- anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenAnswer(writers);
+ anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers);
when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
return new StripeCompactor(conf, store) {