You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by el...@apache.org on 2016/09/08 22:35:53 UTC
[4/7] phoenix git commit: PHOENIX-3081 Consult RegionServer
stopped/stopping state before logging error in StatisticsScanner
PHOENIX-3081 Consult RegionServer stopped/stopping state before logging error in StatisticsScanner
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ce3533de
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ce3533de
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ce3533de
Branch: refs/heads/4.8-HBase-1.2
Commit: ce3533deb255697141cc790a1c3000b41d6863dd
Parents: 03e1005
Author: Josh Elser <el...@apache.org>
Authored: Mon Jul 18 16:24:22 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Sep 8 17:58:10 2016 -0400
----------------------------------------------------------------------
.../phoenix/schema/stats/StatisticsScanner.java | 72 ++++++++--
.../schema/stats/StatisticsScannerTest.java | 144 +++++++++++++++++++
2 files changed, 202 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce3533de/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index 082e833..736efc6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -49,12 +50,14 @@ public class StatisticsScanner implements InternalScanner {
private StatisticsCollector tracker;
private ImmutableBytesPtr family;
private final Configuration config;
+ private final RegionServerServices regionServerServices;
public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter stats, RegionCoprocessorEnvironment env,
InternalScanner delegate, ImmutableBytesPtr family) {
this.tracker = tracker;
this.statsWriter = stats;
this.delegate = delegate;
+ this.regionServerServices = env.getRegionServerServices();
this.region = env.getRegion();
this.family = family;
this.config = env.getConfiguration();
@@ -89,9 +92,13 @@ public class StatisticsScanner implements InternalScanner {
@Override
public void close() throws IOException {
- boolean async = config.getBoolean(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC);
- StatisticsCollectionRunTracker collectionTracker = StatisticsCollectionRunTracker.getInstance(config);
- StatisticsScannerCallable callable = new StatisticsScannerCallable();
+ boolean async = getConfig().getBoolean(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC);
+ StatisticsCollectionRunTracker collectionTracker = getStatsCollectionRunTracker(config);
+ StatisticsScannerCallable callable = createCallable();
+ if (getRegionServerServices().isStopping() || getRegionServerServices().isStopped()) {
+ LOG.debug("Not updating table statistics because the server is stopping/stopped");
+ return;
+ }
if (!async) {
callable.call();
} else {
@@ -99,12 +106,45 @@ public class StatisticsScanner implements InternalScanner {
}
}
- private class StatisticsScannerCallable implements Callable<Void> {
+ // VisibleForTesting
+ StatisticsCollectionRunTracker getStatsCollectionRunTracker(Configuration c) {
+ return StatisticsCollectionRunTracker.getInstance(c);
+ }
+
+ Configuration getConfig() {
+ return config;
+ }
+
+ StatisticsWriter getStatisticsWriter() {
+ return statsWriter;
+ }
+
+ RegionServerServices getRegionServerServices() {
+ return regionServerServices;
+ }
+
+ Region getRegion() {
+ return region;
+ }
+
+ StatisticsScannerCallable createCallable() {
+ return new StatisticsScannerCallable();
+ }
+
+ StatisticsCollector getTracker() {
+ return tracker;
+ }
+
+ InternalScanner getDelegate() {
+ return delegate;
+ }
+
+ class StatisticsScannerCallable implements Callable<Void> {
@Override
public Void call() throws IOException {
IOException toThrow = null;
- StatisticsCollectionRunTracker collectionTracker = StatisticsCollectionRunTracker.getInstance(config);
- final HRegionInfo regionInfo = region.getRegionInfo();
+ StatisticsCollectionRunTracker collectionTracker = getStatsCollectionRunTracker(config);
+ final HRegionInfo regionInfo = getRegion().getRegionInfo();
try {
// update the statistics table
// Just verify if this if fine
@@ -114,32 +154,36 @@ public class StatisticsScanner implements InternalScanner {
LOG.debug("Deleting the stats for the region " + regionInfo.getRegionNameAsString()
+ " as part of major compaction");
}
- statsWriter.deleteStats(region, tracker, family, mutations);
+ getStatisticsWriter().deleteStats(region, tracker, family, mutations);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding new stats for the region " + regionInfo.getRegionNameAsString()
+ " as part of major compaction");
}
- statsWriter.addStats(tracker, family, mutations);
+ getStatisticsWriter().addStats(tracker, family, mutations);
if (LOG.isDebugEnabled()) {
LOG.debug("Committing new stats for the region " + regionInfo.getRegionNameAsString()
+ " as part of major compaction");
}
- statsWriter.commitStats(mutations, tracker);
+ getStatisticsWriter().commitStats(mutations, tracker);
} catch (IOException e) {
- LOG.error("Failed to update statistics table!", e);
- toThrow = e;
+ if (getRegionServerServices().isStopping() || getRegionServerServices().isStopped()) {
+ LOG.debug("Ignoring error updating statistics because region is closing/closed");
+ } else {
+ LOG.error("Failed to update statistics table!", e);
+ toThrow = e;
+ }
} finally {
try {
collectionTracker.removeCompactingRegion(regionInfo);
- statsWriter.close();// close the writer
- tracker.close();// close the tracker
+ getStatisticsWriter().close();// close the writer
+ getTracker().close();// close the tracker
} catch (IOException e) {
if (toThrow == null) toThrow = e;
LOG.error("Error while closing the stats table", e);
} finally {
// close the delegate scanner
try {
- delegate.close();
+ getDelegate().close();
} catch (IOException e) {
if (toThrow == null) toThrow = e;
LOG.error("Error while closing the scanner", e);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce3533de/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java
new file mode 100644
index 0000000..888f09a
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stats;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.schema.stats.StatisticsScanner.StatisticsScannerCallable;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test to verify that we don't try to update stats when a RS is stopping.
+ */
+public class StatisticsScannerTest {
+
+ private Region region;
+ private RegionServerServices rsServices;
+ private StatisticsWriter statsWriter;
+ private StatisticsScannerCallable callable;
+ private StatisticsCollectionRunTracker runTracker;
+ private StatisticsScanner mockScanner;
+ private StatisticsCollector tracker;
+ private InternalScanner delegate;
+ private HRegionInfo regionInfo;
+
+ private Configuration config;
+
+ @Before
+ public void setupMocks() throws Exception {
+ this.config = new Configuration(false);
+
+ // Create all of the mocks
+ this.region = mock(Region.class);
+ this.rsServices = mock(RegionServerServices.class);
+ this.statsWriter = mock(StatisticsWriter.class);
+ this.callable = mock(StatisticsScannerCallable.class);
+ this.runTracker = mock(StatisticsCollectionRunTracker.class);
+ this.mockScanner = mock(StatisticsScanner.class);
+ this.tracker = mock(StatisticsCollector.class);
+ this.delegate = mock(InternalScanner.class);
+ this.regionInfo = mock(HRegionInfo.class);
+
+ // Wire up the mocks to the mock StatisticsScanner
+ when(mockScanner.getStatisticsWriter()).thenReturn(statsWriter);
+ when(mockScanner.getRegionServerServices()).thenReturn(rsServices);
+ when(mockScanner.createCallable()).thenReturn(callable);
+ when(mockScanner.getStatsCollectionRunTracker(any(Configuration.class))).thenReturn(runTracker);
+ when(mockScanner.getRegion()).thenReturn(region);
+ when(mockScanner.getConfig()).thenReturn(config);
+ when(mockScanner.getTracker()).thenReturn(tracker);
+ when(mockScanner.getDelegate()).thenReturn(delegate);
+
+ // Wire up the HRegionInfo mock to the Region mock
+ when(region.getRegionInfo()).thenReturn(regionInfo);
+
+ // Always call close() on the mock StatisticsScanner
+ doCallRealMethod().when(mockScanner).close();
+ }
+
+ @Test
+ public void testCheckRegionServerStoppingOnClose() throws Exception {
+ when(rsServices.isStopping()).thenReturn(true);
+ when(rsServices.isStopped()).thenReturn(false);
+
+ mockScanner.close();
+
+ verify(rsServices).isStopping();
+ verify(callable, never()).call();
+ verify(runTracker, never()).runTask(callable);
+ }
+
+ @Test
+ public void testCheckRegionServerStoppedOnClose() throws Exception {
+ when(rsServices.isStopping()).thenReturn(false);
+ when(rsServices.isStopped()).thenReturn(true);
+
+ mockScanner.close();
+
+ verify(rsServices).isStopping();
+ verify(rsServices).isStopped();
+ verify(callable, never()).call();
+ verify(runTracker, never()).runTask(callable);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testCheckRegionServerStoppingOnException() throws Exception {
+ StatisticsScannerCallable realCallable = mockScanner.new StatisticsScannerCallable();
+ doThrow(new IOException()).when(statsWriter).deleteStats(any(Region.class), any(StatisticsCollector.class),
+ any(ImmutableBytesPtr.class), any(List.class));
+ when(rsServices.isStopping()).thenReturn(true);
+ when(rsServices.isStopped()).thenReturn(false);
+
+ // Should not throw an exception
+ realCallable.call();
+
+ verify(rsServices).isStopping();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testCheckRegionServerStoppedOnException() throws Exception {
+ StatisticsScannerCallable realCallable = mockScanner.new StatisticsScannerCallable();
+ doThrow(new IOException()).when(statsWriter).deleteStats(any(Region.class), any(StatisticsCollector.class),
+ any(ImmutableBytesPtr.class), any(List.class));
+ when(rsServices.isStopping()).thenReturn(false);
+ when(rsServices.isStopped()).thenReturn(true);
+
+ // Should not throw an exception
+ realCallable.call();
+
+ verify(rsServices).isStopping();
+ verify(rsServices).isStopped();
+ }
+}