You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/10/09 02:23:43 UTC
[3/3] git commit: Rename stat package to stats
Rename stat package to stats
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0af8e65a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0af8e65a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0af8e65a
Branch: refs/heads/4.0
Commit: 0af8e65a40e3a4e4c62ae2c2fb265b930dd73797
Parents: 90f7249
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Oct 8 17:29:01 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Oct 8 17:29:01 2014 -0700
----------------------------------------------------------------------
.../coprocessor/MetaDataEndpointImpl.java | 4 +-
.../UngroupedAggregateRegionObserver.java | 2 +-
.../apache/phoenix/jdbc/PhoenixStatement.java | 2 +-
.../apache/phoenix/parse/ParseNodeFactory.java | 2 +-
.../parse/UpdateStatisticsStatement.java | 8 +-
.../java/org/apache/phoenix/schema/PTable.java | 2 +-
.../org/apache/phoenix/schema/PTableImpl.java | 4 +-
.../apache/phoenix/schema/stat/PTableStats.java | 44 ---
.../phoenix/schema/stat/PTableStatsImpl.java | 44 ---
.../schema/stat/StatisticsCollectionScope.java | 28 --
.../schema/stat/StatisticsCollector.java | 343 -------------------
.../phoenix/schema/stat/StatisticsScanner.java | 124 -------
.../phoenix/schema/stat/StatisticsTable.java | 167 ---------
.../phoenix/schema/stat/StatisticsUtil.java | 117 -------
.../phoenix/schema/stats/PTableStats.java | 44 +++
.../phoenix/schema/stats/PTableStatsImpl.java | 44 +++
.../schema/stats/StatisticsCollectionScope.java | 28 ++
.../schema/stats/StatisticsCollector.java | 343 +++++++++++++++++++
.../phoenix/schema/stats/StatisticsScanner.java | 124 +++++++
.../phoenix/schema/stats/StatisticsUtil.java | 117 +++++++
.../phoenix/schema/stats/StatisticsWriter.java | 167 +++++++++
21 files changed, 879 insertions(+), 879 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 09ca2dd..36b986f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -129,8 +129,8 @@ import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableNotFoundException;
-import org.apache.phoenix.schema.stat.PTableStats;
-import org.apache.phoenix.schema.stat.StatisticsUtil;
+import org.apache.phoenix.schema.stats.PTableStats;
+import org.apache.phoenix.schema.stats.StatisticsUtil;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.ByteUtil;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index f7b5889..273ec0e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -80,7 +80,7 @@ import org.apache.phoenix.schema.PRow;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.schema.stat.StatisticsCollector;
+import org.apache.phoenix.schema.stats.StatisticsCollector;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.IndexUtil;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 76fff24..f6b7736 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -111,7 +111,7 @@ import org.apache.phoenix.schema.RowKeyValueAccessor;
import org.apache.phoenix.schema.Sequence;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.stat.StatisticsCollectionScope;
+import org.apache.phoenix.schema.stats.StatisticsCollectionScope;
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.trace.util.Tracing;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 51f3a36..85fd978 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -47,7 +47,7 @@ import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TypeMismatchException;
-import org.apache.phoenix.schema.stat.StatisticsCollectionScope;
+import org.apache.phoenix.schema.stats.StatisticsCollectionScope;
import org.apache.phoenix.util.SchemaUtil;
import com.google.common.collect.ListMultimap;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpdateStatisticsStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpdateStatisticsStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpdateStatisticsStatement.java
index db8b7b5..dff9f06 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpdateStatisticsStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpdateStatisticsStatement.java
@@ -17,11 +17,11 @@
*/
package org.apache.phoenix.parse;
-import static org.apache.phoenix.schema.stat.StatisticsCollectionScope.ALL;
-import static org.apache.phoenix.schema.stat.StatisticsCollectionScope.COLUMNS;
-import static org.apache.phoenix.schema.stat.StatisticsCollectionScope.INDEX;
+import static org.apache.phoenix.schema.stats.StatisticsCollectionScope.ALL;
+import static org.apache.phoenix.schema.stats.StatisticsCollectionScope.COLUMNS;
+import static org.apache.phoenix.schema.stats.StatisticsCollectionScope.INDEX;
-import org.apache.phoenix.schema.stat.StatisticsCollectionScope;
+import org.apache.phoenix.schema.stats.StatisticsCollectionScope;
import com.sun.istack.NotNull;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index a76bc2d..3ea08e1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.schema.stat.PTableStats;
+import org.apache.phoenix.schema.stats.PTableStats;
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index f3b9cc8..e8ca0c1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -47,8 +47,8 @@ import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder;
-import org.apache.phoenix.schema.stat.PTableStats;
-import org.apache.phoenix.schema.stat.PTableStatsImpl;
+import org.apache.phoenix.schema.stats.PTableStats;
+import org.apache.phoenix.schema.stats.PTableStatsImpl;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.SizedUtil;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStats.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStats.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStats.java
deleted file mode 100644
index ead1d43..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStats.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.schema.stat;
-
-import java.util.List;
-import java.util.SortedMap;
-
-import com.google.common.collect.ImmutableSortedMap;
-
-
-/*
- * The table is defined on the client side, but it is populated on the server side. The client should not populate any data to the
- * statistics object.
- */
-public interface PTableStats {
- public static final PTableStats EMPTY_STATS = new PTableStats() {
- @Override
- public SortedMap<byte[], List<byte[]>> getGuidePosts() {
- return ImmutableSortedMap.of();
- }
- };
-
- /**
- * TODO: Change from TreeMap to Map
- * Returns a tree map of the guide posts collected against a column family
- * @return
- */
- SortedMap<byte[], List<byte[]>> getGuidePosts();
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java
deleted file mode 100644
index 07ca31e..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.schema.stat;
-
-import java.util.List;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import org.apache.hadoop.hbase.util.Bytes;
-
- /**
- * Implementation for PTableStats.
- */
-public class PTableStatsImpl implements PTableStats {
- private final SortedMap<byte[], List<byte[]>> guidePosts;
-
- public PTableStatsImpl() {
- this(new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR));
- }
-
- public PTableStatsImpl(SortedMap<byte[], List<byte[]>> guidePosts) {
- this.guidePosts = guidePosts;
- }
-
- @Override
- public SortedMap<byte[], List<byte[]>> getGuidePosts() {
- return guidePosts;
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollectionScope.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollectionScope.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollectionScope.java
deleted file mode 100644
index 8a020d2..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollectionScope.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.schema.stat;
-
-public enum StatisticsCollectionScope {
- COLUMNS, INDEX, ALL;
-
- public static StatisticsCollectionScope getDefault() {
- return ALL;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
deleted file mode 100644
index 0724d6a..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.schema.stat;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.ScanType;
-import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreScanner;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.schema.PhoenixArray;
-import org.apache.phoenix.util.TimeKeeper;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * A default implementation of the Statistics tracker that helps to collect stats like min key, max key and
- * guideposts.
- * TODO: review timestamps used for stats. We support the user controlling the timestamps, so we should
- * honor that with timestamps for stats as well. The issue is for compaction, though. I don't know of
- * a way for the user to specify any timestamp for that. Perhaps best to use current time across the
- * board for now.
- */
-public class StatisticsCollector {
-
- private Map<String, byte[]> minMap = Maps.newHashMap();
- private Map<String, byte[]> maxMap = Maps.newHashMap();
- private long guidepostDepth;
- private Map<String, Pair<Integer,List<byte[]>>> guidePostsMap = Maps.newHashMap();
- private Map<ImmutableBytesPtr, Boolean> familyMap = Maps.newHashMap();
- protected StatisticsTable statsTable;
- // Ensures that either analyze or compaction happens at any point of time.
- private static final Log LOG = LogFactory.getLog(StatisticsCollector.class);
-
- public StatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp) throws IOException {
- guidepostDepth =
- env.getConfiguration().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
- QueryServicesOptions.DEFAULT_STATS_HISTOGRAM_DEPTH_BYTE);
- // Get the stats table associated with the current table on which the CP is
- // triggered
- HTableInterface statsHTable = env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES));
- this.statsTable = StatisticsTable.getStatisticsTable(statsHTable, tableName, clientTimeStamp);
- }
-
- public void close() throws IOException {
- this.statsTable.close();
- }
-
- public void updateStatistic(HRegion region) {
- try {
- ArrayList<Mutation> mutations = new ArrayList<Mutation>();
- writeStatsToStatsTable(region, true, mutations, TimeKeeper.SYSTEM.getCurrentTime());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Committing new stats for the region " + region.getRegionInfo());
- }
- commitStats(mutations);
- } catch (IOException e) {
- LOG.error(e);
- } finally {
- clear();
- }
- }
-
- private void writeStatsToStatsTable(final HRegion region,
- boolean delete, List<Mutation> mutations, long currentTime) throws IOException {
- try {
- // update the statistics table
- for (ImmutableBytesPtr fam : familyMap.keySet()) {
- if (delete) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Deleting the stats for the region "+region.getRegionInfo());
- }
- statsTable.deleteStats(region.getRegionInfo().getRegionNameAsString(), this, Bytes.toString(fam.copyBytesIfNecessary()),
- mutations);
- }
- if(LOG.isDebugEnabled()) {
- LOG.debug("Adding new stats for the region "+region.getRegionInfo());
- }
- statsTable.addStats((region.getRegionInfo().getRegionNameAsString()), this, Bytes.toString(fam.copyBytesIfNecessary()),
- mutations);
- }
- } catch (IOException e) {
- LOG.error("Failed to update statistics table!", e);
- throw e;
- }
- }
-
- private void commitStats(List<Mutation> mutations) throws IOException {
- statsTable.commitStats(mutations);
- }
-
- private void deleteStatsFromStatsTable(final HRegion region, List<Mutation> mutations, long currentTime) throws IOException {
- try {
- String regionName = region.getRegionInfo().getRegionNameAsString();
- // update the statistics table
- for (ImmutableBytesPtr fam : familyMap.keySet()) {
- statsTable.deleteStats(regionName, this, Bytes.toString(fam.copyBytesIfNecessary()),
- mutations);
- }
- } catch (IOException e) {
- LOG.error("Failed to delete from statistics table!", e);
- throw e;
- }
- }
-
- private int scanRegion(RegionScanner scanner, int count) throws IOException {
- List<Cell> results = new ArrayList<Cell>();
- boolean hasMore = true;
- while (hasMore) {
- hasMore = scanner.next(results);
- collectStatistics(results);
- count += results.size();
- results.clear();
- while (!hasMore) {
- break;
- }
- }
- return count;
- }
-
- /**
- * Update the current statistics based on the latest batch of key-values from the underlying scanner
- *
- * @param results
- * next batch of {@link KeyValue}s
- */
- public void collectStatistics(final List<Cell> results) {
- for (Cell c : results) {
- KeyValue kv = KeyValueUtil.ensureKeyValue(c);
- updateStatistic(kv);
- }
- }
-
- public InternalScanner createCompactionScanner(HRegion region, Store store,
- List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s) throws IOException {
- // See if this is for Major compaction
- InternalScanner internalScan = s;
- if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
- // this is the first CP accessed, so we need to just create a major
- // compaction scanner, just
- // like in the compactor
- if (s == null) {
- Scan scan = new Scan();
- scan.setMaxVersions(store.getFamily().getMaxVersions());
- long smallestReadPoint = store.getSmallestReadPoint();
- internalScan = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType,
- smallestReadPoint, earliestPutTs);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Compaction scanner created for stats");
- }
- InternalScanner scanner = getInternalScanner(region, store, internalScan, store.getColumnFamilyName());
- if (scanner != null) {
- internalScan = scanner;
- }
- }
- return internalScan;
- }
-
- public void collectStatsDuringSplit(Configuration conf, HRegion l, HRegion r,
- HRegion region) {
- try {
- // Create a delete operation on the parent region
- // Then write the new guide posts for individual regions
- List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3);
- long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
- deleteStatsFromStatsTable(region, mutations, currentTime);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Collecting stats for the daughter region " + l.getRegionInfo());
- }
- collectStatsForSplitRegions(conf, l, mutations, currentTime);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Collecting stats for the daughter region " + r.getRegionInfo());
- }
- collectStatsForSplitRegions(conf, r, mutations, currentTime);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo());
- }
- commitStats(mutations);
- } catch (IOException e) {
- LOG.error("Error while capturing stats after split of region "
- + region.getRegionInfo().getRegionNameAsString(), e);
- }
- }
-
- private void collectStatsForSplitRegions(Configuration conf, HRegion daughter,
- List<Mutation> mutations, long currentTime) throws IOException {
- IOException toThrow = null;
- clear();
- Scan scan = createScan(conf);
- RegionScanner scanner = null;
- int count = 0;
- try {
- scanner = daughter.getScanner(scan);
- count = scanRegion(scanner, count);
- writeStatsToStatsTable(daughter, false, mutations, currentTime);
- } catch (IOException e) {
- LOG.error(e);
- toThrow = e;
- } finally {
- try {
- if (scanner != null) scanner.close();
- } catch (IOException e) {
- LOG.error(e);
- if (toThrow != null) toThrow = e;
- } finally {
- if (toThrow != null) throw toThrow;
- }
- }
- }
-
- private Scan createScan(Configuration conf) {
- Scan scan = new Scan();
- scan.setCaching(
- conf.getInt(QueryServices.SCAN_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_SCAN_CACHE_SIZE));
- // do not cache the blocks here
- scan.setCacheBlocks(false);
- return scan;
- }
-
- protected InternalScanner getInternalScanner(HRegion region, Store store,
- InternalScanner internalScan, String family) {
- return new StatisticsScanner(this, statsTable, region, internalScan,
- Bytes.toBytes(family));
- }
-
- public void clear() {
- this.maxMap.clear();
- this.minMap.clear();
- this.guidePostsMap.clear();
- this.familyMap.clear();
- }
-
- public void updateStatistic(KeyValue kv) {
- @SuppressWarnings("deprecation")
- byte[] cf = kv.getFamily();
- familyMap.put(new ImmutableBytesPtr(cf), true);
-
- String fam = Bytes.toString(cf);
- byte[] row = new ImmutableBytesPtr(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())
- .copyBytesIfNecessary();
- if (!minMap.containsKey(fam) && !maxMap.containsKey(fam)) {
- minMap.put(fam, row);
- // Ideally the max key also should be added in this case
- maxMap.put(fam, row);
- } else {
- if (Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), minMap.get(fam), 0,
- minMap.get(fam).length) < 0) {
- minMap.put(fam, row);
- }
- if (Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), maxMap.get(fam), 0,
- maxMap.get(fam).length) > 0) {
- maxMap.put(fam, row);
- }
- }
- // TODO : This can be moved to an interface so that we could collect guide posts in different ways
- Pair<Integer,List<byte[]>> gps = guidePostsMap.get(fam);
- if (gps == null) {
- gps = new Pair<Integer,List<byte[]>>(0, Lists.<byte[]>newArrayList());
- guidePostsMap.put(fam, gps);
- }
- int byteCount = gps.getFirst() + kv.getLength();
- gps.setFirst(byteCount);
- if (byteCount >= guidepostDepth) {
- // Prevent dups
- List<byte[]> gpsKeys = gps.getSecond();
- if (gpsKeys.isEmpty() || Bytes.compareTo(row, gpsKeys.get(gpsKeys.size()-1)) > 0) {
- gpsKeys.add(row);
- gps.setFirst(0); // Only reset count when adding guidepost
- }
- }
- }
-
- public byte[] getMaxKey(String fam) {
- if (maxMap.get(fam) != null) { return maxMap.get(fam); }
- return null;
- }
-
- public byte[] getMinKey(String fam) {
- if (minMap.get(fam) != null) { return minMap.get(fam); }
- return null;
- }
-
- public byte[] getGuidePosts(String fam) {
- if (!guidePostsMap.isEmpty()) {
- Pair<Integer,List<byte[]>> gps = guidePostsMap.get(fam);
- if (gps != null) {
- List<byte[]> guidePosts = gps.getSecond();
- if (!guidePosts.isEmpty()) {
- byte[][] array = new byte[guidePosts.size()][];
- int i = 0;
- for (byte[] element : guidePosts) {
- array[i] = element;
- i++;
- }
- PhoenixArray phoenixArray = new PhoenixArray(PDataType.VARBINARY, array);
- return PDataType.VARBINARY_ARRAY.toBytes(phoenixArray);
- }
- }
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
deleted file mode 100644
index ce3d47b..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the License.
- */
-package org.apache.phoenix.schema.stat;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.util.TimeKeeper;
-
-/**
- * The scanner that does the scanning to collect the stats during major compaction.{@link StatisticsCollector}
- */
-public class StatisticsScanner implements InternalScanner {
- private static final Log LOG = LogFactory.getLog(StatisticsScanner.class);
- private InternalScanner delegate;
- private StatisticsTable stats;
- private HRegion region;
- private StatisticsCollector tracker;
- private byte[] family;
-
- public StatisticsScanner(StatisticsCollector tracker, StatisticsTable stats, HRegion region,
- InternalScanner delegate, byte[] family) {
- // should there be only one tracker?
- this.tracker = tracker;
- this.stats = stats;
- this.delegate = delegate;
- this.region = region;
- this.family = family;
- this.tracker.clear();
- }
-
- @Override
- public boolean next(List<Cell> result) throws IOException {
- boolean ret = delegate.next(result);
- updateStat(result);
- return ret;
- }
-
- @Override
- public boolean next(List<Cell> result, int limit) throws IOException {
- boolean ret = delegate.next(result, limit);
- updateStat(result);
- return ret;
- }
-
- /**
- * Update the current statistics based on the lastest batch of key-values from the underlying scanner
- *
- * @param results
- * next batch of {@link KeyValue}s
- */
- protected void updateStat(final List<Cell> results) {
- for (Cell c : results) {
- KeyValue kv = KeyValueUtil.ensureKeyValue(c);
- if (c.getTypeByte() == KeyValue.Type.Put.getCode()) {
- tracker.updateStatistic(kv);
- }
- }
- }
-
- @Override
- public void close() throws IOException {
- IOException toThrow = null;
- try {
- // update the statistics table
- // Just verify if this if fine
- ArrayList<Mutation> mutations = new ArrayList<Mutation>();
- long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Deleting the stats for the region " + region.getRegionNameAsString()
- + " as part of major compaction");
- }
- stats.deleteStats(region.getRegionNameAsString(), this.tracker, Bytes.toString(family), mutations);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding new stats for the region " + region.getRegionNameAsString()
- + " as part of major compaction");
- }
- stats.addStats(region.getRegionNameAsString(), this.tracker, Bytes.toString(family), mutations);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Committing new stats for the region " + region.getRegionNameAsString()
- + " as part of major compaction");
- }
- stats.commitStats(mutations);
- } catch (IOException e) {
- LOG.error("Failed to update statistics table!", e);
- toThrow = e;
- } finally {
- try {
- stats.close();
- } catch (IOException e) {
- if (toThrow == null) toThrow = e;
- LOG.error("Error while closing the stats table", e);
- } finally {
- // close the delegate scanner
- try {
- delegate.close();
- } catch (IOException e) {
- if (toThrow == null) toThrow = e;
- LOG.error("Error while closing the scanner", e);
- } finally {
- if (toThrow != null) {
- throw toThrow;
- }
- }
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
deleted file mode 100644
index 3c0f376..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.schema.stat;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.sql.Date;
-import java.util.List;
-
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
-import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
-import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.TimeKeeper;
-
-import com.google.protobuf.ServiceException;
-
-/**
- * Wrapper to access the statistics table SYSTEM.STATS using the HTable.
- */
-public class StatisticsTable implements Closeable {
- /**
- * @param tableName TODO
- * @param clientTimeStamp TODO
- * @param Configuration
- * Configruation to update the stats table.
- * @param primaryTableName
- * name of the primary table on which we should collect stats
- * @return the {@link StatisticsTable} for the given primary table.
- * @throws IOException
- * if the table cannot be created due to an underlying HTable creation error
- */
- public static StatisticsTable getStatisticsTable(HTableInterface hTable, String tableName, long clientTimeStamp) throws IOException {
- if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
- clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
- }
- StatisticsTable statsTable = new StatisticsTable(hTable, tableName, clientTimeStamp);
- statsTable.commitLastStatsUpdatedTime();
- return statsTable;
- }
-
- private final HTableInterface statisticsTable;
- private final byte[] tableName;
- private final long clientTimeStamp;
-
- private StatisticsTable(HTableInterface statsTable, String tableName, long clientTimeStamp) {
- this.statisticsTable = statsTable;
- this.tableName = PDataType.VARCHAR.toBytes(tableName);
- this.clientTimeStamp = clientTimeStamp;
- }
-
- /**
- * Close the connection to the table
- */
- @Override
- public void close() throws IOException {
- statisticsTable.close();
- }
-
- /**
- * Update a list of statistics for a given region. If the ANALYZE <tablename> query is issued
- * then we use Upsert queries to update the table
- * If the region gets splitted or the major compaction happens we update using HTable.put()
- * @param tracker - the statistics tracker
- * @param fam - the family for which the stats is getting collected.
- * @param mutations - list of mutations that collects all the mutations to commit in a batch
- * @param tablekey - The table name
- * @param schemaName - the schema name associated with the table
- * @param region name - the region of the table for which the stats are collected
- * @param split - if the updation is caused due to a split
- * @throws IOException
- * if we fail to do any of the puts. Any single failure will prevent any future attempts for the remaining list of stats to
- * update
- */
- public void addStats(String regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException {
- if (tracker == null) { return; }
-
- byte[] prefix = StatisticsUtil.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam),
- PDataType.VARCHAR.toBytes(regionName));
- Put put = new Put(prefix);
- if (tracker.getGuidePosts(fam) != null) {
- put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES,
- clientTimeStamp, (tracker.getGuidePosts(fam)));
- }
- put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MIN_KEY_BYTES,
- clientTimeStamp, PDataType.VARBINARY.toBytes(tracker.getMinKey(fam)));
- put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_KEY_BYTES,
- clientTimeStamp, PDataType.VARBINARY.toBytes(tracker.getMaxKey(fam)));
- // Add our empty column value so queries behave correctly
- put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES,
- clientTimeStamp, ByteUtil.EMPTY_BYTE_ARRAY);
- mutations.add(put);
- }
-
- private static MutationType getMutationType(Mutation m) throws IOException {
- if (m instanceof Put) {
- return MutationType.PUT;
- } else if (m instanceof Delete) {
- return MutationType.DELETE;
- } else {
- throw new DoNotRetryIOException("Unsupported mutation type in stats commit"
- + m.getClass().getName());
- }
- }
- public void commitStats(List<Mutation> mutations) throws IOException {
- if (mutations.size() > 0) {
- byte[] row = mutations.get(0).getRow();
- MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
- for (Mutation m : mutations) {
- mrmBuilder.addMutationRequest(ProtobufUtil.toMutation(getMutationType(m), m));
- }
- MutateRowsRequest mrm = mrmBuilder.build();
- CoprocessorRpcChannel channel = statisticsTable.coprocessorService(row);
- MultiRowMutationService.BlockingInterface service =
- MultiRowMutationService.newBlockingStub(channel);
- try {
- service.mutateRows(null, mrm);
- } catch (ServiceException ex) {
- ProtobufUtil.toIOException(ex);
- }
- }
- }
-
- private void commitLastStatsUpdatedTime() throws IOException {
- // Always use wallclock time for this, as it's a mechanism to prevent
- // stats from being collected too often.
- long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
- byte[] prefix = tableName;
- Put put = new Put(prefix);
- put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME_BYTES, clientTimeStamp,
- PDataType.DATE.toBytes(new Date(currentTime)));
- statisticsTable.put(put);
- }
-
- public void deleteStats(String regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations)
- throws IOException {
- byte[] prefix = StatisticsUtil.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam),
- PDataType.VARCHAR.toBytes(regionName));
- mutations.add(new Delete(prefix, clientTimeStamp - 1));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtil.java
deleted file mode 100644
index 3749636..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtil.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.schema.stat;
-import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.TreeMap;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.coprocessor.MetaDataProtocol;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.schema.PhoenixArray;
-import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.MetaDataUtil;
-
-import com.google.common.collect.Lists;
-/**
- * Simple utility class for managing multiple key parts of the statistic
- */
-public class StatisticsUtil {
- private StatisticsUtil() {
- // private ctor for utility classes
- }
-
- /** Number of parts in our complex key */
- protected static final int NUM_KEY_PARTS = 3;
-
- public static byte[] getRowKey(byte[] table, byte[] fam, byte[] region) {
- // always starts with the source table
- byte[] rowKey = new byte[table.length + fam.length + region.length + 2];
- int offset = 0;
- System.arraycopy(table, 0, rowKey, offset, table.length);
- offset += table.length;
- rowKey[offset++] = QueryConstants.SEPARATOR_BYTE;
- System.arraycopy(fam, 0, rowKey, offset, fam.length);
- offset += fam.length;
- rowKey[offset++] = QueryConstants.SEPARATOR_BYTE;
- System.arraycopy(region, 0, rowKey, offset, region.length);
- return rowKey;
- }
-
- public static byte[] copyRow(KeyValue kv) {
- return Arrays.copyOfRange(kv.getRowArray(), kv.getRowOffset(), kv.getRowOffset() + kv.getRowLength());
- }
-
- public static PTableStats readStatistics(HTableInterface statsHTable, byte[] tableNameBytes, long clientTimeStamp) throws IOException {
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- Scan s = MetaDataUtil.newTableRowsScan(tableNameBytes, MetaDataProtocol.MIN_TABLE_TIMESTAMP, clientTimeStamp);
- s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES);
- ResultScanner scanner = statsHTable.getScanner(s);
- Result result = null;
- TreeMap<byte[], List<byte[]>> guidePostsPerCf = new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR);
- while ((result = scanner.next()) != null) {
- CellScanner cellScanner = result.cellScanner();
- while (cellScanner.advance()) {
- Cell current = cellScanner.current();
- int tableNameLength = tableNameBytes.length + 1;
- int cfOffset = current.getRowOffset() + tableNameLength;
- int cfLength = getVarCharLength(current.getRowArray(), cfOffset, current.getRowLength() - tableNameLength);
- ptr.set(current.getRowArray(), cfOffset, cfLength);
- byte[] cfName = ByteUtil.copyKeyBytesIfNecessary(ptr);
- PhoenixArray array = (PhoenixArray)PDataType.VARBINARY_ARRAY.toObject(current.getValueArray(), current.getValueOffset(), current
- .getValueLength());
- if (array != null && array.getDimensions() != 0) {
- List<byte[]> guidePosts = Lists.newArrayListWithExpectedSize(array.getDimensions());
- for (int j = 0; j < array.getDimensions(); j++) {
- byte[] gp = array.toBytes(j);
- if (gp.length != 0) {
- guidePosts.add(gp);
- }
- }
- List<byte[]> gps = guidePostsPerCf.put(cfName, guidePosts);
- if (gps != null) { // Add guidepost already there from other regions
- guidePosts.addAll(gps);
- }
- }
- }
- }
- if (!guidePostsPerCf.isEmpty()) {
- // Sort guideposts, as the order above will depend on the order we traverse
- // each region's worth of guideposts above.
- for (List<byte[]> gps : guidePostsPerCf.values()) {
- Collections.sort(gps, Bytes.BYTES_COMPARATOR);
- }
- return new PTableStatsImpl(guidePostsPerCf);
- }
- return PTableStats.EMPTY_STATS;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
new file mode 100644
index 0000000..40309e4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stats;
+
+import java.util.List;
+import java.util.SortedMap;
+
+import com.google.common.collect.ImmutableSortedMap;
+
+
+/*
+ * The table is defined on the client side, but it is populated on the server side. The client should not populate any data to the
+ * statistics object.
+ */
+public interface PTableStats {
+ public static final PTableStats EMPTY_STATS = new PTableStats() {
+ @Override
+ public SortedMap<byte[], List<byte[]>> getGuidePosts() {
+ return ImmutableSortedMap.of();
+ }
+ };
+
+ /**
+ * TODO: Change from TreeMap to Map
+ * Returns a tree map of the guide posts collected against a column family
+ * @return
+ */
+ SortedMap<byte[], List<byte[]>> getGuidePosts();
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java
new file mode 100644
index 0000000..b527ce1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stats;
+
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+ /**
+ * Implementation for PTableStats.
+ */
+public class PTableStatsImpl implements PTableStats {
+ private final SortedMap<byte[], List<byte[]>> guidePosts;
+
+ public PTableStatsImpl() {
+ this(new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR));
+ }
+
+ public PTableStatsImpl(SortedMap<byte[], List<byte[]>> guidePosts) {
+ this.guidePosts = guidePosts;
+ }
+
+ @Override
+ public SortedMap<byte[], List<byte[]>> getGuidePosts() {
+ return guidePosts;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionScope.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionScope.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionScope.java
new file mode 100644
index 0000000..86e453c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionScope.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stats;
+
+public enum StatisticsCollectionScope {
+ COLUMNS, INDEX, ALL;
+
+ public static StatisticsCollectionScope getDefault() {
+ return ALL;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
new file mode 100644
index 0000000..09d5917
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stats;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PhoenixArray;
+import org.apache.phoenix.util.TimeKeeper;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * A default implementation of the Statistics tracker that helps to collect stats like min key, max key and
+ * guideposts.
+ * TODO: review timestamps used for stats. We support the user controlling the timestamps, so we should
+ * honor that with timestamps for stats as well. The issue is for compaction, though. I don't know of
+ * a way for the user to specify any timestamp for that. Perhaps best to use current time across the
+ * board for now.
+ */
+public class StatisticsCollector {
+
+ private Map<String, byte[]> minMap = Maps.newHashMap();
+ private Map<String, byte[]> maxMap = Maps.newHashMap();
+ private long guidepostDepth;
+ private Map<String, Pair<Integer,List<byte[]>>> guidePostsMap = Maps.newHashMap();
+ private Map<ImmutableBytesPtr, Boolean> familyMap = Maps.newHashMap();
+ protected StatisticsWriter statsTable;
+ // Ensures that either analyze or compaction happens at any point of time.
+ private static final Log LOG = LogFactory.getLog(StatisticsCollector.class);
+
+ public StatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp) throws IOException {
+ guidepostDepth =
+ env.getConfiguration().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_HISTOGRAM_DEPTH_BYTE);
+ // Get the stats table associated with the current table on which the CP is
+ // triggered
+ HTableInterface statsHTable = env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES));
+ this.statsTable = StatisticsWriter.getStatisticsTable(statsHTable, tableName, clientTimeStamp);
+ }
+
+ public void close() throws IOException {
+ this.statsTable.close();
+ }
+
+ public void updateStatistic(HRegion region) {
+ try {
+ ArrayList<Mutation> mutations = new ArrayList<Mutation>();
+ writeStatsToStatsTable(region, true, mutations, TimeKeeper.SYSTEM.getCurrentTime());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Committing new stats for the region " + region.getRegionInfo());
+ }
+ commitStats(mutations);
+ } catch (IOException e) {
+ LOG.error(e);
+ } finally {
+ clear();
+ }
+ }
+
+ private void writeStatsToStatsTable(final HRegion region,
+ boolean delete, List<Mutation> mutations, long currentTime) throws IOException {
+ try {
+ // update the statistics table
+ for (ImmutableBytesPtr fam : familyMap.keySet()) {
+ if (delete) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Deleting the stats for the region "+region.getRegionInfo());
+ }
+ statsTable.deleteStats(region.getRegionInfo().getRegionNameAsString(), this, Bytes.toString(fam.copyBytesIfNecessary()),
+ mutations);
+ }
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Adding new stats for the region "+region.getRegionInfo());
+ }
+ statsTable.addStats((region.getRegionInfo().getRegionNameAsString()), this, Bytes.toString(fam.copyBytesIfNecessary()),
+ mutations);
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to update statistics table!", e);
+ throw e;
+ }
+ }
+
+ private void commitStats(List<Mutation> mutations) throws IOException {
+ statsTable.commitStats(mutations);
+ }
+
+ private void deleteStatsFromStatsTable(final HRegion region, List<Mutation> mutations, long currentTime) throws IOException {
+ try {
+ String regionName = region.getRegionInfo().getRegionNameAsString();
+ // update the statistics table
+ for (ImmutableBytesPtr fam : familyMap.keySet()) {
+ statsTable.deleteStats(regionName, this, Bytes.toString(fam.copyBytesIfNecessary()),
+ mutations);
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to delete from statistics table!", e);
+ throw e;
+ }
+ }
+
+ private int scanRegion(RegionScanner scanner, int count) throws IOException {
+ List<Cell> results = new ArrayList<Cell>();
+ boolean hasMore = true;
+ while (hasMore) {
+ hasMore = scanner.next(results);
+ collectStatistics(results);
+ count += results.size();
+ results.clear();
+ while (!hasMore) {
+ break;
+ }
+ }
+ return count;
+ }
+
+ /**
+ * Update the current statistics based on the latest batch of key-values from the underlying scanner
+ *
+ * @param results
+ * next batch of {@link KeyValue}s
+ */
+ public void collectStatistics(final List<Cell> results) {
+ for (Cell c : results) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(c);
+ updateStatistic(kv);
+ }
+ }
+
+ public InternalScanner createCompactionScanner(HRegion region, Store store,
+ List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s) throws IOException {
+ // See if this is for Major compaction
+ InternalScanner internalScan = s;
+ if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
+ // this is the first CP accessed, so we need to just create a major
+ // compaction scanner, just
+ // like in the compactor
+ if (s == null) {
+ Scan scan = new Scan();
+ scan.setMaxVersions(store.getFamily().getMaxVersions());
+ long smallestReadPoint = store.getSmallestReadPoint();
+ internalScan = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType,
+ smallestReadPoint, earliestPutTs);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Compaction scanner created for stats");
+ }
+ InternalScanner scanner = getInternalScanner(region, store, internalScan, store.getColumnFamilyName());
+ if (scanner != null) {
+ internalScan = scanner;
+ }
+ }
+ return internalScan;
+ }
+
+ public void collectStatsDuringSplit(Configuration conf, HRegion l, HRegion r,
+ HRegion region) {
+ try {
+ // Create a delete operation on the parent region
+ // Then write the new guide posts for individual regions
+ List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3);
+ long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
+ deleteStatsFromStatsTable(region, mutations, currentTime);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Collecting stats for the daughter region " + l.getRegionInfo());
+ }
+ collectStatsForSplitRegions(conf, l, mutations, currentTime);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Collecting stats for the daughter region " + r.getRegionInfo());
+ }
+ collectStatsForSplitRegions(conf, r, mutations, currentTime);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo());
+ }
+ commitStats(mutations);
+ } catch (IOException e) {
+ LOG.error("Error while capturing stats after split of region "
+ + region.getRegionInfo().getRegionNameAsString(), e);
+ }
+ }
+
+ private void collectStatsForSplitRegions(Configuration conf, HRegion daughter,
+ List<Mutation> mutations, long currentTime) throws IOException {
+ IOException toThrow = null;
+ clear();
+ Scan scan = createScan(conf);
+ RegionScanner scanner = null;
+ int count = 0;
+ try {
+ scanner = daughter.getScanner(scan);
+ count = scanRegion(scanner, count);
+ writeStatsToStatsTable(daughter, false, mutations, currentTime);
+ } catch (IOException e) {
+ LOG.error(e);
+ toThrow = e;
+ } finally {
+ try {
+ if (scanner != null) scanner.close();
+ } catch (IOException e) {
+ LOG.error(e);
+ if (toThrow != null) toThrow = e;
+ } finally {
+ if (toThrow != null) throw toThrow;
+ }
+ }
+ }
+
+ private Scan createScan(Configuration conf) {
+ Scan scan = new Scan();
+ scan.setCaching(
+ conf.getInt(QueryServices.SCAN_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_SCAN_CACHE_SIZE));
+ // do not cache the blocks here
+ scan.setCacheBlocks(false);
+ return scan;
+ }
+
+ protected InternalScanner getInternalScanner(HRegion region, Store store,
+ InternalScanner internalScan, String family) {
+ return new StatisticsScanner(this, statsTable, region, internalScan,
+ Bytes.toBytes(family));
+ }
+
+ public void clear() {
+ this.maxMap.clear();
+ this.minMap.clear();
+ this.guidePostsMap.clear();
+ this.familyMap.clear();
+ }
+
+ public void updateStatistic(KeyValue kv) {
+ @SuppressWarnings("deprecation")
+ byte[] cf = kv.getFamily();
+ familyMap.put(new ImmutableBytesPtr(cf), true);
+
+ String fam = Bytes.toString(cf);
+ byte[] row = new ImmutableBytesPtr(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())
+ .copyBytesIfNecessary();
+ if (!minMap.containsKey(fam) && !maxMap.containsKey(fam)) {
+ minMap.put(fam, row);
+ // Ideally the max key also should be added in this case
+ maxMap.put(fam, row);
+ } else {
+ if (Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), minMap.get(fam), 0,
+ minMap.get(fam).length) < 0) {
+ minMap.put(fam, row);
+ }
+ if (Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), maxMap.get(fam), 0,
+ maxMap.get(fam).length) > 0) {
+ maxMap.put(fam, row);
+ }
+ }
+ // TODO : This can be moved to an interface so that we could collect guide posts in different ways
+ Pair<Integer,List<byte[]>> gps = guidePostsMap.get(fam);
+ if (gps == null) {
+ gps = new Pair<Integer,List<byte[]>>(0, Lists.<byte[]>newArrayList());
+ guidePostsMap.put(fam, gps);
+ }
+ int byteCount = gps.getFirst() + kv.getLength();
+ gps.setFirst(byteCount);
+ if (byteCount >= guidepostDepth) {
+ // Prevent dups
+ List<byte[]> gpsKeys = gps.getSecond();
+ if (gpsKeys.isEmpty() || Bytes.compareTo(row, gpsKeys.get(gpsKeys.size()-1)) > 0) {
+ gpsKeys.add(row);
+ gps.setFirst(0); // Only reset count when adding guidepost
+ }
+ }
+ }
+
+ public byte[] getMaxKey(String fam) {
+ if (maxMap.get(fam) != null) { return maxMap.get(fam); }
+ return null;
+ }
+
+ public byte[] getMinKey(String fam) {
+ if (minMap.get(fam) != null) { return minMap.get(fam); }
+ return null;
+ }
+
+ public byte[] getGuidePosts(String fam) {
+ if (!guidePostsMap.isEmpty()) {
+ Pair<Integer,List<byte[]>> gps = guidePostsMap.get(fam);
+ if (gps != null) {
+ List<byte[]> guidePosts = gps.getSecond();
+ if (!guidePosts.isEmpty()) {
+ byte[][] array = new byte[guidePosts.size()][];
+ int i = 0;
+ for (byte[] element : guidePosts) {
+ array[i] = element;
+ i++;
+ }
+ PhoenixArray phoenixArray = new PhoenixArray(PDataType.VARBINARY, array);
+ return PDataType.VARBINARY_ARRAY.toBytes(phoenixArray);
+ }
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
new file mode 100644
index 0000000..598f0d2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.schema.stats;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.util.TimeKeeper;
+
+/**
+ * The scanner that does the scanning to collect the stats during major compaction.{@link StatisticsCollector}
+ */
+public class StatisticsScanner implements InternalScanner {
+ private static final Log LOG = LogFactory.getLog(StatisticsScanner.class);
+ private InternalScanner delegate;
+ private StatisticsWriter stats;
+ private HRegion region;
+ private StatisticsCollector tracker;
+ private byte[] family;
+
+ public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter stats, HRegion region,
+ InternalScanner delegate, byte[] family) {
+ // should there be only one tracker?
+ this.tracker = tracker;
+ this.stats = stats;
+ this.delegate = delegate;
+ this.region = region;
+ this.family = family;
+ this.tracker.clear();
+ }
+
+ @Override
+ public boolean next(List<Cell> result) throws IOException {
+ boolean ret = delegate.next(result);
+ updateStat(result);
+ return ret;
+ }
+
+ @Override
+ public boolean next(List<Cell> result, int limit) throws IOException {
+ boolean ret = delegate.next(result, limit);
+ updateStat(result);
+ return ret;
+ }
+
+ /**
+ * Update the current statistics based on the lastest batch of key-values from the underlying scanner
+ *
+ * @param results
+ * next batch of {@link KeyValue}s
+ */
+ protected void updateStat(final List<Cell> results) {
+ for (Cell c : results) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(c);
+ if (c.getTypeByte() == KeyValue.Type.Put.getCode()) {
+ tracker.updateStatistic(kv);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOException toThrow = null;
+ try {
+ // update the statistics table
+ // Just verify if this if fine
+ ArrayList<Mutation> mutations = new ArrayList<Mutation>();
+ long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Deleting the stats for the region " + region.getRegionNameAsString()
+ + " as part of major compaction");
+ }
+ stats.deleteStats(region.getRegionNameAsString(), this.tracker, Bytes.toString(family), mutations);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding new stats for the region " + region.getRegionNameAsString()
+ + " as part of major compaction");
+ }
+ stats.addStats(region.getRegionNameAsString(), this.tracker, Bytes.toString(family), mutations);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Committing new stats for the region " + region.getRegionNameAsString()
+ + " as part of major compaction");
+ }
+ stats.commitStats(mutations);
+ } catch (IOException e) {
+ LOG.error("Failed to update statistics table!", e);
+ toThrow = e;
+ } finally {
+ try {
+ stats.close();
+ } catch (IOException e) {
+ if (toThrow == null) toThrow = e;
+ LOG.error("Error while closing the stats table", e);
+ } finally {
+ // close the delegate scanner
+ try {
+ delegate.close();
+ } catch (IOException e) {
+ if (toThrow == null) toThrow = e;
+ LOG.error("Error while closing the scanner", e);
+ } finally {
+ if (toThrow != null) {
+ throw toThrow;
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0af8e65a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
new file mode 100644
index 0000000..5ef757c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stats;
+import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PhoenixArray;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+
+import com.google.common.collect.Lists;
+/**
+ * Simple utility class for managing multiple key parts of the statistic
+ */
+public class StatisticsUtil {
+ private StatisticsUtil() {
+ // private ctor for utility classes
+ }
+
+ /** Number of parts in our complex key */
+ protected static final int NUM_KEY_PARTS = 3;
+
+ public static byte[] getRowKey(byte[] table, byte[] fam, byte[] region) {
+ // always starts with the source table
+ byte[] rowKey = new byte[table.length + fam.length + region.length + 2];
+ int offset = 0;
+ System.arraycopy(table, 0, rowKey, offset, table.length);
+ offset += table.length;
+ rowKey[offset++] = QueryConstants.SEPARATOR_BYTE;
+ System.arraycopy(fam, 0, rowKey, offset, fam.length);
+ offset += fam.length;
+ rowKey[offset++] = QueryConstants.SEPARATOR_BYTE;
+ System.arraycopy(region, 0, rowKey, offset, region.length);
+ return rowKey;
+ }
+
+ public static byte[] copyRow(KeyValue kv) {
+ return Arrays.copyOfRange(kv.getRowArray(), kv.getRowOffset(), kv.getRowOffset() + kv.getRowLength());
+ }
+
+ public static PTableStats readStatistics(HTableInterface statsHTable, byte[] tableNameBytes, long clientTimeStamp) throws IOException {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ Scan s = MetaDataUtil.newTableRowsScan(tableNameBytes, MetaDataProtocol.MIN_TABLE_TIMESTAMP, clientTimeStamp);
+ s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES);
+ ResultScanner scanner = statsHTable.getScanner(s);
+ Result result = null;
+ TreeMap<byte[], List<byte[]>> guidePostsPerCf = new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR);
+ while ((result = scanner.next()) != null) {
+ CellScanner cellScanner = result.cellScanner();
+ while (cellScanner.advance()) {
+ Cell current = cellScanner.current();
+ int tableNameLength = tableNameBytes.length + 1;
+ int cfOffset = current.getRowOffset() + tableNameLength;
+ int cfLength = getVarCharLength(current.getRowArray(), cfOffset, current.getRowLength() - tableNameLength);
+ ptr.set(current.getRowArray(), cfOffset, cfLength);
+ byte[] cfName = ByteUtil.copyKeyBytesIfNecessary(ptr);
+ PhoenixArray array = (PhoenixArray)PDataType.VARBINARY_ARRAY.toObject(current.getValueArray(), current.getValueOffset(), current
+ .getValueLength());
+ if (array != null && array.getDimensions() != 0) {
+ List<byte[]> guidePosts = Lists.newArrayListWithExpectedSize(array.getDimensions());
+ for (int j = 0; j < array.getDimensions(); j++) {
+ byte[] gp = array.toBytes(j);
+ if (gp.length != 0) {
+ guidePosts.add(gp);
+ }
+ }
+ List<byte[]> gps = guidePostsPerCf.put(cfName, guidePosts);
+ if (gps != null) { // Add guidepost already there from other regions
+ guidePosts.addAll(gps);
+ }
+ }
+ }
+ }
+ if (!guidePostsPerCf.isEmpty()) {
+ // Sort guideposts, as the order above will depend on the order we traverse
+ // each region's worth of guideposts above.
+ for (List<byte[]> gps : guidePostsPerCf.values()) {
+ Collections.sort(gps, Bytes.BYTES_COMPARATOR);
+ }
+ return new PTableStatsImpl(guidePostsPerCf);
+ }
+ return PTableStats.EMPTY_STATS;
+ }
+}
\ No newline at end of file