You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2015/02/06 15:17:28 UTC
[2/2] hbase git commit: HBASE-11861 Native MOB Compaction mechanisms
(Jingcheng Du)
HBASE-11861 Native MOB Compaction mechanisms (Jingcheng Du)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2c4934ed
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2c4934ed
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2c4934ed
Branch: refs/heads/hbase-11339
Commit: 2c4934eda68e8ed1290c2e3fb50604c2d77bdf64
Parents: fbbb324
Author: Jonathan M Hsieh <jm...@apache.org>
Authored: Fri Feb 6 05:37:13 2015 -0800
Committer: Jonathan M Hsieh <jm...@apache.org>
Committed: Fri Feb 6 05:37:13 2015 -0800
----------------------------------------------------------------------
.../src/main/resources/hbase-default.xml | 51 ++
.../org/apache/hadoop/hbase/master/HMaster.java | 6 +
.../hbase/master/MobFileCompactionChore.java | 162 +++++
.../apache/hadoop/hbase/mob/MobConstants.java | 43 ++
.../org/apache/hadoop/hbase/mob/MobUtils.java | 118 +++-
.../MobFileCompactionRequest.java | 64 ++
.../mob/filecompactions/MobFileCompactor.java | 78 +++
.../PartitionedMobFileCompactionRequest.java | 146 +++++
.../PartitionedMobFileCompactor.java | 631 ++++++++++++++++++
.../hadoop/hbase/regionserver/HStore.java | 2 +-
.../hadoop/hbase/regionserver/StoreScanner.java | 4 +-
.../filecompactions/TestMobFileCompactor.java | 652 +++++++++++++++++++
...TestPartitionedMobFileCompactionRequest.java | 60 ++
.../TestPartitionedMobFileCompactor.java | 423 ++++++++++++
14 files changed, 2426 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 647defd..d1429ad 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1517,4 +1517,55 @@ possible configurations would overwhelm and obscure the important.
The default value is one day.
</description>
</property>
+ <property>
+ <name>hbase.mob.file.compaction.mergeable.threshold</name>
+ <value>201326592</value>
+ <description>
+ If the size of a mob file is less than this value, it's regarded as a small
+ file and needs to be merged in mob file compaction. The default value is 192MB.
+ </description>
+ </property>
+ <property>
+ <name>hbase.mob.delfile.max.count</name>
+ <value>3</value>
+ <description>
+ The max number of del files that is allowed in the mob file compaction.
+ In the mob file compaction, when the number of existing del files is larger than
+ this value, they are merged until number of del files is not larger this value.
+ The default value is 3.
+ </description>
+ </property>
+ <property>
+ <name>hbase.mob.file.compaction.batch.size</name>
+ <value>100</value>
+ <description>
+ The max number of the mob files that is allowed in a batch of the mob file compaction.
+ The mob file compaction merges the small mob files to bigger ones. If the number of the
+ small files is very large, it could lead to a "too many opened file handlers" in the merge.
+ And the merge has to be split into batches. This value limits the number of mob files
+ that are selected in a batch of the mob file compaction. The default value is 100.
+ </description>
+ </property>
+ <property>
+ <name>hbase.master.mob.file.compaction.chore.period</name>
+ <value>604800000</value>
+ <description>
+ The period that MobFileCompactionChore runs. The unit is millisecond.
+ The default value is one week.
+ </description>
+ </property>
+ <property>
+ <name>hbase.mob.file.compactor.class</name>
+ <value>org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor</value>
+ <description>
+ Implementation of mob file compactor, the default one is PartitionedMobFileCompactor.
+ </description>
+ </property>
+ <property>
+ <name>hbase.master.mob.file.compaction.chore.threads.max</name>
+ <value>1</value>
+ <description>
+ The max number of threads used in MobFileCompactionChore.
+ </description>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 4ff3592..7ad49a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -209,6 +209,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
+ private MobFileCompactionChore mobFileCompactChore;
MasterCoprocessorHost cpHost;
@@ -613,6 +614,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this);
Threads.setDaemonThreadRunning(expiredMobFileCleanerChore.getThread());
+ this.mobFileCompactChore = new MobFileCompactionChore(this);
+ Threads.setDaemonThreadRunning(mobFileCompactChore.getThread());
if (this.cpHost != null) {
// don't let cp initialization errors kill the master
@@ -863,6 +866,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
if (this.expiredMobFileCleanerChore != null) {
this.expiredMobFileCleanerChore.interrupt();
}
+ if (this.mobFileCompactChore != null) {
+ this.mobFileCompactChore.interrupt();
+ }
if (this.balancerChore != null) {
this.balancerChore.interrupt();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java
new file mode 100644
index 0000000..9973619
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MobFileCompactionChore.java
@@ -0,0 +1,162 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactor;
+import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.hadoop.hbase.util.Threads;
+
+/**
+ * The Class MobFileCompactChore for running compaction regularly to merge small mob files.
+ */
+@InterfaceAudience.Private
+public class MobFileCompactionChore extends Chore{
+
+ private static final Log LOG = LogFactory.getLog(MobFileCompactionChore.class);
+ private HMaster master;
+ private TableLockManager tableLockManager;
+ private ExecutorService pool;
+
+ public MobFileCompactionChore(HMaster master) {
+ super(master.getServerName() + "-MobFileCompactChore", master.getConfiguration().getInt(
+ MobConstants.MOB_FILE_COMPACTION_CHORE_PERIOD,
+ MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_PERIOD), master);
+ this.master = master;
+ this.tableLockManager = master.getTableLockManager();
+ this.pool = createThreadPool();
+ }
+
+ @Override
+ protected void chore() {
+ try {
+ String className = master.getConfiguration().get(MobConstants.MOB_FILE_COMPACTOR_CLASS_KEY,
+ PartitionedMobFileCompactor.class.getName());
+ TableDescriptors htds = master.getTableDescriptors();
+ Map<String, HTableDescriptor> map = htds.getAll();
+ for (HTableDescriptor htd : map.values()) {
+ for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
+ if (!hcd.isMobEnabled()) {
+ continue;
+ }
+ // instantiate the mob file compactor.
+ MobFileCompactor compactor = null;
+ try {
+ compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
+ Configuration.class, FileSystem.class, TableName.class, HColumnDescriptor.class,
+ ExecutorService.class },
+ new Object[] { master.getConfiguration(), master.getFileSystem(), htd.getTableName(),
+ hcd, pool });
+ } catch (Exception e) {
+ throw new IOException("Unable to load configured mob file compactor '" + className
+ + "'", e);
+ }
+ // compact only for mob-enabled column.
+ // obtain a write table lock before performing compaction to avoid race condition
+ // with major compaction in mob-enabled column.
+ boolean tableLocked = false;
+ TableLock lock = null;
+ try {
+ // the tableLockManager might be null in testing. In that case, it is lock-free.
+ if (tableLockManager != null) {
+ lock = tableLockManager.writeLock(MobUtils.getTableLockName(htd.getTableName()),
+ "Run MobFileCompactChore");
+ lock.acquire();
+ }
+ tableLocked = true;
+ compactor.compact();
+ } catch (Exception e) {
+ LOG.error("Fail to compact the mob files for the column " + hcd.getNameAsString()
+ + " in the table " + htd.getNameAsString(), e);
+ } finally {
+ if (lock != null && tableLocked) {
+ try {
+ lock.release();
+ } catch (IOException e) {
+ LOG.error(
+ "Fail to release the write lock for the table " + htd.getNameAsString(), e);
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Fail to clean the expired mob files", e);
+ }
+ }
+
+ @Override
+ protected void cleanup() {
+ super.cleanup();
+ pool.shutdown();
+ }
+
+ /**
+ * Creates a thread pool.
+ * @return A thread pool.
+ */
+ private ExecutorService createThreadPool() {
+ Configuration conf = master.getConfiguration();
+ int maxThreads = conf.getInt(MobConstants.MOB_FILE_COMPACTION_CHORE_THREADS_MAX,
+ MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_MAX);
+ if (maxThreads == 0) {
+ maxThreads = 1;
+ }
+ long keepAliveTime = conf.getLong(MobConstants.MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME,
+ MobConstants.DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME);
+ final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
+ ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
+ TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"),
+ new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ try {
+ // waiting for a thread to pick up instead of throwing exceptions.
+ queue.put(r);
+ } catch (InterruptedException e) {
+ throw new RejectedExecutionException(e);
+ }
+ }
+ });
+ ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
+ return pool;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
index f40c952..0c9cda8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobConstants.java
@@ -72,8 +72,51 @@ public class MobConstants {
public static final long DEFAULT_MOB_CACHE_EVICT_PERIOD = 3600l;
public final static String TEMP_DIR_NAME = ".tmp";
+ public final static String BULKLOAD_DIR_NAME = ".bulkload";
public final static byte[] MOB_TABLE_LOCK_SUFFIX = Bytes.toBytes(".mobLock");
public final static String EMPTY_STRING = "";
+ /**
+ * If the size of a mob file is less than this value, it's regarded as a small file and needs to
+ * be merged in mob file compaction. The default value is 192MB.
+ */
+ public static final String MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD =
+ "hbase.mob.file.compaction.mergeable.threshold";
+ public static final long DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD = 192 * 1024 * 1024;
+ /**
+ * The max number of del files that is allowed in the mob file compaction. In the mob file
+ * compaction, when the number of existing del files is larger than this value, they are merged
+ * until number of del files is not larger this value. The default value is 3.
+ */
+ public static final String MOB_DELFILE_MAX_COUNT = "hbase.mob.delfile.max.count";
+ public static final int DEFAULT_MOB_DELFILE_MAX_COUNT = 3;
+ /**
+ * The max number of the mob files that is allowed in a batch of the mob file compaction.
+ * The mob file compaction merges the small mob files to bigger ones. If the number of the
+ * small files is very large, it could lead to a "too many opened file handlers" in the merge.
+ * And the merge has to be split into batches. This value limits the number of mob files
+ * that are selected in a batch of the mob file compaction. The default value is 100.
+ */
+ public static final String MOB_FILE_COMPACTION_BATCH_SIZE =
+ "hbase.mob.file.compaction.batch.size";
+ public static final int DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE = 100;
+ /**
+ * The period that MobFileCompactionChore runs. The unit is millisecond.
+ * The default value is one week.
+ */
+ public static final String MOB_FILE_COMPACTION_CHORE_PERIOD =
+ "hbase.master.mob.file.compaction.chore.period";
+ public static final int DEFAULT_MOB_FILE_COMPACTION_CHORE_PERIOD =
+ 24 * 60 * 60 * 1000 * 7; // a week
+ public static final String MOB_FILE_COMPACTOR_CLASS_KEY = "hbase.mob.file.compactor.class";
+ /**
+ * The max number of threads used in MobFileCompactionChore.
+ */
+ public static final String MOB_FILE_COMPACTION_CHORE_THREADS_MAX =
+ "hbase.master.mob.file.compaction.chore.threads.max";
+ public static final int DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_MAX = 1;
+ public static final String MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME =
+ "hbase.master.mob.file.compaction.chore.threads.keepalivetime";
+ public static final long DEFAULT_MOB_FILE_COMPACTION_CHORE_THREADS_KEEPALIVETIME = 60;
private MobConstants() {
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index 43521d2..d8b1376 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -416,7 +417,7 @@ public class MobUtils {
}
/**
- * Creates a directory of mob files for flushing.
+ * Creates a writer for the mob file in temp directory.
* @param conf The current configuration.
* @param fs The current file system.
* @param family The descriptor of the current column family.
@@ -435,17 +436,110 @@ public class MobUtils {
throws IOException {
MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString()
.replaceAll("-", ""));
+ return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
+ cacheConfig);
+ }
+
+ /**
+ * Creates a writer for the ref file in temp directory.
+ * @param conf The current configuration.
+ * @param fs The current file system.
+ * @param family The descriptor of the current column family.
+ * @param basePath The basic path for a temp directory.
+ * @param maxKeyCount The key count.
+ * @param cacheConfig The current cache config.
+ * @return The writer for the mob file.
+ * @throws IOException
+ */
+ public static StoreFile.Writer createRefFileWriter(Configuration conf, FileSystem fs,
+ HColumnDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig)
+ throws IOException {
+ HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(true)
+ .withIncludesTags(true).withCompression(family.getCompactionCompression())
+ .withCompressTags(family.shouldCompressTags()).withChecksumType(HStore.getChecksumType(conf))
+ .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(family.getBlocksize())
+ .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()).build();
+ Path tempPath = new Path(basePath, UUID.randomUUID().toString().replaceAll("-", ""));
+ StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs).withFilePath(tempPath)
+ .withComparator(KeyValue.COMPARATOR).withBloomType(family.getBloomFilterType())
+ .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
+ return w;
+ }
+
+ /**
+ * Creates a writer for the mob file in temp directory.
+ * @param conf The current configuration.
+ * @param fs The current file system.
+ * @param family The descriptor of the current column family.
+ * @param date The date string, its format is yyyymmmdd.
+ * @param basePath The basic path for a temp directory.
+ * @param maxKeyCount The key count.
+ * @param compression The compression algorithm.
+ * @param startKey The start key.
+ * @param cacheConfig The current cache config.
+ * @return The writer for the mob file.
+ * @throws IOException
+ */
+ public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
+ HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
+ Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig)
+ throws IOException {
+ MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString()
+ .replaceAll("-", ""));
+ return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
+ cacheConfig);
+ }
+
+ /**
+ * Creates a writer for the del file in temp directory.
+ * @param conf The current configuration.
+ * @param fs The current file system.
+ * @param family The descriptor of the current column family.
+ * @param date The date string, its format is yyyymmmdd.
+ * @param basePath The basic path for a temp directory.
+ * @param maxKeyCount The key count.
+ * @param compression The compression algorithm.
+ * @param startKey The start key.
+ * @param cacheConfig The current cache config.
+ * @return The writer for the del file.
+ * @throws IOException
+ */
+ public static StoreFile.Writer createDelFileWriter(Configuration conf, FileSystem fs,
+ HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
+ Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig)
+ throws IOException {
+ String suffix = UUID
+ .randomUUID().toString().replaceAll("-", "") + "_del";
+ MobFileName mobFileName = MobFileName.create(startKey, date, suffix);
+ return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
+ cacheConfig);
+ }
+
+ /**
+ * Creates a writer for the del file in temp directory.
+ * @param conf The current configuration.
+ * @param fs The current file system.
+ * @param family The descriptor of the current column family.
+ * @param mobFileName The mob file name.
+ * @param basePath The basic path for a temp directory.
+ * @param maxKeyCount The key count.
+ * @param compression The compression algorithm.
+ * @param cacheConfig The current cache config.
+ * @return The writer for the mob file.
+ * @throws IOException
+ */
+ private static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
+ HColumnDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount,
+ Compression.Algorithm compression, CacheConfig cacheConfig) throws IOException {
HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
- .withIncludesMvcc(false).withIncludesTags(true)
- .withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
- .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
- .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
- .withDataBlockEncoding(family.getDataBlockEncoding()).build();
+ .withIncludesMvcc(false).withIncludesTags(true).withChecksumType(HFile.DEFAULT_CHECKSUM_TYPE)
+ .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).withBlockSize(family.getBlocksize())
+ .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()).build();
StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs)
- .withFilePath(new Path(basePath, mobFileName.getFileName()))
- .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE)
- .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
+ .withFilePath(new Path(basePath, mobFileName.getFileName()))
+ .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE)
+ .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
return w;
}
@@ -456,12 +550,13 @@ public class MobUtils {
* @param path The path where the mob file is saved.
* @param targetPath The directory path where the source file is renamed to.
* @param cacheConfig The current cache config.
+ * @return The target file path the source file is renamed to.
* @throws IOException
*/
- public static void commitFile(Configuration conf, FileSystem fs, final Path sourceFile,
+ public static Path commitFile(Configuration conf, FileSystem fs, final Path sourceFile,
Path targetPath, CacheConfig cacheConfig) throws IOException {
if (sourceFile == null) {
- return;
+ return null;
}
Path dstPath = new Path(targetPath, sourceFile.getName());
validateMobFile(conf, fs, sourceFile, cacheConfig);
@@ -474,6 +569,7 @@ public class MobUtils {
if (!fs.rename(sourceFile, dstPath)) {
throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
}
+ return dstPath;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactionRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactionRequest.java
new file mode 100644
index 0000000..375ba8c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactionRequest.java
@@ -0,0 +1,64 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.filecompactions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * The compaction request for mob files.
+ */
+@InterfaceAudience.Private
+public abstract class MobFileCompactionRequest {
+
+ protected long selectionTime;
+ protected CompactionType type = CompactionType.PART_FILES;
+
+ public void setCompactionType(CompactionType type) {
+ this.type = type;
+ }
+
+ /**
+ * Gets the selection time.
+ * @return The selection time.
+ */
+ public long getSelectionTime() {
+ return this.selectionTime;
+ }
+
+ /**
+ * Gets the compaction type.
+ * @return The compaction type.
+ */
+ public CompactionType getCompactionType() {
+ return type;
+ }
+
+ protected enum CompactionType {
+
+ /**
+ * Part of mob files are selected.
+ */
+ PART_FILES,
+
+ /**
+ * All of mob files are selected.
+ */
+ ALL_FILES;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactor.java
new file mode 100644
index 0000000..bbc358e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/MobFileCompactor.java
@@ -0,0 +1,78 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.filecompactions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * A mob file compactor to directly compact the mob files.
+ */
+@InterfaceAudience.Private
+public abstract class MobFileCompactor {
+
+ protected FileSystem fs;
+ protected Configuration conf;
+ protected TableName tableName;
+ protected HColumnDescriptor column;
+
+ protected Path mobTableDir;
+ protected Path mobFamilyDir;
+ protected ExecutorService pool;
+
+ public MobFileCompactor(Configuration conf, FileSystem fs, TableName tableName,
+ HColumnDescriptor column, ExecutorService pool) {
+ this.conf = conf;
+ this.fs = fs;
+ this.tableName = tableName;
+ this.column = column;
+ this.pool = pool;
+ mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tableName);
+ mobFamilyDir = MobUtils.getMobFamilyPath(conf, tableName, column.getNameAsString());
+ }
+
+ /**
+ * Compacts the mob files for the current column family.
+ * @return The paths of new mob files generated in the compaction.
+ * @throws IOException
+ */
+ public List<Path> compact() throws IOException {
+ return compact(Arrays.asList(fs.listStatus(mobFamilyDir)));
+ }
+
+ /**
+ * Compacts the candidate mob files.
+ * @param files The candidate mob files.
+ * @return The paths of new mob files generated in the compaction.
+ * @throws IOException
+ */
+ public abstract List<Path> compact(List<FileStatus> files) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactionRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactionRequest.java
new file mode 100644
index 0000000..d2ac1db
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactionRequest.java
@@ -0,0 +1,146 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.filecompactions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * An implementation of {@link MobFileCompactionRequest} that is used in
+ * {@link PartitionedMobFileCompactor}.
+ * The mob files that have the same start key and date in their names belong to
+ * the same partition.
+ */
+@InterfaceAudience.Private
+public class PartitionedMobFileCompactionRequest extends MobFileCompactionRequest {
+
+ protected Collection<FileStatus> delFiles;
+ protected Collection<CompactionPartition> compactionPartitions;
+
+ public PartitionedMobFileCompactionRequest(Collection<CompactionPartition> compactionPartitions,
+ Collection<FileStatus> delFiles) {
+ this.selectionTime = EnvironmentEdgeManager.currentTime();
+ this.compactionPartitions = compactionPartitions;
+ this.delFiles = delFiles;
+ }
+
+ /**
+ * Gets the compaction partitions.
+ * @return The compaction partitions.
+ */
+ public Collection<CompactionPartition> getCompactionPartitions() {
+ return this.compactionPartitions;
+ }
+
+ /**
+ * Gets the del files.
+ * @return The del files.
+ */
+ public Collection<FileStatus> getDelFiles() {
+ return this.delFiles;
+ }
+
+ /**
+ * The partition in the mob file compaction.
+ * The mob files that have the same start key and date in their names belong to
+ * the same partition.
+ */
+ protected static class CompactionPartition {
+ private List<FileStatus> files = new ArrayList<FileStatus>();
+ private CompactionPartitionId partitionId;
+
+ public CompactionPartition(CompactionPartitionId partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ public CompactionPartitionId getPartitionId() {
+ return this.partitionId;
+ }
+
+ public void addFile(FileStatus file) {
+ files.add(file);
+ }
+
+ public List<FileStatus> listFiles() {
+ return Collections.unmodifiableList(files);
+ }
+ }
+
+ /**
+ * The partition id that consists of start key and date of the mob file name.
+ */
+ protected static class CompactionPartitionId {
+
+ private String startKey;
+ private String date;
+
+ public CompactionPartitionId(String startKey, String date) {
+ if (startKey == null || date == null) {
+ throw new IllegalArgumentException("Neither of start key and date could be null");
+ }
+ this.startKey = startKey;
+ this.date = date;
+ }
+
+ public String getStartKey() {
+ return this.startKey;
+ }
+
+ public String getDate() {
+ return this.date;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+ result = 31 * result + startKey.hashCode();
+ result = 31 * result + date.hashCode();
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof CompactionPartitionId)) {
+ return false;
+ }
+ CompactionPartitionId another = (CompactionPartitionId) obj;
+ if (!this.startKey.equals(another.startKey)) {
+ return false;
+ }
+ if (!this.date.equals(another.date)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder(startKey).append(date).toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java
new file mode 100644
index 0000000..6cd3172
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/filecompactions/PartitionedMobFileCompactor.java
@@ -0,0 +1,631 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.filecompactions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType;
+import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition;
+import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartitionId;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * An implementation of {@link MobFileCompactor} that compacts the mob files in partitions.
+ */
+@InterfaceAudience.Private
+public class PartitionedMobFileCompactor extends MobFileCompactor {
+
+ private static final Log LOG = LogFactory.getLog(PartitionedMobFileCompactor.class);
+ protected long mergeableSize;
+ protected int delFileMaxCount;
+ /** The number of files compacted in a batch */
+ protected int compactionBatchSize;
+ protected int compactionKVMax;
+
+ private Path tempPath;
+ private Path bulkloadPath;
+ private CacheConfig compactionCacheConfig;
+ private Tag tableNameTag;
+
+ public PartitionedMobFileCompactor(Configuration conf, FileSystem fs, TableName tableName,
+ HColumnDescriptor column, ExecutorService pool) {
+ super(conf, fs, tableName, column, pool);
+ mergeableSize = conf.getLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
+ MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
+ delFileMaxCount = conf.getInt(MobConstants.MOB_DELFILE_MAX_COUNT,
+ MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
+ // default is 100
+ compactionBatchSize = conf.getInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
+ MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
+ tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
+ bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME,
+ tableName.getNameAsString()));
+ compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX,
+ HConstants.COMPACTION_KV_MAX_DEFAULT);
+ Configuration copyOfConf = new Configuration(conf);
+ copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
+ compactionCacheConfig = new CacheConfig(copyOfConf);
+ tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName());
+ }
+
+ @Override
+ public List<Path> compact(List<FileStatus> files) throws IOException {
+ if (files == null || files.isEmpty()) {
+ return null;
+ }
+ // find the files to compact.
+ PartitionedMobFileCompactionRequest request = select(files);
+ // compact the files.
+ return performCompaction(request);
+ }
+
+ /**
+ * Selects the compacted mob/del files.
+ * Iterates the candidates to find out all the del files and small mob files.
+ * @param candidates All the candidates.
+ * @return A compaction request.
+ * @throws IOException
+ */
+ protected PartitionedMobFileCompactionRequest select(List<FileStatus> candidates)
+ throws IOException {
+ Collection<FileStatus> allDelFiles = new ArrayList<FileStatus>();
+ Map<CompactionPartitionId, CompactionPartition> filesToCompact =
+ new HashMap<CompactionPartitionId, CompactionPartition>();
+ int selectedFileCount = 0;
+ int irrelevantFileCount = 0;
+ for (FileStatus file : candidates) {
+ if (!file.isFile()) {
+ irrelevantFileCount++;
+ continue;
+ }
+ // group the del files and small files.
+ FileStatus linkedFile = file;
+ if (HFileLink.isHFileLink(file.getPath())) {
+ HFileLink link = new HFileLink(conf, file.getPath());
+ linkedFile = getLinkedFileStatus(link);
+ if (linkedFile == null) {
+ // If the linked file cannot be found, regard it as an irrelevantFileCount file
+ irrelevantFileCount++;
+ continue;
+ }
+ }
+ if (StoreFileInfo.isDelFile(linkedFile.getPath())) {
+ allDelFiles.add(file);
+ } else if (linkedFile.getLen() < mergeableSize) {
+ // add the small files to the merge pool
+ MobFileName fileName = MobFileName.create(linkedFile.getPath().getName());
+ CompactionPartitionId id = new CompactionPartitionId(fileName.getStartKey(),
+ fileName.getDate());
+ CompactionPartition compactionPartition = filesToCompact.get(id);
+ if (compactionPartition == null) {
+ compactionPartition = new CompactionPartition(id);
+ compactionPartition.addFile(file);
+ filesToCompact.put(id, compactionPartition);
+ } else {
+ compactionPartition.addFile(file);
+ }
+ selectedFileCount++;
+ }
+ }
+ PartitionedMobFileCompactionRequest request = new PartitionedMobFileCompactionRequest(
+ filesToCompact.values(), allDelFiles);
+ if (candidates.size() == (allDelFiles.size() + selectedFileCount + irrelevantFileCount)) {
+ // all the files are selected
+ request.setCompactionType(CompactionType.ALL_FILES);
+ }
+ return request;
+ }
+
+ /**
+ * Performs the compaction on the selected files.
+ * <ol>
+ * <li>Compacts the del files.</li>
+ * <li>Compacts the selected small mob files and all the del files.</li>
+ * <li>If all the candidates are selected, delete the del files.</li>
+ * </ol>
+ * @param request The compaction request.
+ * @return The paths of new mob files generated in the compaction.
+ * @throws IOException
+ */
+ protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request)
+ throws IOException {
+ // merge the del files
+ List<Path> delFilePaths = new ArrayList<Path>();
+ for (FileStatus delFile : request.delFiles) {
+ delFilePaths.add(delFile.getPath());
+ }
+ List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
+ List<StoreFile> newDelFiles = new ArrayList<StoreFile>();
+ for (Path newDelPath : newDelPaths) {
+ StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE);
+ newDelFiles.add(sf);
+ }
+ // compact the mob files by partitions.
+ List<Path> paths = compactMobFiles(request, newDelFiles);
+ // archive the del files if all the mob files are selected.
+ if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
+ try {
+ MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles);
+ } catch (IOException e) {
+ LOG.error("Failed to archive the del files " + newDelFiles, e);
+ }
+ }
+ return paths;
+ }
+
+ /**
+ * Compacts the selected small mob files and all the del files.
+ * @param request The compaction request.
+ * @param delFiles The del files.
+ * @return The paths of new mob files after compactions.
+ * @throws IOException
+ */
+ protected List<Path> compactMobFiles(final PartitionedMobFileCompactionRequest request,
+ final List<StoreFile> delFiles) throws IOException {
+ Collection<CompactionPartition> partitions = request.compactionPartitions;
+ if (partitions == null || partitions.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<Path> paths = new ArrayList<Path>();
+ final HTable table = new HTable(conf, tableName);
+ try {
+ Map<CompactionPartitionId, Future<List<Path>>> results =
+ new HashMap<CompactionPartitionId, Future<List<Path>>>();
+ // compact the mob files by partitions in parallel.
+ for (final CompactionPartition partition : partitions) {
+ results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() {
+ @Override
+ public List<Path> call() throws Exception {
+ return compactMobFilePartition(request, partition, delFiles, table);
+ }
+ }));
+ }
+ // compact the partitions in parallel.
+ boolean hasFailure = false;
+ for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) {
+ try {
+ paths.addAll(result.getValue().get());
+ } catch (Exception e) {
+ // just log the error
+ LOG.error("Failed to compact the partition " + result.getKey(), e);
+ hasFailure = true;
+ }
+ }
+ if (hasFailure) {
+ // if any partition fails in the compaction, directly throw an exception.
+ throw new IOException("Failed to compact the partitions");
+ }
+ } finally {
+ try {
+ table.close();
+ } catch (IOException e) {
+ LOG.error("Failed to close the HTable", e);
+ }
+ }
+ return paths;
+ }
+
+ /**
+ * Compacts a partition of selected small mob files and all the del files.
+ * @param request The compaction request.
+ * @param partition A compaction partition.
+ * @param delFiles The del files.
+ * @param table The current table.
+ * @return The paths of new mob files after compactions.
+ * @throws IOException
+ */
+ private List<Path> compactMobFilePartition(PartitionedMobFileCompactionRequest request,
+ CompactionPartition partition, List<StoreFile> delFiles, HTable table) throws IOException {
+ List<Path> newFiles = new ArrayList<Path>();
+ List<FileStatus> files = partition.listFiles();
+ int offset = 0;
+ Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString());
+ Path bulkloadColumnPath = new Path(bulkloadPathOfPartition, column.getNameAsString());
+ while (offset < files.size()) {
+ int batch = compactionBatchSize;
+ if (files.size() - offset < compactionBatchSize) {
+ batch = files.size() - offset;
+ }
+ if (batch == 1 && delFiles.isEmpty()) {
+ // only one file left and no del files, do not compact it,
+ // and directly add it to the new files.
+ newFiles.add(files.get(offset).getPath());
+ offset++;
+ continue;
+ }
+ // clean the bulkload directory to avoid loading old files.
+ fs.delete(bulkloadPathOfPartition, true);
+ // add the selected mob files and del files into filesToCompact
+ List<StoreFile> filesToCompact = new ArrayList<StoreFile>();
+ for (int i = offset; i < batch + offset; i++) {
+ StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig,
+ BloomType.NONE);
+ filesToCompact.add(sf);
+ }
+ filesToCompact.addAll(delFiles);
+ // compact the mob files in a batch.
+ compactMobFilesInBatch(request, partition, table, filesToCompact, batch,
+ bulkloadPathOfPartition, bulkloadColumnPath, newFiles);
+ // move to the next batch.
+ offset += batch;
+ }
+ return newFiles;
+ }
+
+ /**
+ * Compacts a partition of selected small mob files and all the del files in a batch.
+ * @param request The compaction request.
+ * @param partition A compaction partition.
+ * @param table The current table.
+ * @param filesToCompact The files to be compacted.
+ * @param batch The number of mob files to be compacted in a batch.
+ * @param bulkloadPathOfPartition The directory where the bulkload column of the current
+ * partition is saved.
+ * @param bulkloadColumnPath The directory where the bulkload files of current partition
+ * are saved.
+ * @param newFiles The paths of new mob files after compactions.
+ * @throws IOException
+ */
+ private void compactMobFilesInBatch(PartitionedMobFileCompactionRequest request,
+ CompactionPartition partition, HTable table, List<StoreFile> filesToCompact, int batch,
+ Path bulkloadPathOfPartition, Path bulkloadColumnPath, List<Path> newFiles)
+ throws IOException {
+ // open scanner to the selected mob files and del files.
+ StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES);
+ // the mob files to be compacted, not include the del files.
+ List<StoreFile> mobFilesToCompact = filesToCompact.subList(0, batch);
+ // Pair(maxSeqId, cellsCount)
+ Pair<Long, Long> fileInfo = getFileInfo(mobFilesToCompact);
+ // open writers for the mob files and new ref store files.
+ Writer writer = null;
+ Writer refFileWriter = null;
+ Path filePath = null;
+ Path refFilePath = null;
+ long mobCells = 0;
+ try {
+ writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(),
+ tempPath, Long.MAX_VALUE, column.getCompactionCompression(), partition.getPartitionId()
+ .getStartKey(), compactionCacheConfig);
+ filePath = writer.getPath();
+ byte[] fileName = Bytes.toBytes(filePath.getName());
+ // create a temp file and open a writer for it in the bulkloadPath
+ refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo
+ .getSecond().longValue(), compactionCacheConfig);
+ refFilePath = refFileWriter.getPath();
+ List<Cell> cells = new ArrayList<Cell>();
+ boolean hasMore = false;
+ do {
+ hasMore = scanner.next(cells, compactionKVMax);
+ for (Cell cell : cells) {
+ // TODO remove this after the new code are introduced.
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ // write the mob cell to the mob file.
+ writer.append(kv);
+ // write the new reference cell to the store file.
+ KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag);
+ refFileWriter.append(reference);
+ mobCells++;
+ }
+ cells.clear();
+ } while (hasMore);
+ } finally {
+ // close the scanner.
+ scanner.close();
+ // append metadata to the mob file, and close the mob file writer.
+ closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
+ // append metadata and bulkload info to the ref mob file, and close the writer.
+ closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
+ }
+ if (mobCells > 0) {
+ // commit mob file
+ MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
+ // bulkload the ref file
+ bulkloadRefFile(table, bulkloadPathOfPartition, filePath.getName());
+ newFiles.add(new Path(mobFamilyDir, filePath.getName()));
+ } else {
+ // remove the new files
+ // the mob file is empty, delete it instead of committing.
+ deletePath(filePath);
+ // the ref file is empty, delete it instead of committing.
+ deletePath(refFilePath);
+ }
+ // archive the old mob files, do not archive the del files.
+ try {
+ MobUtils
+ .removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
+ } catch (IOException e) {
+ LOG.error("Failed to archive the files " + mobFilesToCompact, e);
+ }
+ }
+
+ /**
+ * Compacts the del files in batches which avoids opening too many files.
+ * @param request The compaction request.
+ * @param delFilePaths
+ * @return The paths of new del files after merging or the original files if no merging
+ * is necessary.
+ * @throws IOException
+ */
+ protected List<Path> compactDelFiles(PartitionedMobFileCompactionRequest request,
+ List<Path> delFilePaths) throws IOException {
+ if (delFilePaths.size() <= delFileMaxCount) {
+ return delFilePaths;
+ }
+ // when there are more del files than the number that is allowed, merge it firstly.
+ int offset = 0;
+ List<Path> paths = new ArrayList<Path>();
+ while (offset < delFilePaths.size()) {
+ // get the batch
+ int batch = compactionBatchSize;
+ if (delFilePaths.size() - offset < compactionBatchSize) {
+ batch = delFilePaths.size() - offset;
+ }
+ List<StoreFile> batchedDelFiles = new ArrayList<StoreFile>();
+ if (batch == 1) {
+ // only one file left, do not compact it, directly add it to the new files.
+ paths.add(delFilePaths.get(offset));
+ offset++;
+ continue;
+ }
+ for (int i = offset; i < batch + offset; i++) {
+ batchedDelFiles.add(new StoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig,
+ BloomType.NONE));
+ }
+ // compact the del files in a batch.
+ paths.add(compactDelFilesInBatch(request, batchedDelFiles));
+ // move to the next batch.
+ offset += batch;
+ }
+ return compactDelFiles(request, paths);
+ }
+
+ /**
+ * Compacts the del file in a batch.
+ * @param request The compaction request.
+ * @param delFiles The del files.
+ * @return The path of new del file after merging.
+ * @throws IOException
+ */
+ private Path compactDelFilesInBatch(PartitionedMobFileCompactionRequest request,
+ List<StoreFile> delFiles) throws IOException {
+ // create a scanner for the del files.
+ StoreScanner scanner = createScanner(delFiles, ScanType.COMPACT_RETAIN_DELETES);
+ Writer writer = null;
+ Path filePath = null;
+ try {
+ writer = MobUtils.createDelFileWriter(conf, fs, column,
+ MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE,
+ column.getCompactionCompression(), HConstants.EMPTY_START_ROW, compactionCacheConfig);
+ filePath = writer.getPath();
+ List<Cell> cells = new ArrayList<Cell>();
+ boolean hasMore = false;
+ do {
+ hasMore = scanner.next(cells, compactionKVMax);
+ for (Cell cell : cells) {
+ // TODO remove this after the new code are introduced.
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ writer.append(kv);
+ }
+ cells.clear();
+ } while (hasMore);
+ } finally {
+ scanner.close();
+ if (writer != null) {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ LOG.error("Failed to close the writer of the file " + filePath, e);
+ }
+ }
+ }
+ // commit the new del file
+ Path path = MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
+ // archive the old del files
+ try {
+ MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), delFiles);
+ } catch (IOException e) {
+ LOG.error("Failed to archive the old del files " + delFiles, e);
+ }
+ return path;
+ }
+
+ /**
+ * Creates a store scanner.
+ * @param filesToCompact The files to be compacted.
+ * @param scanType The scan type.
+ * @return The store scanner.
+ * @throws IOException
+ */
+ private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType)
+ throws IOException {
+ List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false,
+ null, HConstants.LATEST_TIMESTAMP);
+ Scan scan = new Scan();
+ scan.setMaxVersions(column.getMaxVersions());
+ long ttl = HStore.determineTTLFromFamily(column);
+ ScanInfo scanInfo = new ScanInfo(column, ttl, 0, KeyValue.COMPARATOR);
+ StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners, 0L,
+ HConstants.LATEST_TIMESTAMP);
+ return scanner;
+ }
+
+ /**
+ * Bulkloads the current file.
+ * @param table The current table.
+ * @param bulkloadDirectory The path of bulkload directory.
+ * @param fileName The current file name.
+ * @throws IOException
+ */
+ private void bulkloadRefFile(HTable table, Path bulkloadDirectory, String fileName)
+ throws IOException {
+ // bulkload the ref file
+ try {
+ LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
+ bulkload.doBulkLoad(bulkloadDirectory, table);
+ } catch (Exception e) {
+ // delete the committed mob file
+ deletePath(new Path(mobFamilyDir, fileName));
+ throw new IOException(e);
+ } finally {
+ // delete the bulkload files in bulkloadPath
+ deletePath(bulkloadDirectory);
+ }
+ }
+
+ /**
+ * Closes the mob file writer.
+ * @param writer The mob file writer.
+ * @param maxSeqId Maximum sequence id.
+ * @param mobCellsCount The number of mob cells.
+ * @throws IOException
+ */
+ private void closeMobFileWriter(Writer writer, long maxSeqId, long mobCellsCount)
+ throws IOException {
+ if (writer != null) {
+ writer.appendMetadata(maxSeqId, false, mobCellsCount);
+ try {
+ writer.close();
+ } catch (IOException e) {
+ LOG.error("Failed to close the writer of the file " + writer.getPath(), e);
+ }
+ }
+ }
+
+ /**
+ * Closes the ref file writer.
+ * @param writer The ref file writer.
+ * @param maxSeqId Maximum sequence id.
+ * @param bulkloadTime The timestamp at which the bulk load file is created.
+ * @throws IOException
+ */
+ private void closeRefFileWriter(Writer writer, long maxSeqId, long bulkloadTime)
+ throws IOException {
+ if (writer != null) {
+ writer.appendMetadata(maxSeqId, false);
+ writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime));
+ try {
+ writer.close();
+ } catch (IOException e) {
+ LOG.error("Failed to close the writer of the ref file " + writer.getPath(), e);
+ }
+ }
+ }
+
+ /**
+ * Gets the max seqId and number of cells of the store files.
+ * @param storeFiles The store files.
+ * @return The pair of the max seqId and number of cells of the store files.
+ * @throws IOException
+ */
+ private Pair<Long, Long> getFileInfo(List<StoreFile> storeFiles) throws IOException {
+ long maxSeqId = 0;
+ long maxKeyCount = 0;
+ for (StoreFile sf : storeFiles) {
+ // the readers will be closed later after the merge.
+ maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId());
+ byte[] count = sf.createReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT);
+ if (count != null) {
+ maxKeyCount += Bytes.toLong(count);
+ }
+ }
+ return new Pair<Long, Long>(Long.valueOf(maxSeqId), Long.valueOf(maxKeyCount));
+ }
+
+ /**
+ * Deletes a file.
+ * @param path The path of the file to be deleted.
+ */
+ private void deletePath(Path path) {
+ try {
+ if (path != null) {
+ fs.delete(path, true);
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to delete the file " + path, e);
+ }
+ }
+
+ private FileStatus getLinkedFileStatus(HFileLink link) throws IOException {
+ Path[] locations = link.getLocations();
+ for (Path location : locations) {
+ FileStatus file = getFileStatus(location);
+ if (file != null) {
+ return file;
+ }
+ }
+ return null;
+ }
+
+ private FileStatus getFileStatus(Path path) throws IOException {
+ try {
+ if (path != null) {
+ FileStatus file = fs.getFileStatus(path);
+ return file;
+ }
+ } catch (FileNotFoundException e) {
+ LOG.warn("The file " + path + " can not be found", e);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index ad7318b..b9f4038 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -363,7 +363,7 @@ public class HStore implements Store {
* @param family
* @return TTL in seconds of the specified family
*/
- static long determineTTLFromFamily(final HColumnDescriptor family) {
+ public static long determineTTLFromFamily(final HColumnDescriptor family) {
// HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
long ttl = family.getTimeToLive();
if (ttl == HConstants.FOREVER) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 5519b4b..930e1d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -272,8 +272,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// 0 is passed as readpoint because the test bypasses Store
0);
}
-
- StoreScanner(final Scan scan, ScanInfo scanInfo,
+
+ public StoreScanner(final Scan scan, ScanInfo scanInfo,
ScanType scanType, final NavigableSet<byte[]> columns,
final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
throws IOException {