You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2019/08/09 21:44:32 UTC

[hbase] branch branch-2 updated: HBASE-22623 - Add RegionObserver coprocessor hook for preWALAppend (#390)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 7f0ab84  HBASE-22623 - Add RegionObserver coprocessor hook for preWALAppend (#390)
7f0ab84 is described below

commit 7f0ab84214a28ee9b0a3db118d51da7f73526c84
Author: Geoffrey Jacoby <gj...@apache.org>
AuthorDate: Fri Aug 9 14:27:32 2019 -0700

    HBASE-22623 - Add RegionObserver coprocessor hook for preWALAppend (#390)
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
---
 .../hadoop/hbase/coprocessor/RegionObserver.java   |  12 ++
 .../apache/hadoop/hbase/regionserver/HRegion.java  |   5 +
 .../hbase/regionserver/RegionCoprocessorHost.java  |  12 ++
 .../java/org/apache/hadoop/hbase/wal/WALKey.java   |   8 +-
 .../org/apache/hadoop/hbase/wal/WALKeyImpl.java    |  39 +++++++
 .../hbase/coprocessor/SimpleRegionObserver.java    |  18 +++
 .../coprocessor/TestRegionObserverInterface.java   | 121 +++++++++++++++++++++
 .../hadoop/hbase/regionserver/TestHRegion.java     |  12 +-
 8 files changed, 225 insertions(+), 2 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index e517405..8761d6b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -1104,4 +1104,16 @@ public interface RegionObserver {
       throws IOException {
     return delTracker;
   }
+
+  /**
+   * Called just before the WAL Entry is appended to the WAL. Implementing this hook allows
+   * coprocessors to add extended attributes to the WALKey that then get persisted to the
+   * WAL, and are available to replication endpoints to use in processing WAL Entries.
+   * @param ctx the environment provided by the region server
+   * @param key the WALKey associated with a particular append to a WAL
+   */
+  default void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx, WALKey key,
+                            WALEdit edit)
+    throws IOException {
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index d695fef..1217fab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -7910,6 +7910,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     if (walEdit.isReplay()) {
       walKey.setOrigLogSeqNum(origLogSeqNum);
     }
+    //don't call the coproc hook for writes to the WAL caused by
+    //system lifecycle events like flushes or compactions
+    if (this.coprocessorHost != null && !walEdit.isMetaEdit()) {
+      this.coprocessorHost.preWALAppend(walKey, walEdit);
+    }
     WriteEntry writeEntry = null;
     try {
       long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 16fd332..c75e562 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -1720,6 +1720,18 @@ public class RegionCoprocessorHost
         });
   }
 
+  public void preWALAppend(WALKey key, WALEdit edit) throws IOException {
+    if (this.coprocEnvironments.isEmpty()){
+      return;
+    }
+    execOperation(new RegionObserverOperationWithoutResult() {
+      @Override
+      public void call(RegionObserver observer) throws IOException {
+        observer.preWALAppend(this, key, edit);
+      }
+    });
+  }
+
   public Message preEndpointInvocation(final Service service, final String methodName,
       Message request) throws IOException {
     if (coprocEnvironments.isEmpty()) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index c541cc0..fdbacbd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -34,7 +34,6 @@ import java.util.UUID;
 
 /**
  * Key for WAL Entry.
- * Read-only. No Setters. For limited audience such as Coprocessors.
  */
 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.REPLICATION,
     HBaseInterfaceAudience.COPROC})
@@ -86,6 +85,13 @@ public interface WALKey extends SequenceId, Comparable<WALKey> {
    */
   long getOrigLogSeqNum();
 
+  /**
+   * Add a named String value to this WALKey to be persisted into the WAL
+   * @param attributeKey Name of the attribute
+   * @param attributeValue Value of the attribute
+   */
+  void addExtendedAttribute(String attributeKey, byte[] attributeValue);
+
     /**
      * Return a named String value injected into the WALKey during processing, such as by a
      * coprocessor
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
index 4c20eb4..6164617 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
@@ -188,6 +188,37 @@ public class WALKeyImpl implements WALKey {
   }
 
   /**
+   * Copy constructor that takes in an existing WALKeyImpl plus some extended attributes.
+   * Intended for coprocessors to add annotations to a system-generated WALKey
+   * for persistence to the WAL.
+   * @param key Key to be copied into this new key
+   * @param extendedAttributes Extra attributes to copy into the new key
+   */
+  public WALKeyImpl(WALKeyImpl key,
+                    Map<String, byte[]> extendedAttributes){
+    init(key.getEncodedRegionName(), key.getTableName(), key.getSequenceId(),
+        key.getWriteTime(), key.getClusterIds(), key.getNonceGroup(), key.getNonce(),
+        key.getMvcc(), key.getReplicationScopes(), extendedAttributes);
+
+  }
+
+  /**
+   * Copy constructor that takes in an existing WALKey, the extra WALKeyImpl fields that the
+   * parent interface is missing, plus some extended attributes. Intended
+   * for coprocessors to add annotations to a system-generated WALKey for
+   * persistence to the WAL.
+   */
+  public WALKeyImpl(WALKey key,
+                    List<UUID> clusterIds,
+                    MultiVersionConcurrencyControl mvcc,
+                    final NavigableMap<byte[], Integer> replicationScopes,
+                    Map<String, byte[]> extendedAttributes){
+    init(key.getEncodedRegionName(), key.getTableName(), key.getSequenceId(),
+        key.getWriteTime(), clusterIds, key.getNonceGroup(), key.getNonce(),
+        mvcc, replicationScopes, extendedAttributes);
+
+  }
+  /**
    * Create the log key for writing to somewhere.
    * We maintain the tablename mainly for debugging purposes.
    * A regionName is always a sub-table object.
@@ -474,6 +505,14 @@ public class WALKeyImpl implements WALKey {
   }
 
   @Override
+  public void addExtendedAttribute(String attributeKey, byte[] attributeValue){
+    if (extendedAttributes == null){
+      extendedAttributes = new HashMap<String, byte[]>();
+    }
+    extendedAttributes.put(attributeKey, attributeValue);
+  }
+
+  @Override
   public byte[] getExtendedAttribute(String attributeKey){
     return extendedAttributes != null ? extendedAttributes.get(attributeKey) : null;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
index 62623b0..caf0abb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -124,7 +125,11 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
   final AtomicInteger ctPostStartRegionOperation = new AtomicInteger(0);
   final AtomicInteger ctPostCloseRegionOperation = new AtomicInteger(0);
   final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false);
+  final AtomicInteger ctPreWALAppend = new AtomicInteger(0);
+
   static final String TABLE_SKIPPED = "SKIPPED_BY_PREWALRESTORE";
+  Map<String, byte[]> extendedAttributes = new HashMap<String,byte[]>();
+  static final byte[] WAL_EXTENDED_ATTRIBUTE_BYTES = Bytes.toBytes("foo");
 
   public void setThrowOnPostFlush(Boolean val){
     throwOnPostFlush.set(val);
@@ -631,6 +636,15 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
     return reader;
   }
 
+  @Override
+  public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx,
+                                 WALKey key, WALEdit edit) throws IOException {
+    ctPreWALAppend.incrementAndGet();
+
+    key.addExtendedAttribute(Integer.toString(ctPreWALAppend.get()),
+        Bytes.toBytes("foo"));
+  }
+
   public boolean hadPreGet() {
     return ctPreGet.get() > 0;
   }
@@ -864,6 +878,10 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
     return ctPostWALRestore.get();
   }
 
+  public int getCtPreWALAppend() {
+    return ctPreWALAppend.get();
+  }
+
   public boolean wasStoreFileReaderOpenCalled() {
     return ctPreStoreFileReaderOpen.get() > 0 && ctPostStoreFileReaderOpen.get() > 0;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index 6934c98..e130080 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import org.apache.hadoop.conf.Configuration;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
@@ -55,6 +57,8 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.filter.FilterAllFilter;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
@@ -70,6 +74,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
@@ -77,13 +82,18 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -99,6 +109,7 @@ public class TestRegionObserverInterface {
   private static final Logger LOG = LoggerFactory.getLogger(TestRegionObserverInterface.class);
 
   public static final TableName TEST_TABLE = TableName.valueOf("TestTable");
+  public static final byte[] FAMILY = Bytes.toBytes("f");
   public final static byte[] A = Bytes.toBytes("a");
   public final static byte[] B = Bytes.toBytes("b");
   public final static byte[] C = Bytes.toBytes("c");
@@ -663,6 +674,97 @@ public class TestRegionObserverInterface {
     table.close();
   }
 
+  //called from testPreWALAppendIsWrittenToWAL
+  private void testPreWALAppendHook(Table table, TableName tableName) throws IOException {
+    int expectedCalls = 0;
+    String [] methodArray = new String[1];
+    methodArray[0] = "getCtPreWALAppend";
+    Object[] resultArray = new Object[1];
+
+    Put p = new Put(ROW);
+    p.addColumn(A, A, A);
+    table.put(p);
+    resultArray[0] = ++expectedCalls;
+    verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
+
+    Append a = new Append(ROW);
+    a.addColumn(B, B, B);
+    table.append(a);
+    resultArray[0] = ++expectedCalls;
+    verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
+
+    Increment i = new Increment(ROW);
+    i.addColumn(C, C, 1);
+    table.increment(i);
+    resultArray[0] = ++expectedCalls;
+    verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
+
+    Delete d = new Delete(ROW);
+    table.delete(d);
+    resultArray[0] = ++expectedCalls;
+    verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
+  }
+
+  @Test
+  public void testPreWALAppend() throws Exception {
+    SimpleRegionObserver sro = new SimpleRegionObserver();
+    ObserverContext ctx = Mockito.mock(ObserverContext.class);
+    WALKey key = new WALKeyImpl(Bytes.toBytes("region"), TEST_TABLE,
+        EnvironmentEdgeManager.currentTime());
+    WALEdit edit = new WALEdit();
+    sro.preWALAppend(ctx, key, edit);
+    Assert.assertEquals(1, key.getExtendedAttributes().size());
+    Assert.assertArrayEquals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
+        key.getExtendedAttribute(Integer.toString(sro.getCtPreWALAppend())));
+  }
+
+  @Test
+  public void testPreWALAppendIsWrittenToWAL() throws Exception {
+    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() +
+        "." + name.getMethodName());
+    Table table = util.createTable(tableName, new byte[][] { A, B, C });
+
+    PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
+    List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
+    //should be only one region
+    HRegion region = regions.get(0);
+    region.getWAL().registerWALActionsListener(listener);
+    testPreWALAppendHook(table, tableName);
+    boolean[] expectedResults = {true, true, true, true};
+    Assert.assertArrayEquals(expectedResults, listener.getWalKeysCorrectArray());
+
+  }
+
+  @Test
+  public void testPreWALAppendNotCalledOnMetaEdit() throws Exception {
+    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() +
+        "." + name.getMethodName());
+    TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName);
+    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY);
+    tdBuilder.setColumnFamily(cfBuilder.build());
+    tdBuilder.setCoprocessor(SimpleRegionObserver.class.getName());
+    TableDescriptor td = tdBuilder.build();
+    Table table = util.createTable(td, new byte[][] { A, B, C });
+
+    PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
+    List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
+    //should be only one region
+    HRegion region = regions.get(0);
+
+    region.getWAL().registerWALActionsListener(listener);
+    //flushing should write to the WAL
+    region.flush(true);
+    //so should compaction
+    region.compact(false);
+    //and so should closing the region
+    region.close();
+
+    //but we still shouldn't have triggered preWALAppend because no user data was written
+    String[] methods = new String[] {"getCtPreWALAppend"};
+    Object[] expectedResult = new Integer[]{0};
+    verifyMethodResult(SimpleRegionObserver.class, methods, tableName, expectedResult);
+  }
+
   // check each region whether the coprocessor upcalls are called or not.
   private void verifyMethodResult(Class<?> coprocessor, String methodName[], TableName tableName,
       Object value[]) throws IOException {
@@ -711,4 +813,23 @@ public class TestRegionObserverInterface {
       writer.close();
     }
   }
+
+  private static class PreWALAppendWALActionsListener implements WALActionsListener {
+    boolean[] walKeysCorrect = {false, false, false, false};
+
+    @Override
+    public void postAppend(long entryLen, long elapsedTimeMillis,
+                           WALKey logKey, WALEdit logEdit) throws IOException {
+      for (int k = 0; k < 4; k++) {
+        if (!walKeysCorrect[k]) {
+          walKeysCorrect[k] = Arrays.equals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
+              logKey.getExtendedAttribute(Integer.toString(k + 1)));
+        }
+      }
+    }
+
+    boolean[] getWalKeysCorrectArray() {
+      return walKeysCorrect;
+    }
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 7cb09a6..cc96349 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -104,6 +104,7 @@ import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
@@ -166,7 +167,6 @@ import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
 import org.apache.hadoop.hbase.wal.WALSplitUtil;
-import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.metrics2.MetricsExecutor;
 import org.junit.After;
 import org.junit.Assert;
@@ -404,6 +404,7 @@ public class TestHRegion {
     String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";
     FileSystem fs = FileSystem.get(CONF);
     Path rootDir = new Path(dir + testName);
+    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
     FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
     hLog.init();
     HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
@@ -2439,7 +2440,16 @@ public class TestHRegion {
         return null;
       }
     }).when(mockedCPHost).preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class));
+    ColumnFamilyDescriptorBuilder builder = ColumnFamilyDescriptorBuilder.
+        newBuilder(COLUMN_FAMILY_BYTES);
+    ScanInfo info = new ScanInfo(CONF, builder.build(), Long.MAX_VALUE,
+        Long.MAX_VALUE, region.getCellComparator());
+    Mockito.when(mockedCPHost.preFlushScannerOpen(Mockito.any(HStore.class),
+        Mockito.any())).thenReturn(info);
+    Mockito.when(mockedCPHost.preFlush(Mockito.any(), Mockito.any(StoreScanner.class),
+        Mockito.any())).thenAnswer(i -> i.getArgument(1));
     region.setCoprocessorHost(mockedCPHost);
+
     region.put(originalPut);
     region.setCoprocessorHost(normalCPHost);
     final long finalSize = region.getDataInMemoryWithoutWAL();