You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/03/30 18:03:36 UTC
[34/50] [abbrv] hbase git commit: HBASE-15389 Write out multiple
files when compaction
HBASE-15389 Write out multiple files when compaction
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/11d11d3f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/11d11d3f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/11d11d3f
Branch: refs/heads/hbase-12439
Commit: 11d11d3fcc591227cccf3531b911e46c68774501
Parents: e9c4f12
Author: zhangduo <zh...@apache.org>
Authored: Fri Mar 25 15:07:54 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Fri Mar 25 15:07:54 2016 +0800
----------------------------------------------------------------------
.../regionserver/AbstractMultiFileWriter.java | 120 +++++++
.../regionserver/DateTieredMultiFileWriter.java | 83 +++++
.../hadoop/hbase/regionserver/StoreFile.java | 9 +-
.../regionserver/StripeMultiFileWriter.java | 239 ++++++--------
.../hbase/regionserver/StripeStoreFlusher.java | 30 +-
.../AbstractMultiOutputCompactor.java | 161 +++++++++
.../regionserver/compactions/Compactor.java | 10 +-
.../compactions/DateTieredCompactor.java | 86 +++++
.../compactions/DefaultCompactor.java | 4 +-
.../compactions/StripeCompactionPolicy.java | 13 +-
.../compactions/StripeCompactor.java | 169 +++-------
.../hbase/regionserver/TestStripeCompactor.java | 325 -------------------
.../regionserver/compactions/TestCompactor.java | 212 ++++++++++++
.../compactions/TestDateTieredCompactor.java | 169 ++++++++++
.../compactions/TestStripeCompactionPolicy.java | 24 +-
.../compactions/TestStripeCompactor.java | 223 +++++++++++++
16 files changed, 1272 insertions(+), 605 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
new file mode 100644
index 0000000..4987c59
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
+import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink;
+
+/**
+ * Base class for cell sink that separates the provided cells into multiple files.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractMultiFileWriter implements CellSink {
+
+ private static final Log LOG = LogFactory.getLog(AbstractMultiFileWriter.class);
+
+ /** Factory that is used to produce single StoreFile.Writer-s */
+ protected WriterFactory writerFactory;
+
+ /** Source scanner that is tracking KV count; may be null if source is not StoreScanner */
+ protected StoreScanner sourceScanner;
+
+ public interface WriterFactory {
+ public StoreFile.Writer createWriter() throws IOException;
+ }
+
+ /**
+ * Initializes multi-writer before usage.
+ * @param sourceScanner Optional store scanner to obtain the information about read progress.
+ * @param factory Factory used to produce individual file writers.
+ */
+ public void init(StoreScanner sourceScanner, WriterFactory factory) {
+ this.writerFactory = factory;
+ this.sourceScanner = sourceScanner;
+ }
+
+ /**
+ * Commit all writers.
+ * <p>
+ * Notice that here we use the same <code>maxSeqId</code> for all output files since we haven't
+ * find an easy to find enough sequence ids for different output files in some corner cases. See
+ * comments in HBASE-15400 for more details.
+ */
+ public List<Path> commitWriters(long maxSeqId, boolean majorCompaction) throws IOException {
+ preCommitWriters();
+ Collection<StoreFile.Writer> writers = this.writers();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Commit " + writers.size() + " writers, maxSeqId=" + maxSeqId
+ + ", majorCompaction=" + majorCompaction);
+ }
+ List<Path> paths = new ArrayList<Path>();
+ for (Writer writer : writers) {
+ if (writer == null) {
+ continue;
+ }
+ writer.appendMetadata(maxSeqId, majorCompaction);
+ preCloseWriter(writer);
+ paths.add(writer.getPath());
+ writer.close();
+ }
+ return paths;
+ }
+
+ /**
+ * Close all writers without throwing any exceptions. This is used when compaction failed usually.
+ */
+ public List<Path> abortWriters() {
+ List<Path> paths = new ArrayList<Path>();
+ for (StoreFile.Writer writer : writers()) {
+ try {
+ if (writer != null) {
+ paths.add(writer.getPath());
+ writer.close();
+ }
+ } catch (Exception ex) {
+ LOG.error("Failed to close the writer after an unfinished compaction.", ex);
+ }
+ }
+ return paths;
+ }
+
+ protected abstract Collection<StoreFile.Writer> writers();
+
+ /**
+ * Subclasses override this method to be called at the end of a successful sequence of append; all
+ * appends are processed before this method is called.
+ */
+ protected void preCommitWriters() throws IOException {
+ }
+
+ /**
+ * Subclasses override this method to be called before we close the give writer. Usually you can
+ * append extra metadata to the writer.
+ */
+ protected void preCloseWriter(StoreFile.Writer writer) throws IOException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
new file mode 100644
index 0000000..f0bd444
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
+
+/**
+ * class for cell sink that separates the provided cells into multiple files for date tiered
+ * compaction.
+ */
+@InterfaceAudience.Private
+public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
+
+ private final NavigableMap<Long, StoreFile.Writer> lowerBoundary2Writer
+ = new TreeMap<Long, StoreFile.Writer>();
+
+ private final boolean needEmptyFile;
+
+ /**
+ * @param needEmptyFile whether need to create an empty store file if we haven't written out
+ * anything.
+ */
+ public DateTieredMultiFileWriter(List<Long> lowerBoundaries, boolean needEmptyFile) {
+ for (Long lowerBoundary : lowerBoundaries) {
+ lowerBoundary2Writer.put(lowerBoundary, null);
+ }
+ this.needEmptyFile = needEmptyFile;
+ }
+
+ @Override
+ public void append(Cell cell) throws IOException {
+ Map.Entry<Long, StoreFile.Writer> entry = lowerBoundary2Writer.floorEntry(cell.getTimestamp());
+ StoreFile.Writer writer = entry.getValue();
+ if (writer == null) {
+ writer = writerFactory.createWriter();
+ lowerBoundary2Writer.put(entry.getKey(), writer);
+ }
+ writer.append(cell);
+ }
+
+ @Override
+ protected Collection<Writer> writers() {
+ return lowerBoundary2Writer.values();
+ }
+
+ @Override
+ protected void preCommitWriters() throws IOException {
+ if (!needEmptyFile) {
+ return;
+ }
+ for (StoreFile.Writer writer : lowerBoundary2Writer.values()) {
+ if (writer != null) {
+ return;
+ }
+ }
+ // we haven't written out any data, create an empty file to retain metadata
+ lowerBoundary2Writer.put(lowerBoundary2Writer.firstKey(), writerFactory.createWriter());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index b6164b2..868bee0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -261,6 +261,13 @@ public class StoreFile {
}
/**
+ * Clone a StoreFile for opening private reader.
+ */
+ public StoreFile cloneForReader() {
+ return new StoreFile(this);
+ }
+
+ /**
* @return the StoreFile object associated to this StoreFile.
* null if the StoreFile is not a reference.
*/
@@ -294,7 +301,7 @@ public class StoreFile {
* @return True if this is HFile.
*/
public boolean isHFile() {
- return this.fileInfo.isHFile(this.fileInfo.getPath());
+ return StoreFileInfo.isHFile(this.fileInfo.getPath());
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
index 651b863..1c3f14c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
@@ -20,52 +20,36 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.util.Bytes;
/**
- * Base class for cell sink that separates the provided cells into multiple files.
+ * Base class for cell sink that separates the provided cells into multiple files for stripe
+ * compaction.
*/
@InterfaceAudience.Private
-public abstract class StripeMultiFileWriter implements Compactor.CellSink {
- private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class);
+public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
- /** Factory that is used to produce single StoreFile.Writer-s */
- protected WriterFactory writerFactory;
- protected CellComparator comparator;
+ private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class);
+ protected final CellComparator comparator;
protected List<StoreFile.Writer> existingWriters;
protected List<byte[]> boundaries;
- /** Source scanner that is tracking KV count; may be null if source is not StoreScanner */
- protected StoreScanner sourceScanner;
/** Whether to write stripe metadata */
private boolean doWriteStripeMetadata = true;
- public interface WriterFactory {
- public StoreFile.Writer createWriter() throws IOException;
- }
-
- /**
- * Initializes multi-writer before usage.
- * @param sourceScanner Optional store scanner to obtain the information about read progress.
- * @param factory Factory used to produce individual file writers.
- * @param comparator Comparator used to compare rows.
- */
- public void init(StoreScanner sourceScanner, WriterFactory factory, CellComparator comparator)
- throws IOException {
- this.writerFactory = factory;
- this.sourceScanner = sourceScanner;
+ public StripeMultiFileWriter(CellComparator comparator) {
this.comparator = comparator;
}
@@ -73,41 +57,35 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
this.doWriteStripeMetadata = false;
}
- public List<Path> commitWriters(long maxSeqId, boolean isMajor) throws IOException {
+ @Override
+ protected Collection<Writer> writers() {
+ return existingWriters;
+ }
+
+ protected abstract void preCommitWritersInternal() throws IOException;
+
+ @Override
+ protected final void preCommitWriters() throws IOException {
+ // do some sanity check here.
assert this.existingWriters != null;
- commitWritersInternal();
+ preCommitWritersInternal();
assert this.boundaries.size() == (this.existingWriters.size() + 1);
- LOG.debug((this.doWriteStripeMetadata ? "W" : "Not w")
- + "riting out metadata for " + this.existingWriters.size() + " writers");
- List<Path> paths = new ArrayList<Path>();
- for (int i = 0; i < this.existingWriters.size(); ++i) {
- StoreFile.Writer writer = this.existingWriters.get(i);
- if (writer == null) continue; // writer was skipped due to 0 KVs
- if (doWriteStripeMetadata) {
- writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i));
- writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1));
- }
- writer.appendMetadata(maxSeqId, isMajor);
- paths.add(writer.getPath());
- writer.close();
- }
- this.existingWriters = null;
- return paths;
}
- public List<Path> abortWriters() {
- assert this.existingWriters != null;
- List<Path> paths = new ArrayList<Path>();
- for (StoreFile.Writer writer : this.existingWriters) {
- try {
- paths.add(writer.getPath());
- writer.close();
- } catch (Exception ex) {
- LOG.error("Failed to close the writer after an unfinished compaction.", ex);
+ @Override
+ protected void preCloseWriter(Writer writer) throws IOException {
+ if (doWriteStripeMetadata) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Write stripe metadata for " + writer.getPath().toString());
+ }
+ int index = existingWriters.indexOf(writer);
+ writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, boundaries.get(index));
+ writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, boundaries.get(index + 1));
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip writing stripe metadata for " + writer.getPath().toString());
}
}
- this.existingWriters = null;
- return paths;
}
/**
@@ -115,13 +93,12 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
* @param left The left boundary of the writer.
* @param cell The cell whose row has to be checked.
*/
- protected void sanityCheckLeft(
- byte[] left, Cell cell) throws IOException {
- if (!Arrays.equals(StripeStoreFileManager.OPEN_KEY, left) &&
- comparator.compareRows(cell, left, 0, left.length) < 0) {
- String error = "The first row is lower than the left boundary of [" + Bytes.toString(left)
- + "]: [" + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
- + "]";
+ protected void sanityCheckLeft(byte[] left, Cell cell) throws IOException {
+ if (!Arrays.equals(StripeStoreFileManager.OPEN_KEY, left)
+ && comparator.compareRows(cell, left, 0, left.length) < 0) {
+ String error =
+ "The first row is lower than the left boundary of [" + Bytes.toString(left) + "]: ["
+ + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) + "]";
LOG.error(error);
throw new IOException(error);
}
@@ -131,28 +108,22 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
* Subclasses can call this method to make sure the last KV is within multi-writer range.
* @param right The right boundary of the writer.
*/
- protected void sanityCheckRight(
- byte[] right, Cell cell) throws IOException {
- if (!Arrays.equals(StripeStoreFileManager.OPEN_KEY, right) &&
- comparator.compareRows(cell, right, 0, right.length) >= 0) {
- String error = "The last row is higher or equal than the right boundary of ["
- + Bytes.toString(right) + "]: ["
- + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) + "]";
+ protected void sanityCheckRight(byte[] right, Cell cell) throws IOException {
+ if (!Arrays.equals(StripeStoreFileManager.OPEN_KEY, right)
+ && comparator.compareRows(cell, right, 0, right.length) >= 0) {
+ String error =
+ "The last row is higher or equal than the right boundary of [" + Bytes.toString(right)
+ + "]: ["
+ + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) + "]";
LOG.error(error);
throw new IOException(error);
}
}
/**
- * Subclasses override this method to be called at the end of a successful sequence of
- * append; all appends are processed before this method is called.
- */
- protected abstract void commitWritersInternal() throws IOException;
-
- /**
- * MultiWriter that separates the cells based on fixed row-key boundaries.
- * All the KVs between each pair of neighboring boundaries from the list supplied to ctor
- * will end up in one file, and separate from all other such pairs.
+ * MultiWriter that separates the cells based on fixed row-key boundaries. All the KVs between
+ * each pair of neighboring boundaries from the list supplied to ctor will end up in one file, and
+ * separate from all other such pairs.
*/
public static class BoundaryMultiWriter extends StripeMultiFileWriter {
private StoreFile.Writer currentWriter;
@@ -165,31 +136,28 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
/**
* @param targetBoundaries The boundaries on which writers/files are separated.
- * @param majorRangeFrom Major range is the range for which at least one file should be
- * written (because all files are included in compaction).
- * majorRangeFrom is the left boundary.
+ * @param majorRangeFrom Major range is the range for which at least one file should be written
+ * (because all files are included in compaction). majorRangeFrom is the left boundary.
* @param majorRangeTo The right boundary of majorRange (see majorRangeFrom).
*/
- public BoundaryMultiWriter(List<byte[]> targetBoundaries,
+ public BoundaryMultiWriter(CellComparator comparator, List<byte[]> targetBoundaries,
byte[] majorRangeFrom, byte[] majorRangeTo) throws IOException {
- super();
+ super(comparator);
this.boundaries = targetBoundaries;
this.existingWriters = new ArrayList<StoreFile.Writer>(this.boundaries.size() - 1);
// "major" range (range for which all files are included) boundaries, if any,
// must match some target boundaries, let's find them.
- assert (majorRangeFrom == null) == (majorRangeTo == null);
+ assert (majorRangeFrom == null) == (majorRangeTo == null);
if (majorRangeFrom != null) {
- majorRangeFromIndex = Arrays.equals(majorRangeFrom, StripeStoreFileManager.OPEN_KEY)
- ? 0
- : Collections.binarySearch(boundaries, majorRangeFrom,
- Bytes.BYTES_COMPARATOR);
- majorRangeToIndex = Arrays.equals(majorRangeTo, StripeStoreFileManager.OPEN_KEY)
- ? boundaries.size()
- : Collections.binarySearch(boundaries, majorRangeTo,
- Bytes.BYTES_COMPARATOR);
+ majorRangeFromIndex =
+ Arrays.equals(majorRangeFrom, StripeStoreFileManager.OPEN_KEY) ? 0 : Collections
+ .binarySearch(boundaries, majorRangeFrom, Bytes.BYTES_COMPARATOR);
+ majorRangeToIndex =
+ Arrays.equals(majorRangeTo, StripeStoreFileManager.OPEN_KEY) ? boundaries.size()
+ : Collections.binarySearch(boundaries, majorRangeTo, Bytes.BYTES_COMPARATOR);
if (this.majorRangeFromIndex < 0 || this.majorRangeToIndex < 0) {
- throw new IOException("Major range does not match writer boundaries: [" +
- Bytes.toString(majorRangeFrom) + "] [" + Bytes.toString(majorRangeTo) + "]; from "
+ throw new IOException("Major range does not match writer boundaries: ["
+ + Bytes.toString(majorRangeFrom) + "] [" + Bytes.toString(majorRangeTo) + "]; from "
+ majorRangeFromIndex + " to " + majorRangeToIndex);
}
}
@@ -199,8 +167,7 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
public void append(Cell cell) throws IOException {
if (currentWriter == null && existingWriters.isEmpty()) {
// First append ever, do a sanity check.
- sanityCheckLeft(this.boundaries.get(0),
- cell);
+ sanityCheckLeft(this.boundaries.get(0), cell);
}
prepareWriterFor(cell);
currentWriter.append(cell);
@@ -209,19 +176,18 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
}
private boolean isCellAfterCurrentWriter(Cell cell) {
- return !Arrays.equals(currentWriterEndKey, StripeStoreFileManager.OPEN_KEY) &&
- (comparator.compareRows(cell, currentWriterEndKey, 0, currentWriterEndKey.length) >= 0);
+ return !Arrays.equals(currentWriterEndKey, StripeStoreFileManager.OPEN_KEY)
+ && (comparator.compareRows(cell, currentWriterEndKey, 0, currentWriterEndKey.length) >= 0);
}
@Override
- protected void commitWritersInternal() throws IOException {
+ protected void preCommitWritersInternal() throws IOException {
stopUsingCurrentWriter();
while (existingWriters.size() < boundaries.size() - 1) {
createEmptyWriter();
}
if (lastCell != null) {
- sanityCheckRight(boundaries.get(boundaries.size() - 1),
- lastCell);
+ sanityCheckRight(boundaries.get(boundaries.size() - 1), lastCell);
}
}
@@ -241,14 +207,13 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
}
/**
- * Called if there are no cells for some stripe.
- * We need to have something in the writer list for this stripe, so that writer-boundary
- * list indices correspond to each other. We can insert null in the writer list for that
- * purpose, except in the following cases where we actually need a file:
- * 1) If we are in range for which we are compacting all the files, we need to create an
- * empty file to preserve stripe metadata.
- * 2) If we have not produced any file at all for this compactions, and this is the
- * last chance (the last stripe), we need to preserve last seqNum (see also HBASE-6059).
+ * Called if there are no cells for some stripe. We need to have something in the writer list
+ * for this stripe, so that writer-boundary list indices correspond to each other. We can insert
+ * null in the writer list for that purpose, except in the following cases where we actually
+ * need a file: 1) If we are in range for which we are compacting all the files, we need to
+ * create an empty file to preserve stripe metadata. 2) If we have not produced any file at all
+ * for this compactions, and this is the last chance (the last stripe), we need to preserve last
+ * seqNum (see also HBASE-6059).
*/
private void createEmptyWriter() throws IOException {
int index = existingWriters.size();
@@ -258,12 +223,13 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
boolean needEmptyFile = isInMajorRange || isLastWriter;
existingWriters.add(needEmptyFile ? writerFactory.createWriter() : null);
hasAnyWriter |= needEmptyFile;
- currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
- ? null : boundaries.get(existingWriters.size() + 1);
+ currentWriterEndKey =
+ (existingWriters.size() + 1 == boundaries.size()) ? null : boundaries.get(existingWriters
+ .size() + 1);
}
private void checkCanCreateWriter() throws IOException {
- int maxWriterCount = boundaries.size() - 1;
+ int maxWriterCount = boundaries.size() - 1;
assert existingWriters.size() <= maxWriterCount;
if (existingWriters.size() >= maxWriterCount) {
throw new IOException("Cannot create any more writers (created " + existingWriters.size()
@@ -280,16 +246,16 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
cellsInCurrentWriter = 0;
}
currentWriter = null;
- currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
- ? null : boundaries.get(existingWriters.size() + 1);
+ currentWriterEndKey =
+ (existingWriters.size() + 1 == boundaries.size()) ? null : boundaries.get(existingWriters
+ .size() + 1);
}
}
/**
- * MultiWriter that separates the cells based on target cell number per file and file count.
- * New file is started every time the target number of KVs is reached, unless the fixed
- * count of writers has already been created (in that case all the remaining KVs go into
- * the last writer).
+ * MultiWriter that separates the cells based on target cell number per file and file count. New
+ * file is started every time the target number of KVs is reached, unless the fixed count of
+ * writers has already been created (in that case all the remaining KVs go into the last writer).
*/
public static class SizeMultiWriter extends StripeMultiFileWriter {
private int targetCount;
@@ -310,8 +276,9 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
* @param left The left boundary of the first writer.
* @param right The right boundary of the last writer.
*/
- public SizeMultiWriter(int targetCount, long targetKvs, byte[] left, byte[] right) {
- super();
+ public SizeMultiWriter(CellComparator comparator, int targetCount, long targetKvs, byte[] left,
+ byte[] right) {
+ super(comparator);
this.targetCount = targetCount;
this.targetCells = targetKvs;
this.left = left;
@@ -331,11 +298,11 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
sanityCheckLeft(left, cell);
doCreateWriter = true;
} else if (lastRowInCurrentWriter != null
- && !CellUtil.matchingRow(cell,
- lastRowInCurrentWriter, 0, lastRowInCurrentWriter.length)) {
+ && !CellUtil.matchingRow(cell, lastRowInCurrentWriter, 0,
+ lastRowInCurrentWriter.length)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping to use a writer after [" + Bytes.toString(lastRowInCurrentWriter)
- + "] row; wrote out " + cellsInCurrentWriter + " kvs");
+ + "] row; wrote out " + cellsInCurrentWriter + " kvs");
}
lastRowInCurrentWriter = null;
cellsInCurrentWriter = 0;
@@ -343,7 +310,8 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
doCreateWriter = true;
}
if (doCreateWriter) {
- byte[] boundary = existingWriters.isEmpty() ? left : CellUtil.cloneRow(cell); // make a copy
+ // make a copy
+ byte[] boundary = existingWriters.isEmpty() ? left : CellUtil.cloneRow(cell);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating new writer starting at [" + Bytes.toString(boundary) + "]");
}
@@ -357,34 +325,35 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
++cellsInCurrentWriter;
cellsSeen = cellsInCurrentWriter;
if (this.sourceScanner != null) {
- cellsSeen = Math.max(cellsSeen,
- this.sourceScanner.getEstimatedNumberOfKvsScanned() - cellsSeenInPrevious);
+ cellsSeen =
+ Math.max(cellsSeen, this.sourceScanner.getEstimatedNumberOfKvsScanned()
+ - cellsSeenInPrevious);
}
// If we are not already waiting for opportunity to close, start waiting if we can
// create any more writers and if the current one is too big.
- if (lastRowInCurrentWriter == null
- && existingWriters.size() < targetCount
+ if (lastRowInCurrentWriter == null && existingWriters.size() < targetCount
&& cellsSeen >= targetCells) {
lastRowInCurrentWriter = CellUtil.cloneRow(cell); // make a copy
if (LOG.isDebugEnabled()) {
- LOG.debug("Preparing to start a new writer after [" + Bytes.toString(
- lastRowInCurrentWriter) + "] row; observed " + cellsSeen + " kvs and wrote out "
- + cellsInCurrentWriter + " kvs");
+ LOG.debug("Preparing to start a new writer after ["
+ + Bytes.toString(lastRowInCurrentWriter) + "] row; observed " + cellsSeen
+ + " kvs and wrote out " + cellsInCurrentWriter + " kvs");
}
}
}
@Override
- protected void commitWritersInternal() throws IOException {
+ protected void preCommitWritersInternal() throws IOException {
if (LOG.isDebugEnabled()) {
- LOG.debug("Stopping with " + cellsInCurrentWriter + " kvs in last writer" +
- ((this.sourceScanner == null) ? "" : ("; observed estimated "
+ LOG.debug("Stopping with "
+ + cellsInCurrentWriter
+ + " kvs in last writer"
+ + ((this.sourceScanner == null) ? "" : ("; observed estimated "
+ this.sourceScanner.getEstimatedNumberOfKvsScanned() + " KVs total")));
}
if (lastCell != null) {
- sanityCheckRight(
- right, lastCell);
+ sanityCheckRight(right, lastCell);
}
// When expired stripes were going to be merged into one, and if no writer was created during
http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
index 9a06a88..34e8497 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
@@ -26,6 +26,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -69,7 +70,8 @@ public class StripeStoreFlusher extends StoreFlusher {
}
// Let policy select flush method.
- StripeFlushRequest req = this.policy.selectFlush(this.stripes, cellsCount);
+ StripeFlushRequest req = this.policy.selectFlush(store.getComparator(), this.stripes,
+ cellsCount);
boolean success = false;
StripeMultiFileWriter mw = null;
@@ -78,7 +80,7 @@ public class StripeStoreFlusher extends StoreFlusher {
StripeMultiFileWriter.WriterFactory factory = createWriterFactory(
snapshot.getTimeRangeTracker(), cellsCount);
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
- mw.init(storeScanner, factory, store.getComparator());
+ mw.init(storeScanner, factory);
synchronized (flushLock) {
performFlush(scanner, mw, smallestReadPoint, throughputController);
@@ -123,10 +125,17 @@ public class StripeStoreFlusher extends StoreFlusher {
/** Stripe flush request wrapper that writes a non-striped file. */
public static class StripeFlushRequest {
+
+ protected final CellComparator comparator;
+
+ public StripeFlushRequest(CellComparator comparator) {
+ this.comparator = comparator;
+ }
+
@VisibleForTesting
public StripeMultiFileWriter createWriter() throws IOException {
- StripeMultiFileWriter writer =
- new StripeMultiFileWriter.SizeMultiWriter(1, Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
+ StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(comparator, 1,
+ Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
writer.setNoStripeMetadata();
return writer;
}
@@ -137,13 +146,15 @@ public class StripeStoreFlusher extends StoreFlusher {
private final List<byte[]> targetBoundaries;
/** @param targetBoundaries New files should be written with these boundaries. */
- public BoundaryStripeFlushRequest(List<byte[]> targetBoundaries) {
+ public BoundaryStripeFlushRequest(CellComparator comparator, List<byte[]> targetBoundaries) {
+ super(comparator);
this.targetBoundaries = targetBoundaries;
}
@Override
public StripeMultiFileWriter createWriter() throws IOException {
- return new StripeMultiFileWriter.BoundaryMultiWriter(targetBoundaries, null, null);
+ return new StripeMultiFileWriter.BoundaryMultiWriter(comparator, targetBoundaries, null,
+ null);
}
}
@@ -157,15 +168,16 @@ public class StripeStoreFlusher extends StoreFlusher {
* @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
* total number of kvs, all the overflow data goes into the last stripe.
*/
- public SizeStripeFlushRequest(int targetCount, long targetKvs) {
+ public SizeStripeFlushRequest(CellComparator comparator, int targetCount, long targetKvs) {
+ super(comparator);
this.targetCount = targetCount;
this.targetKvs = targetKvs;
}
@Override
public StripeMultiFileWriter createWriter() throws IOException {
- return new StripeMultiFileWriter.SizeMultiWriter(
- this.targetCount, this.targetKvs, OPEN_KEY, OPEN_KEY);
+ return new StripeMultiFileWriter.SizeMultiWriter(comparator, this.targetCount, this.targetKvs,
+ OPEN_KEY, OPEN_KEY);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
new file mode 100644
index 0000000..29d8561
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter;
+import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter.WriterFactory;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.security.User;
+
+import com.google.common.io.Closeables;
+
+/**
+ * Base class for implementing a Compactor which will generate multiple output files after
+ * compaction.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWriter>
+ extends Compactor {
+
+ private static final Log LOG = LogFactory.getLog(AbstractMultiOutputCompactor.class);
+
+ public AbstractMultiOutputCompactor(Configuration conf, Store store) {
+ super(conf, store);
+ }
+
+ protected interface InternalScannerFactory {
+
+ ScanType getScanType(CompactionRequest request);
+
+ InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
+ FileDetails fd, long smallestReadPoint) throws IOException;
+ }
+
+ protected List<Path> compact(T writer, final CompactionRequest request,
+ InternalScannerFactory scannerFactory, ThroughputController throughputController, User user)
+ throws IOException {
+ final FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
+ this.progress = new CompactionProgress(fd.maxKeyCount);
+
+ // Find the smallest read point across all the Scanners.
+ long smallestReadPoint = getSmallestReadPoint();
+
+ List<StoreFileScanner> scanners;
+ Collection<StoreFile> readersToClose;
+ if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
+ // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
+ // HFiles, and their readers
+ readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
+ for (StoreFile f : request.getFiles()) {
+ readersToClose.add(f.cloneForReader());
+ }
+ scanners = createFileScanners(readersToClose, smallestReadPoint,
+ store.throttleCompaction(request.getSize()));
+ } else {
+ readersToClose = Collections.emptyList();
+ scanners = createFileScanners(request.getFiles(), smallestReadPoint,
+ store.throttleCompaction(request.getSize()));
+ }
+ InternalScanner scanner = null;
+ boolean finished = false;
+ try {
+ /* Include deletes, unless we are doing a major compaction */
+ ScanType scanType = scannerFactory.getScanType(request);
+ scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
+ if (scanner == null) {
+ scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint);
+ }
+ scanner = postCreateCoprocScanner(request, scanType, scanner, user);
+ if (scanner == null) {
+ // NULL scanner returned from coprocessor hooks means skip normal processing.
+ return new ArrayList<Path>();
+ }
+ boolean cleanSeqId = false;
+ if (fd.minSeqIdToKeep > 0) {
+ smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
+ cleanSeqId = true;
+ }
+ // Create the writer factory for compactions.
+ final boolean needMvcc = fd.maxMVCCReadpoint >= 0;
+ WriterFactory writerFactory = new WriterFactory() {
+ @Override
+ public Writer createWriter() throws IOException {
+ return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, needMvcc,
+ fd.maxTagsLength > 0, store.throttleCompaction(request.getSize()));
+ }
+ };
+ // Prepare multi-writer, and perform the compaction using scanner and writer.
+ // It is ok here if storeScanner is null.
+ StoreScanner storeScanner
+ = (scanner instanceof StoreScanner) ? (StoreScanner) scanner : null;
+ writer.init(storeScanner, writerFactory);
+ finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
+ throughputController, request.isAllFiles());
+ if (!finished) {
+ throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
+ + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
+ }
+ } finally {
+ Closeables.close(scanner, true);
+ for (StoreFile f : readersToClose) {
+ try {
+ f.closeReader(true);
+ } catch (IOException e) {
+ LOG.warn("Exception closing " + f, e);
+ }
+ }
+ if (!finished) {
+ FileSystem fs = store.getFileSystem();
+ for (Path leftoverFile : writer.abortWriters()) {
+ try {
+ fs.delete(leftoverFile, false);
+ } catch (IOException e) {
+ LOG.error("Failed to delete the leftover file " + leftoverFile
+ + " after an unfinished compaction.",
+ e);
+ }
+ }
+ }
+ }
+ assert finished : "We should have exited the method on all error paths";
+ return commitMultiWriter(writer, fd, request);
+ }
+
+ protected abstract List<Path> commitMultiWriter(T writer, FileDetails fd,
+ CompactionRequest request) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 0e6ab05..9125684 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -64,14 +64,14 @@ public abstract class Compactor {
private static final Log LOG = LogFactory.getLog(Compactor.class);
private static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000;
protected CompactionProgress progress;
- protected Configuration conf;
- protected Store store;
+ protected final Configuration conf;
+ protected final Store store;
- protected int compactionKVMax;
- protected Compression.Algorithm compactionCompression;
+ protected final int compactionKVMax;
+ protected final Compression.Algorithm compactionCompression;
/** specify how many days to keep MVCC values during major compaction **/
- protected int keepSeqIdPeriod;
+ protected final int keepSeqIdPeriod;
//TODO: depending on Store is not good but, realistically, all compactors currently do.
Compactor(final Configuration conf, final Store store) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
new file mode 100644
index 0000000..413b29c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.security.User;
+
+/**
+ * This compactor will generate StoreFile for different time ranges.
+ */
+@InterfaceAudience.Private
+public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTieredMultiFileWriter> {
+
+ private static final Log LOG = LogFactory.getLog(DateTieredCompactor.class);
+
+ public DateTieredCompactor(Configuration conf, Store store) {
+ super(conf, store);
+ }
+
+ private boolean needEmptyFile(CompactionRequest request) {
+ // if we are going to compact the last N files, then we need to emit an empty file to retain the
+ // maxSeqId if we haven't written out anything.
+ return StoreFile.getMaxSequenceIdInList(request.getFiles()) == store.getMaxSequenceId();
+ }
+
+ public List<Path> compact(final CompactionRequest request, List<Long> lowerBoundaries,
+ ThroughputController throughputController, User user) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Executing compaction with " + lowerBoundaries.size()
+ + "windows, lower boundaries: " + lowerBoundaries);
+ }
+
+ DateTieredMultiFileWriter writer =
+ new DateTieredMultiFileWriter(lowerBoundaries, needEmptyFile(request));
+ return compact(writer, request, new InternalScannerFactory() {
+
+ @Override
+ public ScanType getScanType(CompactionRequest request) {
+ return request.isRetainDeleteMarkers() ? ScanType.COMPACT_RETAIN_DELETES
+ : ScanType.COMPACT_DROP_DELETES;
+ }
+
+ @Override
+ public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
+ FileDetails fd, long smallestReadPoint) throws IOException {
+ return DateTieredCompactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
+ fd.earliestPutTs);
+ }
+ }, throughputController, user);
+ }
+
+ @Override
+ protected List<Path> commitMultiWriter(DateTieredMultiFileWriter writer, FileDetails fd,
+ CompactionRequest request) throws IOException {
+ return writer.commitWriters(fd.maxSeqId, request.isAllFiles());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index e7e0cca..22a45b1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -65,10 +65,10 @@ public class DefaultCompactor extends Compactor {
Collection<StoreFile> readersToClose;
if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
// clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
- // HFileFiles, and their readers
+ // HFiles, and their readers
readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
for (StoreFile f : request.getFiles()) {
- readersToClose.add(new StoreFile(f));
+ readersToClose.add(f.cloneForReader());
}
scanners = createFileScanners(readersToClose, smallestReadPoint,
store.throttleCompaction(request.getSize()));
http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
index 2bb8fc8..e8a4340 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
@@ -27,9 +27,10 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
@@ -84,18 +85,20 @@ public class StripeCompactionPolicy extends CompactionPolicy {
request, OPEN_KEY, OPEN_KEY, targetKvsAndCount.getSecond(), targetKvsAndCount.getFirst());
}
- public StripeStoreFlusher.StripeFlushRequest selectFlush(
+ public StripeStoreFlusher.StripeFlushRequest selectFlush(CellComparator comparator,
StripeInformationProvider si, int kvCount) {
if (this.config.isUsingL0Flush()) {
- return new StripeStoreFlusher.StripeFlushRequest(); // L0 is used, return dumb request.
+ // L0 is used, return dumb request.
+ return new StripeStoreFlusher.StripeFlushRequest(comparator);
}
if (si.getStripeCount() == 0) {
// No stripes - start with the requisite count, derive KVs per stripe.
int initialCount = this.config.getInitialCount();
- return new StripeStoreFlusher.SizeStripeFlushRequest(initialCount, kvCount / initialCount);
+ return new StripeStoreFlusher.SizeStripeFlushRequest(comparator, initialCount,
+ kvCount / initialCount);
}
// There are stripes - do according to the boundaries.
- return new StripeStoreFlusher.BoundaryStripeFlushRequest(si.getStripeBoundaries());
+ return new StripeStoreFlusher.BoundaryStripeFlushRequest(comparator, si.getStripeBoundaries());
}
public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index fd0e2b2..1364ce0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -18,50 +18,65 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
-import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
-import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
/**
- * This is the placeholder for stripe compactor. The implementation,
- * as well as the proper javadoc, will be added in HBASE-7967.
+ * This is the placeholder for stripe compactor. The implementation, as well as the proper javadoc,
+ * will be added in HBASE-7967.
*/
@InterfaceAudience.Private
-public class StripeCompactor extends Compactor {
+public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFileWriter> {
private static final Log LOG = LogFactory.getLog(StripeCompactor.class);
+
public StripeCompactor(Configuration conf, Store store) {
super(conf, store);
}
- public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
- byte[] majorRangeFromRow, byte[] majorRangeToRow,
- ThroughputController throughputController) throws IOException {
- return compact(request, targetBoundaries, majorRangeFromRow, majorRangeToRow,
- throughputController, null);
+ private final class StripeInternalScannerFactory implements InternalScannerFactory {
+
+ private final byte[] majorRangeFromRow;
+
+ private final byte[] majorRangeToRow;
+
+ public StripeInternalScannerFactory(byte[] majorRangeFromRow, byte[] majorRangeToRow) {
+ this.majorRangeFromRow = majorRangeFromRow;
+ this.majorRangeToRow = majorRangeToRow;
+ }
+
+ @Override
+ public ScanType getScanType(CompactionRequest request) {
+ // If majorRangeFromRow and majorRangeToRow are not null, then we will not use the return
+ // value to create InternalScanner. See the createScanner method below. The return value is
+ // also used when calling coprocessor hooks.
+ return ScanType.COMPACT_RETAIN_DELETES;
+ }
+
+ @Override
+ public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
+ FileDetails fd, long smallestReadPoint) throws IOException {
+ return (majorRangeFromRow == null) ? StripeCompactor.this.createScanner(store, scanners,
+ scanType, smallestReadPoint, fd.earliestPutTs) : StripeCompactor.this.createScanner(store,
+ scanners, smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow);
+ }
}
public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
- byte[] majorRangeFromRow, byte[] majorRangeToRow,
- ThroughputController throughputController, User user) throws IOException {
+ byte[] majorRangeFromRow, byte[] majorRangeToRow, ThroughputController throughputController,
+ User user) throws IOException {
if (LOG.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:");
@@ -70,116 +85,32 @@ public class StripeCompactor extends Compactor {
}
LOG.debug(sb.toString());
}
- StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter(
- targetBoundaries, majorRangeFromRow, majorRangeToRow);
- return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow,
- throughputController, user);
- }
-
- public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
- byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
- ThroughputController throughputController) throws IOException {
- return compact(request, targetCount, targetSize, left, right, majorRangeFromRow,
- majorRangeToRow, throughputController, null);
+ StripeMultiFileWriter writer =
+ new StripeMultiFileWriter.BoundaryMultiWriter(store.getComparator(), targetBoundaries,
+ majorRangeFromRow, majorRangeToRow);
+ return compact(writer, request, new StripeInternalScannerFactory(majorRangeFromRow,
+ majorRangeToRow), throughputController, user);
}
public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) {
- LOG.debug("Executing compaction with " + targetSize
- + " target file size, no more than " + targetCount + " files, in ["
- + Bytes.toString(left) + "] [" + Bytes.toString(right) + "] range");
+ LOG.debug("Executing compaction with " + targetSize + " target file size, no more than "
+ + targetCount + " files, in [" + Bytes.toString(left) + "] [" + Bytes.toString(right)
+ + "] range");
}
- StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
- targetCount, targetSize, left, right);
- return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow,
- throughputController, user);
+ StripeMultiFileWriter writer =
+ new StripeMultiFileWriter.SizeMultiWriter(store.getComparator(), targetCount, targetSize,
+ left, right);
+ return compact(writer, request, new StripeInternalScannerFactory(majorRangeFromRow,
+ majorRangeToRow), throughputController, user);
}
- private List<Path> compactInternal(StripeMultiFileWriter mw, final CompactionRequest request,
- byte[] majorRangeFromRow, byte[] majorRangeToRow,
- ThroughputController throughputController, User user) throws IOException {
- final Collection<StoreFile> filesToCompact = request.getFiles();
- final FileDetails fd = getFileDetails(filesToCompact, request.isMajor());
- this.progress = new CompactionProgress(fd.maxKeyCount);
-
- long smallestReadPoint = getSmallestReadPoint();
- List<StoreFileScanner> scanners = createFileScanners(filesToCompact,
- smallestReadPoint, store.throttleCompaction(request.getSize()));
-
- boolean finished = false;
- InternalScanner scanner = null;
- boolean cleanSeqId = false;
- try {
- // Get scanner to use.
- ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES;
- scanner = preCreateCoprocScanner(request, coprocScanType, fd.earliestPutTs, scanners, user);
- if (scanner == null) {
- scanner = (majorRangeFromRow == null)
- ? createScanner(store, scanners,
- ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, fd.earliestPutTs)
- : createScanner(store, scanners,
- smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow);
- }
- scanner = postCreateCoprocScanner(request, coprocScanType, scanner, user);
- if (scanner == null) {
- // NULL scanner returned from coprocessor hooks means skip normal processing.
- return new ArrayList<Path>();
- }
-
- // Create the writer factory for compactions.
- if(fd.minSeqIdToKeep > 0) {
- smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
- cleanSeqId = true;
- }
-
- final boolean needMvcc = fd.maxMVCCReadpoint > 0;
-
- final Compression.Algorithm compression = store.getFamily().getCompactionCompressionType();
- StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() {
- @Override
- public Writer createWriter() throws IOException {
- return store.createWriterInTmp(
- fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0,
- store.throttleCompaction(request.getSize()));
- }
- };
-
- // Prepare multi-writer, and perform the compaction using scanner and writer.
- // It is ok here if storeScanner is null.
- StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
- mw.init(storeScanner, factory, store.getComparator());
- finished =
- performCompaction(fd, scanner, mw, smallestReadPoint, cleanSeqId, throughputController,
- request.isMajor());
- if (!finished) {
- throw new InterruptedIOException( "Aborting compaction of store " + store +
- " in region " + store.getRegionInfo().getRegionNameAsString() +
- " because it was interrupted.");
- }
- } finally {
- if (scanner != null) {
- try {
- scanner.close();
- } catch (Throwable t) {
- // Don't fail the compaction if this fails.
- LOG.error("Failed to close scanner after compaction.", t);
- }
- }
- if (!finished) {
- for (Path leftoverFile : mw.abortWriters()) {
- try {
- store.getFileSystem().delete(leftoverFile, false);
- } catch (Exception ex) {
- LOG.error("Failed to delete the leftover file after an unfinished compaction.", ex);
- }
- }
- }
- }
-
- assert finished : "We should have exited the method on all error paths";
- List<Path> newFiles = mw.commitWriters(fd.maxSeqId, request.isMajor());
+ @Override
+ protected List<Path> commitMultiWriter(StripeMultiFileWriter writer, FileDetails fd,
+ CompactionRequest request) throws IOException {
+ List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor());
assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
return newFiles;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/11d11d3f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
deleted file mode 100644
index cb586f3..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
-import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_END_KEY;
-import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_START_KEY;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.TreeMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
-import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-
-@Category({RegionServerTests.class, SmallTests.class})
-public class TestStripeCompactor {
- private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
- private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
-
- private static final byte[] KEY_B = Bytes.toBytes("bbb");
- private static final byte[] KEY_C = Bytes.toBytes("ccc");
- private static final byte[] KEY_D = Bytes.toBytes("ddd");
-
- private static final KeyValue KV_A = kvAfter(Bytes.toBytes("aaa"));
- private static final KeyValue KV_B = kvAfter(KEY_B);
- private static final KeyValue KV_C = kvAfter(KEY_C);
- private static final KeyValue KV_D = kvAfter(KEY_D);
-
- private static KeyValue kvAfter(byte[] key) {
- return new KeyValue(Arrays.copyOf(key, key.length + 1), 0L);
- }
-
- private static <T> T[] a(T... a) {
- return a;
- }
-
- private static KeyValue[] e() {
- return TestStripeCompactor.<KeyValue>a();
- }
-
- @Test
- public void testBoundaryCompactions() throws Exception {
- // General verification
- verifyBoundaryCompaction(a(KV_A, KV_A, KV_B, KV_B, KV_C, KV_D),
- a(OPEN_KEY, KEY_B, KEY_D, OPEN_KEY), a(a(KV_A, KV_A), a(KV_B, KV_B, KV_C), a(KV_D)));
- verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_C, KEY_D), a(a(KV_B), a(KV_C)));
- verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_D), new KeyValue[][] { a(KV_B, KV_C) });
- }
-
- @Test
- public void testBoundaryCompactionEmptyFiles() throws Exception {
- // No empty file if there're already files.
- verifyBoundaryCompaction(
- a(KV_B), a(KEY_B, KEY_C, KEY_D, OPEN_KEY), a(a(KV_B), null, null), null, null, false);
- verifyBoundaryCompaction(a(KV_A, KV_C),
- a(OPEN_KEY, KEY_B, KEY_C, KEY_D), a(a(KV_A), null, a(KV_C)), null, null, false);
- // But should be created if there are no file.
- verifyBoundaryCompaction(
- e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, null, e()), null, null, false);
- // In major range if there's major range.
- verifyBoundaryCompaction(
- e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, e(), null), KEY_B, KEY_C, false);
- verifyBoundaryCompaction(
- e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(e(), e(), null), OPEN_KEY, KEY_C, false);
- // Major range should have files regardless of KVs.
- verifyBoundaryCompaction(a(KV_A), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY),
- a(a(KV_A), e(), e(), null), KEY_B, KEY_D, false);
- verifyBoundaryCompaction(a(KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY),
- a(null, null, a(KV_C), e()), KEY_C, OPEN_KEY, false);
-
- }
-
- public static void verifyBoundaryCompaction(
- KeyValue[] input, byte[][] boundaries, KeyValue[][] output) throws Exception {
- verifyBoundaryCompaction(input, boundaries, output, null, null, true);
- }
-
- public static void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries,
- KeyValue[][] output, byte[] majorFrom, byte[] majorTo, boolean allFiles)
- throws Exception {
- StoreFileWritersCapture writers = new StoreFileWritersCapture();
- StripeCompactor sc = createCompactor(writers, input);
- List<Path> paths =
- sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo,
- NoLimitThroughputController.INSTANCE);
- writers.verifyKvs(output, allFiles, true);
- if (allFiles) {
- assertEquals(output.length, paths.size());
- writers.verifyBoundaries(boundaries);
- }
- }
-
- @Test
- public void testSizeCompactions() throws Exception {
- // General verification with different sizes.
- verifySizeCompaction(a(KV_A, KV_A, KV_B, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY,
- a(a(KV_A, KV_A), a(KV_B, KV_C), a(KV_D)));
- verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 4, 1, OPEN_KEY, OPEN_KEY,
- a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)));
- verifySizeCompaction(a(KV_B, KV_C), 2, 1, KEY_B, KEY_D, a(a(KV_B), a(KV_C)));
- // Verify row boundaries are preserved.
- verifySizeCompaction(a(KV_A, KV_A, KV_A, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY,
- a(a(KV_A, KV_A, KV_A), a(KV_C, KV_D)));
- verifySizeCompaction(a(KV_A, KV_B, KV_B, KV_C), 3, 1, OPEN_KEY, OPEN_KEY,
- a(a(KV_A), a(KV_B, KV_B), a(KV_C)));
- // Too much data, count limits the number of files.
- verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 2, 1, OPEN_KEY, OPEN_KEY,
- a(a(KV_A), a(KV_B, KV_C, KV_D)));
- verifySizeCompaction(a(KV_A, KV_B, KV_C), 1, Long.MAX_VALUE, OPEN_KEY, KEY_D,
- new KeyValue[][] { a(KV_A, KV_B, KV_C) });
- // Too little data/large count, no extra files.
- verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), Integer.MAX_VALUE, 2, OPEN_KEY, OPEN_KEY,
- a(a(KV_A, KV_B), a(KV_C, KV_D)));
- }
-
- public static void verifySizeCompaction(KeyValue[] input, int targetCount, long targetSize,
- byte[] left, byte[] right, KeyValue[][] output) throws Exception {
- StoreFileWritersCapture writers = new StoreFileWritersCapture();
- StripeCompactor sc = createCompactor(writers, input);
- List<Path> paths =
- sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null,
- NoLimitThroughputController.INSTANCE);
- assertEquals(output.length, paths.size());
- writers.verifyKvs(output, true, true);
- List<byte[]> boundaries = new ArrayList<byte[]>();
- boundaries.add(left);
- for (int i = 1; i < output.length; ++i) {
- boundaries.add(CellUtil.cloneRow(output[i][0]));
- }
- boundaries.add(right);
- writers.verifyBoundaries(boundaries.toArray(new byte[][] {}));
- }
-
- private static StripeCompactor createCompactor(
- StoreFileWritersCapture writers, KeyValue[] input) throws Exception {
- Configuration conf = HBaseConfiguration.create();
- final Scanner scanner = new Scanner(input);
-
- // Create store mock that is satisfactory for compactor.
- HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
- ScanInfo si = new ScanInfo(conf, col, Long.MAX_VALUE, 0, CellComparator.COMPARATOR);
- Store store = mock(Store.class);
- when(store.getFamily()).thenReturn(col);
- when(store.getScanInfo()).thenReturn(si);
- when(store.areWritesEnabled()).thenReturn(true);
- when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
- when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
- when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class),
- anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
- when(store.getComparator()).thenReturn(CellComparator.COMPARATOR);
-
- return new StripeCompactor(conf, store) {
- @Override
- protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
- long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
- byte[] dropDeletesToRow) throws IOException {
- return scanner;
- }
-
- @Override
- protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
- ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
- return scanner;
- }
- };
- }
-
- private static CompactionRequest createDummyRequest() throws Exception {
- // "Files" are totally unused, it's Scanner class below that gives compactor fake KVs.
- // But compaction depends on everything under the sun, so stub everything with dummies.
- StoreFile sf = mock(StoreFile.class);
- StoreFile.Reader r = mock(StoreFile.Reader.class);
- when(r.length()).thenReturn(1L);
- when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
- when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
- when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong()))
- .thenReturn(mock(StoreFileScanner.class));
- when(sf.getReader()).thenReturn(r);
- when(sf.createReader()).thenReturn(r);
- when(sf.createReader(anyBoolean())).thenReturn(r);
- return new CompactionRequest(Arrays.asList(sf));
- }
-
- private static class Scanner implements InternalScanner {
- private final ArrayList<KeyValue> kvs;
- public Scanner(KeyValue... kvs) {
- this.kvs = new ArrayList<KeyValue>(Arrays.asList(kvs));
- }
-
- @Override
- public boolean next(List<Cell> results) throws IOException {
- if (kvs.isEmpty()) return false;
- results.add(kvs.remove(0));
- return !kvs.isEmpty();
- }
-
- @Override
- public boolean next(List<Cell> result, ScannerContext scannerContext)
- throws IOException {
- return next(result);
- }
-
- @Override
- public void close() throws IOException {}
- }
-
- // StoreFile.Writer has private ctor and is unwieldy, so this has to be convoluted.
- public static class StoreFileWritersCapture implements
- Answer<StoreFile.Writer>, StripeMultiFileWriter.WriterFactory {
- public static class Writer {
- public ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
- public TreeMap<byte[], byte[]> data = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
- }
-
- private List<Writer> writers = new ArrayList<Writer>();
-
- @Override
- public StoreFile.Writer createWriter() throws IOException {
- final Writer realWriter = new Writer();
- writers.add(realWriter);
- StoreFile.Writer writer = mock(StoreFile.Writer.class);
- doAnswer(new Answer<Object>() {
- public Object answer(InvocationOnMock invocation) {
- return realWriter.kvs.add((KeyValue)invocation.getArguments()[0]);
- }}).when(writer).append(any(KeyValue.class));
- doAnswer(new Answer<Object>() {
- public Object answer(InvocationOnMock invocation) {
- Object[] args = invocation.getArguments();
- return realWriter.data.put((byte[])args[0], (byte[])args[1]);
- }}).when(writer).appendFileInfo(any(byte[].class), any(byte[].class));
- return writer;
- }
-
- @Override
- public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable {
- return createWriter();
- }
-
- public void verifyKvs(KeyValue[][] kvss, boolean allFiles, boolean requireMetadata) {
- if (allFiles) {
- assertEquals(kvss.length, writers.size());
- }
- int skippedWriters = 0;
- for (int i = 0; i < kvss.length; ++i) {
- KeyValue[] kvs = kvss[i];
- if (kvs != null) {
- Writer w = writers.get(i - skippedWriters);
- if (requireMetadata) {
- assertNotNull(w.data.get(STRIPE_START_KEY));
- assertNotNull(w.data.get(STRIPE_END_KEY));
- } else {
- assertNull(w.data.get(STRIPE_START_KEY));
- assertNull(w.data.get(STRIPE_END_KEY));
- }
- assertEquals(kvs.length, w.kvs.size());
- for (int j = 0; j < kvs.length; ++j) {
- assertEquals(kvs[j], w.kvs.get(j));
- }
- } else {
- assertFalse(allFiles);
- ++skippedWriters;
- }
- }
- }
-
- public void verifyBoundaries(byte[][] boundaries) {
- assertEquals(boundaries.length - 1, writers.size());
- for (int i = 0; i < writers.size(); ++i) {
- assertArrayEquals("i = " + i, boundaries[i], writers.get(i).data.get(STRIPE_START_KEY));
- assertArrayEquals("i = " + i, boundaries[i + 1], writers.get(i).data.get(STRIPE_END_KEY));
- }
- }
- }
-}