You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by el...@apache.org on 2016/09/08 22:35:52 UTC
[3/7] phoenix git commit: PHOENIX-3081 Consult RegionServer
stopped/stopping state before logging error in StatisticsScanner
PHOENIX-3081 Consult RegionServer stopped/stopping state before logging error in StatisticsScanner
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/bbb3e911
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/bbb3e911
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/bbb3e911
Branch: refs/heads/4.x-HBase-0.98
Commit: bbb3e911900ea5f6b3cfacc700a57ba872bb48d1
Parents: fb976aa
Author: Josh Elser <el...@apache.org>
Authored: Mon Jul 18 16:24:22 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Sep 8 17:58:01 2016 -0400
----------------------------------------------------------------------
.../phoenix/schema/stats/StatisticsScanner.java | 91 +++++++++---
.../schema/stats/StatisticsScannerTest.java | 144 +++++++++++++++++++
2 files changed, 212 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bbb3e911/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index a9ce275..71be072 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -48,12 +49,14 @@ public class StatisticsScanner implements InternalScanner {
private StatisticsCollector tracker;
private ImmutableBytesPtr family;
private final Configuration config;
+ private final RegionServerServices regionServerServices;
public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter statsWriter, RegionCoprocessorEnvironment env,
InternalScanner delegate, ImmutableBytesPtr family) {
this.tracker = tracker;
this.statsWriter = statsWriter;
this.delegate = delegate;
+ this.regionServerServices = env.getRegionServerServices();
this.region = env.getRegion();
this.config = env.getConfiguration();
this.family = family;
@@ -86,12 +89,61 @@ public class StatisticsScanner implements InternalScanner {
}
}
- private class StatisticsScannerCallable implements Callable<Void> {
+ @Override
+ public void close() throws IOException {
+ boolean async = getConfig().getBoolean(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC);
+ StatisticsCollectionRunTracker collectionTracker = getStatsCollectionRunTracker(config);
+ StatisticsScannerCallable callable = createCallable();
+ if (getRegionServerServices().isStopping() || getRegionServerServices().isStopped()) {
+ LOG.debug("Not updating table statistics because the server is stopping/stopped");
+ return;
+ }
+ if (!async) {
+ callable.call();
+ } else {
+ collectionTracker.runTask(callable);
+ }
+ }
+
+ // VisibleForTesting
+ StatisticsCollectionRunTracker getStatsCollectionRunTracker(Configuration c) {
+ return StatisticsCollectionRunTracker.getInstance(c);
+ }
+
+ Configuration getConfig() {
+ return config;
+ }
+
+ StatisticsWriter getStatisticsWriter() {
+ return statsWriter;
+ }
+
+ RegionServerServices getRegionServerServices() {
+ return regionServerServices;
+ }
+
+ HRegion getRegion() {
+ return region;
+ }
+
+ StatisticsScannerCallable createCallable() {
+ return new StatisticsScannerCallable();
+ }
+
+ StatisticsCollector getTracker() {
+ return tracker;
+ }
+
+ InternalScanner getDelegate() {
+ return delegate;
+ }
+
+ class StatisticsScannerCallable implements Callable<Void> {
@Override
public Void call() throws IOException {
IOException toThrow = null;
- StatisticsCollectionRunTracker statsRunState =
- StatisticsCollectionRunTracker.getInstance(config);
+ StatisticsCollectionRunTracker collectionTracker = getStatsCollectionRunTracker(config);
+ final HRegion region = getRegion();
try {
// update the statistics table
// Just verify if this if fine
@@ -100,32 +152,36 @@ public class StatisticsScanner implements InternalScanner {
LOG.debug("Deleting the stats for the region " + region.getRegionNameAsString()
+ " as part of major compaction");
}
- statsWriter.deleteStats(region, tracker, family, mutations);
+ getStatisticsWriter().deleteStats(region, tracker, family, mutations);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding new stats for the region " + region.getRegionNameAsString()
+ " as part of major compaction");
}
- statsWriter.addStats(tracker, family, mutations);
+ getStatisticsWriter().addStats(tracker, family, mutations);
if (LOG.isDebugEnabled()) {
LOG.debug("Committing new stats for the region " + region.getRegionNameAsString()
+ " as part of major compaction");
}
- statsWriter.commitStats(mutations, tracker);
+ getStatisticsWriter().commitStats(mutations, tracker);
} catch (IOException e) {
- LOG.error("Failed to update statistics table!", e);
- toThrow = e;
+ if (getRegionServerServices().isStopping() || getRegionServerServices().isStopped()) {
+ LOG.debug("Ignoring error updating statistics because region is closing/closed");
+ } else {
+ LOG.error("Failed to update statistics table!", e);
+ toThrow = e;
+ }
} finally {
try {
- statsRunState.removeCompactingRegion(region.getRegionInfo());
- statsWriter.close();
- tracker.close();// close the tracker
+ collectionTracker.removeCompactingRegion(region.getRegionInfo());
+ getStatisticsWriter().close();// close the writer
+ getTracker().close();// close the tracker
} catch (IOException e) {
if (toThrow == null) toThrow = e;
LOG.error("Error while closing the stats table", e);
} finally {
// close the delegate scanner
try {
- delegate.close();
+ getDelegate().close();
} catch (IOException e) {
if (toThrow == null) toThrow = e;
LOG.error("Error while closing the scanner", e);
@@ -137,15 +193,4 @@ public class StatisticsScanner implements InternalScanner {
return null;
}
}
-
- @Override
- public void close() throws IOException {
- boolean async = config.getBoolean(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC);
- StatisticsScannerCallable callable = new StatisticsScannerCallable();
- if (!async) {
- callable.call();
- } else {
- StatisticsCollectionRunTracker.getInstance(config).runTask(callable);
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bbb3e911/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java
new file mode 100644
index 0000000..685b4b6
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stats;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.schema.stats.StatisticsScanner.StatisticsScannerCallable;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test to verify that we don't try to update stats when a RS is stopping.
+ */
+public class StatisticsScannerTest {
+
+ private HRegion region;
+ private RegionServerServices rsServices;
+ private StatisticsWriter statsWriter;
+ private StatisticsScannerCallable callable;
+ private StatisticsCollectionRunTracker runTracker;
+ private StatisticsScanner mockScanner;
+ private StatisticsCollector tracker;
+ private InternalScanner delegate;
+ private HRegionInfo regionInfo;
+
+ private Configuration config;
+
+ @Before
+ public void setupMocks() throws Exception {
+ this.config = new Configuration(false);
+
+ // Create all of the mocks
+ this.region = mock(HRegion.class);
+ this.rsServices = mock(RegionServerServices.class);
+ this.statsWriter = mock(StatisticsWriter.class);
+ this.callable = mock(StatisticsScannerCallable.class);
+ this.runTracker = mock(StatisticsCollectionRunTracker.class);
+ this.mockScanner = mock(StatisticsScanner.class);
+ this.tracker = mock(StatisticsCollector.class);
+ this.delegate = mock(InternalScanner.class);
+ this.regionInfo = mock(HRegionInfo.class);
+
+ // Wire up the mocks to the mock StatisticsScanner
+ when(mockScanner.getStatisticsWriter()).thenReturn(statsWriter);
+ when(mockScanner.getRegionServerServices()).thenReturn(rsServices);
+ when(mockScanner.createCallable()).thenReturn(callable);
+ when(mockScanner.getStatsCollectionRunTracker(any(Configuration.class))).thenReturn(runTracker);
+ when(mockScanner.getRegion()).thenReturn(region);
+ when(mockScanner.getConfig()).thenReturn(config);
+ when(mockScanner.getTracker()).thenReturn(tracker);
+ when(mockScanner.getDelegate()).thenReturn(delegate);
+
+ // Wire up the HRegionInfo mock to the Region mock
+ when(region.getRegionInfo()).thenReturn(regionInfo);
+
+ // Always call close() on the mock StatisticsScanner
+ doCallRealMethod().when(mockScanner).close();
+ }
+
+ @Test
+ public void testCheckRegionServerStoppingOnClose() throws Exception {
+ when(rsServices.isStopping()).thenReturn(true);
+ when(rsServices.isStopped()).thenReturn(false);
+
+ mockScanner.close();
+
+ verify(rsServices).isStopping();
+ verify(callable, never()).call();
+ verify(runTracker, never()).runTask(callable);
+ }
+
+ @Test
+ public void testCheckRegionServerStoppedOnClose() throws Exception {
+ when(rsServices.isStopping()).thenReturn(false);
+ when(rsServices.isStopped()).thenReturn(true);
+
+ mockScanner.close();
+
+ verify(rsServices).isStopping();
+ verify(rsServices).isStopped();
+ verify(callable, never()).call();
+ verify(runTracker, never()).runTask(callable);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testCheckRegionServerStoppingOnException() throws Exception {
+ StatisticsScannerCallable realCallable = mockScanner.new StatisticsScannerCallable();
+ doThrow(new IOException()).when(statsWriter).deleteStats(any(HRegion.class), any(StatisticsCollector.class),
+ any(ImmutableBytesPtr.class), any(List.class));
+ when(rsServices.isStopping()).thenReturn(true);
+ when(rsServices.isStopped()).thenReturn(false);
+
+ // Should not throw an exception
+ realCallable.call();
+
+ verify(rsServices).isStopping();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testCheckRegionServerStoppedOnException() throws Exception {
+ StatisticsScannerCallable realCallable = mockScanner.new StatisticsScannerCallable();
+ doThrow(new IOException()).when(statsWriter).deleteStats(any(HRegion.class), any(StatisticsCollector.class),
+ any(ImmutableBytesPtr.class), any(List.class));
+ when(rsServices.isStopping()).thenReturn(false);
+ when(rsServices.isStopped()).thenReturn(true);
+
+ // Should not throw an exception
+ realCallable.call();
+
+ verify(rsServices).isStopping();
+ verify(rsServices).isStopped();
+ }
+}