You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2019/08/09 21:44:32 UTC
[hbase] branch branch-2 updated: HBASE-22623 - Add RegionObserver
coprocessor hook for preWALAppend (#390)
This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 7f0ab84 HBASE-22623 - Add RegionObserver coprocessor hook for preWALAppend (#390)
7f0ab84 is described below
commit 7f0ab84214a28ee9b0a3db118d51da7f73526c84
Author: Geoffrey Jacoby <gj...@apache.org>
AuthorDate: Fri Aug 9 14:27:32 2019 -0700
HBASE-22623 - Add RegionObserver coprocessor hook for preWALAppend (#390)
Signed-off-by: Andrew Purtell <ap...@apache.org>
---
.../hadoop/hbase/coprocessor/RegionObserver.java | 12 ++
.../apache/hadoop/hbase/regionserver/HRegion.java | 5 +
.../hbase/regionserver/RegionCoprocessorHost.java | 12 ++
.../java/org/apache/hadoop/hbase/wal/WALKey.java | 8 +-
.../org/apache/hadoop/hbase/wal/WALKeyImpl.java | 39 +++++++
.../hbase/coprocessor/SimpleRegionObserver.java | 18 +++
.../coprocessor/TestRegionObserverInterface.java | 121 +++++++++++++++++++++
.../hadoop/hbase/regionserver/TestHRegion.java | 12 +-
8 files changed, 225 insertions(+), 2 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index e517405..8761d6b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -1104,4 +1104,16 @@ public interface RegionObserver {
throws IOException {
return delTracker;
}
+
+ /**
+ * Called just before the WAL Entry is appended to the WAL. Implementing this hook allows
+ * coprocessors to add extended attributes to the WALKey that then get persisted to the
+ * WAL, and are available to replication endpoints to use in processing WAL Entries.
+ * @param ctx the environment provided by the region server
+ * @param key the WALKey associated with a particular append to a WAL
+ */
+ default void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx, WALKey key,
+ WALEdit edit)
+ throws IOException {
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index d695fef..1217fab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -7910,6 +7910,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (walEdit.isReplay()) {
walKey.setOrigLogSeqNum(origLogSeqNum);
}
+ //don't call the coproc hook for writes to the WAL caused by
+ //system lifecycle events like flushes or compactions
+ if (this.coprocessorHost != null && !walEdit.isMetaEdit()) {
+ this.coprocessorHost.preWALAppend(walKey, walEdit);
+ }
WriteEntry writeEntry = null;
try {
long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 16fd332..c75e562 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -1720,6 +1720,18 @@ public class RegionCoprocessorHost
});
}
+ public void preWALAppend(WALKey key, WALEdit edit) throws IOException {
+ if (this.coprocEnvironments.isEmpty()){
+ return;
+ }
+ execOperation(new RegionObserverOperationWithoutResult() {
+ @Override
+ public void call(RegionObserver observer) throws IOException {
+ observer.preWALAppend(this, key, edit);
+ }
+ });
+ }
+
public Message preEndpointInvocation(final Service service, final String methodName,
Message request) throws IOException {
if (coprocEnvironments.isEmpty()) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index c541cc0..fdbacbd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -34,7 +34,6 @@ import java.util.UUID;
/**
* Key for WAL Entry.
- * Read-only. No Setters. For limited audience such as Coprocessors.
*/
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.REPLICATION,
HBaseInterfaceAudience.COPROC})
@@ -86,6 +85,13 @@ public interface WALKey extends SequenceId, Comparable<WALKey> {
*/
long getOrigLogSeqNum();
+ /**
+ * Add a named String value to this WALKey to be persisted into the WAL
+ * @param attributeKey Name of the attribute
+ * @param attributeValue Value of the attribute
+ */
+ void addExtendedAttribute(String attributeKey, byte[] attributeValue);
+
/**
* Return a named String value injected into the WALKey during processing, such as by a
* coprocessor
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
index 4c20eb4..6164617 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
@@ -188,6 +188,37 @@ public class WALKeyImpl implements WALKey {
}
/**
+ * Copy constructor that takes in an existing WALKeyImpl plus some extended attributes.
+ * Intended for coprocessors to add annotations to a system-generated WALKey
+ * for persistence to the WAL.
+ * @param key Key to be copied into this new key
+ * @param extendedAttributes Extra attributes to copy into the new key
+ */
+ public WALKeyImpl(WALKeyImpl key,
+ Map<String, byte[]> extendedAttributes){
+ init(key.getEncodedRegionName(), key.getTableName(), key.getSequenceId(),
+ key.getWriteTime(), key.getClusterIds(), key.getNonceGroup(), key.getNonce(),
+ key.getMvcc(), key.getReplicationScopes(), extendedAttributes);
+
+ }
+
+ /**
+ * Copy constructor that takes in an existing WALKey, the extra WALKeyImpl fields that the
+ * parent interface is missing, plus some extended attributes. Intended
+ * for coprocessors to add annotations to a system-generated WALKey for
+ * persistence to the WAL.
+ */
+ public WALKeyImpl(WALKey key,
+ List<UUID> clusterIds,
+ MultiVersionConcurrencyControl mvcc,
+ final NavigableMap<byte[], Integer> replicationScopes,
+ Map<String, byte[]> extendedAttributes){
+ init(key.getEncodedRegionName(), key.getTableName(), key.getSequenceId(),
+ key.getWriteTime(), clusterIds, key.getNonceGroup(), key.getNonce(),
+ mvcc, replicationScopes, extendedAttributes);
+
+ }
+ /**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
* A regionName is always a sub-table object.
@@ -474,6 +505,14 @@ public class WALKeyImpl implements WALKey {
}
@Override
+ public void addExtendedAttribute(String attributeKey, byte[] attributeValue){
+ if (extendedAttributes == null){
+ extendedAttributes = new HashMap<String, byte[]>();
+ }
+ extendedAttributes.put(attributeKey, attributeValue);
+ }
+
+ @Override
public byte[] getExtendedAttribute(String attributeKey){
return extendedAttributes != null ? extendedAttributes.get(attributeKey) : null;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
index 62623b0..caf0abb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -124,7 +125,11 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
final AtomicInteger ctPostStartRegionOperation = new AtomicInteger(0);
final AtomicInteger ctPostCloseRegionOperation = new AtomicInteger(0);
final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false);
+ final AtomicInteger ctPreWALAppend = new AtomicInteger(0);
+
static final String TABLE_SKIPPED = "SKIPPED_BY_PREWALRESTORE";
+ Map<String, byte[]> extendedAttributes = new HashMap<String,byte[]>();
+ static final byte[] WAL_EXTENDED_ATTRIBUTE_BYTES = Bytes.toBytes("foo");
public void setThrowOnPostFlush(Boolean val){
throwOnPostFlush.set(val);
@@ -631,6 +636,15 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return reader;
}
+ @Override
+ public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx,
+ WALKey key, WALEdit edit) throws IOException {
+ ctPreWALAppend.incrementAndGet();
+
+ key.addExtendedAttribute(Integer.toString(ctPreWALAppend.get()),
+ Bytes.toBytes("foo"));
+ }
+
public boolean hadPreGet() {
return ctPreGet.get() > 0;
}
@@ -864,6 +878,10 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return ctPostWALRestore.get();
}
+ public int getCtPreWALAppend() {
+ return ctPreWALAppend.get();
+ }
+
public boolean wasStoreFileReaderOpenCalled() {
return ctPreStoreFileReaderOpen.get() > 0 && ctPostStoreFileReaderOpen.get() > 0;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index 6934c98..e130080 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@@ -55,6 +57,8 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.filter.FilterAllFilter;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
@@ -70,6 +74,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
@@ -77,13 +82,18 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,6 +109,7 @@ public class TestRegionObserverInterface {
private static final Logger LOG = LoggerFactory.getLogger(TestRegionObserverInterface.class);
public static final TableName TEST_TABLE = TableName.valueOf("TestTable");
+ public static final byte[] FAMILY = Bytes.toBytes("f");
public final static byte[] A = Bytes.toBytes("a");
public final static byte[] B = Bytes.toBytes("b");
public final static byte[] C = Bytes.toBytes("c");
@@ -663,6 +674,97 @@ public class TestRegionObserverInterface {
table.close();
}
+ //called from testPreWALAppendIsWrittenToWAL
+ private void testPreWALAppendHook(Table table, TableName tableName) throws IOException {
+ int expectedCalls = 0;
+ String [] methodArray = new String[1];
+ methodArray[0] = "getCtPreWALAppend";
+ Object[] resultArray = new Object[1];
+
+ Put p = new Put(ROW);
+ p.addColumn(A, A, A);
+ table.put(p);
+ resultArray[0] = ++expectedCalls;
+ verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
+
+ Append a = new Append(ROW);
+ a.addColumn(B, B, B);
+ table.append(a);
+ resultArray[0] = ++expectedCalls;
+ verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
+
+ Increment i = new Increment(ROW);
+ i.addColumn(C, C, 1);
+ table.increment(i);
+ resultArray[0] = ++expectedCalls;
+ verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
+
+ Delete d = new Delete(ROW);
+ table.delete(d);
+ resultArray[0] = ++expectedCalls;
+ verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
+ }
+
+ @Test
+ public void testPreWALAppend() throws Exception {
+ SimpleRegionObserver sro = new SimpleRegionObserver();
+ ObserverContext ctx = Mockito.mock(ObserverContext.class);
+ WALKey key = new WALKeyImpl(Bytes.toBytes("region"), TEST_TABLE,
+ EnvironmentEdgeManager.currentTime());
+ WALEdit edit = new WALEdit();
+ sro.preWALAppend(ctx, key, edit);
+ Assert.assertEquals(1, key.getExtendedAttributes().size());
+ Assert.assertArrayEquals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
+ key.getExtendedAttribute(Integer.toString(sro.getCtPreWALAppend())));
+ }
+
+ @Test
+ public void testPreWALAppendIsWrittenToWAL() throws Exception {
+ final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() +
+ "." + name.getMethodName());
+ Table table = util.createTable(tableName, new byte[][] { A, B, C });
+
+ PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
+ List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
+ //should be only one region
+ HRegion region = regions.get(0);
+ region.getWAL().registerWALActionsListener(listener);
+ testPreWALAppendHook(table, tableName);
+ boolean[] expectedResults = {true, true, true, true};
+ Assert.assertArrayEquals(expectedResults, listener.getWalKeysCorrectArray());
+
+ }
+
+ @Test
+ public void testPreWALAppendNotCalledOnMetaEdit() throws Exception {
+ final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() +
+ "." + name.getMethodName());
+ TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName);
+ ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY);
+ tdBuilder.setColumnFamily(cfBuilder.build());
+ tdBuilder.setCoprocessor(SimpleRegionObserver.class.getName());
+ TableDescriptor td = tdBuilder.build();
+ Table table = util.createTable(td, new byte[][] { A, B, C });
+
+ PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
+ List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
+ //should be only one region
+ HRegion region = regions.get(0);
+
+ region.getWAL().registerWALActionsListener(listener);
+ //flushing should write to the WAL
+ region.flush(true);
+ //so should compaction
+ region.compact(false);
+ //and so should closing the region
+ region.close();
+
+ //but we still shouldn't have triggered preWALAppend because no user data was written
+ String[] methods = new String[] {"getCtPreWALAppend"};
+ Object[] expectedResult = new Integer[]{0};
+ verifyMethodResult(SimpleRegionObserver.class, methods, tableName, expectedResult);
+ }
+
// check each region whether the coprocessor upcalls are called or not.
private void verifyMethodResult(Class<?> coprocessor, String methodName[], TableName tableName,
Object value[]) throws IOException {
@@ -711,4 +813,23 @@ public class TestRegionObserverInterface {
writer.close();
}
}
+
+ private static class PreWALAppendWALActionsListener implements WALActionsListener {
+ boolean[] walKeysCorrect = {false, false, false, false};
+
+ @Override
+ public void postAppend(long entryLen, long elapsedTimeMillis,
+ WALKey logKey, WALEdit logEdit) throws IOException {
+ for (int k = 0; k < 4; k++) {
+ if (!walKeysCorrect[k]) {
+ walKeysCorrect[k] = Arrays.equals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
+ logKey.getExtendedAttribute(Integer.toString(k + 1)));
+ }
+ }
+ }
+
+ boolean[] getWalKeysCorrectArray() {
+ return walKeysCorrect;
+ }
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 7cb09a6..cc96349 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -104,6 +104,7 @@ import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
@@ -166,7 +167,6 @@ import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
-import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.metrics2.MetricsExecutor;
import org.junit.After;
import org.junit.Assert;
@@ -404,6 +404,7 @@ public class TestHRegion {
String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";
FileSystem fs = FileSystem.get(CONF);
Path rootDir = new Path(dir + testName);
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
hLog.init();
HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
@@ -2439,7 +2440,16 @@ public class TestHRegion {
return null;
}
}).when(mockedCPHost).preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class));
+ ColumnFamilyDescriptorBuilder builder = ColumnFamilyDescriptorBuilder.
+ newBuilder(COLUMN_FAMILY_BYTES);
+ ScanInfo info = new ScanInfo(CONF, builder.build(), Long.MAX_VALUE,
+ Long.MAX_VALUE, region.getCellComparator());
+ Mockito.when(mockedCPHost.preFlushScannerOpen(Mockito.any(HStore.class),
+ Mockito.any())).thenReturn(info);
+ Mockito.when(mockedCPHost.preFlush(Mockito.any(), Mockito.any(StoreScanner.class),
+ Mockito.any())).thenAnswer(i -> i.getArgument(1));
region.setCoprocessorHost(mockedCPHost);
+
region.put(originalPut);
region.setCoprocessorHost(normalCPHost);
final long finalSize = region.getDataInMemoryWithoutWAL();