You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/11/22 16:01:31 UTC

hbase git commit: HBASE-19122 preCompact and preFlush can bypass by returning null scanner; shut it down

Repository: hbase
Updated Branches:
  refs/heads/master 7acf3f9a9 -> 194efe3e5


HBASE-19122 preCompact and preFlush can bypass by returning null scanner; shut it down

Removed TestHRegion#testMemstoreSizeWithFlushCanceling test.
CPs are not able to cancel a flush any more so this test was
failing; removed it (It was testing memory accounting kept
working across a cancelled flush).

Signed-off-by: Michael Stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/194efe3e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/194efe3e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/194efe3e

Branch: refs/heads/master
Commit: 194efe3e5a7d209fccc64d195f5d1f1df41df609
Parents: 7acf3f9
Author: Michael Stack <st...@apache.org>
Authored: Tue Nov 21 13:27:32 2017 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Wed Nov 22 08:01:01 2017 -0800

----------------------------------------------------------------------
 .../hbase/mob/DefaultMobStoreFlusher.java       |   3 -
 .../hbase/regionserver/DefaultStoreFlusher.java |   4 -
 .../regionserver/RegionCoprocessorHost.java     |  18 ++-
 .../hbase/regionserver/StripeStoreFlusher.java  |   3 -
 .../regionserver/compactions/Compactor.java     |   4 -
 ...TestRegionObserverPreFlushAndPreCompact.java | 123 +++++++++++++++++++
 .../hadoop/hbase/regionserver/TestHRegion.java  |  46 -------
 7 files changed, 137 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/194efe3e/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
index 1bc8068..27809c4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@ -110,9 +110,6 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
     // Use a store scanner to find which rows to flush.
     long smallestReadPoint = store.getSmallestReadPoint();
     InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
-    if (scanner == null) {
-      return result; // NULL scanner returned from coprocessor hooks means skip normal processing
-    }
     StoreFileWriter writer;
     try {
       // TODO: We can fail in the below block before we complete adding this flush to

http://git-wip-us.apache.org/repos/asf/hbase/blob/194efe3e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
index 06d4752..d666ba9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
@@ -54,10 +54,6 @@ public class DefaultStoreFlusher extends StoreFlusher {
     // Use a store scanner to find which rows to flush.
     long smallestReadPoint = store.getSmallestReadPoint();
     InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
-    if (scanner == null) {
-      return result; // NULL scanner returned from coprocessor hooks means skip normal processing
-    }
-
     StoreFileWriter writer;
     try {
       // TODO:  We can fail in the below block before we complete adding this flush to

http://git-wip-us.apache.org/repos/asf/hbase/blob/194efe3e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 347ee84..0e4131e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
 import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity;
@@ -655,9 +656,9 @@ public class RegionCoprocessorHost
    * @param tracker used to track the life cycle of a compaction
    * @param request the compaction request
    * @param user the user
+   * @return Scanner to use (cannot be null!)
    * @throws IOException
    */
-  // A Coprocessor can return null to cancel Compact. Leaving for now but this is form of 'bypass'.
   public InternalScanner preCompact(final HStore store, final InternalScanner scanner,
       final ScanType scanType, final CompactionLifeCycleTracker tracker,
       final CompactionRequest request, final User user) throws IOException {
@@ -670,7 +671,12 @@ public class RegionCoprocessorHost
             defaultResult, user) {
           @Override
           public InternalScanner call(RegionObserver observer) throws IOException {
-            return observer.preCompact(this, store, getResult(), scanType, tracker, request);
+            InternalScanner scanner =
+                observer.preCompact(this, store, getResult(), scanType, tracker, request);
+            if (scanner == null) {
+              throw new CoprocessorException("Null Scanner return disallowed!");
+            }
+            return scanner;
           }
         });
   }
@@ -715,9 +721,9 @@ public class RegionCoprocessorHost
 
   /**
    * Invoked before a memstore flush
+   * @return Scanner to use (cannot be null!)
    * @throws IOException
    */
-  // A Coprocessor can return null to cancel Flush. Leaving for now but this is a form of 'bypass'.
   public InternalScanner preFlush(HStore store, InternalScanner scanner,
       FlushLifeCycleTracker tracker) throws IOException {
     if (coprocEnvironments.isEmpty()) {
@@ -727,7 +733,11 @@ public class RegionCoprocessorHost
         new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, scanner) {
           @Override
           public InternalScanner call(RegionObserver observer) throws IOException {
-            return observer.preFlush(this, store, getResult(), tracker);
+            InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker);
+            if (scanner == null) {
+              throw new CoprocessorException("Null Scanner return disallowed!");
+            }
+            return scanner;
           }
         });
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/194efe3e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
index 259b333..a227979 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
@@ -64,9 +64,6 @@ public class StripeStoreFlusher extends StoreFlusher {
 
     long smallestReadPoint = store.getSmallestReadPoint();
     InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
-    if (scanner == null) {
-      return result; // NULL scanner returned from coprocessor hooks means skip normal processing
-    }
 
     // Let policy select flush method.
     StripeFlushRequest req = this.policy.selectFlush(store.getComparator(), this.stripes,

http://git-wip-us.apache.org/repos/asf/hbase/blob/194efe3e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 817ddf8..014d4d1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -320,10 +320,6 @@ public abstract class Compactor<T extends CellSink> {
       ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user);
       scanner = postCompactScannerOpen(request, scanType,
         scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user);
-      if (scanner == null) {
-        // NULL scanner returned from coprocessor hooks means skip normal processing.
-        return new ArrayList<>();
-      }
       boolean cleanSeqId = false;
       if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) {
         // For mvcc-sensitive family, we never set mvcc to 0.

http://git-wip-us.apache.org/repos/asf/hbase/blob/194efe3e/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverPreFlushAndPreCompact.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverPreFlushAndPreCompact.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverPreFlushAndPreCompact.java
new file mode 100644
index 0000000..cea0833
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverPreFlushAndPreCompact.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY;
+
+
+
+/**
+ * Test that we fail if a Coprocessor tries to return a null scanner out
+ * {@link RegionObserver#preFlush(ObserverContext, Store, InternalScanner, FlushLifeCycleTracker)}
+ * or {@link RegionObserver#preCompact(ObserverContext, Store, InternalScanner, ScanType,
+ * CompactionLifeCycleTracker, CompactionRequest)}
+ * @see <a href=https://issues.apache.org/jira/browse/HBASE-19122>HBASE-19122</a>
+ */
+@Category({CoprocessorTests.class, SmallTests.class})
+public class TestRegionObserverPreFlushAndPreCompact {
+  @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
+      withTimeout(this.getClass()).withLookingForStuckThread(true).build();
+  @Rule public TestName name = new TestName();
+
+  /**
+   * Coprocessor that returns null when preCompact or preFlush is called.
+   */
+  public static class TestRegionObserver implements RegionObserver, RegionCoprocessor {
+    @Override
+    public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+        InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
+      return null;
+    }
+
+    @Override
+    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+        InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
+        CompactionRequest request) throws IOException {
+      return null;
+    }
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+  }
+
+  /**
+   * Ensure we get expected exception when we try to return null from a preFlush call.
+   * @throws IOException We expect it to throw {@link CoprocessorException}
+   */
+  @Test (expected = CoprocessorException.class)
+  public void testPreFlushReturningNull() throws IOException {
+    RegionCoprocessorHost rch = getRegionCoprocessorHost();
+    rch.preFlush(null, null, null);
+  }
+
+  /**
+   * Ensure we get expected exception when we try to return null from a preCompact call.
+   * @throws IOException We expect it to throw {@link CoprocessorException}
+   */
+  @Test (expected = CoprocessorException.class)
+  public void testPreCompactReturningNull() throws IOException {
+    RegionCoprocessorHost rch = getRegionCoprocessorHost();
+    rch.preCompact(null, null, null, null, null, null);
+  }
+
+  private RegionCoprocessorHost getRegionCoprocessorHost() {
+    // Make up an HRegion instance. Use the hbase:meta first region as our RegionInfo. Use
+    // hbase:meta table name for building the TableDescriptor our mock returns when asked schema
+    // down inside RegionCoprocessorHost. Pass in mocked RegionServerServices too.
+    RegionInfo ri = RegionInfoBuilder.FIRST_META_REGIONINFO;
+    HRegion mockedHRegion = Mockito.mock(HRegion.class);
+    Mockito.when(mockedHRegion.getRegionInfo()).thenReturn(ri);
+    TableDescriptor td = TableDescriptorBuilder.newBuilder(ri.getTable()).build();
+    Mockito.when(mockedHRegion.getTableDescriptor()).thenReturn(td);
+    RegionServerServices mockedServices = Mockito.mock(RegionServerServices.class);
+    Configuration conf = HBaseConfiguration.create();
+    // Load our test coprocessor defined above.
+    conf.set(REGION_COPROCESSOR_CONF_KEY, TestRegionObserver.class.getName());
+    return new RegionCoprocessorHost(mockedHRegion, mockedServices, conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/194efe3e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index d538b15..0f46f69 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -378,52 +378,6 @@ public class TestHRegion {
         .getWAL(tableName.toBytes(), tableName.getNamespace());
   }
 
-  /**
-   * Test for HBASE-14229: Flushing canceled by coprocessor still leads to memstoreSize set down
-   */
-  @Test
-  public void testMemstoreSizeWithFlushCanceling() throws IOException {
-    FileSystem fs = FileSystem.get(CONF);
-    Path rootDir = new Path(dir + "testMemstoreSizeWithFlushCanceling");
-    FSHLog hLog = new FSHLog(fs, rootDir, "testMemstoreSizeWithFlushCanceling", CONF);
-    HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
-        COLUMN_FAMILY_BYTES);
-    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
-    assertEquals(0, region.getMemStoreSize());
-
-    // Put some value and make sure flush could be completed normally
-    byte [] value = Bytes.toBytes(method);
-    Put put = new Put(value);
-    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
-    region.put(put);
-    long onePutSize = region.getMemStoreSize();
-    assertTrue(onePutSize > 0);
-    region.flush(true);
-    assertEquals("memstoreSize should be zero", 0, region.getMemStoreSize());
-    assertEquals("flushable size should be zero", 0, store.getFlushableSize().getDataSize());
-
-    // save normalCPHost and replaced by mockedCPHost, which will cancel flush requests
-    RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
-    RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
-    when(mockedCPHost.preFlush(Mockito.isA(HStore.class), Mockito.isA(InternalScanner.class),
-      Mockito.isA(FlushLifeCycleTracker.class))).thenReturn(null);
-    when(mockedCPHost.preFlushScannerOpen(Mockito.isA(HStore.class),
-      Mockito.isA(FlushLifeCycleTracker.class))).thenReturn(store.getScanInfo());
-    region.setCoprocessorHost(mockedCPHost);
-    region.put(put);
-    region.flush(true);
-    assertEquals("memstoreSize should NOT be zero", onePutSize, region.getMemStoreSize());
-    assertEquals("flushable size should NOT be zero", onePutSize,
-        store.getFlushableSize().getDataSize());
-
-    // set normalCPHost and flush again, the snapshot will be flushed
-    region.setCoprocessorHost(normalCPHost);
-    region.flush(true);
-    assertEquals("memstoreSize should be zero", 0, region.getMemStoreSize());
-    assertEquals("flushable size should be zero", 0, store.getFlushableSize().getDataSize());
-    HBaseTestingUtility.closeRegionAndWAL(region);
-  }
-
   @Test
   public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException {
     String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";