You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by el...@apache.org on 2016/09/08 22:35:53 UTC

[4/7] phoenix git commit: PHOENIX-3081 Consult RegionServer stopped/stopping state before logging error in StatisticsScanner

PHOENIX-3081 Consult RegionServer stopped/stopping state before logging error in StatisticsScanner


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ce3533de
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ce3533de
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ce3533de

Branch: refs/heads/4.8-HBase-1.2
Commit: ce3533deb255697141cc790a1c3000b41d6863dd
Parents: 03e1005
Author: Josh Elser <el...@apache.org>
Authored: Mon Jul 18 16:24:22 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Sep 8 17:58:10 2016 -0400

----------------------------------------------------------------------
 .../phoenix/schema/stats/StatisticsScanner.java |  72 ++++++++--
 .../schema/stats/StatisticsScannerTest.java     | 144 +++++++++++++++++++
 2 files changed, 202 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce3533de/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index 082e833..736efc6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
@@ -49,12 +50,14 @@ public class StatisticsScanner implements InternalScanner {
     private StatisticsCollector tracker;
     private ImmutableBytesPtr family;
     private final Configuration config;
+    private final RegionServerServices regionServerServices;
 
     public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter stats, RegionCoprocessorEnvironment env,
             InternalScanner delegate, ImmutableBytesPtr family) {
         this.tracker = tracker;
         this.statsWriter = stats;
         this.delegate = delegate;
+        this.regionServerServices = env.getRegionServerServices();
         this.region = env.getRegion();
         this.family = family;
         this.config = env.getConfiguration();
@@ -89,9 +92,13 @@ public class StatisticsScanner implements InternalScanner {
 
     @Override
     public void close() throws IOException {
-        boolean async = config.getBoolean(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC);
-        StatisticsCollectionRunTracker collectionTracker = StatisticsCollectionRunTracker.getInstance(config);
-        StatisticsScannerCallable callable = new StatisticsScannerCallable();
+        boolean async = getConfig().getBoolean(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC);
+        StatisticsCollectionRunTracker collectionTracker = getStatsCollectionRunTracker(config);
+        StatisticsScannerCallable callable = createCallable();
+        if (getRegionServerServices().isStopping() || getRegionServerServices().isStopped()) {
+            LOG.debug("Not updating table statistics because the server is stopping/stopped");
+            return;
+        }
         if (!async) {
             callable.call();
         } else {
@@ -99,12 +106,45 @@ public class StatisticsScanner implements InternalScanner {
         }
     }
 
-    private class StatisticsScannerCallable implements Callable<Void> {
+    // VisibleForTesting
+    StatisticsCollectionRunTracker getStatsCollectionRunTracker(Configuration c) {
+        return StatisticsCollectionRunTracker.getInstance(c);
+    }
+
+    Configuration getConfig() {
+        return config;
+    }
+
+    StatisticsWriter getStatisticsWriter() {
+        return statsWriter;
+    }
+
+    RegionServerServices getRegionServerServices() {
+        return regionServerServices;
+    }
+
+    Region getRegion() {
+        return region;
+    }
+
+    StatisticsScannerCallable createCallable() {
+        return new StatisticsScannerCallable();
+    }
+
+    StatisticsCollector getTracker() {
+        return tracker;
+    }
+
+    InternalScanner getDelegate() {
+        return delegate;
+    }
+
+    class StatisticsScannerCallable implements Callable<Void> {
         @Override
         public Void call() throws IOException {
             IOException toThrow = null;
-            StatisticsCollectionRunTracker collectionTracker = StatisticsCollectionRunTracker.getInstance(config);
-            final HRegionInfo regionInfo = region.getRegionInfo();
+            StatisticsCollectionRunTracker collectionTracker = getStatsCollectionRunTracker(config);
+            final HRegionInfo regionInfo = getRegion().getRegionInfo();
             try {
                 // update the statistics table
                 // Just verify if this if fine
@@ -114,32 +154,36 @@ public class StatisticsScanner implements InternalScanner {
                     LOG.debug("Deleting the stats for the region " + regionInfo.getRegionNameAsString()
                             + " as part of major compaction");
                 }
-                statsWriter.deleteStats(region, tracker, family, mutations);
+                getStatisticsWriter().deleteStats(region, tracker, family, mutations);
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Adding new stats for the region " + regionInfo.getRegionNameAsString()
                             + " as part of major compaction");
                 }
-                statsWriter.addStats(tracker, family, mutations);
+                getStatisticsWriter().addStats(tracker, family, mutations);
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Committing new stats for the region " + regionInfo.getRegionNameAsString()
                             + " as part of major compaction");
                 }
-                statsWriter.commitStats(mutations, tracker);
+                getStatisticsWriter().commitStats(mutations, tracker);
             } catch (IOException e) {
-                LOG.error("Failed to update statistics table!", e);
-                toThrow = e;
+                if (getRegionServerServices().isStopping() || getRegionServerServices().isStopped()) {
+                    LOG.debug("Ignoring error updating statistics because region is closing/closed");
+                } else {
+                    LOG.error("Failed to update statistics table!", e);
+                    toThrow = e;
+                }
             } finally {
                 try {
                     collectionTracker.removeCompactingRegion(regionInfo);
-                    statsWriter.close();// close the writer
-                    tracker.close();// close the tracker
+                    getStatisticsWriter().close();// close the writer
+                    getTracker().close();// close the tracker
                 } catch (IOException e) {
                     if (toThrow == null) toThrow = e;
                     LOG.error("Error while closing the stats table", e);
                 } finally {
                     // close the delegate scanner
                     try {
-                        delegate.close();
+                        getDelegate().close();
                     } catch (IOException e) {
                         if (toThrow == null) toThrow = e;
                         LOG.error("Error while closing the scanner", e);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce3533de/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java
new file mode 100644
index 0000000..888f09a
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stats;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.schema.stats.StatisticsScanner.StatisticsScannerCallable;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test to verify that we don't try to update stats when a RS is stopping.
+ */
+public class StatisticsScannerTest {
+
+    private Region region;
+    private RegionServerServices rsServices;
+    private StatisticsWriter statsWriter;
+    private StatisticsScannerCallable callable;
+    private StatisticsCollectionRunTracker runTracker;
+    private StatisticsScanner mockScanner;
+    private StatisticsCollector tracker;
+    private InternalScanner delegate;
+    private HRegionInfo regionInfo;
+
+    private Configuration config;
+
+    @Before
+    public void setupMocks() throws Exception {
+        this.config = new Configuration(false);
+
+        // Create all of the mocks
+        this.region = mock(Region.class);
+        this.rsServices = mock(RegionServerServices.class);
+        this.statsWriter = mock(StatisticsWriter.class);
+        this.callable = mock(StatisticsScannerCallable.class);
+        this.runTracker = mock(StatisticsCollectionRunTracker.class);
+        this.mockScanner = mock(StatisticsScanner.class);
+        this.tracker = mock(StatisticsCollector.class);
+        this.delegate = mock(InternalScanner.class);
+        this.regionInfo = mock(HRegionInfo.class);
+
+        // Wire up the mocks to the mock StatisticsScanner
+        when(mockScanner.getStatisticsWriter()).thenReturn(statsWriter);
+        when(mockScanner.getRegionServerServices()).thenReturn(rsServices);
+        when(mockScanner.createCallable()).thenReturn(callable);
+        when(mockScanner.getStatsCollectionRunTracker(any(Configuration.class))).thenReturn(runTracker);
+        when(mockScanner.getRegion()).thenReturn(region);
+        when(mockScanner.getConfig()).thenReturn(config);
+        when(mockScanner.getTracker()).thenReturn(tracker);
+        when(mockScanner.getDelegate()).thenReturn(delegate);
+
+        // Wire up the HRegionInfo mock to the Region mock
+        when(region.getRegionInfo()).thenReturn(regionInfo);
+
+        // Always call close() on the mock StatisticsScanner
+        doCallRealMethod().when(mockScanner).close();
+    }
+
+    @Test
+    public void testCheckRegionServerStoppingOnClose() throws Exception {
+        when(rsServices.isStopping()).thenReturn(true);
+        when(rsServices.isStopped()).thenReturn(false);
+
+        mockScanner.close();
+
+        verify(rsServices).isStopping();
+        verify(callable, never()).call();
+        verify(runTracker, never()).runTask(callable);
+    }
+
+    @Test
+    public void testCheckRegionServerStoppedOnClose() throws Exception {
+        when(rsServices.isStopping()).thenReturn(false);
+        when(rsServices.isStopped()).thenReturn(true);
+
+        mockScanner.close();
+
+        verify(rsServices).isStopping();
+        verify(rsServices).isStopped();
+        verify(callable, never()).call();
+        verify(runTracker, never()).runTask(callable);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testCheckRegionServerStoppingOnException() throws Exception {
+        StatisticsScannerCallable realCallable = mockScanner.new StatisticsScannerCallable();
+        doThrow(new IOException()).when(statsWriter).deleteStats(any(Region.class), any(StatisticsCollector.class),
+                any(ImmutableBytesPtr.class), any(List.class));
+        when(rsServices.isStopping()).thenReturn(true);
+        when(rsServices.isStopped()).thenReturn(false);
+
+        // Should not throw an exception
+        realCallable.call();
+
+        verify(rsServices).isStopping();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testCheckRegionServerStoppedOnException() throws Exception {
+        StatisticsScannerCallable realCallable = mockScanner.new StatisticsScannerCallable();
+        doThrow(new IOException()).when(statsWriter).deleteStats(any(Region.class), any(StatisticsCollector.class),
+                any(ImmutableBytesPtr.class), any(List.class));
+        when(rsServices.isStopping()).thenReturn(false);
+        when(rsServices.isStopped()).thenReturn(true);
+
+        // Should not throw an exception
+        realCallable.call();
+
+        verify(rsServices).isStopping();
+        verify(rsServices).isStopped();
+    }
+}