You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gr...@apache.org on 2016/02/19 15:30:19 UTC
phoenix git commit: LP-2692 Config setting for disabling stats
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-1.0 aa362e744 -> f88b76662
LP-2692 Config setting for disabling stats
Add configuration setting to allow disabling stats collection, for
environments where it is not desired or is causing issues.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f88b7666
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f88b7666
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f88b7666
Branch: refs/heads/4.x-HBase-1.0
Commit: f88b766628d75e25e3d9509a6f40f367b54b8021
Parents: aa362e7
Author: Gabriel Reid <ga...@ngdata.com>
Authored: Thu Feb 18 10:20:36 2016 +0100
Committer: Gabriel Reid <ga...@ngdata.com>
Committed: Fri Feb 19 14:35:03 2016 +0100
----------------------------------------------------------------------
.../end2end/StatsCollectionDisabledIT.java | 70 ++++++
.../UngroupedAggregateRegionObserver.java | 12 +-
.../org/apache/phoenix/query/QueryServices.java | 1 +
.../stats/DefaultStatisticsCollector.java | 222 +++++++++++++++++++
.../schema/stats/NoOpStatisticsCollector.java | 71 ++++++
.../phoenix/schema/stats/PTableStats.java | 2 +-
.../schema/stats/StatisticsCollector.java | 213 +++---------------
.../stats/StatisticsCollectorFactory.java | 63 ++++++
.../phoenix/schema/stats/StatisticsScanner.java | 2 +-
.../phoenix/schema/stats/StatisticsWriter.java | 6 +-
10 files changed, 468 insertions(+), 194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f88b7666/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectionDisabledIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectionDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectionDisabledIT.java
new file mode 100644
index 0000000..a92a665
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectionDisabledIT.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.collect.Maps;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Verifies that statistics are not collected if they are disabled via a setting
+ */
+public class StatsCollectionDisabledIT extends StatsCollectorAbstractIT {
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ // Must update config before starting server
+ props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
+ props.put(QueryServices.STATS_ENABLED_ATTRIB, Boolean.toString(false));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Test
+ public void testStatisticsAreNotWritten() throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE T1 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR)");
+ stmt.execute("UPSERT INTO T1 VALUES (1, 'NAME1')");
+ stmt.execute("UPSERT INTO T1 VALUES (2, 'NAME2')");
+ stmt.execute("UPSERT INTO T1 VALUES (3, 'NAME3')");
+ conn.commit();
+ stmt.execute("UPDATE STATISTICS T1");
+ ResultSet rs = stmt.executeQuery("SELECT * FROM SYSTEM.STATS");
+ assertFalse(rs.next());
+ rs.close();
+ stmt.close();
+ conn.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f88b7666/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 942cf01..f7393cc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -88,6 +88,7 @@ import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.ValueSchema.Field;
import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
import org.apache.phoenix.schema.stats.StatisticsCollector;
+import org.apache.phoenix.schema.stats.StatisticsCollectorFactory;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PChar;
@@ -183,9 +184,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
byte[] gp_per_region_bytes =
scan.getAttribute(BaseScannerRegionObserver.GUIDEPOST_PER_REGION);
// Let this throw, as this scan is being done for the sole purpose of collecting stats
- StatisticsCollector statsCollector =
- new StatisticsCollector(env, region.getRegionInfo().getTable()
- .getNameAsString(), ts, gp_width_bytes, gp_per_region_bytes);
+ StatisticsCollector statsCollector = StatisticsCollectorFactory.createStatisticsCollector(
+ env, region.getRegionInfo().getTable().getNameAsString(), ts,
+ gp_width_bytes, gp_per_region_bytes);
return collectStats(s, statsCollector, region, scan, env.getConfiguration());
}
int offsetToBe = 0;
@@ -608,8 +609,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
try {
long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
- StatisticsCollector stats = new StatisticsCollector(c.getEnvironment(), table.getNameAsString(),
- clientTimeStamp, store.getFamily().getName());
+ StatisticsCollector stats = StatisticsCollectorFactory.createStatisticsCollector(
+ c.getEnvironment(), table.getNameAsString(), clientTimeStamp,
+ store.getFamily().getName());
internalScanner = stats.createCompactionScanner(c.getEnvironment(), store, scanner);
} catch (IOException e) {
// If we can't reach the stats table, don't interrupt the normal
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f88b7666/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 5ea0d10..000c63c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -154,6 +154,7 @@ public interface QueryServices extends SQLCloseable {
public static final String STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB = "phoenix.stats.guidepost.width";
public static final String STATS_GUIDEPOST_PER_REGION_ATTRIB = "phoenix.stats.guidepost.per.region";
public static final String STATS_USE_CURRENT_TIME_ATTRIB = "phoenix.stats.useCurrentTime";
+ public static final String STATS_ENABLED_ATTRIB = "phoenix.stats.enabled";
public static final String RUN_UPDATE_STATS_ASYNC = "phoenix.update.stats.command.async";
public static final String STATS_SERVER_POOL_SIZE = "phoenix.stats.pool.size";
public static final String COMMIT_STATS_ASYNC = "phoenix.stats.commit.async";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f88b7666/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
new file mode 100644
index 0000000..38e1ca5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stats;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.TimeKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+/**
+ * A default implementation of the Statistics tracker that helps to collect stats like min key, max key and guideposts.
+ * TODO: review timestamps used for stats. We support the user controlling the timestamps, so we should honor that with
+ * timestamps for stats as well. The issue is for compaction, though. I don't know of a way for the user to specify any
+ * timestamp for that. Perhaps best to use current time across the board for now.
+ */
+class DefaultStatisticsCollector implements StatisticsCollector {
+ private static final Logger logger = LoggerFactory.getLogger(DefaultStatisticsCollector.class);
+
+ private long guidepostDepth;
+ private long maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
+ private Map<ImmutableBytesPtr, Pair<Long, GuidePostsInfoBuilder>> guidePostsInfoWriterMap = Maps.newHashMap();
+ protected StatisticsWriter statsTable;
+ private Pair<Long, GuidePostsInfoBuilder> cachedGps = null;
+
+ DefaultStatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp, byte[] family,
+ byte[] gp_width_bytes, byte[] gp_per_region_bytes) throws IOException {
+ Configuration config = env.getConfiguration();
+ int guidepostPerRegion = gp_per_region_bytes == null
+ ? config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION)
+ : PInteger.INSTANCE.getCodec().decodeInt(gp_per_region_bytes, 0, SortOrder.getDefault());
+ long guidepostWidth = gp_width_bytes == null
+ ? config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES)
+ : PLong.INSTANCE.getCodec().decodeInt(gp_width_bytes, 0, SortOrder.getDefault());
+ this.guidepostDepth = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth,
+ env.getRegion().getTableDesc());
+ // Provides a means of clients controlling their timestamps to not use current time
+ // when background tasks are updating stats. Instead we track the max timestamp of
+ // the cells and use that.
+ boolean useCurrentTime = env.getConfiguration().getBoolean(
+ QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
+ if (!useCurrentTime) {
+ clientTimeStamp = DefaultStatisticsCollector.NO_TIMESTAMP;
+ }
+ // Get the stats table associated with the current table on which the CP is
+ // triggered
+ this.statsTable = StatisticsWriter.newWriter(env, tableName, clientTimeStamp);
+ // in a compaction we know the one family ahead of time
+ if (family != null) {
+ ImmutableBytesPtr cfKey = new ImmutableBytesPtr(family);
+ cachedGps = new Pair<Long, GuidePostsInfoBuilder>(0l, new GuidePostsInfoBuilder());
+ guidePostsInfoWriterMap.put(cfKey, cachedGps);
+ }
+ }
+
+ @Override
+ public long getMaxTimeStamp() {
+ return maxTimeStamp;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.statsTable.close();
+ }
+
+ @Override
+ public void updateStatistic(HRegion region) {
+ try {
+ ArrayList<Mutation> mutations = new ArrayList<Mutation>();
+ writeStatsToStatsTable(region, true, mutations, TimeKeeper.SYSTEM.getCurrentTime());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Committing new stats for the region " + region.getRegionInfo());
+ }
+ commitStats(mutations);
+ } catch (IOException e) {
+ logger.error("Unable to commit new stats", e);
+ } finally {
+ clear();
+ }
+ }
+
+ private void writeStatsToStatsTable(final HRegion region, boolean delete, List<Mutation> mutations, long currentTime)
+ throws IOException {
+ try {
+ // update the statistics table
+ for (ImmutableBytesPtr fam : guidePostsInfoWriterMap.keySet()) {
+ if (delete) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Deleting the stats for the region " + region.getRegionInfo());
+ }
+ statsTable.deleteStats(region, this, fam, mutations);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Adding new stats for the region " + region.getRegionInfo());
+ }
+ statsTable.addStats(this, fam, mutations);
+ }
+ } catch (IOException e) {
+ logger.error("Failed to update statistics table!", e);
+ throw e;
+ }
+ }
+
+ private void commitStats(List<Mutation> mutations) throws IOException {
+ statsTable.commitStats(mutations);
+ }
+
+ /**
+ * Update the current statistics based on the latest batch of key-values from the underlying scanner
+ *
+ * @param results
+ * next batch of {@link KeyValue}s
+ */
+ @Override
+ public void collectStatistics(final List<Cell> results) {
+ Map<ImmutableBytesPtr, Boolean> famMap = Maps.newHashMap();
+ for (Cell cell : results) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ maxTimeStamp = Math.max(maxTimeStamp, kv.getTimestamp());
+ Pair<Long, GuidePostsInfoBuilder> gps;
+ if (cachedGps == null) {
+ ImmutableBytesPtr cfKey = new ImmutableBytesPtr(kv.getFamilyArray(), kv.getFamilyOffset(),
+ kv.getFamilyLength());
+ gps = guidePostsInfoWriterMap.get(cfKey);
+ if (gps == null) {
+ gps = new Pair<Long, GuidePostsInfoBuilder>(0l,
+ new GuidePostsInfoBuilder());
+ guidePostsInfoWriterMap.put(cfKey, gps);
+ }
+ if (famMap.get(cfKey) == null) {
+ famMap.put(cfKey, true);
+ gps.getSecond().incrementRowCount();
+ }
+ } else {
+ gps = cachedGps;
+ cachedGps.getSecond().incrementRowCount();
+ }
+ int kvLength = kv.getLength();
+ long byteCount = gps.getFirst() + kvLength;
+ gps.setFirst(byteCount);
+ if (byteCount >= guidepostDepth) {
+ ImmutableBytesWritable row = new ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
+ if (gps.getSecond().addGuidePosts(row, byteCount, gps.getSecond().getRowCount())) {
+ gps.setFirst(0l);
+ gps.getSecond().resetRowCount();
+ }
+ }
+ }
+ }
+
+ @Override
+ public InternalScanner createCompactionScanner(RegionCoprocessorEnvironment env, Store store,
+ InternalScanner s) throws IOException {
+ // See if this is for Major compaction
+ if (logger.isDebugEnabled()) {
+ logger.debug("Compaction scanner created for stats");
+ }
+ ImmutableBytesPtr cfKey = new ImmutableBytesPtr(store.getFamily().getName());
+ return getInternalScanner(env, s, cfKey);
+ }
+
+ protected InternalScanner getInternalScanner(RegionCoprocessorEnvironment env, InternalScanner internalScan,
+ ImmutableBytesPtr family) {
+ return new StatisticsScanner(this, statsTable, env, internalScan, family);
+ }
+
+ @Override
+ public void clear() {
+ this.guidePostsInfoWriterMap.clear();
+ maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
+ }
+
+ @Override
+ public GuidePostsInfo getGuidePosts(ImmutableBytesPtr fam) {
+ Pair<Long, GuidePostsInfoBuilder> pair = guidePostsInfoWriterMap.get(fam);
+ if (pair != null) { return pair.getSecond().build(); }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f88b7666/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
new file mode 100644
index 0000000..0f2b740
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stats;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * A drop-in statistics collector that does nothing. An instance of this class is used for tables
+ * or environments where statistics collection is disabled.
+ */
+public class NoOpStatisticsCollector implements StatisticsCollector {
+
+ @Override
+ public long getMaxTimeStamp() {
+ return NO_TIMESTAMP;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // No-op
+ }
+
+ @Override
+ public void updateStatistic(HRegion region) {
+ // No-op
+ }
+
+ @Override
+ public void collectStatistics(List<Cell> results) {
+ // No-op
+ }
+
+ @Override
+ public InternalScanner createCompactionScanner(RegionCoprocessorEnvironment env, Store store,
+ InternalScanner delegate) throws IOException {
+ return delegate;
+ }
+
+ @Override public void clear() {
+ // No-op
+ }
+
+ @Override public GuidePostsInfo getGuidePosts(ImmutableBytesPtr fam) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f88b7666/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
index 435fe87..f297b3b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
@@ -40,7 +40,7 @@ public interface PTableStats {
@Override
public long getTimestamp() {
- return StatisticsCollector.NO_TIMESTAMP;
+ return DefaultStatisticsCollector.NO_TIMESTAMP;
}
};
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f88b7666/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
index d2059a5..cdd4f89 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
@@ -17,209 +17,54 @@
*/
package org.apache.phoenix.schema.stats;
+import java.io.Closeable;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.schema.types.PInteger;
-import org.apache.phoenix.schema.types.PLong;
-import org.apache.phoenix.util.TimeKeeper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
/**
- * A default implementation of the Statistics tracker that helps to collect stats like min key, max key and guideposts.
- * TODO: review timestamps used for stats. We support the user controlling the timestamps, so we should honor that with
- * timestamps for stats as well. The issue is for compaction, though. I don't know of a way for the user to specify any
- * timestamp for that. Perhaps best to use current time across the board for now.
+ * Statistics tracker that helps to collect stats like min key, max key and guideposts.
*/
-public class StatisticsCollector {
- private static final Logger logger = LoggerFactory.getLogger(StatisticsCollector.class);
- public static final long NO_TIMESTAMP = -1;
-
- private long guidepostDepth;
- private long maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
- private Map<ImmutableBytesPtr, Pair<Long, GuidePostsInfoBuilder>> guidePostsInfoWriterMap = Maps.newHashMap();
- protected StatisticsWriter statsTable;
- private Pair<Long, GuidePostsInfoBuilder> cachedGps = null;
-
- public StatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp,
- byte[] gp_width_bytes, byte[] gp_per_region_bytes) throws IOException {
- this(env, tableName, clientTimeStamp, null, gp_width_bytes, gp_per_region_bytes);
- }
-
- public StatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp, byte[] family)
- throws IOException {
- this(env, tableName, clientTimeStamp, family, null, null);
- }
-
- private StatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp, byte[] family,
- byte[] gp_width_bytes, byte[] gp_per_region_bytes) throws IOException {
- Configuration config = env.getConfiguration();
- int guidepostPerRegion = gp_per_region_bytes == null
- ? config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
- QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION)
- : PInteger.INSTANCE.getCodec().decodeInt(gp_per_region_bytes, 0, SortOrder.getDefault());
- long guidepostWidth = gp_width_bytes == null
- ? config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
- QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES)
- : PLong.INSTANCE.getCodec().decodeInt(gp_width_bytes, 0, SortOrder.getDefault());
- this.guidepostDepth = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth,
- env.getRegion().getTableDesc());
- // Provides a means of clients controlling their timestamps to not use current time
- // when background tasks are updating stats. Instead we track the max timestamp of
- // the cells and use that.
- boolean useCurrentTime = env.getConfiguration().getBoolean(
- QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
- QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
- if (!useCurrentTime) {
- clientTimeStamp = StatisticsCollector.NO_TIMESTAMP;
- }
- // Get the stats table associated with the current table on which the CP is
- // triggered
- this.statsTable = StatisticsWriter.newWriter(env, tableName, clientTimeStamp);
- // in a compaction we know the one family ahead of time
- if (family != null) {
- ImmutableBytesPtr cfKey = new ImmutableBytesPtr(family);
- cachedGps = new Pair<Long, GuidePostsInfoBuilder>(0l, new GuidePostsInfoBuilder());
- guidePostsInfoWriterMap.put(cfKey, cachedGps);
- }
- }
+public interface StatisticsCollector extends Closeable {
- public long getMaxTimeStamp() {
- return maxTimeStamp;
- }
-
- public void close() throws IOException {
- this.statsTable.close();
- }
-
- public void updateStatistic(HRegion region) {
- try {
- ArrayList<Mutation> mutations = new ArrayList<Mutation>();
- writeStatsToStatsTable(region, true, mutations, TimeKeeper.SYSTEM.getCurrentTime());
- if (logger.isDebugEnabled()) {
- logger.debug("Committing new stats for the region " + region.getRegionInfo());
- }
- commitStats(mutations);
- } catch (IOException e) {
- logger.error("Unable to commit new stats", e);
- } finally {
- clear();
- }
- }
-
- private void writeStatsToStatsTable(final HRegion region, boolean delete, List<Mutation> mutations, long currentTime)
- throws IOException {
- try {
- // update the statistics table
- for (ImmutableBytesPtr fam : guidePostsInfoWriterMap.keySet()) {
- if (delete) {
- if (logger.isDebugEnabled()) {
- logger.debug("Deleting the stats for the region " + region.getRegionInfo());
- }
- statsTable.deleteStats(region, this, fam, mutations);
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Adding new stats for the region " + region.getRegionInfo());
- }
- statsTable.addStats(this, fam, mutations);
- }
- } catch (IOException e) {
- logger.error("Failed to update statistics table!", e);
- throw e;
- }
- }
-
- private void commitStats(List<Mutation> mutations) throws IOException {
- statsTable.commitStats(mutations);
- }
+ /** Constant used if no max timestamp is available */
+ long NO_TIMESTAMP = -1;
/**
- * Update the current statistics based on the latest batch of key-values from the underlying scanner
- *
- * @param results
- * next batch of {@link KeyValue}s
+ * Returns the maximum timestamp of all cells encountered while collecting statistics.
*/
- public void collectStatistics(final List<Cell> results) {
- Map<ImmutableBytesPtr, Boolean> famMap = Maps.newHashMap();
- for (Cell cell : results) {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- maxTimeStamp = Math.max(maxTimeStamp, kv.getTimestamp());
- Pair<Long, GuidePostsInfoBuilder> gps;
- if (cachedGps == null) {
- ImmutableBytesPtr cfKey = new ImmutableBytesPtr(kv.getFamilyArray(), kv.getFamilyOffset(),
- kv.getFamilyLength());
- gps = guidePostsInfoWriterMap.get(cfKey);
- if (gps == null) {
- gps = new Pair<Long, GuidePostsInfoBuilder>(0l,
- new GuidePostsInfoBuilder());
- guidePostsInfoWriterMap.put(cfKey, gps);
- }
- if (famMap.get(cfKey) == null) {
- famMap.put(cfKey, true);
- gps.getSecond().incrementRowCount();
- }
- } else {
- gps = cachedGps;
- cachedGps.getSecond().incrementRowCount();
- }
- int kvLength = kv.getLength();
- long byteCount = gps.getFirst() + kvLength;
- gps.setFirst(byteCount);
- if (byteCount >= guidepostDepth) {
- ImmutableBytesWritable row = new ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
- if (gps.getSecond().addGuidePosts(row, byteCount, gps.getSecond().getRowCount())) {
- gps.setFirst(0l);
- gps.getSecond().resetRowCount();
- }
- }
- }
- }
-
- public InternalScanner createCompactionScanner(RegionCoprocessorEnvironment env, Store store, InternalScanner s) throws IOException {
- // See if this is for Major compaction
- if (logger.isDebugEnabled()) {
- logger.debug("Compaction scanner created for stats");
- }
- ImmutableBytesPtr cfKey = new ImmutableBytesPtr(store.getFamily().getName());
- return getInternalScanner(env, store, s, cfKey);
- }
+ long getMaxTimeStamp();
+ /**
+ * Write the collected statistics for the given region.
+ */
+ void updateStatistic(HRegion region);
- protected InternalScanner getInternalScanner(RegionCoprocessorEnvironment env, Store store,
- InternalScanner internalScan, ImmutableBytesPtr family) {
- return new StatisticsScanner(this, statsTable, env, internalScan, family);
- }
+ /**
+ * Collect statistics for the given list of cells. This method can be called multiple times
+ * during collection of statistics.
+ */
+ void collectStatistics(List<Cell> results);
- public void clear() {
- this.guidePostsInfoWriterMap.clear();
- maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
- }
+ /**
+ * Wrap a compaction scanner with a scanner that will collect statistics using this instance.
+ */
+ InternalScanner createCompactionScanner(RegionCoprocessorEnvironment env, Store store,
+ InternalScanner delegate) throws IOException;
- public GuidePostsInfo getGuidePosts(ImmutableBytesPtr fam) {
- Pair<Long, GuidePostsInfoBuilder> pair = guidePostsInfoWriterMap.get(fam);
- if (pair != null) { return pair.getSecond().build(); }
- return null;
- }
+ /**
+ * Clear all statistics information that has been collected.
+ */
+ void clear();
+ /**
+ * Retrieve the calculated guide post info for the given column family.
+ */
+ GuidePostsInfo getGuidePosts(ImmutableBytesPtr fam);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f88b7666/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java
new file mode 100644
index 0000000..aaffd73
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stats;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.query.QueryServices;
+
+/**
+ * Provides new {@link StatisticsCollector} instances based on configuration settings for a
+ * table (or system-wide configuration of statistics).
+ */
+public class StatisticsCollectorFactory {
+
+ public static StatisticsCollector createStatisticsCollector(RegionCoprocessorEnvironment env,
+ String tableName, long clientTimestamp, byte[] guidepostWidthBytes,
+ byte[] guidepostsPerRegionBytes) throws IOException {
+ if (statisticsEnabled(env)) {
+ return new DefaultStatisticsCollector(env, tableName, clientTimestamp, null,
+ guidepostWidthBytes, guidepostsPerRegionBytes);
+ } else {
+ return new NoOpStatisticsCollector();
+ }
+ }
+
+ public static StatisticsCollector createStatisticsCollector(
+ RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp,
+ byte[] storeName) throws IOException {
+ if (statisticsEnabled(env)) {
+ return new DefaultStatisticsCollector(env, tableName, clientTimeStamp, storeName,
+ null, null);
+ } else {
+ return new NoOpStatisticsCollector();
+ }
+ }
+
+ /**
+ * Determines if statistics are enabled (which is the default). This is done on the
+ * RegionCoprocessorEnvironment for now to allow setting this on a per-table basis, although
+ * it could be moved to the general table metadata in the future if there is a realistic
+ * use case for that.
+ */
+ private static boolean statisticsEnabled(RegionCoprocessorEnvironment env) {
+ return env.getConfiguration().getBoolean(QueryServices.STATS_ENABLED_ATTRIB, true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f88b7666/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index 36c2744..4e6a18f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -38,7 +38,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
/**
- * The scanner that does the scanning to collect the stats during major compaction.{@link StatisticsCollector}
+ * The scanner that does the scanning to collect the stats during major compaction.{@link DefaultStatisticsCollector}
*/
public class StatisticsScanner implements InternalScanner {
private static final Log LOG = LogFactory.getLog(StatisticsScanner.class);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f88b7666/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
index 6eb2b68..63b90ce 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
@@ -79,7 +79,7 @@ public class StatisticsWriter implements Closeable {
HTableInterface statsReaderTable = ServerUtil.getHTableForCoprocessorScan(env, statsWriterTable);
StatisticsWriter statsTable = new StatisticsWriter(statsReaderTable, statsWriterTable, tableName,
clientTimeStamp);
- if (clientTimeStamp != StatisticsCollector.NO_TIMESTAMP) { // Otherwise we do this later as we don't know the ts
+ if (clientTimeStamp != DefaultStatisticsCollector.NO_TIMESTAMP) { // Otherwise we do this later as we don't know the ts
// yet
statsTable.commitLastStatsUpdatedTime();
}
@@ -131,7 +131,7 @@ public class StatisticsWriter implements Closeable {
public void addStats(StatisticsCollector tracker, ImmutableBytesPtr cfKey, List<Mutation> mutations)
throws IOException {
if (tracker == null) { return; }
- boolean useMaxTimeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP;
+ boolean useMaxTimeStamp = clientTimeStamp == DefaultStatisticsCollector.NO_TIMESTAMP;
long timeStamp = clientTimeStamp;
if (useMaxTimeStamp) { // When using max timestamp, we write the update time later because we only know the ts
// now
@@ -217,7 +217,7 @@ public class StatisticsWriter implements Closeable {
public void deleteStats(HRegion region, StatisticsCollector tracker, ImmutableBytesPtr fam, List<Mutation> mutations)
throws IOException {
- long timeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP ? tracker.getMaxTimeStamp()
+ long timeStamp = clientTimeStamp == DefaultStatisticsCollector.NO_TIMESTAMP ? tracker.getMaxTimeStamp()
: clientTimeStamp;
List<Result> statsForRegion = StatisticsUtil.readStatistics(statsWriterTable, tableName, fam,
region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey(), timeStamp);