You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2016/11/11 05:40:19 UTC

hbase git commit: HBASE-16962: Add readPoint to preCompactScannerOpen() and preFlushScannerOpen() API

Repository: hbase
Updated Branches:
  refs/heads/branch-1 18b31fdd3 -> 44ab659b9


HBASE-16962: Add readPoint to preCompactScannerOpen() and preFlushScannerOpen() API

Signed-off-by: anoopsamjohn <an...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/44ab659b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/44ab659b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/44ab659b

Branch: refs/heads/branch-1
Commit: 44ab659b933afed2df323b031181fbcb52c85b61
Parents: 18b31fd
Author: thiruvel <th...@yahoo-inc.com>
Authored: Wed Nov 9 11:02:41 2016 -0800
Committer: anoopsamjohn <an...@gmail.com>
Committed: Fri Nov 11 11:09:55 2016 +0530

----------------------------------------------------------------------
 .../hbase/coprocessor/BaseRegionObserver.java   | 14 +++++
 .../hbase/coprocessor/RegionObserver.java       | 60 ++++++++++++++++++--
 .../regionserver/RegionCoprocessorHost.java     | 13 +++--
 .../hadoop/hbase/regionserver/StoreFlusher.java |  3 +-
 .../regionserver/compactions/Compactor.java     | 10 ++--
 5 files changed, 85 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/44ab659b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
index eb2fc28..a0a91bc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
@@ -103,6 +103,13 @@ public class BaseRegionObserver implements RegionObserver {
   }
 
   @Override
+  public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s,
+      final long readPoint) throws IOException {
+    return preFlushScannerOpen(c, store, memstoreScanner, s);
+  }
+
+  @Override
   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
   }
 
@@ -212,6 +219,13 @@ public class BaseRegionObserver implements RegionObserver {
   }
 
   @Override
+  public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+      Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
+      InternalScanner s, CompactionRequest request, long readPoint) throws IOException {
+    return preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s, request);
+  }
+
+  @Override
   public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, final Store store,
       final StoreFile resultFile) throws IOException {
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ab659b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 42d5cdb..7cd1be5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -112,12 +112,35 @@ public interface RegionObserver extends Coprocessor {
    * @return the scanner to use during the flush.  {@code null} if the default implementation
    * is to be used.
    * @throws IOException if an error occurred on the coprocessor
+   * @deprecated Use {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner,
+   *             InternalScanner, long)}
    */
+  @Deprecated
   InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
       final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s)
       throws IOException;
 
   /**
+   * Called before a memstore is flushed to disk and prior to creating the scanner to read from
+   * the memstore.  To override or modify how a memstore is flushed,
+   * implementing classes can return a new scanner to provide the KeyValues to be
+   * stored into the new {@code StoreFile} or null to perform the default processing.
+   * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
+   * effect in this hook.
+   * @param c the environment provided by the region server
+   * @param store the store being flushed
+   * @param memstoreScanner the scanner for the memstore that is flushed
+   * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
+   * @param readPoint the readpoint to create scanner
+   * @return the scanner to use during the flush.  {@code null} if the default implementation
+   * is to be used.
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s,
+      final long readPoint) throws IOException;
+
+  /**
    * Called before the memstore is flushed to disk.
    * @param c the environment provided by the region server
    * @throws IOException if an error occurred on the coprocessor
@@ -283,7 +306,10 @@ public interface RegionObserver extends Coprocessor {
    * @return the scanner to use during compaction. {@code null} if the default implementation is to
    *         be used.
    * @throws IOException if an error occurred on the coprocessor
+   * @deprecated Use {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
+   *             InternalScanner, CompactionRequest, long)} instead.
    */
+  @Deprecated
   InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
       final Store store, List<? extends KeyValueScanner> scanners, final ScanType scanType,
       final long earliestPutTs, final InternalScanner s, CompactionRequest request)
@@ -304,12 +330,38 @@ public interface RegionObserver extends Coprocessor {
    * @param earliestPutTs timestamp of the earliest put that was found in any of the involved store
    *          files
    * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
+   * @param request compaction request
+   * @param readPoint the readpoint to create scanner
+   * @return the scanner to use during compaction. {@code null} if the default implementation is to
+   *          be used.
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Store store, List<? extends KeyValueScanner> scanners, final ScanType scanType,
+      final long earliestPutTs, final InternalScanner s, final CompactionRequest request,
+      final long readPoint) throws IOException;
+
+  /**
+   * Called prior to writing the {@link StoreFile}s selected for compaction into a new
+   * {@code StoreFile} and prior to creating the scanner used to read the input files. To override
+   * or modify the compaction process, implementing classes can return a new scanner to provide the
+   * KeyValues to be stored into the new {@code StoreFile} or null to perform the default
+   * processing. Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
+   * effect in this hook.
+   * @param c the environment provided by the region server
+   * @param store the store being compacted
+   * @param scanners the list {@link org.apache.hadoop.hbase.regionserver.StoreFileScanner}s
+   *  to be read from
+   * @param scanType the {@link ScanType} indicating whether this is a major or minor compaction
+   * @param earliestPutTs timestamp of the earliest put that was found in any of the involved store
+   *          files
+   * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
    * @return the scanner to use during compaction. {@code null} if the default implementation is to
    *         be used.
    * @throws IOException if an error occurred on the coprocessor
    * @deprecated Use
    *             {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
-   *             InternalScanner, CompactionRequest)} instead.
+   *             InternalScanner, CompactionRequest, long)} instead.
    */
   @Deprecated
   InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
@@ -983,9 +1035,9 @@ public interface RegionObserver extends Coprocessor {
    * Called before a store opens a new scanner.
    * This hook is called when a "user" scanner is opened.
    * <p>
-   * See {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, InternalScanner)}
-   * and {@link #preCompactScannerOpen(ObserverContext,
-   *  Store, List, ScanType, long, InternalScanner)}
+   * See {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, InternalScanner,
+   * long)} and {@link #preCompactScannerOpen(ObserverContext,
+   *  Store, List, ScanType, long, InternalScanner, CompactionRequest, long)}
    * to override scanners created for flushes or compactions, resp.
    * <p>
    * Call CoprocessorEnvironment#complete to skip any subsequent chained

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ab659b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 28d2129..c564091 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -520,18 +520,19 @@ public class RegionCoprocessorHost
 
   /**
    * See
-   * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
+   * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
+   *   InternalScanner, CompactionRequest, long)}
    */
   public InternalScanner preCompactScannerOpen(final Store store,
       final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
-      final CompactionRequest request) throws IOException {
+      final CompactionRequest request, final long readPoint) throws IOException {
     return execOperationWithResult(null,
         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
       @Override
       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
           throws IOException {
         setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
-          earliestPutTs, getResult(), request));
+          earliestPutTs, getResult(), request, readPoint));
       }
     });
   }
@@ -649,16 +650,16 @@ public class RegionCoprocessorHost
   /**
    * See
    * {@link RegionObserver#preFlushScannerOpen(ObserverContext,
-   *    Store, KeyValueScanner, InternalScanner)}
+   *    Store, KeyValueScanner, InternalScanner, long)}
    */
   public InternalScanner preFlushScannerOpen(final Store store,
-      final KeyValueScanner memstoreScanner) throws IOException {
+      final KeyValueScanner memstoreScanner, final long readPoint) throws IOException {
     return execOperationWithResult(null,
         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
       @Override
       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
           throws IOException {
-        setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult()));
+        setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult(), readPoint));
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ab659b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
index 9b182a2..019e1cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
@@ -83,7 +83,8 @@ abstract class StoreFlusher {
       long smallestReadPoint) throws IOException {
     InternalScanner scanner = null;
     if (store.getCoprocessorHost() != null) {
-      scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanner);
+      scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanner,
+          smallestReadPoint);
     }
     if (scanner == null) {
       Scan scan = new Scan();

http://git-wip-us.apache.org/repos/asf/hbase/blob/44ab659b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index efb5fb2..d1ab800 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -286,7 +286,8 @@ public abstract class Compactor<T extends CellSink> {
     try {
       /* Include deletes, unless we are doing a major compaction */
       ScanType scanType = scannerFactory.getScanType(request);
-      scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user);
+      scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user,
+        smallestReadPoint);
       if (scanner == null) {
         scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint);
       }
@@ -337,24 +338,25 @@ public abstract class Compactor<T extends CellSink> {
    * @param earliestPutTs Earliest put ts.
    * @param scanners File scanners for compaction files.
    * @param user the User
+   * @param readPoint the read point to help create scanner by Coprocessor if required.
    * @return Scanner override by coprocessor; null if not overriding.
    */
   protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
       final ScanType scanType, final long earliestPutTs, final List<StoreFileScanner> scanners,
-      User user) throws IOException {
+      User user, final long readPoint) throws IOException {
     if (store.getCoprocessorHost() == null) {
       return null;
     }
     if (user == null) {
       return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType,
-        earliestPutTs, request);
+        earliestPutTs, request, readPoint);
     } else {
       try {
         return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
           @Override
           public InternalScanner run() throws Exception {
             return store.getCoprocessorHost().preCompactScannerOpen(store, scanners,
-              scanType, earliestPutTs, request);
+              scanType, earliestPutTs, request, readPoint);
           }
         });
       } catch (InterruptedException ie) {