You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2019/08/09 21:46:13 UTC

[hbase] branch branch-1 updated: HBASE-22623 - Add RegionObserver coprocessor hook for preWALAppend (#470)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 9888217  HBASE-22623 - Add RegionObserver coprocessor hook for preWALAppend (#470)
9888217 is described below

commit 98882171778af0250dd9f5eabe0aeb72cb06db37
Author: Geoffrey Jacoby <gj...@apache.org>
AuthorDate: Fri Aug 9 14:46:08 2019 -0700

    HBASE-22623 - Add RegionObserver coprocessor hook for preWALAppend (#470)
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
---
 .../hbase/coprocessor/BaseRegionObserver.java      |   6 +
 .../hadoop/hbase/coprocessor/RegionObserver.java   |  12 ++
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  10 ++
 .../hbase/regionserver/RegionCoprocessorHost.java  |  13 ++
 .../java/org/apache/hadoop/hbase/wal/WALKey.java   |  12 ++
 .../hbase/coprocessor/SimpleRegionObserver.java    |  15 ++
 .../coprocessor/TestRegionObserverInterface.java   | 166 +++++++++++++++++++++
 7 files changed, 234 insertions(+)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
index 3fb0858..8353a47 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
@@ -548,4 +548,10 @@ public class BaseRegionObserver implements RegionObserver {
       throws IOException {
     return delTracker;
   }
+
+  @Override
+  public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx, WALKey key,
+                           WALEdit edit) throws IOException {
+
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index ad16b97..ea831ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -1341,4 +1341,16 @@ public interface RegionObserver extends Coprocessor {
   DeleteTracker postInstantiateDeleteTracker(
       final ObserverContext<RegionCoprocessorEnvironment> ctx, DeleteTracker delTracker)
       throws IOException;
+
+  /**
+   * Called just before the WAL Entry is appended to the WAL. Implementing this hook allows
+   * coprocessors to add extended attributes to the WALKey that then get persisted to the
+   * WAL, and are available to replication endpoints to use in processing WAL Entries.
+   * @param ctx the environment provided by the region server
+   * @param key the WALKey associated with a particular append to a WAL
+   * @param edit the WALEdit associated with a particular append to a WAL
+   */
+  void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx, WALKey key,
+                            WALEdit edit)
+      throws IOException;
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 3cb0de5..d784414 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -3527,6 +3527,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
               this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
               mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
+          preWALAppend(walKey, walEdit);
           txid = this.wal
               .append(this.htableDescriptor, this.getRegionInfo(), walKey,
                   walEdit, true);
@@ -7593,6 +7594,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
               this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
               processor.getClusterIds(), nonceGroup, nonce, mvcc);
+            preWALAppend(walKey, walEdit);
             txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
                 walKey, walEdit, true);
           }
@@ -7884,6 +7886,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
                   nonceGroup,
                   nonce,
                   mvcc);
+              preWALAppend(walKey, walEdits);
               txid =
                 this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, true);
             } else {
@@ -7973,6 +7976,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return mutate.isReturnResults() ? Result.create(allKVs) : null;
   }
 
+  private void preWALAppend(WALKey walKey, WALEdit walEdits) throws IOException {
+    if (this.coprocessorHost != null && !walEdits.isMetaEdit()) {
+      this.coprocessorHost.preWALAppend(walKey, walEdits);
+    }
+  }
+
   private static Cell getNewCell(final byte [] row, final long ts, final Cell cell,
       final Cell oldCell, final byte [] tagBytes) {
     // allocate an empty cell once
@@ -8124,6 +8133,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
               this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce,
               getMVCC());
+            preWALAppend(walKey, walEdits);
             txid =
               this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdits, true);
           } else {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index bc5af20..c0f357d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -1691,6 +1691,19 @@ public class RegionCoprocessorHost
     });
   }
 
+  public void preWALAppend(final WALKey key, final WALEdit edit) throws IOException {
+    if (coprocessors.isEmpty()){
+      return;
+    }
+    execOperation(new RegionOperation() {
+      @Override
+      public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
+          throws IOException {
+        oserver.preWALAppend(ctx, key, edit);
+      }
+    });
+  }
+
   private static abstract class CoprocessorOperation
       extends ObserverContext<RegionCoprocessorEnvironment> {
     public CoprocessorOperation() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index 6c231ed..b031fdd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -465,6 +465,18 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
   }
 
   /**
+   * Add a named String value to this WALKey to be persisted into the WAL
+   * @param attributeKey Name of the attribute
+   * @param attributeValue Value of the attribute
+   */
+  public void addExtendedAttribute(String attributeKey, byte[] attributeValue){
+    if (extendedAttributes == null){
+      extendedAttributes = new HashMap<String, byte[]>();
+    }
+    extendedAttributes.put(attributeKey, attributeValue);
+  }
+
+  /**
    * Return a named String value injected into the WALKey during processing, such as by a
    * coprocessor
    * @param attributeKey The key of a key / value pair
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
index dbcf2e9..7b8c323 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
@@ -139,8 +139,10 @@ public class SimpleRegionObserver extends BaseRegionObserver {
   final AtomicInteger ctPostBatchMutateIndispensably = new AtomicInteger(0);
   final AtomicInteger ctPostStartRegionOperation = new AtomicInteger(0);
   final AtomicInteger ctPostCloseRegionOperation = new AtomicInteger(0);
+  final AtomicInteger ctPreWALAppend = new AtomicInteger(0);
   final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false);
   static final String TABLE_SKIPPED = "SKIPPED_BY_PREWALRESTORE";
+  static final byte[] WAL_EXTENDED_ATTRIBUTE_BYTES = Bytes.toBytes("foo");
 
   public void setThrowOnPostFlush(Boolean val){
     throwOnPostFlush.set(val);
@@ -718,6 +720,15 @@ public class SimpleRegionObserver extends BaseRegionObserver {
     return reader;
   }
 
+  @Override
+  public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx,
+                           WALKey key, WALEdit edit) throws IOException {
+    ctPreWALAppend.incrementAndGet();
+
+    key.addExtendedAttribute(Integer.toString(ctPreWALAppend.get()),
+        Bytes.toBytes("foo"));
+  }
+
   public boolean hadPreGet() {
     return ctPreGet.get() > 0;
   }
@@ -975,6 +986,10 @@ public class SimpleRegionObserver extends BaseRegionObserver {
     return ctPostWALRestoreDeprecated.get();
   }
 
+  public int getCtPreWALAppend() {
+    return ctPreWALAppend.get();
+  }
+
   public boolean wasStoreFileReaderOpenCalled() {
     return ctPreStoreFileReaderOpen.get() > 0 && ctPostStoreFileReaderOpen.get() > 0;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index dc8040e..75c8bab 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -42,6 +43,8 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.ServerName;
@@ -78,11 +81,14 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WALKey;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
 
 @Category(MediumTests.class)
 public class TestRegionObserverInterface {
@@ -93,6 +99,7 @@ public class TestRegionObserverInterface {
   public final static byte[] B = Bytes.toBytes("b");
   public final static byte[] C = Bytes.toBytes("c");
   public final static byte[] ROW = Bytes.toBytes("testrow");
+  public final static byte[] FAMILY = Bytes.toBytes("f");
 
   private static HBaseTestingUtility util = new HBaseTestingUtility();
   private static MiniHBaseCluster cluster = null;
@@ -750,6 +757,100 @@ public class TestRegionObserverInterface {
     table.close();
   }
 
+  //called from testPreWALAppendIsWrittenToWAL
+  private void testPreWALAppendHook(Table table, TableName tableName) throws IOException {
+    int expectedCalls = 0;
+    String [] methodArray = new String[1];
+    methodArray[0] = "getCtPreWALAppend";
+    Object[] resultArray = new Object[1];
+
+    Put p = new Put(ROW);
+    p.addColumn(A, A, A);
+    table.put(p);
+    resultArray[0] = ++expectedCalls;
+    verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
+
+    Append a = new Append(ROW);
+    a.add(B, B, B);
+    table.append(a);
+    resultArray[0] = ++expectedCalls;
+    verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
+
+    Increment i = new Increment(ROW);
+    i.addColumn(C, C, 1);
+    table.increment(i);
+    resultArray[0] = ++expectedCalls;
+    verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
+
+    Delete d = new Delete(ROW);
+    table.delete(d);
+    resultArray[0] = ++expectedCalls;
+    verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
+  }
+
+  @Test
+  public void testPreWALAppend() throws Exception {
+    SimpleRegionObserver sro = new SimpleRegionObserver();
+    ObserverContext ctx = Mockito.mock(ObserverContext.class);
+    WALKey key = new WALKey(Bytes.toBytes("region"), TEST_TABLE,
+        EnvironmentEdgeManager.currentTime());
+    WALEdit edit = new WALEdit();
+    sro.preWALAppend(ctx, key, edit);
+    Assert.assertEquals(1, key.getExtendedAttributes().size());
+    Assert.assertArrayEquals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
+        key.getExtendedAttribute(Integer.toString(sro.getCtPreWALAppend())));
+  }
+
+  @Test
+  public void testPreWALAppendIsWrittenToWAL() throws Exception {
+    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() +
+        ".testPreWALAppendIsWrittenToWAL");
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    htd.addFamily(new HColumnDescriptor(A));
+    htd.addFamily(new HColumnDescriptor(B));
+    htd.addFamily(new HColumnDescriptor(C));
+    htd.addCoprocessor(SimpleRegionObserver.class.getName());
+    Table table = util.createTable(htd, null);
+    PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
+    List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
+    //should be only one region
+    HRegion region = regions.get(0);
+    region.getWAL().registerWALActionsListener(listener);
+    testPreWALAppendHook(table, tableName);
+    boolean[] expectedResults = {true, true, true, true};
+    Assert.assertArrayEquals(expectedResults, listener.getWalKeysCorrectArray());
+
+  }
+
+  @Test
+  public void testPreWALAppendNotCalledOnMetaEdit() throws Exception {
+    final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() +
+        ".testPreWALAppendNotCalledOnMetaEdt");
+
+    HTableDescriptor td = new HTableDescriptor(tableName);
+    td.addCoprocessor(SimpleRegionObserver.class.getName());
+    td.addFamily(new HColumnDescriptor(FAMILY));
+    Table table = util.createTable(td, new byte[][] { A, B, C });
+
+    PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
+    List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
+    //should be only one region
+    HRegion region = regions.get(0);
+
+    region.getWAL().registerWALActionsListener(listener);
+    //flushing should write to the WAL
+    region.flush(true);
+    //so should compaction
+    region.compact(false);
+    //and so should closing the region
+    region.close();
+
+    //but we still shouldn't have triggered preWALAppend because no user data was written
+    String[] methods = new String[] {"getCtPreWALAppend"};
+    Object[] expectedResult = new Integer[]{0};
+    verifyMethodResult(SimpleRegionObserver.class, methods, tableName, expectedResult);
+  }
+
   // check each region whether the coprocessor upcalls are called or not.
   private void verifyMethodResult(Class<?> c, String methodName[], TableName tableName,
                                   Object value[]) throws IOException {
@@ -800,4 +901,69 @@ public class TestRegionObserverInterface {
       writer.close();
     }
   }
+
+  private static class PreWALAppendWALActionsListener implements WALActionsListener {
+    boolean[] walKeysCorrect = {false, false, false, false};
+
+    @Override
+    public void postAppend(long entryLen, long elapsedTimeMillis,
+                           WALKey logKey, WALEdit logEdit) throws IOException {
+      for (int k = 0; k < 4; k++) {
+        if (!walKeysCorrect[k]) {
+          walKeysCorrect[k] = Arrays.equals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
+              logKey.getExtendedAttribute(Integer.toString(k + 1)));
+        }
+      }
+    }
+
+    @Override
+    public void postSync(long timeInNanos, int handlerSyncs) {
+
+    }
+
+    boolean[] getWalKeysCorrectArray() {
+      return walKeysCorrect;
+    }
+    @Override
+    public void preLogRoll(Path oldPath, Path newPath) throws IOException {
+
+    }
+
+    @Override
+    public void postLogRoll(Path oldPath, Path newPath) throws IOException {
+
+    }
+
+    @Override
+    public void preLogArchive(Path oldPath, Path newPath) throws IOException {
+
+    }
+
+    @Override
+    public void postLogArchive(Path oldPath, Path newPath) throws IOException {
+
+    }
+
+    @Override
+    public void logRollRequested(RollRequestReason reason) {
+
+    }
+
+    @Override
+    public void logCloseRequested() {
+
+    }
+
+    @Override
+    public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey,
+                                         WALEdit logEdit) {
+
+    }
+
+    @Override
+    public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey,
+                                         WALEdit logEdit) throws IOException {
+
+    }
+  }
 }