You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2019/08/09 21:46:13 UTC
[hbase] branch branch-1 updated: HBASE-22623 - Add RegionObserver
coprocessor hook for preWALAppend (#470)
This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new 9888217 HBASE-22623 - Add RegionObserver coprocessor hook for preWALAppend (#470)
9888217 is described below
commit 98882171778af0250dd9f5eabe0aeb72cb06db37
Author: Geoffrey Jacoby <gj...@apache.org>
AuthorDate: Fri Aug 9 14:46:08 2019 -0700
HBASE-22623 - Add RegionObserver coprocessor hook for preWALAppend (#470)
Signed-off-by: Andrew Purtell <ap...@apache.org>
---
.../hbase/coprocessor/BaseRegionObserver.java | 6 +
.../hadoop/hbase/coprocessor/RegionObserver.java | 12 ++
.../apache/hadoop/hbase/regionserver/HRegion.java | 10 ++
.../hbase/regionserver/RegionCoprocessorHost.java | 13 ++
.../java/org/apache/hadoop/hbase/wal/WALKey.java | 12 ++
.../hbase/coprocessor/SimpleRegionObserver.java | 15 ++
.../coprocessor/TestRegionObserverInterface.java | 166 +++++++++++++++++++++
7 files changed, 234 insertions(+)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
index 3fb0858..8353a47 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
@@ -548,4 +548,10 @@ public class BaseRegionObserver implements RegionObserver {
throws IOException {
return delTracker;
}
+
+ @Override
+ public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx, WALKey key,
+ WALEdit edit) throws IOException {
+
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index ad16b97..ea831ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -1341,4 +1341,16 @@ public interface RegionObserver extends Coprocessor {
DeleteTracker postInstantiateDeleteTracker(
final ObserverContext<RegionCoprocessorEnvironment> ctx, DeleteTracker delTracker)
throws IOException;
+
+ /**
+ * Called just before the WAL Entry is appended to the WAL. Implementing this hook allows
+ * coprocessors to add extended attributes to the WALKey that then get persisted to the
+ * WAL, and are available to replication endpoints to use in processing WAL Entries.
+ * @param ctx the environment provided by the region server
+ * @param key the WALKey associated with a particular append to a WAL
+ * @param edit the WALEdit associated with a particular append to a WAL
+ */
+ void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx, WALKey key,
+ WALEdit edit)
+ throws IOException;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 3cb0de5..d784414 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -3527,6 +3527,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
+ preWALAppend(walKey, walEdit);
txid = this.wal
.append(this.htableDescriptor, this.getRegionInfo(), walKey,
walEdit, true);
@@ -7593,6 +7594,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
processor.getClusterIds(), nonceGroup, nonce, mvcc);
+ preWALAppend(walKey, walEdit);
txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
walKey, walEdit, true);
}
@@ -7884,6 +7886,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
nonceGroup,
nonce,
mvcc);
+ preWALAppend(walKey, walEdits);
txid =
this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, true);
} else {
@@ -7973,6 +7976,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return mutate.isReturnResults() ? Result.create(allKVs) : null;
}
+ private void preWALAppend(WALKey walKey, WALEdit walEdits) throws IOException {
+ if (this.coprocessorHost != null && !walEdits.isMetaEdit()) {
+ this.coprocessorHost.preWALAppend(walKey, walEdits);
+ }
+ }
+
private static Cell getNewCell(final byte [] row, final long ts, final Cell cell,
final Cell oldCell, final byte [] tagBytes) {
// allocate an empty cell once
@@ -8124,6 +8133,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce,
getMVCC());
+ preWALAppend(walKey, walEdits);
txid =
this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdits, true);
} else {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index bc5af20..c0f357d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -1691,6 +1691,19 @@ public class RegionCoprocessorHost
});
}
+ public void preWALAppend(final WALKey key, final WALEdit edit) throws IOException {
+ if (coprocessors.isEmpty()){
+ return;
+ }
+ execOperation(new RegionOperation() {
+ @Override
+ public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+ throws IOException {
+ oserver.preWALAppend(ctx, key, edit);
+ }
+ });
+ }
+
private static abstract class CoprocessorOperation
extends ObserverContext<RegionCoprocessorEnvironment> {
public CoprocessorOperation() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index 6c231ed..b031fdd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -465,6 +465,18 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
}
/**
+ * Add a named String value to this WALKey to be persisted into the WAL
+ * @param attributeKey Name of the attribute
+ * @param attributeValue Value of the attribute
+ */
+ public void addExtendedAttribute(String attributeKey, byte[] attributeValue){
+ if (extendedAttributes == null){
+ extendedAttributes = new HashMap<String, byte[]>();
+ }
+ extendedAttributes.put(attributeKey, attributeValue);
+ }
+
+ /**
* Return a named String value injected into the WALKey during processing, such as by a
* coprocessor
* @param attributeKey The key of a key / value pair
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
index dbcf2e9..7b8c323 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
@@ -139,8 +139,10 @@ public class SimpleRegionObserver extends BaseRegionObserver {
final AtomicInteger ctPostBatchMutateIndispensably = new AtomicInteger(0);
final AtomicInteger ctPostStartRegionOperation = new AtomicInteger(0);
final AtomicInteger ctPostCloseRegionOperation = new AtomicInteger(0);
+ final AtomicInteger ctPreWALAppend = new AtomicInteger(0);
final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false);
static final String TABLE_SKIPPED = "SKIPPED_BY_PREWALRESTORE";
+ static final byte[] WAL_EXTENDED_ATTRIBUTE_BYTES = Bytes.toBytes("foo");
public void setThrowOnPostFlush(Boolean val){
throwOnPostFlush.set(val);
@@ -718,6 +720,15 @@ public class SimpleRegionObserver extends BaseRegionObserver {
return reader;
}
+ @Override
+ public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx,
+ WALKey key, WALEdit edit) throws IOException {
+ ctPreWALAppend.incrementAndGet();
+
+ key.addExtendedAttribute(Integer.toString(ctPreWALAppend.get()),
+ Bytes.toBytes("foo"));
+ }
+
public boolean hadPreGet() {
return ctPreGet.get() > 0;
}
@@ -975,6 +986,10 @@ public class SimpleRegionObserver extends BaseRegionObserver {
return ctPostWALRestoreDeprecated.get();
}
+ public int getCtPreWALAppend() {
+ return ctPreWALAppend.get();
+ }
+
public boolean wasStoreFileReaderOpenCalled() {
return ctPreStoreFileReaderOpen.get() > 0 && ctPostStoreFileReaderOpen.get() > 0;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index dc8040e..75c8bab 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -42,6 +43,8 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
@@ -78,11 +81,14 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
@Category(MediumTests.class)
public class TestRegionObserverInterface {
@@ -93,6 +99,7 @@ public class TestRegionObserverInterface {
public final static byte[] B = Bytes.toBytes("b");
public final static byte[] C = Bytes.toBytes("c");
public final static byte[] ROW = Bytes.toBytes("testrow");
+ public final static byte[] FAMILY = Bytes.toBytes("f");
private static HBaseTestingUtility util = new HBaseTestingUtility();
private static MiniHBaseCluster cluster = null;
@@ -750,6 +757,100 @@ public class TestRegionObserverInterface {
table.close();
}
+ //called from testPreWALAppendIsWrittenToWAL
+ private void testPreWALAppendHook(Table table, TableName tableName) throws IOException {
+ int expectedCalls = 0;
+ String [] methodArray = new String[1];
+ methodArray[0] = "getCtPreWALAppend";
+ Object[] resultArray = new Object[1];
+
+ Put p = new Put(ROW);
+ p.addColumn(A, A, A);
+ table.put(p);
+ resultArray[0] = ++expectedCalls;
+ verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
+
+ Append a = new Append(ROW);
+ a.add(B, B, B);
+ table.append(a);
+ resultArray[0] = ++expectedCalls;
+ verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
+
+ Increment i = new Increment(ROW);
+ i.addColumn(C, C, 1);
+ table.increment(i);
+ resultArray[0] = ++expectedCalls;
+ verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
+
+ Delete d = new Delete(ROW);
+ table.delete(d);
+ resultArray[0] = ++expectedCalls;
+ verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
+ }
+
+ @Test
+ public void testPreWALAppend() throws Exception {
+ SimpleRegionObserver sro = new SimpleRegionObserver();
+ ObserverContext ctx = Mockito.mock(ObserverContext.class);
+ WALKey key = new WALKey(Bytes.toBytes("region"), TEST_TABLE,
+ EnvironmentEdgeManager.currentTime());
+ WALEdit edit = new WALEdit();
+ sro.preWALAppend(ctx, key, edit);
+ Assert.assertEquals(1, key.getExtendedAttributes().size());
+ Assert.assertArrayEquals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
+ key.getExtendedAttribute(Integer.toString(sro.getCtPreWALAppend())));
+ }
+
+ @Test
+ public void testPreWALAppendIsWrittenToWAL() throws Exception {
+ final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() +
+ ".testPreWALAppendIsWrittenToWAL");
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor(A));
+ htd.addFamily(new HColumnDescriptor(B));
+ htd.addFamily(new HColumnDescriptor(C));
+ htd.addCoprocessor(SimpleRegionObserver.class.getName());
+ Table table = util.createTable(htd, null);
+ PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
+ List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
+ //should be only one region
+ HRegion region = regions.get(0);
+ region.getWAL().registerWALActionsListener(listener);
+ testPreWALAppendHook(table, tableName);
+ boolean[] expectedResults = {true, true, true, true};
+ Assert.assertArrayEquals(expectedResults, listener.getWalKeysCorrectArray());
+
+ }
+
+ @Test
+ public void testPreWALAppendNotCalledOnMetaEdit() throws Exception {
+ final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() +
+ ".testPreWALAppendNotCalledOnMetaEdt");
+
+ HTableDescriptor td = new HTableDescriptor(tableName);
+ td.addCoprocessor(SimpleRegionObserver.class.getName());
+ td.addFamily(new HColumnDescriptor(FAMILY));
+ Table table = util.createTable(td, new byte[][] { A, B, C });
+
+ PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
+ List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
+ //should be only one region
+ HRegion region = regions.get(0);
+
+ region.getWAL().registerWALActionsListener(listener);
+ //flushing should write to the WAL
+ region.flush(true);
+ //so should compaction
+ region.compact(false);
+ //and so should closing the region
+ region.close();
+
+ //but we still shouldn't have triggered preWALAppend because no user data was written
+ String[] methods = new String[] {"getCtPreWALAppend"};
+ Object[] expectedResult = new Integer[]{0};
+ verifyMethodResult(SimpleRegionObserver.class, methods, tableName, expectedResult);
+ }
+
// check each region whether the coprocessor upcalls are called or not.
private void verifyMethodResult(Class<?> c, String methodName[], TableName tableName,
Object value[]) throws IOException {
@@ -800,4 +901,69 @@ public class TestRegionObserverInterface {
writer.close();
}
}
+
+ private static class PreWALAppendWALActionsListener implements WALActionsListener {
+ boolean[] walKeysCorrect = {false, false, false, false};
+
+ @Override
+ public void postAppend(long entryLen, long elapsedTimeMillis,
+ WALKey logKey, WALEdit logEdit) throws IOException {
+ for (int k = 0; k < 4; k++) {
+ if (!walKeysCorrect[k]) {
+ walKeysCorrect[k] = Arrays.equals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
+ logKey.getExtendedAttribute(Integer.toString(k + 1)));
+ }
+ }
+ }
+
+ @Override
+ public void postSync(long timeInNanos, int handlerSyncs) {
+
+ }
+
+ boolean[] getWalKeysCorrectArray() {
+ return walKeysCorrect;
+ }
+ @Override
+ public void preLogRoll(Path oldPath, Path newPath) throws IOException {
+
+ }
+
+ @Override
+ public void postLogRoll(Path oldPath, Path newPath) throws IOException {
+
+ }
+
+ @Override
+ public void preLogArchive(Path oldPath, Path newPath) throws IOException {
+
+ }
+
+ @Override
+ public void postLogArchive(Path oldPath, Path newPath) throws IOException {
+
+ }
+
+ @Override
+ public void logRollRequested(RollRequestReason reason) {
+
+ }
+
+ @Override
+ public void logCloseRequested() {
+
+ }
+
+ @Override
+ public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey,
+ WALEdit logEdit) {
+
+ }
+
+ @Override
+ public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey,
+ WALEdit logEdit) throws IOException {
+
+ }
+ }
}