You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2014/10/01 05:51:51 UTC
[1/2] Phoenix-1264 Add StatisticsCollector to existing tables on
first connection to cluster
Repository: phoenix
Updated Branches:
refs/heads/3.0 aaadb360e -> 1d22fdc98
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
index b63730c..f9347a4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
@@ -12,24 +12,14 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Coprocessor;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@@ -40,7 +30,6 @@ import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PDataType;
@@ -48,6 +37,7 @@ import org.apache.phoenix.schema.PhoenixArray;
import org.apache.phoenix.util.TimeKeeper;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
/**
* An endpoint implementation that allows to collect the stats for a given region and groups the stat per family. This is also an
@@ -55,74 +45,46 @@ import com.google.common.collect.Lists;
* Phoenix stats table with the max key, min key and guide posts for the given region. The stats can be consumed by the stats associated
* with every PTable and the same can be used to parallelize the queries
*/
-public class StatisticsCollector extends BaseRegionObserver implements Coprocessor, StatisticsTracker,
- StatisticsCollectorProtocol {
+public class StatisticsCollector {
public static void addToTable(HTableDescriptor desc) throws IOException {
desc.addCoprocessor(StatisticsCollector.class.getName());
}
- private Map<String, byte[]> minMap = new ConcurrentHashMap<String, byte[]>();
- private Map<String, byte[]> maxMap = new ConcurrentHashMap<String, byte[]>();
+ private Map<String, byte[]> minMap = Maps.newHashMap();
+ private Map<String, byte[]> maxMap = Maps.newHashMap();
private long guidepostDepth;
private long byteCount = 0;
- private Map<String, List<byte[]>> guidePostsMap = new ConcurrentHashMap<String, List<byte[]>>();
- private Map<ImmutableBytesPtr, Boolean> familyMap = new ConcurrentHashMap<ImmutableBytesPtr, Boolean>();
- private RegionCoprocessorEnvironment env;
- protected StatisticsTable stats;
- // Ensures that either analyze or compaction happens at any point of time.
- private ReentrantLock lock = new ReentrantLock();
+ private Map<String, List<byte[]>> guidePostsMap = Maps.newHashMap();
+ private Map<ImmutableBytesPtr, Boolean> familyMap = Maps.newHashMap();
+ protected StatisticsTable statsTable;
private static final Log LOG = LogFactory.getLog(StatisticsCollector.class);
- @Override
- public StatisticsCollectorResponse collectStat(KeyRange keyRange) throws IOException {
- HRegion region = env.getRegion();
- boolean heldLock = false;
- int count = 0;
+ public StatisticsCollector(StatisticsTable statsTable, Configuration conf) throws IOException {
+ // Get the stats table associated with the current table on which the CP is
+ // triggered
+ this.statsTable = statsTable;
+ guidepostDepth = conf.getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB,
+ QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH);
+ }
+
+ public void updateStatistic(HRegion region) {
try {
- if (lock.tryLock()) {
- heldLock = true;
- // Clear all old stats
- clear();
- Scan scan = createScan(env.getConfiguration());
- scan.setStartRow(keyRange.getLowerRange());
- scan.setStopRow(keyRange.getUpperRange());
- RegionScanner scanner = null;
- try {
- scanner = region.getScanner(scan);
- count = scanRegion(scanner, count);
- } catch (IOException e) {
- LOG.error(e);
- throw e;
- } finally {
- if (scanner != null) {
- try {
- ArrayList<Mutation> mutations = new ArrayList<Mutation>();
- writeStatsToStatsTable(region, scanner, true, mutations, TimeKeeper.SYSTEM.getCurrentTime());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Committing new stats for the region " + region.getRegionInfo());
- }
- commitStats(mutations);
- } catch (IOException e) {
- LOG.error(e);
- throw e;
- }
- }
- }
+ ArrayList<Mutation> mutations = new ArrayList<Mutation>();
+ writeStatsToStatsTable(region, true, mutations, TimeKeeper.SYSTEM.getCurrentTime());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Committing new stats for the region " + region.getRegionInfo());
}
+ commitStats(mutations);
+ } catch (IOException e) {
+ LOG.error(e);
} finally {
- if (heldLock) {
- lock.unlock();
- }
+ clear();
}
- StatisticsCollectorResponse response = new StatisticsCollectorResponse();
- response.setRowsScanned(count);
- return response;
}
- private void writeStatsToStatsTable(final HRegion region, final RegionScanner scanner, boolean delete,
+ private void writeStatsToStatsTable(final HRegion region, boolean delete,
List<Mutation> mutations, long currentTime) throws IOException {
- scanner.close();
try {
// update the statistics table
for (ImmutableBytesPtr fam : familyMap.keySet()) {
@@ -131,13 +93,13 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting the stats for the region " + region.getRegionInfo());
}
- stats.deleteStats(tableName, region.getRegionInfo().getRegionNameAsString(), this,
+ statsTable.deleteStats(tableName, region.getRegionInfo().getRegionNameAsString(), this,
Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Adding new stats for the region " + region.getRegionInfo());
}
- stats.addStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this,
+ statsTable.addStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this,
Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime);
}
} catch (IOException e) {
@@ -147,7 +109,7 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
}
private void commitStats(List<Mutation> mutations) throws IOException {
- stats.commitStats(mutations);
+ statsTable.commitStats(mutations);
}
private void deleteStatsFromStatsTable(final HRegion region, List<Mutation> mutations, long currentTime)
@@ -156,7 +118,7 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
// update the statistics table
for (ImmutableBytesPtr fam : familyMap.keySet()) {
String tableName = region.getRegionInfo().getTableNameAsString();
- stats.deleteStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this,
+ statsTable.deleteStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this,
Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime);
}
} catch (IOException e) {
@@ -171,7 +133,7 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
while (hasMore) {
// Am getting duplicates here. Need to avoid that
hasMore = scanner.next(results);
- updateStat(results);
+ collectStatistics(results);
count += results.size();
results.clear();
while (!hasMore) {
@@ -181,117 +143,43 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
return count;
}
- /**
- * Update the current statistics based on the lastest batch of key-values from the underlying scanner
- *
- * @param results
- * next batch of {@link KeyValue}s
- */
- protected void updateStat(final List<KeyValue> results) {
+ public void collectStatistics(final List<KeyValue> results) {
for (KeyValue kv : results) {
updateStatistic(kv);
}
}
- @Override
- public void start(CoprocessorEnvironment env) throws IOException {
- if (env instanceof RegionCoprocessorEnvironment) {
- this.env = (RegionCoprocessorEnvironment)env;
- } else {
- throw new CoprocessorException("Must be loaded on a table region!");
- }
- String tableName = ((RegionCoprocessorEnvironment)env).getRegion().getRegionInfo().getTableNameAsString();
- if (!tableName.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
- HTableDescriptor desc = ((RegionCoprocessorEnvironment)env).getRegion().getTableDesc();
- // Get the stats table associated with the current table on which the CP is
- // triggered
- stats = StatisticsTable.getStatisticsTableForCoprocessor(env, desc.getName());
- guidepostDepth = env.getConfiguration().getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB,
- QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH);
- }
- }
-
- @Override
- public void stop(CoprocessorEnvironment env) throws IOException {
- if (env instanceof RegionCoprocessorEnvironment) {
- String tableName = ((RegionCoprocessorEnvironment)env).getRegion().getRegionInfo().getTableNameAsString();
- // Close only if the table is system table
- if (tableName.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
- stats.close();
- }
- }
- }
-
- @Override
- public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
- List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s)
+ public void collectStatsDuringSplit(Configuration conf, HRegion l, HRegion r, HRegion parent)
throws IOException {
- InternalScanner internalScan = s;
- String tableNameAsString = c.getEnvironment().getRegion().getRegionInfo().getTableNameAsString();
- if (!tableNameAsString.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
- boolean heldLock = false;
- try {
- if (lock.tryLock()) {
- heldLock = true;
- // See if this is for Major compaction
- if (scanType.equals(ScanType.MAJOR_COMPACT)) {
- // this is the first CP accessed, so we need to just create a major
- // compaction scanner, just
- // like in the compactor
- if (s == null) {
- Scan scan = new Scan();
- scan.setMaxVersions(store.getFamily().getMaxVersions());
- long smallestReadPoint = store.getHRegion().getSmallestReadPoint();
- internalScan = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType,
- smallestReadPoint, earliestPutTs);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Compaction scanner created for stats");
- }
- InternalScanner scanner = getInternalScanner(c, store, internalScan,
- store.getColumnFamilyName());
- if (scanner != null) {
- internalScan = scanner;
- }
- }
+ // Invoke collectStat here
+ String tableName = parent.getRegionInfo().getTableNameAsString();
+ try {
+ if (!tableName.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
+ if (familyMap != null) {
+ familyMap.clear();
+ }
+ // Create a delete operation on the parent region
+ // Then write the new guide posts for individual regions
+ // TODO : Try making this atomic
+ List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3);
+ long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Collecting stats for the daughter region " + l.getRegionInfo());
}
- } finally {
- if (heldLock) {
- lock.unlock();
+ collectStatsForSplitRegions(conf, l, parent, true, mutations, currentTime);
+ clear();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Collecting stats for the daughter region " + r.getRegionInfo());
}
+ collectStatsForSplitRegions(conf, r, parent, false, mutations, currentTime);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo());
+ }
+ commitStats(mutations);
}
- }
- return internalScan;
- }
-
- @Override
- public void postSplit(ObserverContext<RegionCoprocessorEnvironment> ctx, HRegion l, HRegion r) throws IOException {
- // Invoke collectStat here
- HRegion region = ctx.getEnvironment().getRegion();
- String tableName = region.getRegionInfo().getTableNameAsString();
- if (!tableName.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
- if (familyMap != null) {
- familyMap.clear();
- }
- // Create a delete operation on the parent region
- // Then write the new guide posts for individual regions
- // TODO : Try making this atomic
- List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3);
- long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
- Configuration conf = ctx.getEnvironment().getConfiguration();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Collecting stats for the daughter region " + l.getRegionInfo());
- }
- collectStatsForSplitRegions(conf, l, region, true, mutations, currentTime);
- clear();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Collecting stats for the daughter region " + r.getRegionInfo());
- }
- collectStatsForSplitRegions(conf, r, region, false, mutations, currentTime);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo());
- }
- commitStats(mutations);
+ } catch (IOException e) {
+ LOG.error("Error while capturing stats after split of region "
+ + parent.getRegionInfo().getRegionNameAsString(), e);
}
}
@@ -315,7 +203,7 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
}
deleteStatsFromStatsTable(parent, mutations, currentTime);
}
- writeStatsToStatsTable(daughter, scanner, false, mutations, currentTime);
+ writeStatsToStatsTable(daughter, false, mutations, currentTime);
} catch (IOException e) {
LOG.error(e);
throw e;
@@ -323,6 +211,33 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
}
}
}
+
+ public InternalScanner createCompactionScanner(HRegion region, Store store,
+ List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s)
+ throws IOException {
+ // See if this is for Major compaction
+ InternalScanner internalScan = s;
+ if (scanType.equals(ScanType.MAJOR_COMPACT)) {
+ // this is the first CP accessed, so we need to just create a major
+ // compaction scanner, just
+ // like in the compactor
+ if (s == null) {
+ Scan scan = new Scan();
+ scan.setMaxVersions(store.getFamily().getMaxVersions());
+ long smallestReadPoint = store.getHRegion().getSmallestReadPoint();
+ internalScan = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType,
+ smallestReadPoint, earliestPutTs);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Compaction scanner created for stats");
+ }
+ InternalScanner scanner = getInternalScanner(region, store, internalScan, store.getColumnFamilyName());
+ if (scanner != null) {
+ internalScan = scanner;
+ }
+ }
+ return internalScan;
+ }
private Scan createScan(Configuration conf) {
Scan scan = new Scan();
@@ -332,13 +247,12 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
return scan;
}
- protected InternalScanner getInternalScanner(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+ protected InternalScanner getInternalScanner(HRegion region, Store store,
InternalScanner internalScan, String family) {
- return new StatisticsScanner(this, stats, c.getEnvironment().getRegion().getRegionInfo(), internalScan,
+ return new StatisticsScanner(this, statsTable, region.getRegionInfo(), internalScan,
Bytes.toBytes(family));
}
- @Override
public void clear() {
this.maxMap.clear();
this.minMap.clear();
@@ -346,7 +260,6 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
this.familyMap.clear();
}
- @Override
public void updateStatistic(KeyValue kv) {
byte[] cf = kv.getFamily();
familyMap.put(new ImmutableBytesPtr(cf), true);
@@ -382,19 +295,16 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
}
}
- @Override
public byte[] getMaxKey(String fam) {
if (maxMap.get(fam) != null) { return maxMap.get(fam); }
return null;
}
- @Override
public byte[] getMinKey(String fam) {
if (minMap.get(fam) != null) { return minMap.get(fam); }
return null;
}
- @Override
public byte[] getGuidePosts(String fam) {
if (!guidePostsMap.isEmpty()) {
List<byte[]> guidePosts = guidePostsMap.get(fam);
@@ -411,15 +321,4 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
}
return null;
}
-
- @Override
- public ProtocolSignature getProtocolSignature(String protocol, long version, int clientMethodsHashCode)
- throws IOException {
- return new ProtocolSignature(BaseEndpointCoprocessor.VERSION, null);
- }
-
- @Override
- public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
- return BaseEndpointCoprocessor.VERSION;
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
index 34e52b8..44ad906 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
@@ -30,10 +30,10 @@ public class StatisticsScanner implements InternalScanner {
private InternalScanner delegate;
private StatisticsTable stats;
private HRegionInfo region;
- private StatisticsTracker tracker;
+ private StatisticsCollector tracker;
private byte[] family;
- public StatisticsScanner(StatisticsTracker tracker, StatisticsTable stats, HRegionInfo region,
+ public StatisticsScanner(StatisticsCollector tracker, StatisticsTable stats, HRegionInfo region,
InternalScanner delegate, byte[] family) {
// should there be only one tracker?
this.tracker = tracker;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
index 8e30176..2ea8a13 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
@@ -25,15 +25,11 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PDataType;
@@ -54,17 +50,17 @@ public class StatisticsTable implements Closeable {
* @throws IOException
* if the table cannot be created due to an underlying HTable creation error
*/
- public synchronized static StatisticsTable getStatisticsTableForCoprocessor(CoprocessorEnvironment env,
- byte[] primaryTableName) throws IOException {
+ public synchronized static StatisticsTable getStatisticsTableForCoprocessor(Configuration conf,
+ String primaryTableName) throws IOException {
StatisticsTable table = tableMap.get(primaryTableName);
if (table == null) {
// Map the statics table and the table with which the statistics is
// associated. This is a workaround
- HTablePool pool = new HTablePool(env.getConfiguration(), 1);
+ HTablePool pool = new HTablePool(conf, 1);
try {
HTableInterface hTable = pool.getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
- table = new StatisticsTable(hTable, primaryTableName);
- tableMap.put(Bytes.toString(primaryTableName), table);
+ table = new StatisticsTable(hTable);
+ tableMap.put(primaryTableName, table);
} finally {
pool.close();
}
@@ -73,15 +69,9 @@ public class StatisticsTable implements Closeable {
}
private final HTableInterface statisticsTable;
- private final byte[] sourceTableName;
- private StatisticsTable(HTableInterface statsTable, byte[] sourceTableName) {
+ private StatisticsTable(HTableInterface statsTable) {
this.statisticsTable = statsTable;
- this.sourceTableName = sourceTableName;
- }
-
- public StatisticsTable(Configuration conf, HTableDescriptor source) throws IOException {
- this(new HTable(conf, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME), source.getName());
}
/**
@@ -108,7 +98,7 @@ public class StatisticsTable implements Closeable {
* if we fail to do any of the puts. Any single failure will prevent any future attempts for the remaining list of stats to
* update
*/
- public void addStats(String tableName, String regionName, StatisticsTracker tracker, String fam,
+ public void addStats(String tableName, String regionName, StatisticsCollector tracker, String fam,
List<Mutation> mutations, long currentTime) throws IOException {
if (tracker == null) { return; }
@@ -123,13 +113,15 @@ public class StatisticsTable implements Closeable {
public void commitStats(List<Mutation> mutations) throws IOException {
Object[] res = new Object[mutations.size()];
try {
- statisticsTable.batch(mutations, res);
+ if (mutations.size() > 0) {
+ statisticsTable.batch(mutations, res);
+ }
} catch (InterruptedException e) {
throw new IOException("Exception while adding deletes and puts");
}
}
- private void formStatsUpdateMutation(StatisticsTracker tracker, String fam, List<Mutation> mutations,
+ private void formStatsUpdateMutation(StatisticsCollector tracker, String fam, List<Mutation> mutations,
long currentTime, byte[] prefix) {
Put put = new Put(prefix, currentTime);
if (tracker.getGuidePosts(fam) != null) {
@@ -151,22 +143,11 @@ public class StatisticsTable implements Closeable {
mutations.add(put);
}
- public void deleteStats(String tableName, String regionName, StatisticsTracker tracker, String fam,
+ public void deleteStats(String tableName, String regionName, StatisticsCollector tracker, String fam,
List<Mutation> mutations, long currentTime)
throws IOException {
byte[] prefix = StatisticsUtils.getRowKey(PDataType.VARCHAR.toBytes(tableName), PDataType.VARCHAR.toBytes(fam),
PDataType.VARCHAR.toBytes(regionName));
mutations.add(new Delete(prefix, currentTime - 1));
}
-
- /**
- * @return the underlying {@link HTableInterface} to which this table is writing
- */
- HTableInterface getUnderlyingTable() {
- return statisticsTable;
- }
-
- byte[] getSourceTableName() {
- return this.sourceTableName;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java
deleted file mode 100644
index e1754f3..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.schema.stat;
-
-import org.apache.hadoop.hbase.KeyValue;
-
-/**
- * Track a statistic for the column on a given region
- */
-public interface StatisticsTracker {
-
- /**
- * Reset the statistic after the completion of the compaction
- */
- public void clear();
-
- /**
- * Update the current statistics with the next {@link KeyValue} to be written
- *
- * @param kv
- * next {@link KeyValue} to be written.
- */
- public void updateStatistic(KeyValue kv);
-
- /**
- * Return the max key of the family
- * @param fam
- * @return
- */
- public byte[] getMaxKey(String fam);
-
- /**
- * Return the min key of the family
- *
- * @param fam
- * @return
- */
- public byte[] getMinKey(String fam);
-
- /**
- * Return the guide posts of the family
- *
- * @param fam
- * @return
- */
- public byte[] getGuidePosts(String fam);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index ab37f00..0d69c8b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.query.KeyRange;
@@ -462,4 +463,8 @@ public class ScanUtil {
return offset + slotPosition;
}
+
+ public static boolean isAnalyzeTable(Scan scan) {
+ return scan.getAttribute((BaseScannerRegionObserver.ANALYZE_TABLE)) != null;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index a237e9e..e922609 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -52,6 +52,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
public static final long DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE = 1024L*1024L*4L; // 4 Mb
public static final long DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE = 1024L*1024L*2L; // 2 Mb
public static final long DEFAULT_HISTOGRAM_BYTE_DEPTH = 20;
+ public static final int DEFAULT_STATS_UPDATE_FREQ_MS = 2000;
public QueryServicesTestImpl(ReadOnlyProps defaultProps) {
this(defaultProps, ReadOnlyProps.EMPTY_PROPS);
@@ -60,6 +61,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
private static QueryServicesOptions getDefaultServicesOptions() {
return withDefaults()
.setHistogramByteDepth(DEFAULT_HISTOGRAM_BYTE_DEPTH)
+ .setStatsUpdateFrequencyMs(DEFAULT_STATS_UPDATE_FREQ_MS)
.setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE)
.setQueueSize(DEFAULT_QUEUE_SIZE)
.setMaxMemoryPerc(DEFAULT_MAX_MEMORY_PERC)
[2/2] git commit: Phoenix-1264 Add StatisticsCollector to existing
tables on first connection to cluster
Posted by ra...@apache.org.
Phoenix-1264 Add StatisticsCollector to existing tables on first
connection to cluster
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1d22fdc9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1d22fdc9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1d22fdc9
Branch: refs/heads/3.0
Commit: 1d22fdc98c7aa92622a3fd1511f7cd5ee9fb864b
Parents: aaadb36
Author: Ramkrishna <ra...@intel.com>
Authored: Wed Oct 1 09:20:23 2014 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Wed Oct 1 09:20:23 2014 +0530
----------------------------------------------------------------------
.../end2end/BaseTenantSpecificTablesIT.java | 22 +-
...efaultParallelIteratorsRegionSplitterIT.java | 15 +
.../phoenix/end2end/GuidePostsLifeCycleIT.java | 22 +-
.../org/apache/phoenix/end2end/KeyOnlyIT.java | 15 +
.../phoenix/end2end/MultiCfQueryExecIT.java | 14 +
.../phoenix/end2end/StatsCollectorIT.java | 44 +--
.../end2end/TenantSpecificTablesDMLIT.java | 46 +--
.../coprocessor/BaseScannerRegionObserver.java | 11 +-
.../coprocessor/MetaDataEndpointImpl.java | 2 +
.../UngroupedAggregateRegionObserver.java | 83 +++++-
.../DefaultParallelIteratorRegionSplitter.java | 35 +--
.../phoenix/query/ConnectionQueryServices.java | 3 -
.../query/ConnectionQueryServicesImpl.java | 43 ---
.../query/ConnectionlessQueryServicesImpl.java | 6 -
.../query/DelegateConnectionQueryServices.java | 5 -
.../apache/phoenix/schema/MetaDataClient.java | 46 +--
.../schema/stat/StatisticsCollector.java | 277 ++++++-------------
.../phoenix/schema/stat/StatisticsScanner.java | 4 +-
.../phoenix/schema/stat/StatisticsTable.java | 43 +--
.../phoenix/schema/stat/StatisticsTracker.java | 62 -----
.../java/org/apache/phoenix/util/ScanUtil.java | 5 +
.../phoenix/query/QueryServicesTestImpl.java | 2 +
22 files changed, 366 insertions(+), 439 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java
index 85d65e2..bcae7ed 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java
@@ -20,10 +20,16 @@ package org.apache.phoenix.end2end;
import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import java.sql.SQLException;
+import java.util.Map;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
+import com.google.common.collect.Maps;
+
/**
* Describe your class here.
*
@@ -35,9 +41,9 @@ import org.junit.experimental.categories.Category;
public abstract class BaseTenantSpecificTablesIT extends BaseClientManagedTimeIT {
protected static final String TENANT_ID = "ZZTop";
protected static final String TENANT_TYPE_ID = "abc";
- protected static final String PHOENIX_JDBC_TENANT_SPECIFIC_URL = getUrl() + ';' + TENANT_ID_ATTRIB + '=' + TENANT_ID;
+ protected static String PHOENIX_JDBC_TENANT_SPECIFIC_URL;
protected static final String TENANT_ID2 = "Styx";
- protected static final String PHOENIX_JDBC_TENANT_SPECIFIC_URL2 = getUrl() + ';' + TENANT_ID_ATTRIB + '=' + TENANT_ID2;
+ protected static String PHOENIX_JDBC_TENANT_SPECIFIC_URL2;
protected static final String PARENT_TABLE_NAME = "PARENT_TABLE";
protected static final String PARENT_TABLE_DDL = "CREATE TABLE " + PARENT_TABLE_NAME + " ( \n" +
@@ -64,6 +70,7 @@ public abstract class BaseTenantSpecificTablesIT extends BaseClientManagedTimeIT
" tenant_col VARCHAR) AS SELECT *\n" +
" FROM " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID;
+
@Before
public void createTables() throws SQLException {
createTestTable(getUrl(), PARENT_TABLE_DDL, null, nextTimestamp());
@@ -71,4 +78,15 @@ public abstract class BaseTenantSpecificTablesIT extends BaseClientManagedTimeIT
createTestTable(PHOENIX_JDBC_TENANT_SPECIFIC_URL, TENANT_TABLE_DDL, null, nextTimestamp());
createTestTable(PHOENIX_JDBC_TENANT_SPECIFIC_URL, TENANT_TABLE_DDL_NO_TENANT_TYPE_ID, null, nextTimestamp());
}
+
+ @BeforeClass
+ @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+ public static void doSetup() throws Exception {
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ // Must update config before starting server
+ props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ PHOENIX_JDBC_TENANT_SPECIFIC_URL = getUrl() + ';' + TENANT_ID_ATTRIB + '=' + TENANT_ID;
+ PHOENIX_JDBC_TENANT_SPECIFIC_URL2 = getUrl() + ';' + TENANT_ID_ATTRIB + '=' + TENANT_ID2;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java
index dd1dc8b..a6ec835 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java
@@ -27,6 +27,7 @@ import java.sql.SQLException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.hbase.HConstants;
@@ -40,13 +41,18 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import com.google.common.collect.Maps;
+
/**
* Tests for {@link DefaultParallelIteratorRegionSplitter}.
@@ -58,6 +64,14 @@ import org.junit.experimental.categories.Category;
@Category(ClientManagedTimeTest.class)
public class DefaultParallelIteratorsRegionSplitterIT extends BaseParallelIteratorsRegionSplitterIT {
+ @BeforeClass
+ @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+ public static void doSetup() throws Exception {
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ // Must update config before starting server
+ props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
private static List<KeyRange> getSplits(Connection conn, long ts, final Scan scan)
throws SQLException {
TableRef tableRef = getTableRef(conn, ts);
@@ -93,6 +107,7 @@ public class DefaultParallelIteratorsRegionSplitterIT extends BaseParallelIterat
Scan scan = new Scan();
// number of regions > target query concurrency
+ conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery();
scan.setStartRow(K1);
scan.setStopRow(K12);
List<KeyRange> keyRanges = getSplits(conn, ts, scan);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java
index 7645040..3cef492 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java
@@ -28,6 +28,7 @@ import java.sql.SQLException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -40,16 +41,32 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import com.google.common.collect.Maps;
+
@Category(HBaseManagedTimeTest.class)
public class GuidePostsLifeCycleIT extends BaseHBaseManagedTimeIT {
-
+
+ @BeforeClass
+ @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+ public static void doSetup() throws Exception {
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ // Must update config before starting server
+ props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
+ props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(20));
+ props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(20));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
protected static final byte[] KMIN = new byte[] {'!'};
protected static final byte[] KMIN2 = new byte[] {'.'};
protected static final byte[] K1 = new byte[] {'a'};
@@ -106,16 +123,19 @@ public class GuidePostsLifeCycleIT extends BaseHBaseManagedTimeIT {
upsert(new byte[][] { KMIN, K4, K11 });
stmt = conn.prepareStatement("ANALYZE STABLE");
stmt.execute();
+ conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery();
keyRanges = getSplits(conn, scan);
assertEquals(7, keyRanges.size());
upsert(new byte[][] { KMIN2, K5, K12 });
stmt = conn.prepareStatement("ANALYZE STABLE");
stmt.execute();
+ conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery();
keyRanges = getSplits(conn, scan);
assertEquals(10, keyRanges.size());
upsert(new byte[][] { K1, K6, KP });
stmt = conn.prepareStatement("ANALYZE STABLE");
stmt.execute();
+ conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery();
keyRanges = getSplits(conn, scan);
assertEquals(13, keyRanges.size());
conn.close();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
index 4b0d07f..f713fff 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
@@ -32,6 +32,7 @@ import java.sql.SQLException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -44,15 +45,29 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import com.google.common.collect.Maps;
+
@Category(ClientManagedTimeTest.class)
public class KeyOnlyIT extends BaseClientManagedTimeIT {
+
+ @BeforeClass
+ @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+ public static void doSetup() throws Exception {
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ // Must update config before starting server
+ props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
@Test
public void testKeyOnly() throws Exception {
long ts = nextTimestamp();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java
index ebf03d0..f01d985 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java
@@ -32,6 +32,7 @@ import java.sql.SQLException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -44,17 +45,30 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import com.google.common.collect.Maps;
+
@Category(ClientManagedTimeTest.class)
public class MultiCfQueryExecIT extends BaseClientManagedTimeIT {
private static final String MULTI_CF = "MULTI_CF";
+ @BeforeClass
+ @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+ public static void doSetup() throws Exception {
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ // Must update config before starting server
+ props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
protected static void initTableValues(long ts) throws Exception {
ensureTableCreated(getUrl(),MULTI_CF,null, ts-2);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
index 3833e56..5fda67b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
@@ -1,10 +1,5 @@
package org.apache.phoenix.end2end;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
-import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
-import static org.apache.phoenix.util.TestUtil.LOCALHOST;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertTrue;
@@ -18,9 +13,6 @@ import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -32,27 +24,19 @@ import com.google.common.collect.Maps;
@Category(HBaseManagedTimeTest.class)
public class StatsCollectorIT extends BaseHBaseManagedTimeIT {
- private static String url;
- private static HBaseTestingUtility util;
- private static int frequency = 4000;
-
+ //private static String url;
+ private static int frequency = 5000;
+
@BeforeClass
+ @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
public static void doSetup() throws Exception {
- Configuration conf = HBaseConfiguration.create();
- setUpConfigForMiniCluster(conf);
- conf.setInt("hbase.client.retries.number", 2);
- conf.setInt("hbase.client.pause", 5000);
- conf.setLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, 0);
- util = new HBaseTestingUtility(conf);
- util.startMiniCluster();
- String clientPort = util.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
- url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort
- + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
- int histogramDepth = 60;
- Map<String, String> props = Maps.newHashMapWithExpectedSize(3);
- props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Integer.toString(histogramDepth));
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ // Must update config before starting server
+ props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Integer.toString(frequency));
- driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
+ props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(20));
+ props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(20));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
@Test
@@ -62,7 +46,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT {
ResultSet rs;
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
// props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
- conn = DriverManager.getConnection(url, props);
+ conn = DriverManager.getConnection(getUrl(), props);
conn.createStatement().execute(
"CREATE TABLE t ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n"
+ " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC)) \n");
@@ -98,7 +82,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT {
ResultSet rs;
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
// props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
- conn = DriverManager.getConnection(url, props);
+ conn = DriverManager.getConnection(getUrl(), props);
conn.createStatement().execute(
"CREATE TABLE x ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n"
+ " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC)) \n");
@@ -147,7 +131,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT {
Connection conn;
PreparedStatement stmt;
// props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30));
- conn = DriverManager.getConnection(url, props);
+ conn = DriverManager.getConnection(getUrl(), props);
stmt = upsertStmt(conn, tableName);
stmt.setString(1, "a");
String[] s = new String[] { "abc", "def", "ghi", "jkll", null, null, "xxx" };
@@ -218,7 +202,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT {
}
private void flush(String tableName) throws IOException, InterruptedException {
- util.getHBaseAdmin().flush(tableName.toUpperCase());
+ //utility.getHBaseAdmin().flush(tableName.toUpperCase());
}
private PreparedStatement upsertStmt(Connection conn, String tableName) throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java
index dba4264..593a0e4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java
@@ -38,7 +38,7 @@ import org.junit.experimental.categories.Category;
@Category(ClientManagedTimeTest.class)
public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
-
+
@Test
public void testBasicUpsertSelect() throws Exception {
Connection conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
@@ -48,9 +48,8 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " (id, tenant_col) values (2, 'Viva Las Vegas')");
conn.commit();
conn.close();
- analyzeTable(conn, TENANT_TABLE_NAME);
-
conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
+ analyzeTable(conn, TENANT_TABLE_NAME);
ResultSet rs = conn.createStatement().executeQuery("select tenant_col from " + TENANT_TABLE_NAME + " where id = 1");
assertTrue("Expected 1 row in result set", rs.next());
assertEquals("Cheap Sunglasses", rs.getString(1));
@@ -71,23 +70,24 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn1.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " values ('me','" + TENANT_TYPE_ID + "',1,'Cheap Sunglasses')");
conn1.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " values ('you','" + TENANT_TYPE_ID +"',2,'Viva Las Vegas')");
conn1.commit();
-
+ conn1 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
analyzeTable(conn1, TENANT_TABLE_NAME);
conn2.setAutoCommit(true);
conn2.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " values ('them','" + TENANT_TYPE_ID + "',1,'Long Hair')");
conn2.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " values ('us','" + TENANT_TYPE_ID + "',2,'Black Hat')");
- analyzeTable(conn2, TENANT_TABLE_NAME);
- conn2.close();
+ conn2.close();
conn1.close();
-
conn1 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
ResultSet rs = conn1.createStatement().executeQuery("select * from " + TENANT_TABLE_NAME + " where id = 1");
assertTrue("Expected 1 row in result set", rs.next());
assertEquals(1, rs.getInt(3));
assertEquals("Cheap Sunglasses", rs.getString(4));
assertFalse("Expected 1 row in result set", rs.next());
-
+ conn1 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
+ analyzeTable(conn1, TENANT_TABLE_NAME);
+ conn1.close();
conn2 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL2);
+ analyzeTable(conn2, TENANT_TABLE_NAME);
rs = conn2.createStatement().executeQuery("select * from " + TENANT_TABLE_NAME + " where id = 2");
assertTrue("Expected 1 row in result set", rs.next());
assertEquals(2, rs.getInt(3));
@@ -98,7 +98,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn2 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL2);
conn2.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " select * from " + TENANT_TABLE_NAME );
conn2.commit();
- analyzeTable(conn2, TENANT_TABLE_NAME);
conn2.close();
conn2 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL2);
@@ -115,7 +114,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn2 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL2);
conn2.setAutoCommit(true);;
conn2.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " select 'all', tenant_type_id, id, 'Big ' || tenant_col from " + TENANT_TABLE_NAME );
- analyzeTable(conn2, TENANT_TABLE_NAME);
conn2.close();
conn2 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL2);
@@ -132,7 +130,14 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
assertEquals("Big Black Hat", rs.getString(4));
assertFalse("Expected 2 rows total", rs.next());
conn2.close();
-
+ conn1 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
+ rs = conn1.createStatement().executeQuery("select * from " + TENANT_TABLE_NAME );
+ assertTrue("Expected row row in result set", rs.next());
+ assertEquals(1, rs.getInt(3));
+ assertEquals("Cheap Sunglasses", rs.getString(4));
+ assertTrue("Expected 1 row in result set", rs.next());
+ assertEquals(2, rs.getInt(3));
+ assertEquals("Viva Las Vegas", rs.getString(4));
}
finally {
conn1.close();
@@ -163,10 +168,10 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " (id, tenant_col) values (1, 'Cheap Sunglasses')");
conn.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " (id, tenant_col) values (2, 'Viva Las Vegas')");
conn.commit();
- analyzeTable(conn, TENANT_TABLE_NAME);
conn.close();
conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
+ analyzeTable(conn, TENANT_TABLE_NAME);
ResultSet rs = conn.createStatement().executeQuery("select tenant_col from " + TENANT_TABLE_NAME + " join foo on k=id");
assertTrue("Expected 1 row in result set", rs.next());
assertEquals("Cheap Sunglasses", rs.getString(1));
@@ -190,7 +195,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('AC/DC', 'abc', 1, 'Bon Scott')");
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', '" + TENANT_TYPE_ID + "', 1, 'Billy Gibbons')");
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', 'def', 1, 'Billy Gibbons')");
- analyzeTable(conn, PARENT_TABLE_NAME);
conn.close();
conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
@@ -200,6 +204,7 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
assertFalse("Expected 1 row in result set", rs.next());
rs = conn.createStatement().executeQuery("select count(*) from " + TENANT_TABLE_NAME);
+ analyzeTable(conn, PARENT_TABLE_NAME);
assertTrue("Expected 1 row in result set", rs.next());
assertEquals(1, rs.getInt(1));
assertFalse("Expected 1 row in result set", rs.next());
@@ -222,7 +227,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('AC/DC', 'abc', 1, 'Bon Scott')");
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', '" + TENANT_TYPE_ID + "', 1, 'Billy Gibbons')");
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', 'def', 1, 'Billy Gibbons')");
- analyzeTable(conn, PARENT_TABLE_NAME);
conn.close();
conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
@@ -238,6 +242,7 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn.close();
conn = nextConnection(getUrl());
+ analyzeTable(conn, PARENT_TABLE_NAME);
rs = conn.createStatement().executeQuery("select count(*) from " + PARENT_TABLE_NAME);
rs.next();
assertEquals(2, rs.getInt(1));
@@ -260,7 +265,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID + " (tenant_id, id, user) values ('AC/DC', 1, 'Bon Scott')");
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID + " (tenant_id, id, user) values ('" + TENANT_ID + "', 1, 'Billy Gibbons')");
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID + " (tenant_id, id, user) values ('" + TENANT_ID + "', 2, 'Billy Gibbons')");
- analyzeTable(conn, PARENT_TABLE_NAME_NO_TENANT_TYPE_ID);
conn.close();
conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
@@ -268,7 +272,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
int count = conn.createStatement().executeUpdate("delete from " + TENANT_TABLE_NAME_NO_TENANT_TYPE_ID);
assertEquals("Expected 2 rows have been deleted", 2, count);
conn.close();
-
conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
ResultSet rs = conn.createStatement().executeQuery("select * from " + TENANT_TABLE_NAME_NO_TENANT_TYPE_ID);
assertFalse("Expected no rows in result set", rs.next());
@@ -297,10 +300,10 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('AC/DC', 'abc', 1, 'Bon Scott')");
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', '" + TENANT_TYPE_ID + "', 1, 'Billy Gibbons')");
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', 'def', 1, 'Billy Gibbons')");
- analyzeTable(conn, PARENT_TABLE_NAME);
conn.close();
conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
+ analyzeTable(conn, PARENT_TABLE_NAME);
conn.createStatement().execute("delete from " + TENANT_TABLE_NAME);
conn.commit();
conn.close();
@@ -328,7 +331,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID + " (tenant_id, id, user) values ('AC/DC', 1, 'Bon Scott')");
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID + " (tenant_id, id, user) values ('" + TENANT_ID + "', 1, 'Billy Gibbons')");
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID + " (tenant_id, id, user) values ('" + TENANT_ID + "', 2, 'Billy Gibbons')");
- analyzeTable(conn, PARENT_TABLE_NAME_NO_TENANT_TYPE_ID);
conn.close();
conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
@@ -336,6 +338,7 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn.close();
conn = nextConnection(getUrl());
+ analyzeTable(conn, PARENT_TABLE_NAME_NO_TENANT_TYPE_ID);
ResultSet rs = conn.createStatement().executeQuery("select count(*) from " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID);
rs.next();
assertEquals(3, rs.getInt(1));
@@ -358,11 +361,11 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('AC/DC', 'aaa', 1, 'Bon Scott')");
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', '" + TENANT_TYPE_ID + "', 1, 'Billy Gibbons')");
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', 'def', 2, 'Billy Gibbons')");
- analyzeTable(conn, PARENT_TABLE_NAME);
conn.close();
conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
conn.setAutoCommit(true);
+ analyzeTable(conn, TENANT_TABLE_NAME);
int count = conn.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + "(id, user) select id+100, user from " + TENANT_TABLE_NAME);
assertEquals("Expected 1 row to have been inserted", 1, count);
conn.close();
@@ -393,11 +396,11 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('AC/DC', 'aaa', 1, 'Bon Scott')");
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', '" + TENANT_TYPE_ID + "', 1, 'Billy Gibbons')");
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', 'def', 2, 'Billy Gibbons')");
- analyzeTable(conn, PARENT_TABLE_NAME);
conn.close();
conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
conn.setAutoCommit(true);
+ analyzeTable(conn, TENANT_TABLE_NAME);
int count = conn.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + "(id, user) select id+100, user from ANOTHER_TENANT_TABLE where id=2");
assertEquals("Expected 1 row to have been inserted", 1, count);
conn.close();
@@ -442,10 +445,9 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
conn.setAutoCommit(true);
conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_type_id, id, user) values ('" + TENANT_TYPE_ID + "', 1, 'Billy Gibbons')");
- analyzeTable(conn, PARENT_TABLE_NAME);
conn.close();
-
conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
+ analyzeTable(conn, PARENT_TABLE_NAME);
rs = conn.createStatement().executeQuery("select user from " + PARENT_TABLE_NAME);
assertTrue(rs.next());
assertEquals(rs.getString(1),"Billy Gibbons");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 2a23f0e..bb134f1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -48,6 +48,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String EMPTY_CF = "_EmptyCF";
public static final String SPECIFIC_ARRAY_INDEX = "_SpecificArrayIndex";
public static final String GROUP_BY_LIMIT = "_GroupByLimit";
+ public static final String ANALYZE_TABLE = "_AnalyzeTable";
/**
* Used by logger to identify coprocessor
@@ -73,6 +74,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
abstract protected boolean isRegionObserverFor(Scan scan);
abstract protected RegionScanner doPostScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable;
+ @Override
+ public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Scan scan, final RegionScanner s) throws IOException {
+ if (isRegionObserverFor(scan)) {
+ throwIfScanOutOfRegion(scan, c.getEnvironment().getRegion());
+ }
+ return s;
+ }
/**
* Wrapper for {@link #postScannerOpen(ObserverContext, Scan, RegionScanner)} that ensures no non IOException is thrown,
* to prevent the coprocessor from becoming blacklisted.
@@ -84,8 +93,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
if (!isRegionObserverFor(scan)) {
return s;
}
- HRegion region = c.getEnvironment().getRegion();
- throwIfScanOutOfRegion(scan, region);
return doPostScannerOpen(c, scan, s);
} catch (Throwable t) {
ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index c05ef30..a1d943c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -158,7 +158,9 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
private static final KeyValue VIEW_TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_TYPE_BYTES);
private static final KeyValue VIEW_INDEX_ID_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_INDEX_ID_BYTES);
private static final KeyValue INDEX_DISABLE_TIMESTAMP_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
+ private static final KeyValue EMPTY_KEYVALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES);
private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList(
+ EMPTY_KEYVALUE_KV,
TABLE_TYPE_KV,
TABLE_SEQ_NUM_KV,
COLUMN_COUNT_KV,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 99af478..bb70932 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -34,7 +34,10 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
@@ -45,8 +48,12 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
@@ -59,6 +66,7 @@ import org.apache.phoenix.expression.aggregator.ServerAggregators;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.join.TupleProjector;
import org.apache.phoenix.query.QueryConstants;
@@ -70,6 +78,8 @@ import org.apache.phoenix.schema.PRow;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.stat.StatisticsCollector;
+import org.apache.phoenix.schema.stat.StatisticsTable;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.KeyValueUtil;
@@ -91,6 +101,8 @@ import com.google.common.collect.Sets;
public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver {
private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
private KeyValueBuilder kvBuilder;
+ private static final Log LOG = LogFactory.getLog(UngroupedAggregateRegionObserver.class);
+ private StatisticsTable statsTable = null;
@Override
public void start(CoprocessorEnvironment e) throws IOException {
@@ -98,6 +110,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
// Can't use ClientKeyValueBuilder on server-side because the memstore expects to
// be able to get a single backing buffer for a KeyValue.
this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
+ String name = ((RegionCoprocessorEnvironment)e).getRegion().getTableDesc().getNameAsString();
+ String schemaName = SchemaUtil.getSchemaNameFromFullName(name);
+ if (!QueryConstants.SYSTEM_SCHEMA_NAME.equals(schemaName)) {
+ this.statsTable = StatisticsTable.getStatisticsTableForCoprocessor(e.getConfiguration(), name);
+ }
}
private static void commitBatch(HRegion region, List<Pair<Mutation,Integer>> mutations, byte[] indexUUID) throws IOException {
@@ -115,11 +132,31 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
public static void serializeIntoScan(Scan scan) {
scan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, QueryConstants.TRUE);
}
+
+ @Override
+ public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
+ throws IOException {
+ s = super.preScannerOpen(e, scan, s);
+ if(ScanUtil.isAnalyzeTable(scan)) {
+ scan.setStartRow(HConstants.EMPTY_START_ROW);
+ scan.setStopRow(HConstants.EMPTY_END_ROW);
+ }
+ return s;
+ }
@Override
protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException {
final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+ boolean isAnalyze = false;
+ HRegion region = c.getEnvironment().getRegion();
+ String table = c.getEnvironment().getRegion().getRegionInfo().getTableNameAsString();;
+ StatisticsCollector stats = null;
+ if (ScanUtil.isAnalyzeTable(scan) && statsTable != null) {
+ stats = new StatisticsCollector(statsTable, c.getEnvironment().getConfiguration());
+ isAnalyze = true;
+ }
+
RegionScanner theScanner = s;
if (p != null || j != null) {
theScanner = new HashJoinRegionScanner(s, p, j, ScanUtil.getTenantId(scan), c.getEnvironment());
@@ -155,7 +192,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
int batchSize = 0;
long ts = scan.getTimeRange().getMax();
- HRegion region = c.getEnvironment().getRegion();
List<Pair<Mutation,Integer>> mutations = Collections.emptyList();
if (isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null) {
// TODO: size better
@@ -169,7 +205,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
boolean hasAny = false;
MultiKeyValueTuple result = new MultiKeyValueTuple();
if (logger.isInfoEnabled()) {
- logger.info("Starting ungrouped coprocessor scan " + scan);
+ logger.info("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo());
}
long rowCount = 0;
MultiVersionConsistencyControl.setThreadReadPoint(innerScanner.getMvccReadPoint());
@@ -181,6 +217,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
// since this is an indication of whether or not there are more values after the
// ones returned
hasMore = innerScanner.nextRaw(results, null);
+ if (isAnalyze && stats != null) {
+ stats.collectStatistics(results);
+ }
if (!results.isEmpty()) {
rowCount++;
result.setKeyValues(results);
@@ -275,6 +314,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
} while (hasMore);
} finally {
try {
+ if (isAnalyze && stats != null) {
+ stats.updateStatistic(region);
+ stats.clear();
+ }
innerScanner.close();
} finally {
region.closeRegionOperation();
@@ -384,6 +427,42 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
}
+
+ @Override
+ public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+ Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
+ long earliestPutTs, InternalScanner s) throws IOException {
+ InternalScanner internalScan = s;
+ String table = c.getEnvironment().getRegion().getRegionInfo().getTableNameAsString();
+ if (!table.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)
+ && scanType.equals(ScanType.MAJOR_COMPACT)) {
+ StatisticsCollector stats = new StatisticsCollector(statsTable, c.getEnvironment().getConfiguration());
+ internalScan =
+ stats.createCompactionScanner(c.getEnvironment().getRegion(), store, scanners, scanType, earliestPutTs, s);
+ }
+ return internalScan;
+ }
+
+
+ @Override
+ public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, HRegion l, HRegion r)
+ throws IOException {
+ HRegion region = e.getEnvironment().getRegion();
+ String table = region.getRegionInfo().getTableNameAsString();
+ if (!table.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME) && statsTable != null) {
+ try {
+ StatisticsCollector stats = new StatisticsCollector(statsTable, e.getEnvironment()
+ .getConfiguration());
+ stats.collectStatsDuringSplit(e.getEnvironment().getConfiguration(), l, r, region);
+ stats.clear();
+ } catch (IOException ioe) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Error while collecting stats during split ",ioe);
+ }
+ }
+ }
+
+ }
public static byte[] serialize(List<Expression> selectExpressions) {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
index a54b5b4..bff2a8b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
@@ -33,6 +33,7 @@ import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,9 +72,8 @@ public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRe
// Get the mapping between key range and the regions that contains them.
protected List<HRegionLocation> getAllRegions() throws SQLException {
Scan scan = context.getScan();
- PTable table = tableRef.getTable();
List<HRegionLocation> allTableRegions = context.getConnection().getQueryServices()
- .getAllTableRegions(table.getPhysicalName().getBytes());
+ .getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes());
// If we're not salting, then we've already intersected the minMaxRange with the scan range
// so there's nothing to do here.
return filterRegions(allTableRegions, scan.getStartRow(), scan.getStopRow());
@@ -107,27 +107,28 @@ public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRe
protected List<KeyRange> genKeyRanges(List<HRegionLocation> regions) {
if (regions.isEmpty()) { return Collections.emptyList(); }
+ PTable table = tableRef.getTable();
Scan scan = context.getScan();
- PTable table = this.tableRef.getTable();
byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table);
List<byte[]> gps = Lists.newArrayList();
-
- if (table.getColumnFamilies().isEmpty()) {
- // For sure we can get the defaultCF from the table
- gps = table.getGuidePosts();
- } else {
- try {
- if (scan.getFamilyMap().size() > 0) {
- if (scan.getFamilyMap().containsKey(defaultCF)) { // Favor using default CF if it's used in scan
+ if (!ScanUtil.isAnalyzeTable(scan)) {
+ if (table.getColumnFamilies().isEmpty()) {
+ // For sure we can get the defaultCF from the table
+ gps = table.getGuidePosts();
+ } else {
+ try {
+ if (scan.getFamilyMap().size() > 0) {
+ if (scan.getFamilyMap().containsKey(defaultCF)) { // Favor using default CF if it's used in scan
+ gps = table.getColumnFamily(defaultCF).getGuidePosts();
+ } else { // Otherwise, just use first CF in use by scan
+ gps = table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts();
+ }
+ } else {
gps = table.getColumnFamily(defaultCF).getGuidePosts();
- } else { // Otherwise, just use first CF in use by scan
- gps = table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts();
}
- } else {
- gps = table.getColumnFamily(defaultCF).getGuidePosts();
+ } catch (ColumnFamilyNotFoundException cfne) {
+ // Alter table does this
}
- } catch (ColumnFamilyNotFoundException cfne) {
- // Alter table does this
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index a05acde..26e4809 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -94,9 +94,6 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
void addConnection(PhoenixConnection connection) throws SQLException;
void removeConnection(PhoenixConnection connection) throws SQLException;
- long updateStatistics(KeyRange keyRange, byte[] tableName)
- throws SQLException;
-
/**
* @return the {@link KeyValueBuilder} that is valid for the locally installed version of HBase.
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 926407a..3a59d6f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -102,9 +102,6 @@ import org.apache.phoenix.schema.Sequence;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.TableAlreadyExistsException;
import org.apache.phoenix.schema.TableNotFoundException;
-import org.apache.phoenix.schema.stat.StatisticsCollector;
-import org.apache.phoenix.schema.stat.StatisticsCollectorProtocol;
-import org.apache.phoenix.schema.stat.StatisticsCollectorResponse;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.MetaDataUtil;
@@ -121,7 +118,6 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.protobuf.ServiceException;
public class ConnectionQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices {
private static final Logger logger = LoggerFactory.getLogger(ConnectionQueryServicesImpl.class);
@@ -522,9 +518,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, 1, null);
}
- if (!descriptor.hasCoprocessor(StatisticsCollector.class.getName())) {
- descriptor.addCoprocessor(StatisticsCollector.class.getName(), null, 1, null);
- }
// TODO: better encapsulation for this
// Since indexes can't have indexes, don't install our indexing coprocessor for indexes. Also,
@@ -1805,42 +1798,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
@Override
- public long updateStatistics(final KeyRange keyRange, final byte[] tableName) throws SQLException {
- HTableInterface ht = null;
- try {
- ht = this.getTable(tableName);
- long noOfRowsScanned = 0;
- Batch.Call<StatisticsCollectorProtocol, StatisticsCollectorResponse> callable =
- new Batch.Call<StatisticsCollectorProtocol, StatisticsCollectorResponse>() {
- @Override
- public StatisticsCollectorResponse call(StatisticsCollectorProtocol instance) throws IOException {
- return instance.collectStat(keyRange);
- }
- };
- Map<byte[], StatisticsCollectorResponse> result = ht.coprocessorExec(StatisticsCollectorProtocol.class,
- keyRange.getLowerRange(), keyRange.getUpperRange(), callable);
- for (StatisticsCollectorResponse response : result.values()) {
- noOfRowsScanned += response.getRowsScanned();
- }
- return noOfRowsScanned;
- } catch (ServiceException e) {
- throw new SQLException("Unable to update the statistics for the table " + tableName, e);
- } catch (TableNotFoundException e) {
- throw new SQLException("Unable to update the statistics for the table " + tableName, e);
- } catch (Throwable e) {
- throw new SQLException("Unable to update the statistics for the table " + tableName, e);
- } finally {
- if (ht != null) {
- try {
- ht.close();
- } catch (IOException e) {
- throw new SQLException("Unable to close the table " + tableName + " after collecting stats", e);
- }
- }
- }
- }
-
- @Override
public void clearCacheForTable(final byte[] tenantId, final byte[] schemaName, final byte[] tableName,
final long clientTS) throws SQLException {
// clear the meta data cache for the table here
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 4e155e3..70e656a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -181,12 +181,6 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
public MetaDataMutationResult dropColumn(List<Mutation> tableMetadata, PTableType tableType) throws SQLException {
return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, null);
}
-
- @Override
- public long updateStatistics(KeyRange keyRange, byte[] tableName) throws SQLException {
- // Noop
- return 0;
- }
@Override
public void clearCacheForTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index fa01f09..8bd2c61 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -226,11 +226,6 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
public String getUserName() {
return getDelegate().getUserName();
}
-
- @Override
- public long updateStatistics(KeyRange keyRange, byte[] tableName) throws SQLException {
- return getDelegate().updateStatistics(keyRange, tableName);
- }
@Override
public void clearCacheForTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 390e621..ccfbdd9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -85,9 +85,12 @@ import java.util.Set;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -96,6 +99,8 @@ import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.compile.PostDDLCompiler;
import org.apache.phoenix.compile.PostIndexDDLCompiler;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
@@ -104,6 +109,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.AddColumnStatement;
import org.apache.phoenix.parse.AlterIndexStatement;
import org.apache.phoenix.parse.ColumnDef;
@@ -120,7 +126,6 @@ import org.apache.phoenix.parse.ParseNodeFactory;
import org.apache.phoenix.parse.PrimaryKeyConstraint;
import org.apache.phoenix.parse.TableName;
import org.apache.phoenix.parse.UpdateStatisticsStatement;
-import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -130,7 +135,6 @@ import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -281,7 +285,7 @@ public class MetaDataClient {
return updateCache(schemaName, tableName, false);
}
- private MetaDataMutationResult updateCache(String schemaName, String tableName, boolean alwaysHitServer) throws SQLException { // TODO: pass byte[] here
+ private MetaDataMutationResult updateCache(String schemaName, String tableName, boolean alwaysHitServer) throws SQLException {
Long scn = connection.getSCN();
boolean systemTable = SYSTEM_CATALOG_SCHEMA.equals(schemaName);
// System tables must always have a null tenantId
@@ -467,27 +471,11 @@ public class MetaDataClient {
PTable table = resolver.getTables().get(0).getTable();
PName physicalName = table.getPhysicalName();
byte[] tenantIdBytes = ByteUtil.EMPTY_BYTE_ARRAY;
- KeyRange analyzeRange = KeyRange.EVERYTHING_RANGE;
- if (connection.getTenantId() != null && table.isMultiTenant()) {
- tenantIdBytes = connection.getTenantId().getBytes();
- // TODO remove this inner if once PHOENIX-1259 is fixed.
- if (table.getBucketNum() == null) {
- List<List<KeyRange>> tenantIdKeyRanges = Collections.singletonList(Collections.singletonList(KeyRange
- .getKeyRange(tenantIdBytes)));
- byte[] lowerRange = ScanUtil.getMinKey(table.getRowKeySchema(), tenantIdKeyRanges,
- ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
- byte[] upperRange = ScanUtil.getMaxKey(table.getRowKeySchema(), tenantIdKeyRanges,
- ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
- analyzeRange = KeyRange.getKeyRange(lowerRange, upperRange);
- }
- }
Long scn = connection.getSCN();
// Always invalidate the cache
long clientTS = connection.getSCN() == null ? HConstants.LATEST_TIMESTAMP : scn;
connection.getQueryServices().clearCacheForTable(tenantIdBytes, table.getSchemaName().getBytes(),
table.getTableName().getBytes(), clientTS);
- // Clear the cache also. So that for cases like major compaction also we would be able to use the stats
- updateCache(table.getSchemaName().getString(), table.getTableName().getString(), true);
String query = "SELECT CURRENT_DATE(),"+ LAST_STATS_UPDATE_TIME + " FROM " + SYSTEM_CATALOG_SCHEMA
+ "." + SYSTEM_STATS_TABLE + " WHERE " + PHYSICAL_NAME + "='" + physicalName.getString() + "' AND " + COLUMN_FAMILY
+ " IS NULL AND " + REGION_NAME + " IS NULL";
@@ -497,12 +485,26 @@ public class MetaDataClient {
lastUpdatedTime = rs.getDate(1).getTime() - rs.getDate(2).getTime();
}
if (minTimeForStatsUpdate > lastUpdatedTime) {
+ // Here create the select query.
+ String countQuery = "SELECT /*+ NO_CACHE */ count(*) FROM " + table.getName().getString();
+ PhoenixStatement statement = (PhoenixStatement) connection.createStatement();
+ QueryPlan plan = statement.compileQuery(countQuery);
+ Scan scan = plan.getContext().getScan();
+ // Add all CF in the table
+ scan.getFamilyMap().clear();
+ for (PColumnFamily family : table.getColumnFamilies()) {
+ scan.addFamily(family.getName().getBytes());
+ }
+ scan.setAttribute(BaseScannerRegionObserver.ANALYZE_TABLE, PDataType.TRUE_BYTES);
+ KeyValue kv = plan.iterator().next().getValue(0);
+ ImmutableBytesWritable tempPtr = plan.getContext().getTempPtr();
+ tempPtr.set(kv.getValue());
+ // A single Cell will be returned with the count(*) - we decode that here
+ long rowCount = PDataType.LONG.getCodec().decodeLong(tempPtr, SortOrder.getDefault());
// We need to update the stats table
- connection.getQueryServices().updateStatistics(analyzeRange, physicalName.getBytes());
connection.getQueryServices().clearCacheForTable(tenantIdBytes, table.getSchemaName().getBytes(),
table.getTableName().getBytes(), clientTS);
- updateCache(table.getSchemaName().getString(), table.getTableName().getString(), true);
- return new MutationState(1, connection);
+ return new MutationState(0, connection, rowCount);
} else {
return new MutationState(0, connection);
}