You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by es...@apache.org on 2017/11/01 07:29:35 UTC
[1/2] hbase git commit: HBASE-16417: In-memory MemStore Policy for
Flattening and Compactions
Repository: hbase
Updated Branches:
refs/heads/master d7cf88947 -> 17e7aff37
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 64ee8dc..4a414d2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.io.compress.Compression;
@@ -1177,7 +1178,8 @@ public class TestHStore {
}
@Test
- public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, InterruptedException {
+ public void testFlushBeforeCompletingScanWithFilterHint() throws IOException,
+ InterruptedException {
final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
final int expectedSize = 2;
testFlushBeforeCompletingScan(new MyListHook() {
@@ -1364,7 +1366,8 @@ public class TestHStore {
myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
long snapshotId = id++;
// push older data into snapshot -- phase (1/4)
- StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY);
+ StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker
+ .DUMMY);
storeFlushCtx.prepare();
// insert current data into active -- phase (2/4)
@@ -1464,7 +1467,7 @@ public class TestHStore {
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
// Set the lower threshold to invoke the "MERGE" policy
- conf.set(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
+ conf.set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build());
byte[] value = Bytes.toBytes("thisisavarylargevalue");
@@ -1551,8 +1554,8 @@ public class TestHStore {
private class MyStore extends HStore {
private final MyStoreHook hook;
- MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration confParam,
- MyStoreHook hook, boolean switchToPread) throws IOException {
+ MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration
+ confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
super(region, family, confParam);
this.hook = hook;
}
@@ -1669,7 +1672,8 @@ public class TestHStore {
private static class MyMemStoreCompactor extends MemStoreCompactor {
private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
- public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy compactionPolicy) {
+ public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy
+ compactionPolicy) throws IllegalArgumentIOException {
super(compactingMemStore, compactionPolicy);
}
@@ -1697,7 +1701,8 @@ public class TestHStore {
}
@Override
- protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) {
+ protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
+ throws IllegalArgumentIOException {
return new MyMemStoreCompactor(this, compactionPolicy);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
index e559b48..2c9a437 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
@@ -141,7 +141,7 @@ public class TestRecoveredEdits {
// Our 0000000000000016310 is 10MB. Most of the edits are for one region. Lets assume that if
// we flush at 1MB, that there are at least 3 flushed files that are there because of the
// replay of edits.
- if(policy == MemoryCompactionPolicy.EAGER) {
+ if(policy == MemoryCompactionPolicy.EAGER || policy == MemoryCompactionPolicy.ADAPTIVE) {
assertTrue("Files count=" + storeFiles.size(), storeFiles.size() >= 1);
} else {
assertTrue("Files count=" + storeFiles.size(), storeFiles.size() > 10);
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
index a012d09..6a64796 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java
@@ -775,7 +775,7 @@ public class TestWalAndCompactingMemStoreFlush {
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(MemoryCompactionPolicy.BASIC));
// length of pipeline that requires merge
- conf.setInt(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1);
+ conf.setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 1);
// Intialize the HRegion
HRegion region = initHRegion("testSelectiveFlushWithBasicAndMerge", conf);
[2/2] hbase git commit: HBASE-16417: In-memory MemStore Policy for
Flattening and Compactions
Posted by es...@apache.org.
HBASE-16417: In-memory MemStore Policy for Flattening and Compactions
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/17e7aff3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/17e7aff3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/17e7aff3
Branch: refs/heads/master
Commit: 17e7aff37e6b69cae0fb6d15ebeb2037d1ca6acc
Parents: d7cf889
Author: eshcar <es...@yahoo-inc.com>
Authored: Wed Nov 1 09:24:36 2017 +0200
Committer: eshcar <es...@yahoo-inc.com>
Committed: Wed Nov 1 09:26:00 2017 +0200
----------------------------------------------------------------------
.../hadoop/hbase/MemoryCompactionPolicy.java | 8 +-
.../java/org/apache/hadoop/hbase/CellUtil.java | 29 ++++
.../org/apache/hadoop/hbase/util/ClassSize.java | 3 +-
.../AdaptiveMemStoreCompactionStrategy.java | 112 ++++++++++++++
.../BasicMemStoreCompactionStrategy.java | 43 ++++++
.../regionserver/CellArrayImmutableSegment.java | 57 ++++++-
.../regionserver/CellChunkImmutableSegment.java | 58 +++++--
.../hadoop/hbase/regionserver/CellChunkMap.java | 1 -
.../hadoop/hbase/regionserver/CellSet.java | 20 ++-
.../hbase/regionserver/CompactingMemStore.java | 37 ++---
.../hbase/regionserver/CompactionPipeline.java | 12 +-
.../EagerMemStoreCompactionStrategy.java | 36 +++++
.../hadoop/hbase/regionserver/HStore.java | 24 ++-
.../hbase/regionserver/ImmutableSegment.java | 12 +-
.../MemStoreCompactionStrategy.java | 114 ++++++++++++++
.../hbase/regionserver/MemStoreCompactor.java | 153 +++++++------------
.../hbase/regionserver/SegmentFactory.java | 31 ++--
.../regionserver/VersionedSegmentsList.java | 27 ++++
.../org/apache/hadoop/hbase/TestIOFencing.java | 2 +-
.../regionserver/TestCompactingMemStore.java | 120 +++++++++++++--
.../TestCompactingToCellFlatMapMemStore.java | 23 +--
.../hadoop/hbase/regionserver/TestHStore.java | 19 ++-
.../hbase/regionserver/TestRecoveredEdits.java | 2 +-
.../TestWalAndCompactingMemStoreFlush.java | 2 +-
24 files changed, 728 insertions(+), 217 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-client/src/main/java/org/apache/hadoop/hbase/MemoryCompactionPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MemoryCompactionPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MemoryCompactionPolicy.java
index 654e7ab..099ea40 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MemoryCompactionPolicy.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MemoryCompactionPolicy.java
@@ -42,5 +42,11 @@ public enum MemoryCompactionPolicy {
* on-disk compaction does after the data is flushed to disk). This policy is most useful for
* applications with high data churn or small working sets.
*/
- EAGER
+ EAGER,
+ /**
+ * Adaptive compaction adapts to the workload. It applies either index compaction or data
+ * compaction based on the ratio of duplicate cells in the data.
+ */
+ ADAPTIVE
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index 20c217f..8a5bb2c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -1372,6 +1372,35 @@ public final class CellUtil {
return matchingColumn(left, right);
}
+ public static boolean matchingRowColumnBytes(final Cell left, final Cell right) {
+ int lrowlength = left.getRowLength();
+ int rrowlength = right.getRowLength();
+ int lfamlength = left.getFamilyLength();
+ int rfamlength = right.getFamilyLength();
+ int lqlength = left.getQualifierLength();
+ int rqlength = right.getQualifierLength();
+ // match length
+ if ((lrowlength + lfamlength + lqlength) !=
+ (rrowlength + rfamlength + rqlength)) {
+ return false;
+ }
+
+ // match row
+ if (!Bytes.equals(left.getRowArray(), left.getRowOffset(), lrowlength, right.getRowArray(),
+ right.getRowOffset(), rrowlength)) {
+ return false;
+ }
+ //match family
+ if (!Bytes.equals(left.getFamilyArray(), left.getFamilyOffset(), lfamlength,
+ right.getFamilyArray(), right.getFamilyOffset(), rfamlength)) {
+ return false;
+ }
+ //match qualifier
+ return Bytes.equals(left.getQualifierArray(), left.getQualifierOffset(),
+ lqlength, right.getQualifierArray(), right.getQualifierOffset(),
+ rqlength);
+ }
+
/**
* Compares the cell's qualifier with the given byte[]
* @param left the cell for which the qualifier has to be compared
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
index d9ea761..efcf8d0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
@@ -330,9 +330,10 @@ public class ClassSize {
TIMERANGE = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2 + Bytes.SIZEOF_BOOLEAN);
SYNC_TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * REFERENCE);
+
NON_SYNC_TIMERANGE_TRACKER = align(ClassSize.OBJECT + 2 * Bytes.SIZEOF_LONG);
- CELL_SET = align(OBJECT + REFERENCE);
+ CELL_SET = align(OBJECT + REFERENCE + Bytes.SIZEOF_INT);
STORE_SERVICES = align(OBJECT + REFERENCE + ATOMIC_LONG);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AdaptiveMemStoreCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AdaptiveMemStoreCompactionStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AdaptiveMemStoreCompactionStrategy.java
new file mode 100644
index 0000000..232ffe3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AdaptiveMemStoreCompactionStrategy.java
@@ -0,0 +1,112 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Adaptive is a heuristic that chooses whether to apply data compaction or not based on the
+ * level of redundancy in the data. Adaptive triggers redundancy elimination only for those
+ * stores where positive impact is expected.
+ *
+ * Adaptive uses two parameters to determine whether to perform redundancy elimination.
+ * The first parameter, u, estimates the ratio of unique keys in the memory store based on the
+ * fraction of unique keys encountered during the previous merge of segment indices.
+ * The second is the perceived probability (compactionProbability) that the store can benefit from
+ * redundancy elimination. Initially, compactionProbability=0.5; it then grows exponentially by
+ * 2% whenever a compaction is successful and decreased by 2% whenever a compaction did not meet
+ * the expectation. It is reset back to the default value (namely 0.5) upon disk flush.
+ *
+ * Adaptive triggers redundancy elimination with probability compactionProbability if the
+ * fraction of redundant keys 1-u exceeds a parameter threshold compactionThreshold.
+ */
+@InterfaceAudience.Private
+public class AdaptiveMemStoreCompactionStrategy extends MemStoreCompactionStrategy{
+
+ private static final String name = "ADAPTIVE";
+ public static final String ADAPTIVE_COMPACTION_THRESHOLD_KEY =
+ "hbase.hregion.compacting.memstore.adaptive.compaction.threshold";
+ private static final double ADAPTIVE_COMPACTION_THRESHOLD_DEFAULT = 0.5;
+ public static final String ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_KEY =
+ "hbase.hregion.compacting.memstore.adaptive.compaction.probability";
+ private static final double ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_DEFAULT = 0.5;
+ private static final double ADAPTIVE_PROBABILITY_FACTOR = 1.02;
+
+ private double compactionThreshold;
+ private double initialCompactionProbability;
+ private double compactionProbability;
+ private Random rand = new Random();
+ private double numCellsInVersionedList = 0;
+ private boolean compacted = false;
+
+ public AdaptiveMemStoreCompactionStrategy(Configuration conf, String cfName) {
+ super(conf, cfName);
+ compactionThreshold = conf.getDouble(ADAPTIVE_COMPACTION_THRESHOLD_KEY,
+ ADAPTIVE_COMPACTION_THRESHOLD_DEFAULT);
+ initialCompactionProbability = conf.getDouble(ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_KEY,
+ ADAPTIVE_INITIAL_COMPACTION_PROBABILITY_DEFAULT);
+ resetStats();
+ }
+
+ @Override public Action getAction(VersionedSegmentsList versionedList) {
+ if (versionedList.getEstimatedUniquesFrac() < 1.0 - compactionThreshold) {
+ double r = rand.nextDouble();
+ if(r < compactionProbability) {
+ numCellsInVersionedList = versionedList.getNumOfCells();
+ compacted = true;
+ return compact(versionedList, name+" (compaction probability="+compactionProbability+")");
+ }
+ }
+ compacted = false;
+ return simpleMergeOrFlatten(versionedList,
+ name+" (compaction probability="+compactionProbability+")");
+ }
+
+ @Override
+ public void updateStats(Segment replacement) {
+ if(compacted) {
+ if (replacement.getCellsCount() / numCellsInVersionedList < 1.0 - compactionThreshold) {
+ // compaction was a good decision - increase probability
+ compactionProbability *= ADAPTIVE_PROBABILITY_FACTOR;
+ if(compactionProbability > 1.0) {
+ compactionProbability = 1.0;
+ }
+ } else {
+ // compaction was NOT a good decision - decrease probability
+ compactionProbability /= ADAPTIVE_PROBABILITY_FACTOR;
+ }
+ }
+ }
+
+ @Override
+ public void resetStats() {
+ compactionProbability = initialCompactionProbability;
+ }
+ protected Action getMergingAction() {
+ return Action.MERGE_COUNT_UNIQUE_KEYS;
+ }
+
+ protected Action getFlattenAction() {
+ return Action.FLATTEN;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BasicMemStoreCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BasicMemStoreCompactionStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BasicMemStoreCompactionStrategy.java
new file mode 100644
index 0000000..d816fc1
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BasicMemStoreCompactionStrategy.java
@@ -0,0 +1,43 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Basic strategy chooses between two actions: flattening a segment or merging indices of all
+ * segments in the pipeline.
+ * If number of segments in pipeline exceed the limit defined in MemStoreCompactionStrategy then
+ * apply merge, otherwise flatten some segment.
+ */
+@InterfaceAudience.Private
+public class BasicMemStoreCompactionStrategy extends MemStoreCompactionStrategy{
+
+ private static final String name = "BASIC";
+
+ public BasicMemStoreCompactionStrategy(Configuration conf, String cfName) {
+ super(conf, cfName);
+ }
+
+ @Override
+ public Action getAction(VersionedSegmentsList versionedList) {
+ return simpleMergeOrFlatten(versionedList, name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
index 8cd5f2a..0e80b1d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellArrayImmutableSegment.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -42,7 +43,7 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
* The given iterator returns the Cells that "survived" the compaction.
*/
protected CellArrayImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
- MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactor.Action action) {
+ MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactionStrategy.Action action) {
super(null, comparator, memStoreLAB); // initiailize the CellSet with NULL
incSize(0, DEEP_OVERHEAD_CAM);
// build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
@@ -54,12 +55,14 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
* of CSLMImmutableSegment
* The given iterator returns the Cells that "survived" the compaction.
*/
- protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing memstoreSizing) {
+ protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing memstoreSizing,
+ MemStoreCompactionStrategy.Action action) {
super(segment); // initiailize the upper class
incSize(0, DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM);
int numOfCells = segment.getCellsCount();
// build the new CellSet based on CellChunkMap and update the CellSet of this Segment
- reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet());
+ reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(),
+ action);
// arrange the meta-data size, decrease all meta-data sizes related to SkipList;
// add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes)
long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
@@ -81,14 +84,18 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
/*------------------------------------------------------------------------*/
// Create CellSet based on CellArrayMap from compacting iterator
private void initializeCellSet(int numOfCells, MemStoreSegmentsIterator iterator,
- MemStoreCompactor.Action action) {
+ MemStoreCompactionStrategy.Action action) {
+ boolean merge = (action == MemStoreCompactionStrategy.Action.MERGE ||
+ action == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS);
Cell[] cells = new Cell[numOfCells]; // build the Cell Array
int i = 0;
+ int numUniqueKeys=0;
+ Cell prev = null;
while (iterator.hasNext()) {
Cell c = iterator.next();
// The scanner behind the iterator is doing all the elimination logic
- if (action == MemStoreCompactor.Action.MERGE) {
+ if (merge) {
// if this is merge we just move the Cell object without copying MSLAB
// the sizes still need to be updated in the new segment
cells[i] = c;
@@ -99,11 +106,27 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
// second parameter true, because in compaction/merge the addition of the cell to new segment
// is always successful
updateMetaInfo(c, true, null); // updates the size per cell
+ if(action == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS) {
+ //counting number of unique keys
+ if (prev != null) {
+ if (!CellUtil.matchingRowColumnBytes(prev, c)) {
+ numUniqueKeys++;
+ }
+ } else {
+ numUniqueKeys++;
+ }
+ }
+ prev = c;
i++;
}
+ if(action == MemStoreCompactionStrategy.Action.COMPACT) {
+ numUniqueKeys = numOfCells;
+ } else if(action != MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS) {
+ numUniqueKeys = CellSet.UNKNOWN_NUM_UNIQUES;
+ }
// build the immutable CellSet
CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, i, false);
- this.setCellSet(null, new CellSet(cam)); // update the CellSet of this Segment
+ this.setCellSet(null, new CellSet(cam, numUniqueKeys)); // update the CellSet of this Segment
}
/*------------------------------------------------------------------------*/
@@ -111,22 +134,40 @@ public class CellArrayImmutableSegment extends ImmutableSegment {
// (without compacting iterator)
// We do not consider cells bigger than chunks!
private void reinitializeCellSet(
- int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet) {
+ int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet,
+ MemStoreCompactionStrategy.Action action) {
Cell[] cells = new Cell[numOfCells]; // build the Cell Array
Cell curCell;
int idx = 0;
+ int numUniqueKeys=0;
+ Cell prev = null;
try {
while ((curCell = segmentScanner.next()) != null) {
cells[idx++] = curCell;
+ if(action == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) {
+ //counting number of unique keys
+ if (prev != null) {
+ if (!CellUtil.matchingRowColumn(prev, curCell)) {
+ numUniqueKeys++;
+ }
+ } else {
+ numUniqueKeys++;
+ }
+ }
+ prev = curCell;
}
} catch (IOException ie) {
throw new IllegalStateException(ie);
} finally {
segmentScanner.close();
}
+ if(action != MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) {
+ numUniqueKeys = CellSet.UNKNOWN_NUM_UNIQUES;
+ }
// build the immutable CellSet
CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, idx, false);
- this.setCellSet(oldCellSet, new CellSet(cam)); // update the CellSet of this Segment
+ // update the CellSet of this Segment
+ this.setCellSet(oldCellSet, new CellSet(cam, numUniqueKeys));
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
index 4ef0657..7db00a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.ByteBufferKeyValue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.yetus.audience.InterfaceAudience;
@@ -48,7 +49,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
* The given iterator returns the Cells that "survived" the compaction.
*/
protected CellChunkImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
- MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactor.Action action) {
+ MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactionStrategy.Action action) {
super(null, comparator, memStoreLAB); // initialize the CellSet with NULL
incSize(0, DEEP_OVERHEAD_CCM); // initiate the heapSize with the size of the segment metadata
// build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
@@ -61,12 +62,13 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
* The given iterator returns the Cells that "survived" the compaction.
*/
protected CellChunkImmutableSegment(CSLMImmutableSegment segment,
- MemStoreSizing memstoreSizing) {
+ MemStoreSizing memstoreSizing, MemStoreCompactionStrategy.Action action) {
super(segment); // initiailize the upper class
incSize(0,-CSLMImmutableSegment.DEEP_OVERHEAD_CSLM + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM);
int numOfCells = segment.getCellsCount();
// build the new CellSet based on CellChunkMap
- reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet());
+ reinitializeCellSet(numOfCells, segment.getScanner(Long.MAX_VALUE), segment.getCellSet(),
+ action);
// arrange the meta-data size, decrease all meta-data sizes related to SkipList;
// add sizes of CellChunkMap entry, decrease also Cell object sizes
// (reinitializeCellSet doesn't take the care for the sizes)
@@ -90,15 +92,17 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
/*------------------------------------------------------------------------*/
// Create CellSet based on CellChunkMap from compacting iterator
private void initializeCellSet(int numOfCells, MemStoreSegmentsIterator iterator,
- MemStoreCompactor.Action action) {
+ MemStoreCompactionStrategy.Action action) {
// calculate how many chunks we will need for index
int chunkSize = ChunkCreator.getInstance().getChunkSize();
int numOfCellsInChunk = CellChunkMap.NUM_OF_CELL_REPS_IN_CHUNK;
- int numberOfChunks = calculateNumberOfChunks(numOfCells,numOfCellsInChunk);
+ int numberOfChunks = calculateNumberOfChunks(numOfCells, numOfCellsInChunk);
int numOfCellsAfterCompaction = 0;
int currentChunkIdx = 0;
int offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER;
+ int numUniqueKeys=0;
+ Cell prev = null;
// all index Chunks are allocated from ChunkCreator
Chunk[] chunks = new Chunk[numberOfChunks];
for (int i=0; i < numberOfChunks; i++) {
@@ -112,7 +116,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
currentChunkIdx++; // continue to the next index chunk
offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER;
}
- if (action == MemStoreCompactor.Action.COMPACT) {
+ if (action == MemStoreCompactionStrategy.Action.COMPACT) {
c = maybeCloneWithAllocator(c); // for compaction copy cell to the new segment (MSLAB copy)
}
offsetInCurentChunk = // add the Cell reference to the index chunk
@@ -122,11 +126,27 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
// second parameter true, because in compaction/merge the addition of the cell to new segment
// is always successful
updateMetaInfo(c, true, null); // updates the size per cell
+ if(action == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS) {
+ //counting number of unique keys
+ if (prev != null) {
+ if (!CellUtil.matchingRowColumnBytes(prev, c)) {
+ numUniqueKeys++;
+ }
+ } else {
+ numUniqueKeys++;
+ }
+ }
+ prev = c;
+ }
+ if(action == MemStoreCompactionStrategy.Action.COMPACT) {
+ numUniqueKeys = numOfCells;
+ } else if(action != MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS) {
+ numUniqueKeys = CellSet.UNKNOWN_NUM_UNIQUES;
}
// build the immutable CellSet
CellChunkMap ccm =
new CellChunkMap(getComparator(), chunks, 0, numOfCellsAfterCompaction, false);
- this.setCellSet(null, new CellSet(ccm)); // update the CellSet of this Segment
+ this.setCellSet(null, new CellSet(ccm, numUniqueKeys)); // update the CellSet of this Segment
}
/*------------------------------------------------------------------------*/
@@ -135,12 +155,13 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
// This is a service for not-flat immutable segments
// Assumption: cells do not exceed chunk size!
private void reinitializeCellSet(
- int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet) {
+ int numOfCells, KeyValueScanner segmentScanner, CellSet oldCellSet,
+ MemStoreCompactionStrategy.Action action) {
Cell curCell;
// calculate how many chunks we will need for metadata
int chunkSize = ChunkCreator.getInstance().getChunkSize();
int numOfCellsInChunk = CellChunkMap.NUM_OF_CELL_REPS_IN_CHUNK;
- int numberOfChunks = calculateNumberOfChunks(numOfCells,numOfCellsInChunk);
+ int numberOfChunks = calculateNumberOfChunks(numOfCells, numOfCellsInChunk);
// all index Chunks are allocated from ChunkCreator
Chunk[] chunks = new Chunk[numberOfChunks];
for (int i=0; i < numberOfChunks; i++) {
@@ -150,6 +171,8 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
int currentChunkIdx = 0;
int offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER;
+ int numUniqueKeys=0;
+ Cell prev = null;
try {
while ((curCell = segmentScanner.next()) != null) {
assert (curCell instanceof ByteBufferKeyValue); // shouldn't get here anything but ByteBufferKeyValue
@@ -161,6 +184,20 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
offsetInCurentChunk =
createCellReference((ByteBufferKeyValue) curCell, chunks[currentChunkIdx].getData(),
offsetInCurentChunk);
+ if(action == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) {
+ //counting number of unique keys
+ if (prev != null) {
+ if (!CellUtil.matchingRowColumn(prev, curCell)) {
+ numUniqueKeys++;
+ }
+ } else {
+ numUniqueKeys++;
+ }
+ }
+ prev = curCell;
+ }
+ if(action != MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) {
+ numUniqueKeys = CellSet.UNKNOWN_NUM_UNIQUES;
}
} catch (IOException ie) {
throw new IllegalStateException(ie);
@@ -169,7 +206,8 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
}
CellChunkMap ccm = new CellChunkMap(getComparator(), chunks, 0, numOfCells, false);
- this.setCellSet(oldCellSet, new CellSet(ccm)); // update the CellSet of this Segment
+ // update the CellSet of this Segment
+ this.setCellSet(oldCellSet, new CellSet(ccm, numUniqueKeys));
}
/*------------------------------------------------------------------------*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java
index 3b11baa..3fbd46d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver;
import java.nio.ByteBuffer;
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.Cell;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
index e16d961..56717ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
@@ -41,6 +41,8 @@ import org.apache.yetus.audience.InterfaceAudience;
*/
@InterfaceAudience.Private
public class CellSet implements NavigableSet<Cell> {
+
+ public static final int UNKNOWN_NUM_UNIQUES = -1;
// Implemented on top of a {@link java.util.concurrent.ConcurrentSkipListMap}
// Differ from CSLS in one respect, where CSLS does "Adds the specified element to this set if it
// is not already present.", this implementation "Adds the specified element to this set EVEN
@@ -48,12 +50,22 @@ public class CellSet implements NavigableSet<Cell> {
// Otherwise, has same attributes as ConcurrentSkipListSet
private final NavigableMap<Cell, Cell> delegatee; ///
+ private final int numUniqueKeys;
+
CellSet(final CellComparator c) {
this.delegatee = new ConcurrentSkipListMap<>(c);
+ this.numUniqueKeys = UNKNOWN_NUM_UNIQUES;
+ }
+
+ CellSet(final NavigableMap<Cell, Cell> m, int numUniqueKeys) {
+ this.delegatee = m;
+ this.numUniqueKeys = numUniqueKeys;
}
+ @VisibleForTesting
CellSet(final NavigableMap<Cell, Cell> m) {
this.delegatee = m;
+ this.numUniqueKeys = UNKNOWN_NUM_UNIQUES;
}
@VisibleForTesting
@@ -83,7 +95,7 @@ public class CellSet implements NavigableSet<Cell> {
public NavigableSet<Cell> headSet(final Cell toElement,
boolean inclusive) {
- return new CellSet(this.delegatee.headMap(toElement, inclusive));
+ return new CellSet(this.delegatee.headMap(toElement, inclusive), UNKNOWN_NUM_UNIQUES);
}
public Cell higher(Cell e) {
@@ -120,7 +132,7 @@ public class CellSet implements NavigableSet<Cell> {
}
public NavigableSet<Cell> tailSet(Cell fromElement, boolean inclusive) {
- return new CellSet(this.delegatee.tailMap(fromElement, inclusive));
+ return new CellSet(this.delegatee.tailMap(fromElement, inclusive), UNKNOWN_NUM_UNIQUES);
}
public Comparator<? super Cell> comparator() {
@@ -187,4 +199,8 @@ public class CellSet implements NavigableSet<Cell> {
public <T> T[] toArray(T[] a) {
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
}
+
+ public int getNumUniqueKeys() {
+ return numUniqueKeys;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index e6f9451..d250252 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
@@ -66,13 +67,13 @@ public class CompactingMemStore extends AbstractMemStore {
// Default fraction of in-memory-flush size w.r.t. flush-to-disk size
public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
"hbase.memstore.inmemoryflush.threshold.factor";
- private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.25;
+ private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.02;
private static final Log LOG = LogFactory.getLog(CompactingMemStore.class);
private HStore store;
private RegionServicesForStores regionServices;
private CompactionPipeline pipeline;
- private MemStoreCompactor compactor;
+ protected MemStoreCompactor compactor;
private long inmemoryFlushSize; // the threshold on active size for in-memory flush
private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false);
@@ -81,7 +82,7 @@ public class CompactingMemStore extends AbstractMemStore {
private boolean inWalReplay = false;
@VisibleForTesting
- private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
+ protected final AtomicBoolean allowCompaction = new AtomicBoolean(true);
private boolean compositeSnapshot = true;
/**
@@ -119,7 +120,8 @@ public class CompactingMemStore extends AbstractMemStore {
}
@VisibleForTesting
- protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) {
+ protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
+ throws IllegalArgumentIOException {
return new MemStoreCompactor(this, compactionPolicy);
}
@@ -205,6 +207,7 @@ public class CompactingMemStore extends AbstractMemStore {
} else {
pushTailToSnapshot();
}
+ compactor.resetStats();
}
return new MemStoreSnapshot(snapshotId, this.snapshot);
}
@@ -298,10 +301,6 @@ public class CompactingMemStore extends AbstractMemStore {
this.compositeSnapshot = useCompositeSnapshot;
}
- public boolean isCompositeSnapshot() {
- return this.compositeSnapshot;
- }
-
public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
boolean merge) {
// last true stands for updating the region size
@@ -313,8 +312,8 @@ public class CompactingMemStore extends AbstractMemStore {
* with version taken earlier. This version must be passed as a parameter here.
* The flattening happens only if versions match.
*/
- public void flattenOneSegment(long requesterVersion) {
- pipeline.flattenOneSegment(requesterVersion, indexType);
+ public void flattenOneSegment(long requesterVersion, MemStoreCompactionStrategy.Action action) {
+ pipeline.flattenOneSegment(requesterVersion, indexType, action);
}
// setter is used only for testability
@@ -543,29 +542,11 @@ public class CompactingMemStore extends AbstractMemStore {
}
}
- //----------------------------------------------------------------------
- //methods for tests
- //----------------------------------------------------------------------
@VisibleForTesting
boolean isMemStoreFlushingInMemory() {
return inMemoryFlushInProgress.get();
}
- @VisibleForTesting
- void disableCompaction() {
- allowCompaction.set(false);
- }
-
- @VisibleForTesting
- void enableCompaction() {
- allowCompaction.set(true);
- }
-
- @VisibleForTesting
- void initiateType(MemoryCompactionPolicy compactionType) {
- compactor.initiateAction(compactionType);
- }
-
/**
* @param cell Find the row that comes after this one. If null, we return the
* first.
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index e44cb45..2f479e9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -126,14 +126,10 @@ public class CompactionPipeline {
}
suffix = versionedList.getStoreSegments();
if (LOG.isDebugEnabled()) {
- int count = 0;
- if(segment != null) {
- count = segment.getCellsCount();
- }
LOG.debug("Swapping pipeline suffix. "
+ "Just before the swap the number of segments in pipeline is:"
+ versionedList.getStoreSegments().size()
- + ", and the number of cells in new segment is:" + count);
+ + ", and the new segment is:" + segment);
}
swapSuffix(suffix, segment, closeSuffix);
readOnlyCopy = new LinkedList<>(pipeline);
@@ -183,7 +179,9 @@ public class CompactionPipeline {
*
* @return true iff a segment was successfully flattened
*/
- public boolean flattenOneSegment(long requesterVersion, CompactingMemStore.IndexType idxType) {
+ public boolean flattenOneSegment(long requesterVersion,
+ CompactingMemStore.IndexType idxType,
+ MemStoreCompactionStrategy.Action action) {
if(requesterVersion != version) {
LOG.warn("Segment flattening failed, because versions do not match. Requester version: "
@@ -201,7 +199,7 @@ public class CompactionPipeline {
if ( s.canBeFlattened() ) {
MemStoreSizing newMemstoreAccounting = new MemStoreSizing(); // the size to be updated
ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
- (CSLMImmutableSegment)s,idxType,newMemstoreAccounting);
+ (CSLMImmutableSegment)s,idxType,newMemstoreAccounting,action);
replaceAtIndex(i,newS);
if(region != null) {
// update the global memstore size counter
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/EagerMemStoreCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/EagerMemStoreCompactionStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/EagerMemStoreCompactionStrategy.java
new file mode 100644
index 0000000..90d0756
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/EagerMemStoreCompactionStrategy.java
@@ -0,0 +1,36 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class EagerMemStoreCompactionStrategy extends MemStoreCompactionStrategy{
+
+ private static final String name = "EAGER";
+ public EagerMemStoreCompactionStrategy(Configuration conf, String cfName) {
+ super(conf, cfName);
+ }
+
+ @Override
+ public Action getAction(VersionedSegmentsList versionedList) {
+ return compact(versionedList, name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 4b83b23..db900a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -278,19 +278,17 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
}
String className;
switch (inMemoryCompaction) {
- case BASIC:
- case EAGER:
- Class<? extends CompactingMemStore> clz =
- conf.getClass(MEMSTORE_CLASS_NAME, CompactingMemStore.class, CompactingMemStore.class);
- className = clz.getName();
- this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this,
- this.getHRegion().getRegionServicesForStores(), inMemoryCompaction });
- break;
- case NONE:
- default:
- className = DefaultMemStore.class.getName();
- this.memstore = ReflectionUtils.newInstance(DefaultMemStore.class,
- new Object[] { conf, this.comparator });
+ case NONE:
+ className = DefaultMemStore.class.getName();
+ this.memstore = ReflectionUtils.newInstance(DefaultMemStore.class,
+ new Object[] { conf, this.comparator });
+ break;
+ default:
+ Class<? extends CompactingMemStore> clz = conf.getClass(MEMSTORE_CLASS_NAME,
+ CompactingMemStore.class, CompactingMemStore.class);
+ className = clz.getName();
+ this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this,
+ this.getHRegion().getRegionServicesForStores(), inMemoryCompaction });
}
LOG.info("Memstore class name is " + className);
this.offPeakHours = OffPeakHours.getInstance(conf);
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
index c1244ff..02a05c8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -40,6 +40,10 @@ public abstract class ImmutableSegment extends Segment {
// each sub-type of immutable segment knows whether it is flat or not
protected abstract boolean canBeFlattened();
+ public int getNumUniqueKeys() {
+ return getCellSet().getNumUniqueKeys();
+ }
+
///////////////////// CONSTRUCTORS /////////////////////
/**------------------------------------------------------------------------
* Empty C-tor to be used only for CompositeImmutableSegment
@@ -64,7 +68,6 @@ public abstract class ImmutableSegment extends Segment {
super(segment);
}
-
///////////////////// PUBLIC METHODS /////////////////////
public int getNumOfSegments() {
@@ -75,4 +78,11 @@ public abstract class ImmutableSegment extends Segment {
List<Segment> res = new ArrayList<>(Arrays.asList(this));
return res;
}
+
+ @Override
+ public String toString() {
+ String res = super.toString();
+ res += "Num uniques "+getNumUniqueKeys()+"; ";
+ return res;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactionStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactionStrategy.java
new file mode 100644
index 0000000..b262328
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactionStrategy.java
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * MemStoreCompactionStrategy is the root of a class hierarchy which defines the strategy for
+ * choosing the next action to apply in an (in-memory) memstore compaction.
+ * Possible action are:
+ * - No-op - do nothing
+ * - Flatten - to change the segment's index from CSLM to a flat representation
+ * - Merge - to merge the indices of the segments in the pipeline
+ * - Compact - to merge the indices while removing data redundancies
+ *
+ * In addition while applying flat/merge actions it is possible to count the number of unique
+ * keys in the result segment.
+ */
+@InterfaceAudience.Private
+public abstract class MemStoreCompactionStrategy {
+
+ protected static final Log LOG = LogFactory.getLog(MemStoreCompactionStrategy.class);
+ // The upper bound for the number of segments we store in the pipeline prior to merging.
+ public static final String COMPACTING_MEMSTORE_THRESHOLD_KEY =
+ "hbase.hregion.compacting.pipeline.segments.limit";
+ public static final int COMPACTING_MEMSTORE_THRESHOLD_DEFAULT = 4;
+
+ /**
+ * Types of actions to be done on the pipeline upon MemStoreCompaction invocation.
+ * Note that every value covers the previous ones, i.e. if MERGE is the action it implies
+ * that the youngest segment is going to be flatten anyway.
+ */
+ public enum Action {
+ NOOP,
+ FLATTEN, // flatten a segment in the pipeline
+ FLATTEN_COUNT_UNIQUE_KEYS, // flatten a segment in the pipeline and count its unique keys
+ MERGE, // merge all the segments in the pipeline into one
+ MERGE_COUNT_UNIQUE_KEYS, // merge all pipeline segments into one and count its unique keys
+ COMPACT // compact the data of all pipeline segments
+ }
+
+ protected final String cfName;
+ // The limit on the number of the segments in the pipeline
+ protected final int pipelineThreshold;
+
+
+ public MemStoreCompactionStrategy(Configuration conf, String cfName) {
+ this.cfName = cfName;
+ if(conf == null) {
+ pipelineThreshold = COMPACTING_MEMSTORE_THRESHOLD_DEFAULT;
+ } else {
+ pipelineThreshold = // get the limit on the number of the segments in the pipeline
+ conf.getInt(COMPACTING_MEMSTORE_THRESHOLD_KEY, COMPACTING_MEMSTORE_THRESHOLD_DEFAULT);
+ }
+ }
+
+ // get next compaction action to apply on compaction pipeline
+ public abstract Action getAction(VersionedSegmentsList versionedList);
+ // update policy stats based on the segment that replaced previous versioned list (in
+ // compaction pipeline)
+ public void updateStats(Segment replacement) {}
+ // resets policy stats
+ public void resetStats() {}
+
+ protected Action simpleMergeOrFlatten(VersionedSegmentsList versionedList, String strategy) {
+ int numOfSegments = versionedList.getNumOfSegments();
+ if (numOfSegments > pipelineThreshold) {
+ // to avoid too many segments, merge now
+ LOG.debug(strategy+" memory compaction for store " + cfName
+ + " merging " + numOfSegments + " segments");
+ return getMergingAction();
+ }
+
+ // just flatten a segment
+ LOG.debug(strategy+" memory compaction for store " + cfName
+ + " flattening a segment in the pipeline");
+ return getFlattenAction();
+ }
+
+ protected Action getMergingAction() {
+ return Action.MERGE;
+ }
+
+ protected Action getFlattenAction() {
+ return Action.FLATTEN;
+ }
+
+ protected Action compact(VersionedSegmentsList versionedList, String strategyInfo) {
+ int numOfSegments = versionedList.getNumOfSegments();
+ LOG.debug(strategyInfo+" memory compaction for store " + cfName
+ + " compacting " + numOfSegments + " segments");
+ return Action.COMPACT;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
index 8af33b6..fea9f17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
@@ -18,9 +18,11 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
import org.apache.yetus.audience.InterfaceAudience;
@@ -44,26 +46,16 @@ import java.util.concurrent.atomic.AtomicBoolean;
@InterfaceAudience.Private
public class MemStoreCompactor {
- // The upper bound for the number of segments we store in the pipeline prior to merging.
- // This constant is subject to further experimentation.
- // The external setting of the compacting MemStore behaviour
- public static final String COMPACTING_MEMSTORE_THRESHOLD_KEY =
- "hbase.hregion.compacting.pipeline.segments.limit";
- // remaining with the same ("infinity") but configurable default for now
- public static final int COMPACTING_MEMSTORE_THRESHOLD_DEFAULT = 1;
-
public static final long DEEP_OVERHEAD = ClassSize
- .align(ClassSize.OBJECT
- + 4 * ClassSize.REFERENCE
- // compactingMemStore, versionedList, action, isInterrupted (the reference)
+ .align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE
+ // compactingMemStore, versionedList, isInterrupted, strategy (the reference)
// "action" is an enum and thus it is a class with static final constants,
// so counting only the size of the reference to it and not the size of the internals
- + 2 * Bytes.SIZEOF_INT // compactionKVMax, pipelineThreshold
+ + Bytes.SIZEOF_INT // compactionKVMax
+ ClassSize.ATOMIC_BOOLEAN // isInterrupted (the internals)
);
private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class);
- private final int pipelineThreshold; // the limit on the number of the segments in the pipeline
private CompactingMemStore compactingMemStore;
// a static version of the segment list from the pipeline
@@ -75,29 +67,15 @@ public class MemStoreCompactor {
// the limit to the size of the groups to be later provided to MemStoreSegmentsIterator
private final int compactionKVMax;
- /**
- * Types of actions to be done on the pipeline upon MemStoreCompaction invocation.
- * Note that every value covers the previous ones, i.e. if MERGE is the action it implies
- * that the youngest segment is going to be flatten anyway.
- */
- public enum Action {
- NOOP,
- FLATTEN, // flatten the youngest segment in the pipeline
- MERGE, // merge all the segments in the pipeline into one
- COMPACT // copy-compact the data of all the segments in the pipeline
- }
-
- private Action action = Action.FLATTEN;
+ private MemStoreCompactionStrategy strategy;
public MemStoreCompactor(CompactingMemStore compactingMemStore,
- MemoryCompactionPolicy compactionPolicy) {
+ MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException {
this.compactingMemStore = compactingMemStore;
this.compactionKVMax = compactingMemStore.getConfiguration()
.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
- initiateAction(compactionPolicy);
- pipelineThreshold = // get the limit on the number of the segments in the pipeline
- compactingMemStore.getConfiguration().getInt(COMPACTING_MEMSTORE_THRESHOLD_KEY,
- COMPACTING_MEMSTORE_THRESHOLD_DEFAULT);
+ initiateCompactionStrategy(compactionPolicy, compactingMemStore.getConfiguration(),
+ compactingMemStore.getFamilyName());
}
/**----------------------------------------------------------------------
@@ -132,11 +110,9 @@ public class MemStoreCompactor {
isInterrupted.compareAndSet(false, true);
}
- /**----------------------------------------------------------------------
- * The interface to check whether user requested the index-compaction
- */
- public boolean isIndexCompaction() {
- return (action == Action.MERGE);
+
+ public void resetStats() {
+ strategy.resetStats();
}
/**----------------------------------------------------------------------
@@ -149,40 +125,6 @@ public class MemStoreCompactor {
}
/**----------------------------------------------------------------------
- * Decide what to do with the new and old segments in the compaction pipeline.
- * Implements basic in-memory compaction policy.
- */
- private Action policy() {
-
- if (isInterrupted.get()) { // if the entire process is interrupted cancel flattening
- return Action.NOOP; // the compaction also doesn't start when interrupted
- }
-
- if (action == Action.COMPACT) { // compact according to the user request
- LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName()
- + " is going to be compacted to the " + compactingMemStore.getIndexType() + ". Number of"
- + " cells before compaction is " + versionedList.getNumOfCells());
- return Action.COMPACT;
- }
-
- // compaction shouldn't happen or doesn't worth it
- // limit the number of the segments in the pipeline
- int numOfSegments = versionedList.getNumOfSegments();
- if (numOfSegments > pipelineThreshold) {
- LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName()
- + " is going to be merged to the " + compactingMemStore.getIndexType()
- + ", as there are " + numOfSegments + " segments");
- return Action.MERGE; // to avoid too many segments, merge now
- }
-
- // if nothing of the above, then just flatten the newly joined segment
- LOG.debug("The youngest segment in the in-Memory Compaction Pipeline for store "
- + compactingMemStore.getFamilyName() + " is going to be flattened to the "
- + compactingMemStore.getIndexType());
- return Action.FLATTEN;
- }
-
- /**----------------------------------------------------------------------
* The worker thread performs the compaction asynchronously.
* The solo (per compactor) thread only reads the compaction pipeline.
* There is at most one thread per memstore instance.
@@ -190,29 +132,37 @@ public class MemStoreCompactor {
private void doCompaction() {
ImmutableSegment result = null;
boolean resultSwapped = false;
- Action nextStep = null;
- try {
- nextStep = policy();
+ if (isInterrupted.get()) { // if the entire process is interrupted cancel flattening
+ return; // the compaction also doesn't start when interrupted
+ }
- if (nextStep == Action.NOOP) {
+ MemStoreCompactionStrategy.Action nextStep = strategy.getAction(versionedList);
+ boolean merge =
+ (nextStep == MemStoreCompactionStrategy.Action.MERGE ||
+ nextStep == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS);
+ try {
+ if (nextStep == MemStoreCompactionStrategy.Action.NOOP) {
return;
}
- if (nextStep == Action.FLATTEN) {
- // Youngest Segment in the pipeline is with SkipList index, make it flat
- compactingMemStore.flattenOneSegment(versionedList.getVersion());
+ if (nextStep == MemStoreCompactionStrategy.Action.FLATTEN ||
+ nextStep == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) {
+ // some Segment in the pipeline is with SkipList index, make it flat
+ compactingMemStore.flattenOneSegment(versionedList.getVersion(), nextStep);
return;
}
// Create one segment representing all segments in the compaction pipeline,
// either by compaction or by merge
if (!isInterrupted.get()) {
- result = createSubstitution();
+ result = createSubstitution(nextStep);
}
// Substitute the pipeline with one segment
if (!isInterrupted.get()) {
if (resultSwapped = compactingMemStore.swapCompactedSegments(
- versionedList, result, (action==Action.MERGE))) {
+ versionedList, result, merge)) {
+ // update compaction strategy
+ strategy.updateStats(result);
// update the wal so it can be truncated and not get too long
compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater
}
@@ -226,10 +176,8 @@ public class MemStoreCompactor {
// we DON'T need to close the result segment (meaning its MSLAB)!
// Because closing the result segment means closing the chunks of all segments
// in the compaction pipeline, which still have ongoing scans.
- if (nextStep != Action.MERGE) {
- if ((result != null) && (!resultSwapped)) {
- result.close();
- }
+ if (!merge && (result != null) && !resultSwapped) {
+ result.close();
}
releaseResources();
}
@@ -240,7 +188,8 @@ public class MemStoreCompactor {
* Creation of the ImmutableSegment either by merge or copy-compact of the segments of the
* pipeline, based on the Compactor Iterator. The new ImmutableSegment is returned.
*/
- private ImmutableSegment createSubstitution() throws IOException {
+ private ImmutableSegment createSubstitution(MemStoreCompactionStrategy.Action action) throws
+ IOException {
ImmutableSegment result = null;
MemStoreSegmentsIterator iterator = null;
@@ -248,21 +197,24 @@ public class MemStoreCompactor {
switch (action) {
case COMPACT:
iterator = new MemStoreCompactorSegmentsIterator(versionedList.getStoreSegments(),
- compactingMemStore.getComparator(), compactionKVMax, compactingMemStore.getStore());
+ compactingMemStore.getComparator(),
+ compactionKVMax, compactingMemStore.getStore());
result = SegmentFactory.instance().createImmutableSegmentByCompaction(
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
- versionedList.getNumOfCells(), compactingMemStore.getIndexType());
+ versionedList.getNumOfCells(), compactingMemStore.getIndexType(), action);
iterator.close();
break;
case MERGE:
- iterator = new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(),
+ case MERGE_COUNT_UNIQUE_KEYS:
+ iterator =
+ new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(),
compactingMemStore.getComparator(), compactionKVMax);
result = SegmentFactory.instance().createImmutableSegmentByMerge(
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
versionedList.getNumOfCells(), versionedList.getStoreSegments(),
- compactingMemStore.getIndexType());
+ compactingMemStore.getIndexType(), action);
iterator.close();
break;
default:
@@ -272,21 +224,22 @@ public class MemStoreCompactor {
return result;
}
- /**----------------------------------------------------------------------
- * Initiate the action according to user config, after its default is Action.MERGE
- */
@VisibleForTesting
- void initiateAction(MemoryCompactionPolicy compType) {
+ void initiateCompactionStrategy(MemoryCompactionPolicy compType,
+ Configuration configuration, String cfName) throws IllegalArgumentIOException {
+
+ assert (compType !=MemoryCompactionPolicy.NONE);
switch (compType){
- case NONE: action = Action.NOOP;
- break;
- case BASIC: action = Action.MERGE;
- break;
- case EAGER: action = Action.COMPACT;
- break;
- default:
- throw new RuntimeException("Unknown memstore type " + compType); // sanity check
+ case BASIC: strategy = new BasicMemStoreCompactionStrategy(configuration, cfName);
+ break;
+ case EAGER: strategy = new EagerMemStoreCompactionStrategy(configuration, cfName);
+ break;
+ case ADAPTIVE: strategy = new AdaptiveMemStoreCompactionStrategy(configuration, cfName);
+ break;
+ default:
+ // sanity check
+ throw new IllegalArgumentIOException("Unknown memory compaction type " + compType);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
index 43836f4..db0b319 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
@@ -51,13 +51,13 @@ public final class SegmentFactory {
// for compaction
public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf,
final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
- CompactingMemStore.IndexType idxType)
+ CompactingMemStore.IndexType idxType, MemStoreCompactionStrategy.Action action)
throws IOException {
MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf);
return
createImmutableSegment(
- conf,comparator,iterator,memStoreLAB,numOfCells,MemStoreCompactor.Action.COMPACT,idxType);
+ conf,comparator,iterator,memStoreLAB,numOfCells,action,idxType);
}
// create empty immutable segment
@@ -82,13 +82,14 @@ public final class SegmentFactory {
// for merge
public ImmutableSegment createImmutableSegmentByMerge(final Configuration conf,
final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
- List<ImmutableSegment> segments, CompactingMemStore.IndexType idxType)
+ List<ImmutableSegment> segments, CompactingMemStore.IndexType idxType,
+ MemStoreCompactionStrategy.Action action)
throws IOException {
MemStoreLAB memStoreLAB = getMergedMemStoreLAB(conf, segments);
return
createImmutableSegment(
- conf,comparator,iterator,memStoreLAB,numOfCells,MemStoreCompactor.Action.MERGE,idxType);
+ conf,comparator,iterator,memStoreLAB,numOfCells,action,idxType);
}
@@ -96,18 +97,18 @@ public final class SegmentFactory {
// for flattening
public ImmutableSegment createImmutableSegmentByFlattening(
CSLMImmutableSegment segment, CompactingMemStore.IndexType idxType,
- MemStoreSizing memstoreSizing) {
+ MemStoreSizing memstoreSizing, MemStoreCompactionStrategy.Action action) {
ImmutableSegment res = null;
switch (idxType) {
- case CHUNK_MAP:
- res = new CellChunkImmutableSegment(segment, memstoreSizing);
- break;
- case CSLM_MAP:
- assert false; // non-flat segment can not be the result of flattening
- break;
- case ARRAY_MAP:
- res = new CellArrayImmutableSegment(segment, memstoreSizing);
- break;
+ case CHUNK_MAP:
+ res = new CellChunkImmutableSegment(segment, memstoreSizing, action);
+ break;
+ case CSLM_MAP:
+ assert false; // non-flat segment can not be the result of flattening
+ break;
+ case ARRAY_MAP:
+ res = new CellArrayImmutableSegment(segment, memstoreSizing, action);
+ break;
}
return res;
}
@@ -116,7 +117,7 @@ public final class SegmentFactory {
//****** private methods to instantiate concrete store segments **********//
private ImmutableSegment createImmutableSegment(final Configuration conf, final CellComparator comparator,
MemStoreSegmentsIterator iterator, MemStoreLAB memStoreLAB, int numOfCells,
- MemStoreCompactor.Action action, CompactingMemStore.IndexType idxType) {
+ MemStoreCompactionStrategy.Action action, CompactingMemStore.IndexType idxType) {
ImmutableSegment res = null;
switch (idxType) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
index 4269863..a697912 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
@@ -18,6 +18,8 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import com.google.common.annotations.VisibleForTesting;
+
import java.util.List;
import org.apache.yetus.audience.InterfaceAudience;
@@ -62,4 +64,29 @@ public class VersionedSegmentsList {
public int getNumOfSegments() {
return storeSegments.size();
}
+
+ // Estimates fraction of unique keys
+ @VisibleForTesting
+ double getEstimatedUniquesFrac() {
+ int segmentCells = 0;
+ int maxCells = 0;
+ double est = 0;
+
+ for (ImmutableSegment s : storeSegments) {
+ double segmentUniques = s.getNumUniqueKeys();
+ if(segmentUniques != CellSet.UNKNOWN_NUM_UNIQUES) {
+ segmentCells = s.getCellsCount();
+ if(segmentCells > maxCells) {
+ maxCells = segmentCells;
+ est = segmentUniques / segmentCells;
+ }
+ }
+ // else ignore this segment specifically since if the unique number is unknown counting
+ // cells can be expensive
+ }
+ if(maxCells == 0) {
+ return 1.0;
+ }
+ return est;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index b759261..2ef200f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -368,7 +368,7 @@ public class TestIOFencing {
Thread.sleep(1000);
}
}
- if (policy == MemoryCompactionPolicy.EAGER) {
+ if (policy == MemoryCompactionPolicy.EAGER || policy == MemoryCompactionPolicy.ADAPTIVE) {
assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= count);
} else {
assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, count);
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index b44fd34..0d5290d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -26,8 +26,23 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellComparatorImpl;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueTestUtil;
+import org.apache.hadoop.hbase.MemoryCompactionPolicy;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -73,7 +88,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
@Before
public void setUp() throws Exception {
compactingSetUp();
- this.memstore = new CompactingMemStore(HBaseConfiguration.create(), CellComparatorImpl.COMPARATOR,
+ this.memstore = new MyCompactingMemStore(HBaseConfiguration.create(), CellComparatorImpl
+ .COMPARATOR,
store, regionServicesForStores, MemoryCompactionPolicy.EAGER);
}
@@ -482,7 +498,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
- ((CompactingMemStore)memstore).initiateType(compactionType);
+ ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
byte[] row = Bytes.toBytes("testrow");
byte[] fam = Bytes.toBytes("testfamily");
@@ -497,7 +513,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
memstore.add(new KeyValue(row, fam, qf3, 1, val), null);
// Creating a pipeline
- ((CompactingMemStore)memstore).disableCompaction();
+ ((MyCompactingMemStore)memstore).disableCompaction();
((CompactingMemStore)memstore).flushInMemory();
// Adding value to "new" memstore
@@ -512,7 +528,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
List<KeyValueScanner> scanners = memstore.getScanners(0);
// Shouldn't putting back the chunks to pool,since some scanners are opening
// based on their data
- ((CompactingMemStore)memstore).enableCompaction();
+ ((MyCompactingMemStore)memstore).enableCompaction();
// trigger compaction
((CompactingMemStore)memstore).flushInMemory();
@@ -574,7 +590,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration()
.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType));
- ((CompactingMemStore)memstore).initiateType(compactionType);
+ ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
@@ -613,7 +629,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
- ((CompactingMemStore)memstore).initiateType(compactionType);
+ memstore.getConfiguration().set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY,
+ String.valueOf(1));
+ ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
String[] keys1 = { "A", "A", "B", "C" };
String[] keys2 = { "A", "B", "D" };
@@ -669,7 +687,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
- ((CompactingMemStore)memstore).initiateType(compactionType);
+ ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
String[] keys1 = { "A", "A", "B", "C" };
String[] keys2 = { "A", "B", "D" };
String[] keys3 = { "D", "B", "B" };
@@ -698,7 +716,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
- ((CompactingMemStore) memstore).disableCompaction();
+ ((MyCompactingMemStore) memstore).disableCompaction();
MemStoreSize size = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
assertEquals(0, memstore.getSnapshot().getCellsCount());
@@ -714,7 +732,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
+ 3 * oneCellOnCSLMHeapSize;
assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize());
- ((CompactingMemStore)memstore).enableCompaction();
+ ((MyCompactingMemStore)memstore).enableCompaction();
size = memstore.getFlushableSize();
((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
assertEquals(0, memstore.getSnapshot().getCellsCount());
@@ -738,6 +756,67 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
memstore.clearSnapshot(snapshot.getId());
}
+ @Test
+ public void testMagicCompaction3Buckets() throws IOException {
+
+ MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.ADAPTIVE;
+ memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
+ String.valueOf(compactionType));
+ memstore.getConfiguration().setDouble(
+ AdaptiveMemStoreCompactionStrategy.ADAPTIVE_COMPACTION_THRESHOLD_KEY, 0.45);
+ memstore.getConfiguration().setInt(
+ AdaptiveMemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 2);
+ memstore.getConfiguration().setInt(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 1);
+ ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
+
+ String[] keys1 = { "A", "B", "D" };
+ String[] keys2 = { "A" };
+ String[] keys3 = { "A", "A", "B", "C" };
+ String[] keys4 = { "D", "B", "B" };
+
+ int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells.
+ int oneCellOnCSLMHeapSize = 120;
+ assertEquals(totalCellsLen1, region.getMemStoreSize());
+ long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize;
+ assertEquals(totalHeapSize, memstore.heapSize());
+
+ ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline - flatten
+ assertEquals(3, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
+ assertEquals(1.0,
+ ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
+ assertEquals(0, memstore.getSnapshot().getCellsCount());
+
+ addRowsByKeys(memstore, keys2);// Adding 1 more cell - flatten.
+ ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
+ assertEquals(4, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
+ assertEquals(1.0,
+ ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
+ assertEquals(0, memstore.getSnapshot().getCellsCount());
+
+ addRowsByKeys(memstore, keys3);// Adding 4 more cells - merge.
+ ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
+ assertEquals(8, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
+ assertEquals((4.0 / 8.0),
+ ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
+ assertEquals(0, memstore.getSnapshot().getCellsCount());
+
+ addRowsByKeys(memstore, keys4);// 3 more cells added - compact (or not)
+ ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
+ int numCells = ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells();
+ assertTrue(4 == numCells || 11 == numCells);
+ assertEquals(0, memstore.getSnapshot().getCellsCount());
+
+ MemStoreSize size = memstore.getFlushableSize();
+ MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
+ region.decrMemStoreSize(size); // simulate flusher
+ ImmutableSegment s = memstore.getSnapshot();
+ numCells = s.getCellsCount();
+ assertTrue(4 == numCells || 11 == numCells);
+ assertEquals(0, regionServicesForStores.getMemStoreSize());
+
+ memstore.clearSnapshot(snapshot.getId());
+ }
+
protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
byte[] fam = Bytes.toBytes("testfamily");
byte[] qf = Bytes.toBytes("testqualifier");
@@ -771,4 +850,25 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
}
}
+ static protected class MyCompactingMemStore extends CompactingMemStore {
+
+ public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store,
+ RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
+ throws IOException {
+ super(conf, c, store, regionServices, compactionPolicy);
+ }
+
+ void disableCompaction() {
+ allowCompaction.set(false);
+ }
+
+ void enableCompaction() {
+ allowCompaction.set(true);
+ }
+ void initiateType(MemoryCompactionPolicy compactionType, Configuration conf)
+ throws IllegalArgumentIOException {
+ compactor.initiateCompactionStrategy(compactionType, conf, "CF_TEST");
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/17e7aff3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
index 1c58723..b4cf4ec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java
@@ -21,7 +21,11 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.CellComparatorImpl;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.MemoryCompactionPolicy;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -76,7 +80,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
String.valueOf(MemoryCompactionPolicy.EAGER));
this.memstore =
- new CompactingMemStore(conf, CellComparatorImpl.COMPARATOR, store,
+ new MyCompactingMemStore(conf, CellComparatorImpl.COMPARATOR, store,
regionServicesForStores, MemoryCompactionPolicy.EAGER);
}
@@ -229,7 +233,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
- ((CompactingMemStore) memstore).disableCompaction();
+ ((MyCompactingMemStore) memstore).disableCompaction();
size = memstore.getFlushableSize();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
@@ -244,7 +248,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3,
((CompactingMemStore) memstore).heapSize());
- ((CompactingMemStore) memstore).enableCompaction();
+ ((MyCompactingMemStore) memstore).enableCompaction();
size = memstore.getFlushableSize();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
@@ -293,7 +297,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
- ((CompactingMemStore)memstore).initiateType(compactionType);
+ ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
addRowsByKeysDataSize(memstore, keys1);
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact
@@ -311,7 +315,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
}
assertEquals(12, counter2);
- ((CompactingMemStore) memstore).disableCompaction();
+ ((MyCompactingMemStore) memstore).disableCompaction();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening
assertEquals(0, memstore.getSnapshot().getCellsCount());
@@ -330,7 +334,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
}
assertEquals(16, counter4);
- ((CompactingMemStore) memstore).enableCompaction();
+ ((MyCompactingMemStore) memstore).enableCompaction();
((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
@@ -372,7 +376,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
- ((CompactingMemStore)memstore).initiateType(compactionType);
+ ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
testTimeRange(false);
}
@@ -603,7 +607,6 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
assertTrue(chunkCount > 0);
}
-
@Test
public void testFlatteningToCellChunkMap() throws IOException {
@@ -611,7 +614,7 @@ public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore
MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(compactionType));
- ((CompactingMemStore)memstore).initiateType(compactionType);
+ ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
((CompactingMemStore)memstore).setIndexType();