You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/11/05 12:38:42 UTC
hbase git commit: HBASE-19095 Add CP hooks in RegionObserver for in
memory compaction
Repository: hbase
Updated Branches:
refs/heads/master 3a0f59d03 -> 28cdf4afb
HBASE-19095 Add CP hooks in RegionObserver for in memory compaction
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/28cdf4af
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/28cdf4af
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/28cdf4af
Branch: refs/heads/master
Commit: 28cdf4afb8a864c0cfccf278960a6597708f7dd6
Parents: 3a0f59d
Author: zhangduo <zh...@apache.org>
Authored: Sat Nov 4 21:45:39 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sun Nov 5 20:28:05 2017 +0800
----------------------------------------------------------------------
.../example/WriteHeavyIncrementObserver.java | 14 ++
.../TestWriteHeavyIncrementObserver.java | 75 +----------
...IncrementObserverWithMemStoreCompaction.java | 59 +++++++++
.../WriteHeavyIncrementObserverTestBase.java | 105 +++++++++++++++
.../hbase/coprocessor/RegionObserver.java | 55 +++++++-
.../hbase/regionserver/MemStoreCompactor.java | 14 +-
.../MemStoreCompactorSegmentsIterator.java | 129 +++++++++++--------
.../regionserver/RegionCoprocessorHost.java | 55 ++++++++
.../hadoop/hbase/regionserver/StoreScanner.java | 4 +-
9 files changed, 377 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/28cdf4af/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java
index e9b590d..55d9ac3 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java
@@ -163,6 +163,20 @@ public class WriteHeavyIncrementObserver implements RegionCoprocessor, RegionObs
}
@Override
+ public void preMemStoreCompactionCompactScannerOpen(
+ ObserverContext<RegionCoprocessorEnvironment> c, Store store, ScanOptions options)
+ throws IOException {
+ options.readAllVersions();
+ }
+
+ @Override
+ public InternalScanner preMemStoreCompactionCompact(
+ ObserverContext<RegionCoprocessorEnvironment> c, Store store, InternalScanner scanner)
+ throws IOException {
+ return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
+ }
+
+ @Override
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, List<Cell> result)
throws IOException {
Scan scan =
http://git-wip-us.apache.org/repos/asf/hbase/blob/28cdf4af/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java
index 1881c85..18e819f 100644
--- a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java
+++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java
@@ -20,58 +20,25 @@ package org.apache.hadoop.hbase.coprocessor.example;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.IntStream;
-
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ CoprocessorTests.class, MediumTests.class })
-public class TestWriteHeavyIncrementObserver {
-
- private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
-
- private static TableName NAME = TableName.valueOf("TestCP");
-
- private static byte[] FAMILY = Bytes.toBytes("cf");
-
- private static byte[] ROW = Bytes.toBytes("row");
-
- private static byte[] CQ1 = Bytes.toBytes("cq1");
-
- private static byte[] CQ2 = Bytes.toBytes("cq2");
-
- private static Table TABLE;
-
- private static long UPPER = 1000;
-
- private static int THREADS = 10;
+public class TestWriteHeavyIncrementObserver extends WriteHeavyIncrementObserverTestBase {
@BeforeClass
public static void setUp() throws Exception {
- UTIL.getConfiguration().setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 64 * 1024L);
- UTIL.getConfiguration().setLong("hbase.hregion.memstore.flush.size.limit", 1024L);
- UTIL.startMiniCluster(3);
+ WriteHeavyIncrementObserverTestBase.setUp();
UTIL.getAdmin()
.createTable(TableDescriptorBuilder.newBuilder(NAME)
.addCoprocessor(WriteHeavyIncrementObserver.class.getName())
@@ -79,45 +46,9 @@ public class TestWriteHeavyIncrementObserver {
TABLE = UTIL.getConnection().getTable(NAME);
}
- @AfterClass
- public static void tearDown() throws Exception {
- if (TABLE != null) {
- TABLE.close();
- }
- UTIL.shutdownMiniCluster();
- }
-
- private static void increment() throws IOException {
- for (long i = 1; i <= UPPER; i++) {
- TABLE.increment(new Increment(ROW).addColumn(FAMILY, CQ1, i).addColumn(FAMILY, CQ2, 2 * i));
- try {
- Thread.sleep(ThreadLocalRandom.current().nextInt(5, 10));
- } catch (InterruptedException e) {
- }
- }
- }
-
- private void assertSum() throws IOException {
- Result result = TABLE.get(new Get(ROW).addColumn(FAMILY, CQ1).addColumn(FAMILY, CQ2));
- assertEquals(THREADS * (1 + UPPER) * UPPER / 2, Bytes.toLong(result.getValue(FAMILY, CQ1)));
- assertEquals(THREADS * (1 + UPPER) * UPPER, Bytes.toLong(result.getValue(FAMILY, CQ2)));
- }
-
@Test
public void test() throws Exception {
- Thread[] threads = IntStream.range(0, THREADS).mapToObj(i -> new Thread(() -> {
- try {
- increment();
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }, "increment-" + i)).toArray(Thread[]::new);
- for (Thread thread : threads) {
- thread.start();
- }
- for (Thread thread : threads) {
- thread.join();
- }
+ doIncrement(0);
assertSum();
// we do not hack scan operation so using scan we could get the original values added into the
// table.
http://git-wip-us.apache.org/repos/asf/hbase/blob/28cdf4af/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserverWithMemStoreCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserverWithMemStoreCompaction.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserverWithMemStoreCompaction.java
new file mode 100644
index 0000000..eeb1fa8
--- /dev/null
+++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserverWithMemStoreCompaction.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.hbase.MemoryCompactionPolicy;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ CoprocessorTests.class, MediumTests.class })
+public class TestWriteHeavyIncrementObserverWithMemStoreCompaction
+ extends WriteHeavyIncrementObserverTestBase {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ WriteHeavyIncrementObserverTestBase.setUp();
+ UTIL.getAdmin()
+ .createTable(TableDescriptorBuilder.newBuilder(NAME)
+ .addCoprocessor(WriteHeavyIncrementObserver.class.getName())
+ .setValue(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
+ MemoryCompactionPolicy.EAGER.name())
+ .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build());
+ TABLE = UTIL.getConnection().getTable(NAME);
+ }
+
+ @Test
+ public void test() throws Exception {
+ // sleep every 10 loops to give memstore compaction enough time to finish before reaching the
+ // flush size.
+ doIncrement(10);
+ assertSum();
+ HStore store = UTIL.getHBaseCluster().findRegionsForTable(NAME).get(0).getStore(FAMILY);
+ // should have no store files created as we have done aggregating all in memory
+ assertEquals(0, store.getStorefilesCount());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/28cdf4af/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserverTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserverTestBase.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserverTestBase.java
new file mode 100644
index 0000000..3583230
--- /dev/null
+++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserverTestBase.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class WriteHeavyIncrementObserverTestBase {
+
+ protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ protected static TableName NAME = TableName.valueOf("TestCP");
+
+ protected static byte[] FAMILY = Bytes.toBytes("cf");
+
+ protected static byte[] ROW = Bytes.toBytes("row");
+
+ protected static byte[] CQ1 = Bytes.toBytes("cq1");
+
+ protected static byte[] CQ2 = Bytes.toBytes("cq2");
+
+ protected static Table TABLE;
+
+ protected static long UPPER = 1000;
+
+ protected static int THREADS = 10;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL.getConfiguration().setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 64 * 1024L);
+ UTIL.getConfiguration().setLong("hbase.hregion.memstore.flush.size.limit", 1024L);
+ UTIL.startMiniCluster(3);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (TABLE != null) {
+ TABLE.close();
+ }
+ UTIL.shutdownMiniCluster();
+ }
+
+ private static void increment(int sleepSteps) throws IOException {
+ for (long i = 1; i <= UPPER; i++) {
+ TABLE.increment(new Increment(ROW).addColumn(FAMILY, CQ1, i).addColumn(FAMILY, CQ2, 2 * i));
+ if (sleepSteps > 0 && i % sleepSteps == 0) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+
+ protected final void assertSum() throws IOException {
+ Result result = TABLE.get(new Get(ROW).addColumn(FAMILY, CQ1).addColumn(FAMILY, CQ2));
+ assertEquals(THREADS * (1 + UPPER) * UPPER / 2, Bytes.toLong(result.getValue(FAMILY, CQ1)));
+ assertEquals(THREADS * (1 + UPPER) * UPPER, Bytes.toLong(result.getValue(FAMILY, CQ2)));
+ }
+
+ protected final void doIncrement(int sleepSteps) throws InterruptedException {
+ Thread[] threads = IntStream.range(0, THREADS).mapToObj(i -> new Thread(() -> {
+ try {
+ increment(sleepSteps);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }, "increment-" + i)).toArray(Thread[]::new);
+ for (Thread thread : threads) {
+ thread.start();
+ }
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/28cdf4af/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 9546116..83cd358 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.coprocessor;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -144,10 +146,10 @@ public interface RegionObserver {
* Called before a Store's memstore is flushed to disk.
* @param c the environment provided by the region server
* @param store the store where flush is being requested
- * @param scanner the scanner over existing data used in the store file
+ * @param scanner the scanner over existing data used in the memstore
* @param tracker tracker used to track the life cycle of a flush
- * @return the scanner to use during compaction. Should not be {@code null}
- * unless the implementation is writing new store files on its own.
+ * @return the scanner to use during flush. Should not be {@code null} unless the implementation
+ * is writing new store files on its own.
*/
default InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
@@ -174,6 +176,51 @@ public interface RegionObserver {
StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException {}
/**
+ * Called before in memory compaction started.
+ * @param c the environment provided by the region server
+ * @param store the store where in memory compaction is being requested
+ */
+ default void preMemStoreCompaction(ObserverContext<RegionCoprocessorEnvironment> c, Store store)
+ throws IOException {}
+
+ /**
+ * Called before we open store scanner for in memory compaction. You can use the {@code options}
+ * to change max versions and TTL for the scanner being opened. Notice that this method will only
+ * be called when you use {@code eager} mode. For {@code basic} mode we will not drop any cells
+ * thus we do not open a store scanner.
+ * @param c the environment provided by the region server
+ * @param store the store where in memory compaction is being requested
+ * @param options used to change max versions and TTL for the scanner being opened
+ */
+ default void preMemStoreCompactionCompactScannerOpen(
+ ObserverContext<RegionCoprocessorEnvironment> c, Store store, ScanOptions options)
+ throws IOException {}
+
+ /**
+ * Called before we do in memory compaction. Notice that this method will only be called when you
+ * use {@code eager} mode. For {@code basic} mode we will not drop any cells thus there is no
+ * {@link InternalScanner}.
+ * @param c the environment provided by the region server
+ * @param store the store where in memory compaction is being executed
+ * @param scanner the scanner over existing data used in the memstore segments being compact
+ * @return the scanner to use during in memory compaction. Must be non-null.
+ */
+ @NonNull
+ default InternalScanner preMemStoreCompactionCompact(
+ ObserverContext<RegionCoprocessorEnvironment> c, Store store, InternalScanner scanner)
+ throws IOException {
+ return scanner;
+ }
+
+ /**
+ * Called after the in memory compaction is finished.
+ * @param c the environment provided by the region server
+ * @param store the store where in memory compaction is being executed
+ */
+ default void postMemStoreCompaction(ObserverContext<RegionCoprocessorEnvironment> c, Store store)
+ throws IOException {}
+
+ /**
* Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of
* available candidates. To alter the files used for compaction, you may mutate the passed in list
* of candidates. If you remove all the candidates then the compaction will be canceled.
http://git-wip-us.apache.org/repos/asf/hbase/blob/28cdf4af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
index fea9f17..4d97411 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
@@ -96,8 +96,18 @@ public class MemStoreCompactor {
LOG.debug("Starting the In-Memory Compaction for store "
+ compactingMemStore.getStore().getColumnFamilyName());
}
-
- doCompaction();
+ HStore store = compactingMemStore.getStore();
+ RegionCoprocessorHost cpHost = store.getCoprocessorHost();
+ if (cpHost != null) {
+ cpHost.preMemStoreCompaction(store);
+ }
+ try {
+ doCompaction();
+ } finally {
+ if (cpHost != null) {
+ cpHost.postMemStoreCompaction(store);
+ }
+ }
return true;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/28cdf4af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
index 7ab2fe3..0f96936 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
@@ -23,12 +23,18 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.shaded.com.google.common.io.Closeables;
+
/**
* The MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
* and performs the scan for compaction operation meaning it is based on SQM
@@ -36,12 +42,14 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator {
- private List<Cell> kvs = new ArrayList<>();
- private boolean hasMore;
+ private static final Log LOG = LogFactory.getLog(MemStoreCompactorSegmentsIterator.class);
+
+ private final List<Cell> kvs = new ArrayList<>();
+ private boolean hasMore = true;
private Iterator<Cell> kvsIterator;
// scanner on top of pipeline scanner that uses ScanQueryMatcher
- private StoreScanner compactingScanner;
+ private InternalScanner compactingScanner;
// C-tor
public MemStoreCompactorSegmentsIterator(List<ImmutableSegment> segments,
@@ -56,44 +64,34 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
// build the scanner based on Query Matcher
// reinitialize the compacting scanner for each instance of iterator
compactingScanner = createScanner(store, scanners);
-
- hasMore = compactingScanner.next(kvs, scannerContext);
-
- if (!kvs.isEmpty()) {
- kvsIterator = kvs.iterator();
- }
-
+ refillKVS();
}
@Override
public boolean hasNext() {
- if (kvsIterator == null) { // for the case when the result is empty
+ if (kvsIterator == null) { // for the case when the result is empty
return false;
}
- if (!kvsIterator.hasNext()) {
- // refillKVS() method should be invoked only if !kvsIterator.hasNext()
- if (!refillKVS()) {
- return false;
- }
- }
- return kvsIterator.hasNext();
+ // return true either we have cells in buffer or we can get more.
+ return kvsIterator.hasNext() || refillKVS();
}
@Override
- public Cell next() {
- if (kvsIterator == null) { // for the case when the result is empty
- return null;
+ public Cell next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
}
- if (!kvsIterator.hasNext()) {
- // refillKVS() method should be invoked only if !kvsIterator.hasNext()
- if (!refillKVS()) return null;
- }
- return (!hasMore) ? null : kvsIterator.next();
+ return kvsIterator.next();
}
public void close() {
- compactingScanner.close();
+ try {
+ compactingScanner.close();
+ } catch (IOException e) {
+ LOG.warn("close store scanner failed", e);
+ }
compactingScanner = null;
+ kvs.clear();
}
@Override
@@ -105,39 +103,64 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
* Creates the scanner for compacting the pipeline.
* @return the scanner
*/
- private StoreScanner createScanner(HStore store, List<KeyValueScanner> scanners)
+ private InternalScanner createScanner(HStore store, List<KeyValueScanner> scanners)
throws IOException {
- // FIXME: This is the old comment 'Get all available versions'
- // But actually if we really reset the ScanInfo to get all available versions then lots of UTs
- // will fail
- return new StoreScanner(store, store.getScanInfo(), scanners, ScanType.COMPACT_RETAIN_DELETES,
- store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
+ InternalScanner scanner = null;
+ boolean success = false;
+ try {
+ RegionCoprocessorHost cpHost = store.getCoprocessorHost();
+ ScanInfo scanInfo;
+ if (cpHost != null) {
+ scanInfo = cpHost.preMemStoreCompactionCompactScannerOpen(store);
+ } else {
+ scanInfo = store.getScanInfo();
+ }
+ scanner = new StoreScanner(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES,
+ store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
+ if (cpHost != null) {
+ InternalScanner scannerFromCp = cpHost.preMemStoreCompactionCompact(store, scanner);
+ if (scannerFromCp == null) {
+ throw new CoprocessorException("Got a null InternalScanner when calling" +
+ " preMemStoreCompactionCompact which is not acceptable");
+ }
+ success = true;
+ return scannerFromCp;
+ } else {
+ success = true;
+ return scanner;
+ }
+ } finally {
+ if (!success) {
+ Closeables.close(scanner, true);
+ scanners.forEach(KeyValueScanner::close);
+ }
+ }
}
- /* Refill kev-value set (should be invoked only when KVS is empty)
- * Returns true if KVS is non-empty */
+ /*
+ * Refill kev-value set (should be invoked only when KVS is empty) Returns true if KVS is
+ * non-empty
+ */
private boolean refillKVS() {
- kvs.clear(); // clear previous KVS, first initiated in the constructor
- if (!hasMore) { // if there is nothing expected next in compactingScanner
+ // if there is nothing expected next in compactingScanner
+ if (!hasMore) {
return false;
}
-
- try { // try to get next KVS
- hasMore = compactingScanner.next(kvs, scannerContext);
- } catch (IOException ie) {
- throw new IllegalStateException(ie);
- }
-
- if (!kvs.isEmpty() ) {// is the new KVS empty ?
- kvsIterator = kvs.iterator();
- return true;
- } else {
- // KVS is empty, but hasMore still true?
- if (hasMore) { // try to move to next row
- return refillKVS();
+ // clear previous KVS, first initiated in the constructor
+ kvs.clear();
+ for (;;) {
+ try {
+ hasMore = compactingScanner.next(kvs, scannerContext);
+ } catch (IOException e) {
+ // should not happen as all data are in memory
+ throw new IllegalStateException(e);
+ }
+ if (!kvs.isEmpty()) {
+ kvsIterator = kvs.iterator();
+ return true;
+ } else if (!hasMore) {
+ return false;
}
-
}
- return hasMore;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/28cdf4af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 43a01ba..7c3bf85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -777,6 +777,61 @@ public class RegionCoprocessorHost
}
/**
+ * Invoked before in memory compaction.
+ */
+ public void preMemStoreCompaction(HStore store) throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
+ @Override
+ public void call(RegionObserver observer) throws IOException {
+ observer.preMemStoreCompaction(this, store);
+ }
+ });
+ }
+
+ /**
+ * Invoked before create StoreScanner for in memory compaction.
+ */
+ public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException {
+ CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
+ @Override
+ public void call(RegionObserver observer) throws IOException {
+ observer.preMemStoreCompactionCompactScannerOpen(this, store, builder);
+ }
+ });
+ return builder.build();
+ }
+
+ /**
+ * Invoked before compacting memstore.
+ */
+ public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner)
+ throws IOException {
+ if (coprocEnvironments.isEmpty()) {
+ return scanner;
+ }
+ return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
+ regionObserverGetter, scanner) {
+ @Override
+ public InternalScanner call(RegionObserver observer) throws IOException {
+ return observer.preMemStoreCompactionCompact(this, store, getResult());
+ }
+ });
+ }
+
+ /**
+ * Invoked after in memory compaction.
+ */
+ public void postMemStoreCompaction(HStore store) throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
+ @Override
+ public void call(RegionObserver observer) throws IOException {
+ observer.postMemStoreCompaction(this, store);
+ }
+ });
+ }
+
+ /**
* Invoked after a memstore flush
* @throws IOException
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/28cdf4af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 09be65c..6abca13 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -268,9 +268,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
private static final Scan SCAN_FOR_COMPACTION = new Scan();
/**
- * Used for compactions.
+ * Used for store file compaction and memstore compaction.
* <p>
- * Opens a scanner across specified StoreFiles.
+ * Opens a scanner across specified StoreFiles/MemStoreSegments.
* @param store who we scan
* @param scanners ancillary scanners
* @param smallestReadPoint the readPoint that we should use for tracking versions