You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2014/09/18 16:04:39 UTC
git commit: HBASE-11646 Handle the MOB in compaction (Jingcheng Du)
Repository: hbase
Updated Branches:
refs/heads/hbase-11339 7cd71d1db -> 7dbd828a9
HBASE-11646 Handle the MOB in compaction (Jingcheng Du)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7dbd828a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7dbd828a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7dbd828a
Branch: refs/heads/hbase-11339
Commit: 7dbd828a90e36c3b94fcadf659ce047a1ddbce0e
Parents: 7cd71d1
Author: Jonathan M Hsieh <jm...@apache.org>
Authored: Thu Sep 18 07:03:21 2014 -0700
Committer: Jonathan M Hsieh <jm...@apache.org>
Committed: Thu Sep 18 07:03:21 2014 -0700
----------------------------------------------------------------------
.../hadoop/hbase/mob/DefaultMobCompactor.java | 233 +++++++++++++
.../apache/hadoop/hbase/mob/MobStoreEngine.java | 8 +
.../org/apache/hadoop/hbase/mob/MobUtils.java | 40 +++
.../hadoop/hbase/regionserver/HMobStore.java | 5 +-
.../regionserver/compactions/Compactor.java | 49 ++-
.../compactions/DefaultCompactor.java | 23 +-
.../compactions/StripeCompactor.java | 3 +-
.../hbase/regionserver/TestMobCompaction.java | 332 +++++++++++++++++++
8 files changed, 678 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/7dbd828a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
new file mode 100644
index 0000000..5f13502
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobCompactor.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.regionserver.HMobStore;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
+import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Compact passed set of files in the mob-enabled column family.
+ */
+@InterfaceAudience.Private
+public class DefaultMobCompactor extends DefaultCompactor {
+
+ private static final Log LOG = LogFactory.getLog(DefaultMobCompactor.class);
+ private long mobSizeThreshold;
+ private HMobStore mobStore;
+ public DefaultMobCompactor(Configuration conf, Store store) {
+ super(conf, store);
+ // The mob cells reside in the mob-enabled column family which is held by HMobStore.
+ // During the compaction, the compactor reads the cells from the mob files and
+ // probably creates new mob files. All of these operations are included in HMobStore,
+ // so we need to cast the Store to HMobStore.
+ if (!(store instanceof HMobStore)) {
+ throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
+ }
+ mobStore = (HMobStore) store;
+ mobSizeThreshold = MobUtils.getMobThreshold(store.getFamily());
+ }
+
+ /**
+ * Creates a writer for a new file in a temporary directory.
+ * @param fd The file details.
+ * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
+ * @return Writer for a new StoreFile in the tmp dir.
+ * @throws IOException
+ */
+ @Override
+ protected Writer createTmpWriter(FileDetails fd, long smallestReadPoint) throws IOException {
+ // make this writer with tags always because of possible new cells with tags.
+ StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
+ true, fd.maxMVCCReadpoint >= smallestReadPoint, true);
+ return writer;
+ }
+
+ /**
+ * Performs compaction on a column family with the mob flag enabled.
+ * This is for when the mob threshold size has changed or if the mob
+ * column family mode has been toggled via an alter table statement.
+ * Compacts the files by the following rules.
+ * 1. If the cell has a mob reference tag, the cell's value is the path of the mob file.
+ * <ol>
+ * <li>
+ * If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
+ * directly copy the (with mob tag) cell into the new store file.
+ * </li>
+ * <li>
+ * Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into
+ * the new store file.
+ * </li>
+ * </ol>
+ * 2. If the cell doesn't have a reference tag.
+ * <ol>
+ * <li>
+ * If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
+ * write this cell to a mob file, and write the path of this mob file to the store file.
+ * </li>
+ * <li>
+ * Otherwise, directly write this cell into the store file.
+ * </li>
+ * </ol>
+ * @param fd File details
+ * @param scanner Where to read from.
+ * @param writer Where to write to.
+ * @param smallestReadPoint Smallest read point.
+ * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
+ * @param major Is a major compaction.
+ * @return Whether compaction ended; false if it was interrupted for any reason.
+ */
+ @Override
+ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
+ long smallestReadPoint, boolean cleanSeqId, boolean major) throws IOException {
+ int bytesWritten = 0;
+ // Since scanner.next() can return 'false' but still be delivering data,
+ // we have to use a do/while loop.
+ List<Cell> cells = new ArrayList<Cell>();
+ // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
+ int closeCheckInterval = HStore.getCloseCheckInterval();
+ boolean hasMore;
+ Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
+ byte[] fileName = null;
+ StoreFile.Writer mobFileWriter = null;
+ long mobCells = 0;
+ Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName()
+ .getName());
+ try {
+ try {
+ // If the mob file writer could not be created, directly write the cell to the store file.
+ mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
+ store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
+ fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
+ } catch (IOException e) {
+ LOG.error(
+ "Fail to create mob writer, "
+ + "we will continue the compaction by writing MOB cells directly in store files",
+ e);
+ }
+ do {
+ hasMore = scanner.next(cells, compactionKVMax);
+ // output to writer:
+ for (Cell c : cells) {
+ // TODO remove the KeyValueUtil.ensureKeyValue before merging back to trunk.
+ KeyValue kv = KeyValueUtil.ensureKeyValue(c);
+ resetSeqId(smallestReadPoint, cleanSeqId, kv);
+ if (mobFileWriter == null || kv.getTypeByte() != KeyValue.Type.Put.getCode()) {
+ // If the mob file writer is null or the kv type is not put, directly write the cell
+ // to the store file.
+ writer.append(kv);
+ } else if (MobUtils.isMobReferenceCell(kv)) {
+ if (MobUtils.isValidMobRefCellValue(kv)) {
+ int size = MobUtils.getMobValueLength(kv);
+ if (size > mobSizeThreshold) {
+ // If the value size is larger than the threshold, it's regarded as a mob. Since
+ // its value is already in the mob file, directly write this cell to the store file
+ writer.append(kv);
+ } else {
+ // If the value is not larger than the threshold, it's not regarded a mob. Retrieve
+ // the mob cell from the mob file, and write it back to the store file.
+ Cell cell = mobStore.resolve(kv, false);
+ if (cell.getValueLength() != 0) {
+ // put the mob data back to the store file
+ KeyValue mobKv = KeyValueUtil.ensureKeyValue(cell);
+ mobKv.setSequenceId(kv.getSequenceId());
+ writer.append(mobKv);
+ } else {
+ // If the value of a file is empty, there might be issues when retrieving,
+ // directly write the cell to the store file, and leave it to be handled by the
+ // next compaction.
+ writer.append(kv);
+ }
+ }
+ } else {
+ LOG.warn("The value format of the KeyValue " + kv
+ + " is wrong, its length is less than " + Bytes.SIZEOF_INT);
+ writer.append(kv);
+ }
+ } else if (kv.getValueLength() <= mobSizeThreshold) {
+ // If the value size of a cell is not larger than the threshold, directly write it to
+ // the store file.
+ writer.append(kv);
+ } else {
+ // If the value size of a cell is larger than the threshold, it's regarded as a mob,
+ // write this cell to a mob file, and write the path to the store file.
+ mobCells++;
+ // append the original keyValue in the mob file.
+ mobFileWriter.append(kv);
+ KeyValue reference = MobUtils.createMobRefKeyValue(kv, fileName, tableNameTag);
+ // write the cell whose value is the path of a mob file to the store file.
+ writer.append(reference);
+ }
+ ++progress.currentCompactedKVs;
+
+ // check periodically to see if a system stop is requested
+ if (closeCheckInterval > 0) {
+ bytesWritten += kv.getLength();
+ if (bytesWritten > closeCheckInterval) {
+ bytesWritten = 0;
+ if (!store.areWritesEnabled()) {
+ progress.cancel();
+ return false;
+ }
+ }
+ }
+ }
+ cells.clear();
+ } while (hasMore);
+ } finally {
+ if (mobFileWriter != null) {
+ appendMetadataAndCloseWriter(mobFileWriter, fd, major);
+ }
+ }
+ if(mobFileWriter!=null) {
+ if (mobCells > 0) {
+ // If the mob file is not empty, commit it.
+ mobStore.commitFile(mobFileWriter.getPath(), path);
+ } else {
+ try {
+ // If the mob file is empty, delete it instead of committing.
+ store.getFileSystem().delete(mobFileWriter.getPath(), true);
+ } catch (IOException e) {
+ LOG.error("Fail to delete the temp mob file", e);
+ }
+ }
+ }
+ progress.complete();
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7dbd828a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
index d5e6f2e..2d5f1ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java
@@ -37,4 +37,12 @@ public class MobStoreEngine extends DefaultStoreEngine {
// specific compactor and policy when that is implemented.
storeFlusher = new DefaultMobStoreFlusher(conf, store);
}
+
+ /**
+ * Creates the DefaultMobCompactor.
+ */
+ @Override
+ protected void createCompactor(Configuration conf, Store store) throws IOException {
+ compactor = new DefaultMobCompactor(conf, store);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7dbd828a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index c106e0b..e52d336 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -260,4 +260,44 @@ public class MobUtils {
reference.setSequenceId(kv.getSequenceId());
return reference;
}
+
+ /**
+ * Indicates whether the current mob ref cell has a valid value.
+ * A mob ref cell has a mob reference tag.
+ * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
+ * The real mob value length takes 4 bytes.
+ * The remaining part is the mob file name.
+ * @param cell The mob ref cell.
+ * @return True if the cell has a valid value.
+ */
+ public static boolean isValidMobRefCellValue(Cell cell) {
+ return cell.getValueLength() > Bytes.SIZEOF_INT;
+ }
+
+ /**
+ * Gets the mob value length from the mob ref cell.
+ * A mob ref cell has a mob reference tag.
+ * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
+ * The real mob value length takes 4 bytes.
+ * The remaining part is the mob file name.
+ * @param cell The mob ref cell.
+ * @return The real mob value length.
+ */
+ public static int getMobValueLength(Cell cell) {
+ return Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), Bytes.SIZEOF_INT);
+ }
+
+ /**
+ * Gets the mob file name from the mob ref cell.
+ * A mob ref cell has a mob reference tag.
+ * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
+ * The real mob value length takes 4 bytes.
+ * The remaining part is the mob file name.
+ * @param cell The mob ref cell.
+ * @return The mob file name.
+ */
+ public static String getMobFileName(Cell cell) {
+ return Bytes.toString(cell.getValueArray(), cell.getValueOffset() + Bytes.SIZEOF_INT,
+ cell.getValueLength() - Bytes.SIZEOF_INT);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7dbd828a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index 17d3802..9c6f34e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -219,9 +219,8 @@ public class HMobStore extends HStore {
*/
public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
Cell result = null;
- if (reference.getValueLength() > Bytes.SIZEOF_INT) {
- String fileName = Bytes.toString(reference.getValueArray(), reference.getValueOffset()
- + Bytes.SIZEOF_INT, reference.getValueLength() - Bytes.SIZEOF_INT);
+ if (MobUtils.isValidMobRefCellValue(reference)) {
+ String fileName = MobUtils.getMobFileName(reference);
Path targetPath = new Path(mobFamilyPath, fileName);
MobFile file = null;
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/7dbd828a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 2b053a6..13967c2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -43,7 +43,9 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.util.StringUtils;
/**
@@ -57,7 +59,7 @@ public abstract class Compactor {
protected Configuration conf;
protected Store store;
- private int compactionKVMax;
+ protected int compactionKVMax;
protected Compression.Algorithm compactionCompression;
/** specify how many days to keep MVCC values during major compaction **/
@@ -92,6 +94,8 @@ public abstract class Compactor {
public long maxKeyCount = 0;
/** Earliest put timestamp if major compaction */
public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
+ /** Latest put timestamp */
+ public long latestPutTs = HConstants.LATEST_TIMESTAMP;
/** The last key in the files we're compacting. */
public long maxSeqId = 0;
/** Latest memstore read point found in any of the involved files */
@@ -158,6 +162,14 @@ public abstract class Compactor {
fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
}
}
+ tmp = fileInfo.get(StoreFile.TIMERANGE_KEY);
+ TimeRangeTracker trt = new TimeRangeTracker();
+ if (tmp == null) {
+ fd.latestPutTs = HConstants.LATEST_TIMESTAMP;
+ } else {
+ Writables.copyWritable(tmp, trt);
+ fd.latestPutTs = trt.getMaximumTimestamp();
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Compacting " + file +
", keycount=" + keyCount +
@@ -216,14 +228,16 @@ public abstract class Compactor {
/**
* Performs the compaction.
+ * @param fd File details
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
+ * @param major Is a major compaction.
* @return Whether compaction ended; false if it was interrupted for some reason.
*/
- protected boolean performCompaction(InternalScanner scanner,
- CellSink writer, long smallestReadPoint, boolean cleanSeqId) throws IOException {
+ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
+ long smallestReadPoint, boolean cleanSeqId, boolean major) throws IOException {
int bytesWritten = 0;
// Since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
@@ -241,9 +255,7 @@ public abstract class Compactor {
// output to writer:
for (Cell c : kvs) {
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
- if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) {
- CellUtil.setSequenceId(kv, 0);
- }
+ resetSeqId(smallestReadPoint, cleanSeqId, kv);
writer.append(kv);
++progress.currentCompactedKVs;
progress.totalCompactedSize += kv.getLength();
@@ -309,4 +321,29 @@ public abstract class Compactor {
return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
}
+
+ /**
+ * Resets the sequence id.
+ * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
+ * @param cleanSeqId Should clean the sequence id.
+ * @param kv The current KeyValue.
+ */
+ protected void resetSeqId(long smallestReadPoint, boolean cleanSeqId, KeyValue kv) {
+ if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) {
+ kv.setSequenceId(0);
+ }
+ }
+
+ /**
+ * Appends the metadata and closes the writer.
+ * @param writer The current store writer.
+ * @param fd The file details.
+ * @param isMajor Is a major compaction.
+ * @throws IOException
+ */
+ protected void appendMetadataAndCloseWriter(StoreFile.Writer writer, FileDetails fd,
+ boolean isMajor) throws IOException {
+ writer.appendMetadata(fd.maxSeqId, isMajor);
+ writer.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7dbd828a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index d5b2b63..8056dd0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -76,9 +76,9 @@ public class DefaultCompactor extends Compactor {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true;
}
- writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
- fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
- boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId);
+ writer = createTmpWriter(fd, smallestReadPoint);
+ boolean finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
+ request.isAllFiles());
if (!finished) {
writer.close();
store.getFileSystem().delete(writer.getPath(), false);
@@ -94,8 +94,7 @@ public class DefaultCompactor extends Compactor {
}
} finally {
if (writer != null) {
- writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
- writer.close();
+ appendMetadataAndCloseWriter(writer, fd, request.isAllFiles());
newFiles.add(writer.getPath());
}
}
@@ -103,6 +102,20 @@ public class DefaultCompactor extends Compactor {
}
/**
+ * Creates a writer for a new file in a temporary directory.
+ * @param fd The file details.
+ * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
+ * @return Writer for a new StoreFile in the tmp dir.
+ * @throws IOException
+ */
+ protected StoreFile.Writer createTmpWriter(FileDetails fd, long smallestReadPoint)
+ throws IOException {
+ StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
+ true, fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
+ return writer;
+ }
+
+ /**
* Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
* {@link #compact(CompactionRequest)};
* @param filesToCompact the files to compact. These are used as the compactionSelection for
http://git-wip-us.apache.org/repos/asf/hbase/blob/7dbd828a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 487ff46..3109015 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -127,7 +127,8 @@ public class StripeCompactor extends Compactor {
// It is ok here if storeScanner is null.
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
mw.init(storeScanner, factory, store.getComparator());
- finished = performCompaction(scanner, mw, smallestReadPoint, cleanSeqId);
+ finished = performCompaction(fd, scanner, mw, smallestReadPoint, cleanSeqId,
+ request.isMajor());
if (!finished) {
throw new InterruptedIOException( "Aborting compaction of store " + store +
" in region " + store.getRegionInfo().getRegionNameAsString() +
http://git-wip-us.apache.org/repos/asf/hbase/blob/7dbd828a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
new file mode 100644
index 0000000..f8d6ce4
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobCompaction.java
@@ -0,0 +1,332 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
+import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+/**
+ * Test mob compaction
+ */
+@Category(MediumTests.class)
+public class TestMobCompaction {
+ @Rule
+ public TestName name = new TestName();
+ static final Log LOG = LogFactory.getLog(TestMobCompaction.class.getName());
+ private HBaseTestingUtility UTIL = null;
+ private Configuration conf = null;
+
+ private HRegion region = null;
+ private HTableDescriptor htd = null;
+ private HColumnDescriptor hcd = null;
+ private long mobCellThreshold = 1000;
+
+ private FileSystem fs;
+
+ private static final byte[] COLUMN_FAMILY = fam1;
+ private final byte[] STARTROW = Bytes.toBytes(START_KEY);
+ private int compactionThreshold;
+
+ private void init(long mobThreshold) throws Exception {
+ this.mobCellThreshold = mobThreshold;
+
+ UTIL = HBaseTestingUtility.createLocalHTU();
+
+ conf = UTIL.getConfiguration();
+ compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
+
+ htd = UTIL.createTableDescriptor(name.getMethodName());
+ hcd = new HColumnDescriptor(COLUMN_FAMILY);
+ hcd.setValue(MobConstants.IS_MOB, Bytes.toBytes(Boolean.TRUE));
+ hcd.setValue(MobConstants.MOB_THRESHOLD, Bytes.toBytes(mobThreshold));
+ hcd.setMaxVersions(1);
+ htd.addFamily(hcd);
+
+ region = UTIL.createLocalHRegion(htd, null, null);
+ fs = FileSystem.get(conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ region.close();
+ fs.delete(UTIL.getDataTestDir(), true);
+ }
+
+ /**
+ * During compaction, cells smaller than the threshold won't be affected.
+ */
+ @Test
+ public void testSmallerValue() throws Exception {
+ init(500);
+ byte[] dummyData = makeDummyData(300); // smaller than mob threshold
+ HRegionIncommon loader = new HRegionIncommon(region);
+ // one hfile per row
+ for (int i = 0; i < compactionThreshold; i++) {
+ Put p = createPut(i, dummyData);
+ loader.put(p);
+ loader.flushcache();
+ }
+ assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
+ assertEquals("Before compaction: mob file count", 0, countMobFiles());
+ assertEquals("Before compaction: rows", compactionThreshold, countRows());
+ assertEquals("Before compaction: mob rows", 0, countMobRows());
+
+ region.compactStores();
+
+ assertEquals("After compaction: store files", 1, countStoreFiles());
+ assertEquals("After compaction: mob file count", 0, countMobFiles());
+ assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
+ assertEquals("After compaction: rows", compactionThreshold, countRows());
+ assertEquals("After compaction: mob rows", 0, countMobRows());
+ }
+
+ /**
+ * During compaction, the mob threshold size is changed.
+ */
+ @Test
+ public void testLargerValue() throws Exception {
+ init(200);
+ byte[] dummyData = makeDummyData(300); // larger than mob threshold
+ HRegionIncommon loader = new HRegionIncommon(region);
+ for (int i = 0; i < compactionThreshold; i++) {
+ Put p = createPut(i, dummyData);
+ loader.put(p);
+ loader.flushcache();
+ }
+ assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
+ assertEquals("Before compaction: mob file count", compactionThreshold, countMobFiles());
+ assertEquals("Before compaction: rows", compactionThreshold, countRows());
+ assertEquals("Before compaction: mob rows", compactionThreshold, countMobRows());
+ // Change the threshold larger than the data size
+ region.getTableDesc().getFamily(COLUMN_FAMILY).setValue(
+ MobConstants.MOB_THRESHOLD, Bytes.toBytes(500L));
+ region.initialize();
+ region.compactStores(true);
+ assertEquals("After compaction: store files", 1, countStoreFiles());
+ assertEquals("After compaction: mob file count", compactionThreshold, countMobFiles());
+ assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
+ assertEquals("After compaction: rows", compactionThreshold, countRows());
+ assertEquals("After compaction: mob rows", 0, countMobRows());
+ }
+
+ /**
+ * This test will first generate store files, then bulk load them and trigger the compaction. When
+ * compaction, the cell value will be larger than the threshold.
+ */
+ @Test
+ public void testMobCompactionWithBulkload() throws Exception {
+ // The following will produce store files of 600.
+ init(300);
+ byte[] dummyData = makeDummyData(600);
+
+ Path hbaseRootDir = FSUtils.getRootDir(conf);
+ Path basedir = new Path(hbaseRootDir, htd.getNameAsString());
+ List<Pair<byte[], String>> hfiles = new ArrayList<Pair<byte[], String>>(1);
+ for (int i = 0; i < compactionThreshold; i++) {
+ Path hpath = new Path(basedir, "hfile" + i);
+ hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString()));
+ createHFile(hpath, i, dummyData);
+ }
+
+ // The following will bulk load the above generated store files and compact, with 600(fileSize)
+ // > 300(threshold)
+ boolean result = region.bulkLoadHFiles(hfiles, true);
+ assertTrue("Bulkload result:", result);
+ assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
+ assertEquals("Before compaction: mob file count", 0, countMobFiles());
+ assertEquals("Before compaction: rows", compactionThreshold, countRows());
+ assertEquals("Before compaction: mob rows", 0, countMobRows());
+ assertEquals("Before compaction: referenced mob file count", 0, countReferencedMobFiles());
+
+ region.compactStores();
+
+ assertEquals("After compaction: store files", 1, countStoreFiles());
+ assertEquals("After compaction: mob file count:", 1, countMobFiles());
+ assertEquals("After compaction: rows", compactionThreshold, countRows());
+ assertEquals("After compaction: mob rows", compactionThreshold, countMobRows());
+ assertEquals("After compaction: referenced mob file count", 1, countReferencedMobFiles());
+ }
+
+ private int countStoreFiles() throws IOException {
+ Store store = region.getStore(COLUMN_FAMILY);
+ return store.getStorefilesCount();
+ }
+
+ private int countMobFiles() throws IOException {
+ Path mobDirPath = new Path(MobUtils.getMobRegionPath(conf, htd.getTableName()),
+ hcd.getNameAsString());
+ if (fs.exists(mobDirPath)) {
+ FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
+ return files.length;
+ }
+ return 0;
+ }
+
+ private Put createPut(int rowIdx, byte[] dummyData) throws IOException {
+ Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)));
+ p.setDurability(Durability.SKIP_WAL);
+ p.add(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData);
+ return p;
+ }
+
+ /**
+ * Create an HFile with the given number of bytes
+ */
+ private void createHFile(Path path, int rowIdx, byte[] dummyData) throws IOException {
+ HFileContext meta = new HFileContextBuilder().build();
+ HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
+ .withFileContext(meta).create();
+ long now = System.currentTimeMillis();
+ try {
+ KeyValue kv = new KeyValue(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)), COLUMN_FAMILY,
+ Bytes.toBytes("colX"), now, dummyData);
+ writer.append(kv);
+ } finally {
+ writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
+ writer.close();
+ }
+ }
+
+ private int countMobRows() throws IOException {
+ Scan scan = new Scan();
+ // Do not retrieve the mob data when scanning
+ scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+ InternalScanner scanner = region.getScanner(scan);
+
+ int scannedCount = 0;
+ List<Cell> results = new ArrayList<Cell>();
+ boolean hasMore = scanner.next(results);
+ while (hasMore) {
+ for (Cell c : results) {
+ if (MobUtils.isMobReferenceCell(c)) {
+ scannedCount++;
+ }
+ }
+ hasMore = scanner.next(results);
+ }
+ scanner.close();
+
+ return scannedCount;
+ }
+
+ private int countRows() throws IOException {
+ Scan scan = new Scan();
+ // Do not retrieve the mob data when scanning
+ InternalScanner scanner = region.getScanner(scan);
+
+ int scannedCount = 0;
+ List<Cell> results = new ArrayList<Cell>();
+ boolean hasMore = scanner.next(results);
+ while (hasMore) {
+ scannedCount += results.size();
+ hasMore = scanner.next(results);
+ }
+ scanner.close();
+
+ return scannedCount;
+ }
+
+ private byte[] makeDummyData(int size) {
+ byte[] dummyData = new byte[size];
+ new Random().nextBytes(dummyData);
+ return dummyData;
+ }
+
+ private int countReferencedMobFiles() throws IOException {
+ Scan scan = new Scan();
+ // Do not retrieve the mob data when scanning
+ scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+ InternalScanner scanner = region.getScanner(scan);
+
+ List<Cell> kvs = new ArrayList<Cell>();
+ boolean hasMore = true;
+ String fileName;
+ Set<String> files = new HashSet<String>();
+ do {
+ kvs.clear();
+ hasMore = scanner.next(kvs);
+ for (Cell c : kvs) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(c);
+ if (!MobUtils.isMobReferenceCell(kv)) {
+ continue;
+ }
+ if (!MobUtils.isValidMobRefCellValue(kv)) {
+ continue;
+ }
+ int size = MobUtils.getMobValueLength(kv);
+ if (size <= mobCellThreshold) {
+ continue;
+ }
+ fileName = MobUtils.getMobFileName(kv);
+ if (fileName.isEmpty()) {
+ continue;
+ }
+ files.add(fileName);
+ Path familyPath = MobUtils.getMobFamilyPath(conf, htd.getTableName(),
+ hcd.getNameAsString());
+ assertTrue(fs.exists(new Path(familyPath, fileName)));
+ }
+ } while (hasMore);
+
+ scanner.close();
+
+ return files.size();
+ }
+}