You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2014/09/26 07:52:33 UTC
git commit: Phoenix-1264 : Add StatisticsCollector to existing tables
on first connection to cluster
Repository: phoenix
Updated Branches:
refs/heads/master 10efdb1f2 -> 6b0461002
Phoenix-1264 : Add StatisticsCollector to existing tables on first
connection to cluster
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6b046100
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6b046100
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6b046100
Branch: refs/heads/master
Commit: 6b04610022415fcc27ea69fe001cbd464badf355
Parents: 10efdb1
Author: Ramkrishna <ra...@intel.com>
Authored: Fri Sep 26 11:21:40 2014 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Fri Sep 26 11:21:40 2014 +0530
----------------------------------------------------------------------
...efaultParallelIteratorsRegionSplitterIT.java | 15 ++
.../phoenix/end2end/GuidePostsLifeCycleIT.java | 22 +-
.../org/apache/phoenix/end2end/KeyOnlyIT.java | 15 ++
.../phoenix/end2end/MultiCfQueryExecIT.java | 14 ++
.../phoenix/end2end/StatsCollectorIT.java | 44 ++--
.../coprocessor/BaseScannerRegionObserver.java | 1 +
.../UngroupedAggregateRegionObserver.java | 72 +++++-
.../DefaultParallelIteratorRegionSplitter.java | 30 +--
.../phoenix/query/ConnectionQueryServices.java | 3 -
.../query/ConnectionQueryServicesImpl.java | 51 +---
.../query/ConnectionlessQueryServicesImpl.java | 6 -
.../query/DelegateConnectionQueryServices.java | 5 -
.../apache/phoenix/schema/MetaDataClient.java | 36 ++-
.../schema/stat/StatisticsCollector.java | 249 +++++--------------
.../phoenix/schema/stat/StatisticsScanner.java | 7 +-
.../phoenix/schema/stat/StatisticsTable.java | 49 ++--
.../phoenix/schema/stat/StatisticsTracker.java | 62 -----
.../java/org/apache/phoenix/util/ScanUtil.java | 4 +
.../phoenix/query/QueryServicesTestImpl.java | 2 +-
.../src/main/StatisticsCollect.proto | 20 --
20 files changed, 272 insertions(+), 435 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java
index dd1dc8b..a6ec835 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java
@@ -27,6 +27,7 @@ import java.sql.SQLException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.hbase.HConstants;
@@ -40,13 +41,18 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import com.google.common.collect.Maps;
+
/**
* Tests for {@link DefaultParallelIteratorRegionSplitter}.
@@ -58,6 +64,14 @@ import org.junit.experimental.categories.Category;
@Category(ClientManagedTimeTest.class)
public class DefaultParallelIteratorsRegionSplitterIT extends BaseParallelIteratorsRegionSplitterIT {
+ @BeforeClass
+ @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+ public static void doSetup() throws Exception {
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ // Must update config before starting server
+ props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
private static List<KeyRange> getSplits(Connection conn, long ts, final Scan scan)
throws SQLException {
TableRef tableRef = getTableRef(conn, ts);
@@ -93,6 +107,7 @@ public class DefaultParallelIteratorsRegionSplitterIT extends BaseParallelIterat
Scan scan = new Scan();
// number of regions > target query concurrency
+ conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery();
scan.setStartRow(K1);
scan.setStopRow(K12);
List<KeyRange> keyRanges = getSplits(conn, ts, scan);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java
index 7645040..3cef492 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java
@@ -28,6 +28,7 @@ import java.sql.SQLException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -40,16 +41,32 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import com.google.common.collect.Maps;
+
@Category(HBaseManagedTimeTest.class)
public class GuidePostsLifeCycleIT extends BaseHBaseManagedTimeIT {
-
+
+ @BeforeClass
+ @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+ public static void doSetup() throws Exception {
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ // Must update config before starting server
+ props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
+ props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(20));
+ props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(20));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
protected static final byte[] KMIN = new byte[] {'!'};
protected static final byte[] KMIN2 = new byte[] {'.'};
protected static final byte[] K1 = new byte[] {'a'};
@@ -106,16 +123,19 @@ public class GuidePostsLifeCycleIT extends BaseHBaseManagedTimeIT {
upsert(new byte[][] { KMIN, K4, K11 });
stmt = conn.prepareStatement("ANALYZE STABLE");
stmt.execute();
+ conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery();
keyRanges = getSplits(conn, scan);
assertEquals(7, keyRanges.size());
upsert(new byte[][] { KMIN2, K5, K12 });
stmt = conn.prepareStatement("ANALYZE STABLE");
stmt.execute();
+ conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery();
keyRanges = getSplits(conn, scan);
assertEquals(10, keyRanges.size());
upsert(new byte[][] { K1, K6, KP });
stmt = conn.prepareStatement("ANALYZE STABLE");
stmt.execute();
+ conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery();
keyRanges = getSplits(conn, scan);
assertEquals(13, keyRanges.size());
conn.close();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
index 4b0d07f..f713fff 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
@@ -32,6 +32,7 @@ import java.sql.SQLException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -44,15 +45,29 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import com.google.common.collect.Maps;
+
@Category(ClientManagedTimeTest.class)
public class KeyOnlyIT extends BaseClientManagedTimeIT {
+
+ @BeforeClass
+ @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+ public static void doSetup() throws Exception {
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ // Must update config before starting server
+ props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
@Test
public void testKeyOnly() throws Exception {
long ts = nextTimestamp();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java
index ebf03d0..f01d985 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java
@@ -32,6 +32,7 @@ import java.sql.SQLException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -44,17 +45,30 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import com.google.common.collect.Maps;
+
@Category(ClientManagedTimeTest.class)
public class MultiCfQueryExecIT extends BaseClientManagedTimeIT {
private static final String MULTI_CF = "MULTI_CF";
+ @BeforeClass
+ @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+ public static void doSetup() throws Exception {
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ // Must update config before starting server
+ props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
protected static void initTableValues(long ts) throws Exception {
ensureTableCreated(getUrl(),MULTI_CF,null, ts-2);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
index a38abea..e20c11f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
@@ -1,10 +1,5 @@
package org.apache.phoenix.end2end;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
-import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
-import static org.apache.phoenix.util.TestUtil.LOCALHOST;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertTrue;
@@ -18,9 +13,6 @@ import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -32,27 +24,19 @@ import com.google.common.collect.Maps;
@Category(HBaseManagedTimeTest.class)
public class StatsCollectorIT extends BaseHBaseManagedTimeIT {
- private static String url;
- private static HBaseTestingUtility util;
- private static int frequency = 4000;
-
+ //private static String url;
+ private static int frequency = 5000;
+
@BeforeClass
+ @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
public static void doSetup() throws Exception {
- Configuration conf = HBaseConfiguration.create();
- setUpConfigForMiniCluster(conf);
- conf.setInt("hbase.client.retries.number", 2);
- conf.setInt("hbase.client.pause", 5000);
- conf.setLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, 0);
- util = new HBaseTestingUtility(conf);
- util.startMiniCluster();
- String clientPort = util.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
- url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort
- + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
- int histogramDepth = 60;
- Map<String, String> props = Maps.newHashMapWithExpectedSize(3);
- props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Integer.toString(histogramDepth));
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ // Must update config before starting server
+ props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Integer.toString(frequency));
- driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
+ props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(20));
+ props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(20));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
@Test
@@ -62,7 +46,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT {
ResultSet rs;
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
// props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
- conn = DriverManager.getConnection(url, props);
+ conn = DriverManager.getConnection(getUrl(), props);
conn.createStatement().execute(
"CREATE TABLE t ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n"
+ " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC)) \n");
@@ -99,7 +83,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT {
long ts = nextTimestamp();
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
// props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
- conn = DriverManager.getConnection(url, props);
+ conn = DriverManager.getConnection(getUrl(), props);
conn.createStatement().execute(
"CREATE TABLE x ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n"
+ " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC)) \n");
@@ -148,7 +132,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT {
Connection conn;
PreparedStatement stmt;
// props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30));
- conn = DriverManager.getConnection(url, props);
+ conn = DriverManager.getConnection(getUrl(), props);
stmt = upsertStmt(conn, tableName);
stmt.setString(1, "a");
String[] s = new String[] { "abc", "def", "ghi", "jkll", null, null, "xxx" };
@@ -219,7 +203,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT {
}
private void flush(String tableName) throws IOException, InterruptedException {
- util.getHBaseAdmin().flush(tableName.toUpperCase());
+ //utility.getHBaseAdmin().flush(tableName.toUpperCase());
}
private PreparedStatement upsertStmt(Connection conn, String tableName) throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index b2e2806..1129eef 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -60,6 +60,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String VIEW_CONSTANTS = "_ViewConstants";
public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey";
public static final String REVERSE_SCAN = "_ReverseScan";
+ public static final String ANALYZE_TABLE = "_ANALYZETABLE";
/** Exposed for testing */
public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on server";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 95b095e..d39f868 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -34,6 +34,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -47,7 +49,11 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.coprocessor.generated.PTableProtos;
@@ -63,6 +69,7 @@ import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.join.TupleProjector;
import org.apache.phoenix.query.QueryConstants;
@@ -74,6 +81,8 @@ import org.apache.phoenix.schema.PRow;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.stat.StatisticsCollector;
+import org.apache.phoenix.schema.stat.StatisticsTable;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.IndexUtil;
@@ -94,7 +103,7 @@ import com.google.common.collect.Sets;
*
* @since 0.1
*/
-public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver {
+public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
// TODO: move all constants into a single class
public static final String UNGROUPED_AGG = "UngroupedAgg";
public static final String DELETE_AGG = "DeleteAgg";
@@ -105,6 +114,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
public static final String EMPTY_CF = "EmptyCF";
private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
private KeyValueBuilder kvBuilder;
+ private static final Log LOG = LogFactory.getLog(UngroupedAggregateRegionObserver.class);
+ private StatisticsTable statsTable = null;
@Override
public void start(CoprocessorEnvironment e) throws IOException {
@@ -112,6 +123,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
// Can't use ClientKeyValueBuilder on server-side because the memstore expects to
// be able to get a single backing buffer for a KeyValue.
this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
+ String name = ((RegionCoprocessorEnvironment)e).getRegion().getTableDesc().getTableName().getNameAsString();
+ this.statsTable = StatisticsTable.getStatisticsTableForCoprocessor(e.getConfiguration(), name);
}
private static void commitBatch(HRegion region, List<Mutation> mutations, byte[] indexUUID) throws IOException {
@@ -128,16 +141,23 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
public static void serializeIntoScan(Scan scan) {
scan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, QueryConstants.TRUE);
}
-
+
@Override
protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException {
int offset = 0;
+ boolean isAnalyze = false;
+ HRegion region = c.getEnvironment().getRegion();
+ TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
+ StatisticsCollector stats = null;
+ if(scan.getAttribute(BaseScannerRegionObserver.ANALYZE_TABLE) != null && statsTable != null) {
+ stats = new StatisticsCollector(statsTable, c.getEnvironment().getConfiguration());
+ isAnalyze = true;
+ }
if (ScanUtil.isLocalIndex(scan)) {
/*
* For local indexes, we need to set an offset on row key expressions to skip
* the region start key.
*/
- HRegion region = c.getEnvironment().getRegion();
offset = region.getStartKey().length != 0 ? region.getStartKey().length:region.getEndKey().length;
ScanUtil.setRowKeyOffset(scan, offset);
}
@@ -199,7 +219,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
int batchSize = 0;
long ts = scan.getTimeRange().getMax();
- HRegion region = c.getEnvironment().getRegion();
List<Mutation> mutations = Collections.emptyList();
boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan;
if (isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) {
@@ -214,7 +233,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
boolean hasAny = false;
MultiKeyValueTuple result = new MultiKeyValueTuple();
if (logger.isInfoEnabled()) {
- logger.info("Starting ungrouped coprocessor scan " + scan);
+ logger.info("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo());
}
long rowCount = 0;
region.startRegionOperation();
@@ -226,6 +245,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
// since this is an indication of whether or not there are more values after the
// ones returned
hasMore = innerScanner.nextRaw(results);
+ if(isAnalyze && stats != null) {
+ stats.collectStatistics(results);
+ }
if (!results.isEmpty()) {
if (localIndexScan && !isDelete) {
@@ -346,6 +368,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
} while (hasMore);
} finally {
try {
+ if (isAnalyze && stats != null) {
+ stats.updateStatistic(region);
+ stats.clear();
+ }
innerScanner.close();
} finally {
region.closeRegionOperation();
@@ -408,6 +434,42 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
};
return scanner;
}
+
+ @Override
+ public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+ Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
+ long earliestPutTs, InternalScanner s) throws IOException {
+ InternalScanner internalScan = s;
+ TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
+ if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)
+ && scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
+ StatisticsCollector stats = new StatisticsCollector(statsTable, c.getEnvironment().getConfiguration());
+ internalScan =
+ stats.createCompactionScanner(c.getEnvironment().getRegion(), store, scanners, scanType, earliestPutTs, s);
+ }
+ return internalScan;
+ }
+
+
+ @Override
+ public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, HRegion l, HRegion r)
+ throws IOException {
+ HRegion region = e.getEnvironment().getRegion();
+ TableName table = region.getRegionInfo().getTable();
+ if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
+ try {
+ StatisticsCollector stats = new StatisticsCollector(statsTable, e.getEnvironment()
+ .getConfiguration());
+ stats.collectStatsDuringSplit(e.getEnvironment().getConfiguration(), l, r, region);
+ stats.clear();
+ } catch (IOException ioe) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Error while collecting stats during split ",ioe);
+ }
+ }
+ }
+
+ }
private HRegion getIndexRegion(RegionCoprocessorEnvironment environment) throws IOException {
HRegion userRegion = environment.getRegion();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
index 227163e..a0ac20c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
@@ -33,6 +33,7 @@ import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,23 +112,24 @@ public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRe
PTable table = this.tableRef.getTable();
byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table);
List<byte[]> gps = Lists.newArrayList();
-
- if (table.getColumnFamilies().isEmpty()) {
- // For sure we can get the defaultCF from the table
- gps = table.getGuidePosts();
- } else {
- try {
- if (scan.getFamilyMap().size() > 0) {
- if (scan.getFamilyMap().containsKey(defaultCF)) { // Favor using default CF if it's used in scan
+ if (!ScanUtil.isAnalyzeTable(scan)) {
+ if (table.getColumnFamilies().isEmpty()) {
+ // For sure we can get the defaultCF from the table
+ gps = table.getGuidePosts();
+ } else {
+ try {
+ if (scan.getFamilyMap().size() > 0) {
+ if (scan.getFamilyMap().containsKey(defaultCF)) { // Favor using default CF if it's used in scan
+ gps = table.getColumnFamily(defaultCF).getGuidePosts();
+ } else { // Otherwise, just use first CF in use by scan
+ gps = table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts();
+ }
+ } else {
gps = table.getColumnFamily(defaultCF).getGuidePosts();
- } else { // Otherwise, just use first CF in use by scan
- gps = table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts();
}
- } else {
- gps = table.getColumnFamily(defaultCF).getGuidePosts();
+ } catch (ColumnFamilyNotFoundException cfne) {
+ // Alter table does this
}
- } catch (ColumnFamilyNotFoundException cfne) {
- // Alter table does this
}
}
List<KeyRange> guidePosts = Lists.newArrayListWithCapacity(regions.size());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 0c1f45d..15c8ebe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -94,9 +94,6 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
void addConnection(PhoenixConnection connection) throws SQLException;
void removeConnection(PhoenixConnection connection) throws SQLException;
- long updateStatistics(KeyRange keyRange, byte[] tableName)
- throws SQLException;
-
/**
* @return the {@link KeyValueBuilder} that is valid for the locally installed version of HBase.
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index dfd56bc..25117ad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -88,9 +88,6 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRespons
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
-import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectRequest;
-import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectResponse;
-import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectService;
import org.apache.phoenix.exception.PhoenixIOException;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -124,7 +121,6 @@ import org.apache.phoenix.schema.Sequence;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.TableAlreadyExistsException;
import org.apache.phoenix.schema.TableNotFoundException;
-import org.apache.phoenix.schema.stat.StatisticsCollector;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.ConfigUtil;
@@ -143,7 +139,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.HBaseZeroCopyByteString;
-import com.google.protobuf.ServiceException;
public class ConnectionQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices {
@@ -595,10 +590,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, 1, null);
}
-
- if (!descriptor.hasCoprocessor(StatisticsCollector.class.getName())) {
- descriptor.addCoprocessor(StatisticsCollector.class.getName(), null, 1, null);
- }
// TODO: better encapsulation for this
// Since indexes can't have indexes, don't install our indexing coprocessor for indexes. Also,
// don't install on the metadata table until we fix the TODO there.
@@ -1864,47 +1855,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
}
-
- @Override
- public long updateStatistics(final KeyRange keyRange, final byte[] tableName) throws SQLException {
- HTableInterface ht = null;
- try {
- ht = this.getTable(tableName);
- Batch.Call<StatCollectService, StatCollectResponse> callable = new Batch.Call<StatCollectService, StatCollectResponse>() {
- ServerRpcController controller = new ServerRpcController();
- BlockingRpcCallback<StatCollectResponse> rpcCallback = new BlockingRpcCallback<StatCollectResponse>();
-
- @Override
- public StatCollectResponse call(StatCollectService service) throws IOException {
- StatCollectRequest.Builder builder = StatCollectRequest.newBuilder();
- builder.setStartRow(HBaseZeroCopyByteString.wrap(keyRange.getLowerRange()));
- builder.setStopRow(HBaseZeroCopyByteString.wrap(keyRange.getUpperRange()));
- service.collectStat(controller, builder.build(), rpcCallback);
- if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
- return rpcCallback.get();
- }
- };
- Map<byte[], StatCollectResponse> result = ht.coprocessorService(StatCollectService.class,
- keyRange.getLowerRange(), keyRange.getUpperRange(), callable);
- StatCollectResponse next = result.values().iterator().next();
- return next.getRowsScanned();
- } catch (ServiceException e) {
- throw new SQLException("Unable to update the statistics for the table " + tableName, e);
- } catch (TableNotFoundException e) {
- throw new SQLException("Unable to update the statistics for the table " + tableName, e);
- } catch (Throwable e) {
- throw new SQLException("Unable to update the statistics for the table " + tableName, e);
- } finally {
- if (ht != null) {
- try {
- ht.close();
- } catch (IOException e) {
- throw new SQLException("Unable to close the table " + tableName + " after collecting stats", e);
- }
- }
- }
- }
-
+
@Override
public void clearCacheForTable(final byte[] tenantId, final byte[] schemaName, final byte[] tableName,
final long clientTS) throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 9fa415c..055bc79 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -188,12 +188,6 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
public MetaDataMutationResult dropColumn(List<Mutation> tableMetadata, PTableType tableType) throws SQLException {
return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, null);
}
-
- @Override
- public long updateStatistics(KeyRange keyRange, byte[] tableName) throws SQLException {
- // Noop
- return 0;
- }
@Override
public void clearCacheForTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index fa01f09..8bd2c61 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -226,11 +226,6 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
public String getUserName() {
return getDelegate().getUserName();
}
-
- @Override
- public long updateStatistics(KeyRange keyRange, byte[] tableName) throws SQLException {
- return getDelegate().updateStatistics(keyRange, tableName);
- }
@Override
public void clearCacheForTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 1f933d8..82eb836 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -130,7 +130,6 @@ import org.apache.phoenix.parse.ParseNodeFactory;
import org.apache.phoenix.parse.PrimaryKeyConstraint;
import org.apache.phoenix.parse.TableName;
import org.apache.phoenix.parse.UpdateStatisticsStatement;
-import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -141,7 +140,6 @@ import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -479,20 +477,6 @@ public class MetaDataClient {
PTable table = resolver.getTables().get(0).getTable();
PName physicalName = table.getPhysicalName();
byte[] tenantIdBytes = ByteUtil.EMPTY_BYTE_ARRAY;
- KeyRange analyzeRange = KeyRange.EVERYTHING_RANGE;
- if (connection.getTenantId() != null && table.isMultiTenant()) {
- tenantIdBytes = connection.getTenantId().getBytes();
- // TODO remove this inner if once PHOENIX-1259 is fixed.
- if (table.getBucketNum() == null && table.getIndexType() != IndexType.LOCAL) {
- List<List<KeyRange>> tenantIdKeyRanges = Collections.singletonList(Collections.singletonList(KeyRange
- .getKeyRange(tenantIdBytes)));
- byte[] lowerRange = ScanUtil.getMinKey(table.getRowKeySchema(), tenantIdKeyRanges,
- ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
- byte[] upperRange = ScanUtil.getMaxKey(table.getRowKeySchema(), tenantIdKeyRanges,
- ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
- analyzeRange = KeyRange.getKeyRange(lowerRange, upperRange);
- }
- }
Long scn = connection.getSCN();
// Always invalidate the cache
long clientTS = connection.getSCN() == null ? HConstants.LATEST_TIMESTAMP : scn;
@@ -509,12 +493,26 @@ public class MetaDataClient {
lastUpdatedTime = rs.getDate(1).getTime() - rs.getDate(2).getTime();
}
if (minTimeForStatsUpdate > lastUpdatedTime) {
+ // Here create the select query.
+ String countQuery = "SELECT /*+ NO_CACHE */ count(*) FROM " + table.getName().getString();
+ PhoenixStatement statement = (PhoenixStatement) connection.createStatement();
+ QueryPlan plan = statement.compileQuery(countQuery);
+ Scan scan = plan.getContext().getScan();
+ // Add all CF in the table
+ scan.getFamilyMap().clear();
+ for (PColumnFamily family : table.getColumnFamilies()) {
+ scan.addFamily(family.getName().getBytes());
+ }
+ scan.setAttribute(BaseScannerRegionObserver.ANALYZE_TABLE, PDataType.TRUE_BYTES);
+ Cell kv = plan.iterator().next().getValue(0);
+ ImmutableBytesWritable tempPtr = plan.getContext().getTempPtr();
+ tempPtr.set(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+ // A single Cell will be returned with the count(*) - we decode that here
+ long rowCount = PDataType.LONG.getCodec().decodeLong(tempPtr, SortOrder.getDefault());
// We need to update the stats table
- connection.getQueryServices().updateStatistics(analyzeRange, physicalName.getBytes());
connection.getQueryServices().clearCacheForTable(tenantIdBytes, table.getSchemaName().getBytes(),
table.getTableName().getBytes(), clientTS);
- updateCache(table.getSchemaName().getString(), table.getTableName().getString(), true);
- return new MutationState(1, connection);
+ return new MutationState(0, connection, rowCount);
} else {
return new MutationState(0, connection);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
index 7552698..6b45c5e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
@@ -21,27 +21,15 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.Coprocessor;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@@ -50,13 +38,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.coprocessor.generated.StatCollectorProtos;
-import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectRequest;
-import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectResponse;
-import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectResponse.Builder;
-import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectService;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PDataType;
@@ -64,91 +46,50 @@ import org.apache.phoenix.schema.PhoenixArray;
import org.apache.phoenix.util.TimeKeeper;
import com.google.common.collect.Lists;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
+import com.google.common.collect.Maps;
/**
- * An endpoint implementation that allows to collect the stats for a given region and groups the stat per family. This is also an
- * RegionObserver that collects the information on compaction also. The user would be allowed to invoke this endpoint and thus populate the
- * Phoenix stats table with the max key, min key and guide posts for the given region. The stats can be consumed by the stats associated
- * with every PTable and the same can be used to parallelize the queries
+ * A default implementation of the Statistics tracker that helps to collect stats like min key, max key and
+ * guideposts
*/
-public class StatisticsCollector extends BaseRegionObserver implements CoprocessorService, Coprocessor,
- StatisticsTracker, StatCollectService.Interface {
+public class StatisticsCollector {
- public static void addToTable(HTableDescriptor desc) throws IOException {
- desc.addCoprocessor(StatisticsCollector.class.getName());
- }
-
- private Map<String, byte[]> minMap = new ConcurrentHashMap<String, byte[]>();
- private Map<String, byte[]> maxMap = new ConcurrentHashMap<String, byte[]>();
+ private Map<String, byte[]> minMap = Maps.newHashMap();
+ private Map<String, byte[]> maxMap = Maps.newHashMap();
private long guidepostDepth;
private long byteCount = 0;
- private Map<String, List<byte[]>> guidePostsMap = new ConcurrentHashMap<String, List<byte[]>>();
- private Map<ImmutableBytesPtr, Boolean> familyMap = new ConcurrentHashMap<ImmutableBytesPtr, Boolean>();
- private RegionCoprocessorEnvironment env;
- protected StatisticsTable stats;
+ private Map<String, List<byte[]>> guidePostsMap = Maps.newHashMap();
+ private Map<ImmutableBytesPtr, Boolean> familyMap = Maps.newHashMap();
+ protected StatisticsTable statsTable;
// Ensures that either analyze or compaction happens at any point of time.
- private ReentrantLock lock = new ReentrantLock();
private static final Log LOG = LogFactory.getLog(StatisticsCollector.class);
- @Override
- public void collectStat(RpcController controller, StatCollectRequest request, RpcCallback<StatCollectResponse> done) {
- HRegion region = env.getRegion();
- boolean heldLock = false;
- int count = 0;
- Builder newBuilder = StatCollectResponse.newBuilder();
+ public StatisticsCollector(StatisticsTable statsTable, Configuration conf) throws IOException {
+ // Get the stats table associated with the current table on which the CP is
+ // triggered
+ this.statsTable = statsTable;
+ guidepostDepth =
+ conf.getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB,
+ QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH);
+ }
+
+ public void updateStatistic(HRegion region) {
try {
- if (lock.tryLock()) {
- heldLock = true;
- // Clear all old stats
- clear();
- Scan scan = createScan(env.getConfiguration());
- if (request.hasStartRow()) {
- scan.setStartRow(request.getStartRow().toByteArray());
- }
- if (request.hasStopRow()) {
- scan.setStopRow(request.getStopRow().toByteArray());
- }
- RegionScanner scanner = null;
- try {
- scanner = region.getScanner(scan);
- count = scanRegion(scanner, count);
- } catch (IOException e) {
- LOG.error(e);
- ResponseConverter.setControllerException(controller, e);
- } finally {
- if (scanner != null) {
- try {
- ArrayList<Mutation> mutations = new ArrayList<Mutation>();
- writeStatsToStatsTable(region, scanner, true, mutations, TimeKeeper.SYSTEM.getCurrentTime());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Committing new stats for the region " + region.getRegionInfo());
- }
- commitStats(mutations);
- } catch (IOException e) {
- LOG.error(e);
- ResponseConverter.setControllerException(controller, e);
- } finally {
- clear();
- }
- }
- }
+ ArrayList<Mutation> mutations = new ArrayList<Mutation>();
+ writeStatsToStatsTable(region, true, mutations, TimeKeeper.SYSTEM.getCurrentTime());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Committing new stats for the region " + region.getRegionInfo());
}
+ commitStats(mutations);
+ } catch (IOException e) {
+ LOG.error(e);
} finally {
- if (heldLock) {
- lock.unlock();
- }
- newBuilder.setRowsScanned(count);
- StatCollectResponse result = newBuilder.build();
- done.run(result);
+ clear();
}
}
-
+
private void writeStatsToStatsTable(final HRegion region,
- final RegionScanner scanner, boolean delete, List<Mutation> mutations, long currentTime) throws IOException {
- scanner.close();
+ boolean delete, List<Mutation> mutations, long currentTime) throws IOException {
try {
// update the statistics table
for (ImmutableBytesPtr fam : familyMap.keySet()) {
@@ -157,13 +98,13 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
if(LOG.isDebugEnabled()) {
LOG.debug("Deleting the stats for the region "+region.getRegionInfo());
}
- stats.deleteStats(tableName, region.getRegionInfo().getRegionNameAsString(), this,
+ statsTable.deleteStats(tableName, region.getRegionInfo().getRegionNameAsString(), this,
Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime);
}
if(LOG.isDebugEnabled()) {
LOG.debug("Adding new stats for the region "+region.getRegionInfo());
}
- stats.addStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this,
+ statsTable.addStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this,
Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime);
}
} catch (IOException e) {
@@ -173,7 +114,7 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
}
private void commitStats(List<Mutation> mutations) throws IOException {
- stats.commitStats(mutations);
+ statsTable.commitStats(mutations);
}
private void deleteStatsFromStatsTable(final HRegion region, List<Mutation> mutations, long currentTime) throws IOException {
@@ -181,7 +122,7 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
// update the statistics table
for (ImmutableBytesPtr fam : familyMap.keySet()) {
String tableName = region.getRegionInfo().getTable().getNameAsString();
- stats.deleteStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this,
+ statsTable.deleteStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this,
Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime);
}
} catch (IOException e) {
@@ -196,7 +137,7 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
while (hasMore) {
// Am getting duplicates here. Need to avoid that
hasMore = scanner.next(results);
- updateStat(results);
+ collectStatistics(results);
count += results.size();
results.clear();
while (!hasMore) {
@@ -212,93 +153,42 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
* @param results
* next batch of {@link KeyValue}s
*/
- protected void updateStat(final List<Cell> results) {
+ public void collectStatistics(final List<Cell> results) {
for (Cell c : results) {
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
updateStatistic(kv);
}
}
- @Override
- public Service getService() {
- return StatCollectorProtos.StatCollectService.newReflectiveService(this);
- }
-
- @Override
- public void start(CoprocessorEnvironment env) throws IOException {
- if (env instanceof RegionCoprocessorEnvironment) {
- this.env = (RegionCoprocessorEnvironment)env;
- } else {
- throw new CoprocessorException("Must be loaded on a table region!");
- }
- HTableDescriptor desc = ((RegionCoprocessorEnvironment)env).getRegion().getTableDesc();
- // Get the stats table associated with the current table on which the CP is
- // triggered
- stats = StatisticsTable.getStatisticsTableForCoprocessor(env, desc.getName());
- guidepostDepth = env.getConfiguration().getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB,
- QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH);
- }
-
- @Override
- public void stop(CoprocessorEnvironment env) throws IOException {
- if (env instanceof RegionCoprocessorEnvironment) {
- TableName table = ((RegionCoprocessorEnvironment)env).getRegion().getRegionInfo().getTable();
- // Close only if the table is system table
- if(table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
- stats.close();
- }
- }
- }
-
- @Override
- public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
- List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s)
- throws IOException {
+ public InternalScanner createCompactionScanner(HRegion region, Store store,
+ List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s) throws IOException {
+ // See if this is for Major compaction
InternalScanner internalScan = s;
- TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
- if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
- boolean heldLock = false;
- try {
- if (lock.tryLock()) {
- heldLock = true;
- // See if this is for Major compaction
- if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
- // this is the first CP accessed, so we need to just create a major
- // compaction scanner, just
- // like in the compactor
- if (s == null) {
- Scan scan = new Scan();
- scan.setMaxVersions(store.getFamily().getMaxVersions());
- long smallestReadPoint = store.getSmallestReadPoint();
- internalScan = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType,
- smallestReadPoint, earliestPutTs);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Compaction scanner created for stats");
- }
- InternalScanner scanner = getInternalScanner(c, store, internalScan,
- store.getColumnFamilyName());
- if (scanner != null) {
- internalScan = scanner;
- }
- }
- }
- } finally {
- if (heldLock) {
- lock.unlock();
- }
+ if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
+ // this is the first CP accessed, so we need to just create a major
+ // compaction scanner, just
+ // like in the compactor
+ if (s == null) {
+ Scan scan = new Scan();
+ scan.setMaxVersions(store.getFamily().getMaxVersions());
+ long smallestReadPoint = store.getSmallestReadPoint();
+ internalScan = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType,
+ smallestReadPoint, earliestPutTs);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Compaction scanner created for stats");
+ }
+ InternalScanner scanner = getInternalScanner(region, store, internalScan, store.getColumnFamilyName());
+ if (scanner != null) {
+ internalScan = scanner;
}
}
return internalScan;
}
-
- @Override
- public void postSplit(ObserverContext<RegionCoprocessorEnvironment> ctx, HRegion l, HRegion r) throws IOException {
- // Invoke collectStat here
- HRegion region = ctx.getEnvironment().getRegion();
- TableName table = region.getRegionInfo().getTable();
- if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
+ public void collectStatsDuringSplit(Configuration conf, HRegion l, HRegion r,
+ HRegion region) {
+ try {
if (familyMap != null) {
familyMap.clear();
}
@@ -307,14 +197,13 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
// TODO : Try making this atomic
List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3);
long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
- Configuration conf = ctx.getEnvironment().getConfiguration();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Collecting stats for the daughter region "+l.getRegionInfo());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Collecting stats for the daughter region " + l.getRegionInfo());
}
collectStatsForSplitRegions(conf, l, region, true, mutations, currentTime);
clear();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Collecting stats for the daughter region "+r.getRegionInfo());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Collecting stats for the daughter region " + r.getRegionInfo());
}
collectStatsForSplitRegions(conf, r, region, false, mutations, currentTime);
clear();
@@ -322,6 +211,9 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
LOG.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo());
}
commitStats(mutations);
+ } catch (IOException e) {
+ LOG.error("Error while capturing stats after split of region "
+ + region.getRegionInfo().getRegionNameAsString(), e);
}
}
@@ -345,7 +237,7 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
}
deleteStatsFromStatsTable(parent, mutations, currentTime);
}
- writeStatsToStatsTable(daughter, scanner, false, mutations, currentTime);
+ writeStatsToStatsTable(daughter, false, mutations, currentTime);
} catch (IOException e) {
LOG.error(e);
throw e;
@@ -363,13 +255,12 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
return scan;
}
- protected InternalScanner getInternalScanner(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+ protected InternalScanner getInternalScanner(HRegion region, Store store,
InternalScanner internalScan, String family) {
- return new StatisticsScanner(this, stats, c.getEnvironment().getRegion().getRegionInfo(), internalScan,
+ return new StatisticsScanner(this, statsTable, region.getRegionInfo(), internalScan,
Bytes.toBytes(family));
}
- @Override
public void clear() {
this.maxMap.clear();
this.minMap.clear();
@@ -377,7 +268,6 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
this.familyMap.clear();
}
- @Override
public void updateStatistic(KeyValue kv) {
byte[] cf = kv.getFamily();
familyMap.put(new ImmutableBytesPtr(cf), true);
@@ -415,19 +305,16 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
}
}
- @Override
public byte[] getMaxKey(String fam) {
if (maxMap.get(fam) != null) { return maxMap.get(fam); }
return null;
}
- @Override
public byte[] getMinKey(String fam) {
if (minMap.get(fam) != null) { return minMap.get(fam); }
return null;
}
- @Override
public byte[] getGuidePosts(String fam) {
if (!guidePostsMap.isEmpty()) {
List<byte[]> guidePosts = guidePostsMap.get(fam);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
index 09174b2..86ffca7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
@@ -32,10 +32,10 @@ public class StatisticsScanner implements InternalScanner {
private InternalScanner delegate;
private StatisticsTable stats;
private HRegionInfo region;
- private StatisticsTracker tracker;
+ private StatisticsCollector tracker;
private byte[] family;
- public StatisticsScanner(StatisticsTracker tracker, StatisticsTable stats, HRegionInfo region,
+ public StatisticsScanner(StatisticsCollector tracker, StatisticsTable stats, HRegionInfo region,
InternalScanner delegate, byte[] family) {
// should there be only one tracker?
this.tracker = tracker;
@@ -109,9 +109,6 @@ public class StatisticsScanner implements InternalScanner {
delegate.close();
} catch (IOException e) {
LOG.error("Error while closing the scanner");
- // TODO : We should throw the exception
- /*if (toThrow == null) { throw e; }
- throw MultipleIOException.createIOException(Lists.newArrayList(toThrow, e));*/
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
index fcbbee9..e92d61e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
@@ -25,15 +25,11 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PDataType;
@@ -46,38 +42,34 @@ public class StatisticsTable implements Closeable {
/** Map of the currently open statistics tables */
private static final Map<String, StatisticsTable> tableMap = new HashMap<String, StatisticsTable>();
/**
- * @param env
- * Environment wherein the coprocessor is attempting to update the stats table.
+ * @param Configuration
+ * Configruation to update the stats table.
* @param primaryTableName
* name of the primary table on which we should collect stats
* @return the {@link StatisticsTable} for the given primary table.
* @throws IOException
* if the table cannot be created due to an underlying HTable creation error
*/
- public synchronized static StatisticsTable getStatisticsTableForCoprocessor(CoprocessorEnvironment env,
- byte[] primaryTableName) throws IOException {
+ public synchronized static StatisticsTable getStatisticsTableForCoprocessor(Configuration conf,
+ String primaryTableName) throws IOException {
StatisticsTable table = tableMap.get(primaryTableName);
if (table == null) {
// Map the statics table and the table with which the statistics is
// associated. This is a workaround
- HTablePool pool = new HTablePool(env.getConfiguration(), 1);
+ HTablePool pool = new HTablePool(conf,1);
+ //HTable hTable = new HTable(conf, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
HTableInterface hTable = pool.getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
- table = new StatisticsTable(hTable, primaryTableName);
- tableMap.put(Bytes.toString(primaryTableName), table);
+ //h.setAutoFlushTo(true);
+ table = new StatisticsTable(hTable);
+ tableMap.put(primaryTableName, table);
}
return table;
}
private final HTableInterface statisticsTable;
- private final byte[] sourceTableName;
- private StatisticsTable(HTableInterface statsTable, byte[] sourceTableName) {
+ public StatisticsTable(HTableInterface statsTable) {
this.statisticsTable = statsTable;
- this.sourceTableName = sourceTableName;
- }
-
- public StatisticsTable(Configuration conf, HTableDescriptor source) throws IOException {
- this(new HTable(conf, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME), source.getName());
}
/**
@@ -104,7 +96,7 @@ public class StatisticsTable implements Closeable {
* if we fail to do any of the puts. Any single failure will prevent any future attempts for the remaining list of stats to
* update
*/
- public void addStats(String tableName, String regionName, StatisticsTracker tracker, String fam,
+ public void addStats(String tableName, String regionName, StatisticsCollector tracker, String fam,
List<Mutation> mutations, long currentTime) throws IOException {
if (tracker == null) { return; }
@@ -119,13 +111,15 @@ public class StatisticsTable implements Closeable {
public void commitStats(List<Mutation> mutations) throws IOException {
Object[] res = new Object[mutations.size()];
try {
- statisticsTable.batch(mutations, res);
+ if (mutations.size() > 0) {
+ statisticsTable.batch(mutations, res);
+ }
} catch (InterruptedException e) {
throw new IOException("Exception while adding deletes and puts");
}
}
- private void formStatsUpdateMutation(StatisticsTracker tracker, String fam, List<Mutation> mutations,
+ private void formStatsUpdateMutation(StatisticsCollector tracker, String fam, List<Mutation> mutations,
long currentTime, byte[] prefix) {
Put put = new Put(prefix, currentTime);
if (tracker.getGuidePosts(fam) != null) {
@@ -147,22 +141,11 @@ public class StatisticsTable implements Closeable {
mutations.add(put);
}
- public void deleteStats(String tableName, String regionName, StatisticsTracker tracker, String fam,
+ public void deleteStats(String tableName, String regionName, StatisticsCollector tracker, String fam,
List<Mutation> mutations, long currentTime)
throws IOException {
byte[] prefix = StatisticsUtils.getRowKey(PDataType.VARCHAR.toBytes(tableName), PDataType.VARCHAR.toBytes(fam),
PDataType.VARCHAR.toBytes(regionName));
mutations.add(new Delete(prefix, currentTime - 1));
}
-
- /**
- * @return the underlying {@link HTableInterface} to which this table is writing
- */
- HTableInterface getUnderlyingTable() {
- return statisticsTable;
- }
-
- byte[] getSourceTableName() {
- return this.sourceTableName;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java
deleted file mode 100644
index e1754f3..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.schema.stat;
-
-import org.apache.hadoop.hbase.KeyValue;
-
-/**
- * Track a statistic for the column on a given region
- */
-public interface StatisticsTracker {
-
- /**
- * Reset the statistic after the completion of the compaction
- */
- public void clear();
-
- /**
- * Update the current statistics with the next {@link KeyValue} to be written
- *
- * @param kv
- * next {@link KeyValue} to be written.
- */
- public void updateStatistic(KeyValue kv);
-
- /**
- * Return the max key of the family
- * @param fam
- * @return
- */
- public byte[] getMaxKey(String fam);
-
- /**
- * Return the min key of the family
- *
- * @param fam
- * @return
- */
- public byte[] getMinKey(String fam);
-
- /**
- * Return the guide posts of the family
- *
- * @param fam
- * @return
- */
- public byte[] getGuidePosts(String fam);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 42b20fe..daef1c3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -552,4 +552,8 @@ public class ScanUtil {
return offset + slotPosition;
}
+
+ public static boolean isAnalyzeTable(Scan scan) {
+ return scan.getAttribute((BaseScannerRegionObserver.ANALYZE_TABLE)) != null;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index a65ca77..6e29c69 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -51,7 +51,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
private static final String DEFAULT_WAL_EDIT_CODEC = IndexedWALEditCodec.class.getName();
public static final long DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE = 1024L*1024L*4L; // 4 Mb
public static final long DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE = 1024L*1024L*2L; // 2 Mb
- public static final long DEFAULT_HISTOGRAM_BYTE_DEPTH = 20;
+ public static final long DEFAULT_HISTOGRAM_BYTE_DEPTH = 2000;
public QueryServicesTestImpl(ReadOnlyProps defaultProps) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-protocol/src/main/StatisticsCollect.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/StatisticsCollect.proto b/phoenix-protocol/src/main/StatisticsCollect.proto
deleted file mode 100644
index c80a756..0000000
--- a/phoenix-protocol/src/main/StatisticsCollect.proto
+++ /dev/null
@@ -1,20 +0,0 @@
-option java_package = "org.apache.phoenix.coprocessor.generated";
-option java_outer_classname = "StatCollectorProtos";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-option optimize_for = SPEED;
-
-
-message StatCollectRequest {
- optional bytes startRow = 1;
- optional bytes stopRow = 2;
-}
-
-message StatCollectResponse {
- required uint64 rowsScanned = 1;
-}
-
-service StatCollectService {
- rpc collectStat(StatCollectRequest)
- returns (StatCollectResponse);
-}
\ No newline at end of file