You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2014/10/01 05:51:51 UTC

[1/2] Phoenix-1264 Add StatisticsCollector to existing tables on first connection to cluster

Repository: phoenix
Updated Branches:
  refs/heads/3.0 aaadb360e -> 1d22fdc98


http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
index b63730c..f9347a4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
@@ -12,24 +12,14 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Coprocessor;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.ipc.ProtocolSignature;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@@ -40,7 +30,6 @@ import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PDataType;
@@ -48,6 +37,7 @@ import org.apache.phoenix.schema.PhoenixArray;
 import org.apache.phoenix.util.TimeKeeper;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 /**
  * An endpoint implementation that allows to collect the stats for a given region and groups the stat per family. This is also an
@@ -55,74 +45,46 @@ import com.google.common.collect.Lists;
  * Phoenix stats table with the max key, min key and guide posts for the given region. The stats can be consumed by the stats associated
  * with every PTable and the same can be used to parallelize the queries
  */
-public class StatisticsCollector extends BaseRegionObserver implements Coprocessor, StatisticsTracker,
-        StatisticsCollectorProtocol {
+public class StatisticsCollector {
 
     public static void addToTable(HTableDescriptor desc) throws IOException {
         desc.addCoprocessor(StatisticsCollector.class.getName());
     }
 
-    private Map<String, byte[]> minMap = new ConcurrentHashMap<String, byte[]>();
-    private Map<String, byte[]> maxMap = new ConcurrentHashMap<String, byte[]>();
+    private Map<String, byte[]> minMap = Maps.newHashMap();
+    private Map<String, byte[]> maxMap = Maps.newHashMap();
     private long guidepostDepth;
     private long byteCount = 0;
-    private Map<String, List<byte[]>> guidePostsMap = new ConcurrentHashMap<String, List<byte[]>>();
-    private Map<ImmutableBytesPtr, Boolean> familyMap = new ConcurrentHashMap<ImmutableBytesPtr, Boolean>();
-    private RegionCoprocessorEnvironment env;
-    protected StatisticsTable stats;
-    // Ensures that either analyze or compaction happens at any point of time.
-    private ReentrantLock lock = new ReentrantLock();
+    private Map<String, List<byte[]>> guidePostsMap = Maps.newHashMap();
+    private Map<ImmutableBytesPtr, Boolean> familyMap = Maps.newHashMap();
+    protected StatisticsTable statsTable;
     private static final Log LOG = LogFactory.getLog(StatisticsCollector.class);
 
-    @Override
-    public StatisticsCollectorResponse collectStat(KeyRange keyRange) throws IOException {
-        HRegion region = env.getRegion();
-        boolean heldLock = false;
-        int count = 0;
+    public StatisticsCollector(StatisticsTable statsTable, Configuration conf) throws IOException {
+        // Get the stats table associated with the current table on which the CP is
+        // triggered
+        this.statsTable = statsTable;
+        guidepostDepth = conf.getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB,
+                QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH);
+    }
+
+    public void updateStatistic(HRegion region) {
         try {
-            if (lock.tryLock()) {
-                heldLock = true;
-                // Clear all old stats
-                clear();
-                Scan scan = createScan(env.getConfiguration());
-                scan.setStartRow(keyRange.getLowerRange());
-                scan.setStopRow(keyRange.getUpperRange());
-                RegionScanner scanner = null;
-                try {
-                    scanner = region.getScanner(scan);
-                    count = scanRegion(scanner, count);
-                } catch (IOException e) {
-                    LOG.error(e);
-                    throw e;
-                } finally {
-                    if (scanner != null) {
-                        try {
-                            ArrayList<Mutation> mutations = new ArrayList<Mutation>();
-                            writeStatsToStatsTable(region, scanner, true, mutations, TimeKeeper.SYSTEM.getCurrentTime());
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("Committing new stats for the region " + region.getRegionInfo());
-                            }
-                            commitStats(mutations);
-                        } catch (IOException e) {
-                            LOG.error(e);
-                            throw e;
-                        }
-                    }
-                }
+            ArrayList<Mutation> mutations = new ArrayList<Mutation>();
+            writeStatsToStatsTable(region, true, mutations, TimeKeeper.SYSTEM.getCurrentTime());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Committing new stats for the region " + region.getRegionInfo());
             }
+            commitStats(mutations);
+        } catch (IOException e) {
+            LOG.error(e);
         } finally {
-            if (heldLock) {
-                lock.unlock();
-            }
+            clear();
         }
-        StatisticsCollectorResponse response = new StatisticsCollectorResponse();
-        response.setRowsScanned(count);
-        return response;
     }
 
-    private void writeStatsToStatsTable(final HRegion region, final RegionScanner scanner, boolean delete,
+    private void writeStatsToStatsTable(final HRegion region, boolean delete,
             List<Mutation> mutations, long currentTime) throws IOException {
-        scanner.close();
         try {
             // update the statistics table
             for (ImmutableBytesPtr fam : familyMap.keySet()) {
@@ -131,13 +93,13 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Deleting the stats for the region " + region.getRegionInfo());
                     }
-                    stats.deleteStats(tableName, region.getRegionInfo().getRegionNameAsString(), this,
+                    statsTable.deleteStats(tableName, region.getRegionInfo().getRegionNameAsString(), this,
                             Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime);
                 }
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Adding new stats for the region " + region.getRegionInfo());
                 }
-                stats.addStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this,
+                statsTable.addStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this,
                         Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime);
             }
         } catch (IOException e) {
@@ -147,7 +109,7 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
     }
 
     private void commitStats(List<Mutation> mutations) throws IOException {
-        stats.commitStats(mutations);
+        statsTable.commitStats(mutations);
     }
 
     private void deleteStatsFromStatsTable(final HRegion region, List<Mutation> mutations, long currentTime)
@@ -156,7 +118,7 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
             // update the statistics table
             for (ImmutableBytesPtr fam : familyMap.keySet()) {
                 String tableName = region.getRegionInfo().getTableNameAsString();
-                stats.deleteStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this,
+                statsTable.deleteStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this,
                         Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime);
             }
         } catch (IOException e) {
@@ -171,7 +133,7 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
         while (hasMore) {
             // Am getting duplicates here. Need to avoid that
             hasMore = scanner.next(results);
-            updateStat(results);
+            collectStatistics(results);
             count += results.size();
             results.clear();
             while (!hasMore) {
@@ -181,117 +143,43 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
         return count;
     }
 
-    /**
-     * Update the current statistics based on the lastest batch of key-values from the underlying scanner
-     * 
-     * @param results
-     *            next batch of {@link KeyValue}s
-     */
-    protected void updateStat(final List<KeyValue> results) {
+    public void collectStatistics(final List<KeyValue> results) {
         for (KeyValue kv : results) {
             updateStatistic(kv);
         }
     }
 
-    @Override
-    public void start(CoprocessorEnvironment env) throws IOException {
-        if (env instanceof RegionCoprocessorEnvironment) {
-            this.env = (RegionCoprocessorEnvironment)env;
-        } else {
-            throw new CoprocessorException("Must be loaded on a table region!");
-        }
-        String tableName = ((RegionCoprocessorEnvironment)env).getRegion().getRegionInfo().getTableNameAsString();
-        if (!tableName.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
-            HTableDescriptor desc = ((RegionCoprocessorEnvironment)env).getRegion().getTableDesc();
-            // Get the stats table associated with the current table on which the CP is
-            // triggered
-            stats = StatisticsTable.getStatisticsTableForCoprocessor(env, desc.getName());
-            guidepostDepth = env.getConfiguration().getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB,
-                    QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH);
-        }
-    }
-
-    @Override
-    public void stop(CoprocessorEnvironment env) throws IOException {
-        if (env instanceof RegionCoprocessorEnvironment) {
-            String tableName = ((RegionCoprocessorEnvironment)env).getRegion().getRegionInfo().getTableNameAsString();
-            // Close only if the table is system table
-            if (tableName.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
-                stats.close();
-            }
-        }
-    }
-
-    @Override
-    public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-            List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s)
+    public void collectStatsDuringSplit(Configuration conf, HRegion l, HRegion r, HRegion parent)
             throws IOException {
-        InternalScanner internalScan = s;
-        String tableNameAsString = c.getEnvironment().getRegion().getRegionInfo().getTableNameAsString();
-        if (!tableNameAsString.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
-            boolean heldLock = false;
-            try {
-                if (lock.tryLock()) {
-                    heldLock = true;
-                    // See if this is for Major compaction
-                    if (scanType.equals(ScanType.MAJOR_COMPACT)) {
-                        // this is the first CP accessed, so we need to just create a major
-                        // compaction scanner, just
-                        // like in the compactor
-                        if (s == null) {
-                            Scan scan = new Scan();
-                            scan.setMaxVersions(store.getFamily().getMaxVersions());
-                            long smallestReadPoint = store.getHRegion().getSmallestReadPoint();
-                            internalScan = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType,
-                                    smallestReadPoint, earliestPutTs);
-                        }
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Compaction scanner created for stats");
-                        }
-                        InternalScanner scanner = getInternalScanner(c, store, internalScan,
-                                store.getColumnFamilyName());
-                        if (scanner != null) {
-                            internalScan = scanner;
-                        }
-                    }
+        // Invoke collectStat here
+        String tableName = parent.getRegionInfo().getTableNameAsString();
+        try {
+            if (!tableName.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
+                if (familyMap != null) {
+                    familyMap.clear();
+                }
+                // Create a delete operation on the parent region
+                // Then write the new guide posts for individual regions
+                // TODO : Try making this atomic
+                List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3);
+                long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Collecting stats for the daughter region " + l.getRegionInfo());
                 }
-            } finally {
-                if (heldLock) {
-                    lock.unlock();
+                collectStatsForSplitRegions(conf, l, parent, true, mutations, currentTime);
+                clear();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Collecting stats for the daughter region " + r.getRegionInfo());
                 }
+                collectStatsForSplitRegions(conf, r, parent, false, mutations, currentTime);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo());
+                }
+                commitStats(mutations);
             }
-        }
-        return internalScan;
-    }
-
-    @Override
-    public void postSplit(ObserverContext<RegionCoprocessorEnvironment> ctx, HRegion l, HRegion r) throws IOException {
-        // Invoke collectStat here
-        HRegion region = ctx.getEnvironment().getRegion();
-        String tableName = region.getRegionInfo().getTableNameAsString();
-        if (!tableName.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
-            if (familyMap != null) {
-                familyMap.clear();
-            }
-            // Create a delete operation on the parent region
-            // Then write the new guide posts for individual regions
-            // TODO : Try making this atomic
-            List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3);
-            long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
-            Configuration conf = ctx.getEnvironment().getConfiguration();
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Collecting stats for the daughter region " + l.getRegionInfo());
-            }
-            collectStatsForSplitRegions(conf, l, region, true, mutations, currentTime);
-            clear();
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Collecting stats for the daughter region " + r.getRegionInfo());
-            }
-            collectStatsForSplitRegions(conf, r, region, false, mutations, currentTime);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo());
-            }
-            commitStats(mutations);
+        } catch (IOException e) {
+            LOG.error("Error while capturing stats after split of region "
+                    + parent.getRegionInfo().getRegionNameAsString(), e);
         }
     }
 
@@ -315,7 +203,7 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
                         }
                         deleteStatsFromStatsTable(parent, mutations, currentTime);
                     }
-                    writeStatsToStatsTable(daughter, scanner, false, mutations, currentTime);
+                    writeStatsToStatsTable(daughter, false, mutations, currentTime);
                 } catch (IOException e) {
                     LOG.error(e);
                     throw e;
@@ -323,6 +211,33 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
             }
         }
     }
+    
+    public InternalScanner createCompactionScanner(HRegion region, Store store,
+            List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s)
+            throws IOException {
+        // See if this is for Major compaction
+        InternalScanner internalScan = s;
+        if (scanType.equals(ScanType.MAJOR_COMPACT)) {
+            // this is the first CP accessed, so we need to just create a major
+            // compaction scanner, just
+            // like in the compactor
+            if (s == null) {
+                Scan scan = new Scan();
+                scan.setMaxVersions(store.getFamily().getMaxVersions());
+                long smallestReadPoint = store.getHRegion().getSmallestReadPoint();
+                internalScan = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType,
+                        smallestReadPoint, earliestPutTs);
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Compaction scanner created for stats");
+            }
+            InternalScanner scanner = getInternalScanner(region, store, internalScan, store.getColumnFamilyName());
+            if (scanner != null) {
+                internalScan = scanner;
+            }
+        }
+        return internalScan;
+    }
 
     private Scan createScan(Configuration conf) {
         Scan scan = new Scan();
@@ -332,13 +247,12 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
         return scan;
     }
 
-    protected InternalScanner getInternalScanner(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+    protected InternalScanner getInternalScanner(HRegion region, Store store,
             InternalScanner internalScan, String family) {
-        return new StatisticsScanner(this, stats, c.getEnvironment().getRegion().getRegionInfo(), internalScan,
+        return new StatisticsScanner(this, statsTable, region.getRegionInfo(), internalScan,
                 Bytes.toBytes(family));
     }
 
-    @Override
     public void clear() {
         this.maxMap.clear();
         this.minMap.clear();
@@ -346,7 +260,6 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
         this.familyMap.clear();
     }
 
-    @Override
     public void updateStatistic(KeyValue kv) {
         byte[] cf = kv.getFamily();
         familyMap.put(new ImmutableBytesPtr(cf), true);
@@ -382,19 +295,16 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
         }
     }
 
-    @Override
     public byte[] getMaxKey(String fam) {
         if (maxMap.get(fam) != null) { return maxMap.get(fam); }
         return null;
     }
 
-    @Override
     public byte[] getMinKey(String fam) {
         if (minMap.get(fam) != null) { return minMap.get(fam); }
         return null;
     }
 
-    @Override
     public byte[] getGuidePosts(String fam) {
         if (!guidePostsMap.isEmpty()) {
             List<byte[]> guidePosts = guidePostsMap.get(fam);
@@ -411,15 +321,4 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
         }
         return null;
     }
-
-    @Override
-    public ProtocolSignature getProtocolSignature(String protocol, long version, int clientMethodsHashCode)
-            throws IOException {
-        return new ProtocolSignature(BaseEndpointCoprocessor.VERSION, null);
-    }
-
-    @Override
-    public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
-        return BaseEndpointCoprocessor.VERSION;
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
index 34e52b8..44ad906 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
@@ -30,10 +30,10 @@ public class StatisticsScanner implements InternalScanner {
     private InternalScanner delegate;
     private StatisticsTable stats;
     private HRegionInfo region;
-    private StatisticsTracker tracker;
+    private StatisticsCollector tracker;
     private byte[] family;
 
-    public StatisticsScanner(StatisticsTracker tracker, StatisticsTable stats, HRegionInfo region,
+    public StatisticsScanner(StatisticsCollector tracker, StatisticsTable stats, HRegionInfo region,
             InternalScanner delegate, byte[] family) {
         // should there be only one tracker?
         this.tracker = tracker;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
index 8e30176..2ea8a13 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
@@ -25,15 +25,11 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PDataType;
@@ -54,17 +50,17 @@ public class StatisticsTable implements Closeable {
      * @throws IOException
      *             if the table cannot be created due to an underlying HTable creation error
      */
-    public synchronized static StatisticsTable getStatisticsTableForCoprocessor(CoprocessorEnvironment env,
-            byte[] primaryTableName) throws IOException {
+    public synchronized static StatisticsTable getStatisticsTableForCoprocessor(Configuration conf,
+            String primaryTableName) throws IOException {
         StatisticsTable table = tableMap.get(primaryTableName);
         if (table == null) {
             // Map the statics table and the table with which the statistics is
             // associated. This is a workaround
-            HTablePool pool = new HTablePool(env.getConfiguration(), 1);
+            HTablePool pool = new HTablePool(conf, 1);
             try {
                 HTableInterface hTable = pool.getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
-                table = new StatisticsTable(hTable, primaryTableName);
-                tableMap.put(Bytes.toString(primaryTableName), table);
+                table = new StatisticsTable(hTable);
+                tableMap.put(primaryTableName, table);
             } finally {
                 pool.close();
             }
@@ -73,15 +69,9 @@ public class StatisticsTable implements Closeable {
     }
 
     private final HTableInterface statisticsTable;
-    private final byte[] sourceTableName;
 
-    private StatisticsTable(HTableInterface statsTable, byte[] sourceTableName) {
+    private StatisticsTable(HTableInterface statsTable) {
         this.statisticsTable = statsTable;
-        this.sourceTableName = sourceTableName;
-    }
-
-    public StatisticsTable(Configuration conf, HTableDescriptor source) throws IOException {
-        this(new HTable(conf, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME), source.getName());
     }
 
     /**
@@ -108,7 +98,7 @@ public class StatisticsTable implements Closeable {
      *             if we fail to do any of the puts. Any single failure will prevent any future attempts for the remaining list of stats to
      *             update
      */
-    public void addStats(String tableName, String regionName, StatisticsTracker tracker, String fam,
+    public void addStats(String tableName, String regionName, StatisticsCollector tracker, String fam,
             List<Mutation> mutations, long currentTime) throws IOException {
         if (tracker == null) { return; }
 
@@ -123,13 +113,15 @@ public class StatisticsTable implements Closeable {
     public void commitStats(List<Mutation> mutations) throws IOException {
         Object[] res = new Object[mutations.size()];
         try {
-            statisticsTable.batch(mutations, res);
+            if (mutations.size() > 0) {
+                statisticsTable.batch(mutations, res);
+            }
         } catch (InterruptedException e) {
             throw new IOException("Exception while adding deletes and puts");
         }
     }
 
-    private void formStatsUpdateMutation(StatisticsTracker tracker, String fam, List<Mutation> mutations,
+    private void formStatsUpdateMutation(StatisticsCollector tracker, String fam, List<Mutation> mutations,
             long currentTime, byte[] prefix) {
         Put put = new Put(prefix, currentTime);
         if (tracker.getGuidePosts(fam) != null) {
@@ -151,22 +143,11 @@ public class StatisticsTable implements Closeable {
         mutations.add(put);
     }
     
-    public void deleteStats(String tableName, String regionName, StatisticsTracker tracker, String fam,
+    public void deleteStats(String tableName, String regionName, StatisticsCollector tracker, String fam,
             List<Mutation> mutations, long currentTime)
             throws IOException {
         byte[] prefix = StatisticsUtils.getRowKey(PDataType.VARCHAR.toBytes(tableName), PDataType.VARCHAR.toBytes(fam),
                 PDataType.VARCHAR.toBytes(regionName));
         mutations.add(new Delete(prefix, currentTime - 1));
     }
-
-    /**
-     * @return the underlying {@link HTableInterface} to which this table is writing
-     */
-    HTableInterface getUnderlyingTable() {
-        return statisticsTable;
-    }
-
-    byte[] getSourceTableName() {
-        return this.sourceTableName;
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java
deleted file mode 100644
index e1754f3..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.schema.stat;
-
-import org.apache.hadoop.hbase.KeyValue;
-
-/**
- * Track a statistic for the column on a given region
- */
-public interface StatisticsTracker {
-
-    /**
-     * Reset the statistic after the completion of the compaction
-     */
-    public void clear();
-
-    /**
-     * Update the current statistics with the next {@link KeyValue} to be written
-     * 
-     * @param kv
-     *            next {@link KeyValue} to be written.
-     */
-    public void updateStatistic(KeyValue kv);
-
-    /**
-     * Return the max key of the family
-     * @param fam
-     * @return
-     */
-    public byte[] getMaxKey(String fam);
-
-    /**
-     * Return the min key of the family
-     * 
-     * @param fam
-     * @return
-     */
-    public byte[] getMinKey(String fam);
-
-    /**
-     * Return the guide posts of the family
-     * 
-     * @param fam
-     * @return
-     */
-    public byte[] getGuidePosts(String fam);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index ab37f00..0d69c8b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.query.KeyRange;
@@ -462,4 +463,8 @@ public class ScanUtil {
 
         return offset + slotPosition;
     }
+
+    public static boolean isAnalyzeTable(Scan scan) {
+        return scan.getAttribute((BaseScannerRegionObserver.ANALYZE_TABLE)) != null;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index a237e9e..e922609 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -52,6 +52,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
     public static final long DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE =  1024L*1024L*4L; // 4 Mb
     public static final long DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE =  1024L*1024L*2L; // 2 Mb
     public static final long DEFAULT_HISTOGRAM_BYTE_DEPTH = 20;
+    public static final int DEFAULT_STATS_UPDATE_FREQ_MS = 2000;
     
     public QueryServicesTestImpl(ReadOnlyProps defaultProps) {
         this(defaultProps, ReadOnlyProps.EMPTY_PROPS);
@@ -60,6 +61,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
     private static QueryServicesOptions getDefaultServicesOptions() {
     	return withDefaults()
     	        .setHistogramByteDepth(DEFAULT_HISTOGRAM_BYTE_DEPTH)
+    	        .setStatsUpdateFrequencyMs(DEFAULT_STATS_UPDATE_FREQ_MS)
                 .setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE)
                 .setQueueSize(DEFAULT_QUEUE_SIZE)
                 .setMaxMemoryPerc(DEFAULT_MAX_MEMORY_PERC)


[2/2] git commit: Phoenix-1264 Add StatisticsCollector to existing tables on first connection to cluster

Posted by ra...@apache.org.
Phoenix-1264 Add StatisticsCollector to existing tables on first
connection to cluster


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1d22fdc9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1d22fdc9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1d22fdc9

Branch: refs/heads/3.0
Commit: 1d22fdc98c7aa92622a3fd1511f7cd5ee9fb864b
Parents: aaadb36
Author: Ramkrishna <ra...@intel.com>
Authored: Wed Oct 1 09:20:23 2014 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Wed Oct 1 09:20:23 2014 +0530

----------------------------------------------------------------------
 .../end2end/BaseTenantSpecificTablesIT.java     |  22 +-
 ...efaultParallelIteratorsRegionSplitterIT.java |  15 +
 .../phoenix/end2end/GuidePostsLifeCycleIT.java  |  22 +-
 .../org/apache/phoenix/end2end/KeyOnlyIT.java   |  15 +
 .../phoenix/end2end/MultiCfQueryExecIT.java     |  14 +
 .../phoenix/end2end/StatsCollectorIT.java       |  44 +--
 .../end2end/TenantSpecificTablesDMLIT.java      |  46 +--
 .../coprocessor/BaseScannerRegionObserver.java  |  11 +-
 .../coprocessor/MetaDataEndpointImpl.java       |   2 +
 .../UngroupedAggregateRegionObserver.java       |  83 +++++-
 .../DefaultParallelIteratorRegionSplitter.java  |  35 +--
 .../phoenix/query/ConnectionQueryServices.java  |   3 -
 .../query/ConnectionQueryServicesImpl.java      |  43 ---
 .../query/ConnectionlessQueryServicesImpl.java  |   6 -
 .../query/DelegateConnectionQueryServices.java  |   5 -
 .../apache/phoenix/schema/MetaDataClient.java   |  46 +--
 .../schema/stat/StatisticsCollector.java        | 277 ++++++-------------
 .../phoenix/schema/stat/StatisticsScanner.java  |   4 +-
 .../phoenix/schema/stat/StatisticsTable.java    |  43 +--
 .../phoenix/schema/stat/StatisticsTracker.java  |  62 -----
 .../java/org/apache/phoenix/util/ScanUtil.java  |   5 +
 .../phoenix/query/QueryServicesTestImpl.java    |   2 +
 22 files changed, 366 insertions(+), 439 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java
index 85d65e2..bcae7ed 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java
@@ -20,10 +20,16 @@ package org.apache.phoenix.end2end;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 
 import java.sql.SQLException;
+import java.util.Map;
 
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
 
+import com.google.common.collect.Maps;
+
 /**
  * Describe your class here.
  *
@@ -35,9 +41,9 @@ import org.junit.experimental.categories.Category;
 public abstract class BaseTenantSpecificTablesIT extends BaseClientManagedTimeIT {
     protected static final String TENANT_ID = "ZZTop";
     protected static final String TENANT_TYPE_ID = "abc";
-    protected static final String PHOENIX_JDBC_TENANT_SPECIFIC_URL = getUrl() + ';' + TENANT_ID_ATTRIB + '=' + TENANT_ID;
+    protected static String PHOENIX_JDBC_TENANT_SPECIFIC_URL;
     protected static final String TENANT_ID2 = "Styx";
-    protected static final String PHOENIX_JDBC_TENANT_SPECIFIC_URL2 = getUrl() + ';' + TENANT_ID_ATTRIB + '=' + TENANT_ID2;
+    protected static String PHOENIX_JDBC_TENANT_SPECIFIC_URL2;
     
     protected static final String PARENT_TABLE_NAME = "PARENT_TABLE";
     protected static final String PARENT_TABLE_DDL = "CREATE TABLE " + PARENT_TABLE_NAME + " ( \n" + 
@@ -64,6 +70,7 @@ public abstract class BaseTenantSpecificTablesIT extends BaseClientManagedTimeIT
             "                tenant_col VARCHAR) AS SELECT *\n" + 
             "                FROM " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID;
     
+    
     @Before
     public void createTables() throws SQLException {
         createTestTable(getUrl(), PARENT_TABLE_DDL, null, nextTimestamp());
@@ -71,4 +78,15 @@ public abstract class BaseTenantSpecificTablesIT extends BaseClientManagedTimeIT
         createTestTable(PHOENIX_JDBC_TENANT_SPECIFIC_URL, TENANT_TABLE_DDL, null, nextTimestamp());
         createTestTable(PHOENIX_JDBC_TENANT_SPECIFIC_URL, TENANT_TABLE_DDL_NO_TENANT_TYPE_ID, null, nextTimestamp());
     }
+    
+    @BeforeClass
+    @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Must update config before starting server
+        props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+        PHOENIX_JDBC_TENANT_SPECIFIC_URL = getUrl() + ';' + TENANT_ID_ATTRIB + '=' + TENANT_ID;
+        PHOENIX_JDBC_TENANT_SPECIFIC_URL2 = getUrl() + ';' + TENANT_ID_ATTRIB + '=' + TENANT_ID2;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java
index dd1dc8b..a6ec835 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java
@@ -27,6 +27,7 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.HConstants;
@@ -40,13 +41,18 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.google.common.collect.Maps;
+
 
 /**
  * Tests for {@link DefaultParallelIteratorRegionSplitter}.
@@ -58,6 +64,14 @@ import org.junit.experimental.categories.Category;
 @Category(ClientManagedTimeTest.class)
 public class DefaultParallelIteratorsRegionSplitterIT extends BaseParallelIteratorsRegionSplitterIT {
     
+    @BeforeClass
+    @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Must update config before starting server
+        props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
     private static List<KeyRange> getSplits(Connection conn, long ts, final Scan scan)
             throws SQLException {
         TableRef tableRef = getTableRef(conn, ts);
@@ -93,6 +107,7 @@ public class DefaultParallelIteratorsRegionSplitterIT extends BaseParallelIterat
         Scan scan = new Scan();
         
         // number of regions > target query concurrency
+        conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery();
         scan.setStartRow(K1);
         scan.setStopRow(K12);
         List<KeyRange> keyRanges = getSplits(conn, ts, scan);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java
index 7645040..3cef492 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java
@@ -28,6 +28,7 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -40,16 +41,32 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.google.common.collect.Maps;
+
 @Category(HBaseManagedTimeTest.class)
 public class GuidePostsLifeCycleIT extends BaseHBaseManagedTimeIT {
-
+    
+    @BeforeClass
+    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Must update config before starting server
+        props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
+        props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(20));
+        props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(20));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+    
     protected static final byte[] KMIN  = new byte[] {'!'};
     protected static final byte[] KMIN2  = new byte[] {'.'};
     protected static final byte[] K1  = new byte[] {'a'};
@@ -106,16 +123,19 @@ public class GuidePostsLifeCycleIT extends BaseHBaseManagedTimeIT {
         upsert(new byte[][] { KMIN, K4, K11 });
         stmt = conn.prepareStatement("ANALYZE STABLE");
         stmt.execute();
+        conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery(); 
         keyRanges = getSplits(conn, scan);
         assertEquals(7, keyRanges.size());
         upsert(new byte[][] { KMIN2, K5, K12 });
         stmt = conn.prepareStatement("ANALYZE STABLE");
         stmt.execute();
+        conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery();
         keyRanges = getSplits(conn, scan);
         assertEquals(10, keyRanges.size());
         upsert(new byte[][] { K1, K6, KP });
         stmt = conn.prepareStatement("ANALYZE STABLE");
         stmt.execute();
+        conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery();
         keyRanges = getSplits(conn, scan);
         assertEquals(13, keyRanges.size());
         conn.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
index 4b0d07f..f713fff 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
@@ -32,6 +32,7 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -44,15 +45,29 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.google.common.collect.Maps;
+
 @Category(ClientManagedTimeTest.class)
 public class KeyOnlyIT extends BaseClientManagedTimeIT {
+    
+    @BeforeClass
+    @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Must update config before starting server
+        props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
     @Test
     public void testKeyOnly() throws Exception {
         long ts = nextTimestamp();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java
index ebf03d0..f01d985 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java
@@ -32,6 +32,7 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -44,17 +45,30 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.google.common.collect.Maps;
+
 @Category(ClientManagedTimeTest.class)
 public class MultiCfQueryExecIT extends BaseClientManagedTimeIT {
     private static final String MULTI_CF = "MULTI_CF";
     
+    @BeforeClass
+    @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Must update config before starting server
+        props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
     protected static void initTableValues(long ts) throws Exception {
         ensureTableCreated(getUrl(),MULTI_CF,null, ts-2);
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
index 3833e56..5fda67b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
@@ -1,10 +1,5 @@
 package org.apache.phoenix.end2end;
 
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
-import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
-import static org.apache.phoenix.util.TestUtil.LOCALHOST;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertTrue;
 
@@ -18,9 +13,6 @@ import java.sql.SQLException;
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -32,27 +24,19 @@ import com.google.common.collect.Maps;
 
 @Category(HBaseManagedTimeTest.class)
 public class StatsCollectorIT extends BaseHBaseManagedTimeIT {
-    private static String url;
-    private static HBaseTestingUtility util;
-    private static int frequency = 4000;
-
+    //private static String url;
+    private static int frequency = 5000;
+    
     @BeforeClass
+    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
     public static void doSetup() throws Exception {
-        Configuration conf = HBaseConfiguration.create();
-        setUpConfigForMiniCluster(conf);
-        conf.setInt("hbase.client.retries.number", 2);
-        conf.setInt("hbase.client.pause", 5000);
-        conf.setLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, 0);
-        util = new HBaseTestingUtility(conf);
-        util.startMiniCluster();
-        String clientPort = util.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
-        url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort
-                + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
-        int histogramDepth = 60;
-        Map<String, String> props = Maps.newHashMapWithExpectedSize(3);
-        props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Integer.toString(histogramDepth));
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Must update config before starting server
+        props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
         props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Integer.toString(frequency));
-        driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
+        props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(20));
+        props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(20));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
 
     @Test
@@ -62,7 +46,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT {
         ResultSet rs;
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         // props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
-        conn = DriverManager.getConnection(url, props);
+        conn = DriverManager.getConnection(getUrl(), props);
         conn.createStatement().execute(
                 "CREATE TABLE t ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n"
                         + " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC)) \n");
@@ -98,7 +82,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT {
         ResultSet rs;
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         // props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
-        conn = DriverManager.getConnection(url, props);
+        conn = DriverManager.getConnection(getUrl(), props);
         conn.createStatement().execute(
                 "CREATE TABLE x ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n"
                         + " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC)) \n");
@@ -147,7 +131,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT {
         Connection conn;
         PreparedStatement stmt;
         // props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30));
-        conn = DriverManager.getConnection(url, props);
+        conn = DriverManager.getConnection(getUrl(), props);
         stmt = upsertStmt(conn, tableName);
         stmt.setString(1, "a");
         String[] s = new String[] { "abc", "def", "ghi", "jkll", null, null, "xxx" };
@@ -218,7 +202,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT {
     }
 
     private void flush(String tableName) throws IOException, InterruptedException {
-        util.getHBaseAdmin().flush(tableName.toUpperCase());
+        //utility.getHBaseAdmin().flush(tableName.toUpperCase());
     }
 
     private PreparedStatement upsertStmt(Connection conn, String tableName) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java
index dba4264..593a0e4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDMLIT.java
@@ -38,7 +38,7 @@ import org.junit.experimental.categories.Category;
 
 @Category(ClientManagedTimeTest.class)
 public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
-    
+
     @Test
     public void testBasicUpsertSelect() throws Exception {
         Connection conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
@@ -48,9 +48,8 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
             conn.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " (id, tenant_col) values (2, 'Viva Las Vegas')");
             conn.commit();
             conn.close();
-            analyzeTable(conn, TENANT_TABLE_NAME);
-            
             conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
+            analyzeTable(conn, TENANT_TABLE_NAME);
             ResultSet rs = conn.createStatement().executeQuery("select tenant_col from " + TENANT_TABLE_NAME + " where id = 1");
             assertTrue("Expected 1 row in result set", rs.next());
             assertEquals("Cheap Sunglasses", rs.getString(1));
@@ -71,23 +70,24 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
             conn1.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " values ('me','" + TENANT_TYPE_ID + "',1,'Cheap Sunglasses')");
             conn1.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " values ('you','" + TENANT_TYPE_ID +"',2,'Viva Las Vegas')");
             conn1.commit();
-            
+            conn1 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
             analyzeTable(conn1, TENANT_TABLE_NAME);
             conn2.setAutoCommit(true);
             conn2.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " values ('them','" + TENANT_TYPE_ID + "',1,'Long Hair')");
             conn2.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " values ('us','" + TENANT_TYPE_ID + "',2,'Black Hat')");
-            analyzeTable(conn2, TENANT_TABLE_NAME);
-            conn2.close();            
+            conn2.close();
             conn1.close();
-            
             conn1 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
             ResultSet rs = conn1.createStatement().executeQuery("select * from " + TENANT_TABLE_NAME + " where id = 1");
             assertTrue("Expected 1 row in result set", rs.next());
             assertEquals(1, rs.getInt(3));
             assertEquals("Cheap Sunglasses", rs.getString(4));
             assertFalse("Expected 1 row in result set", rs.next());
-
+            conn1 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
+            analyzeTable(conn1, TENANT_TABLE_NAME);
+            conn1.close();
             conn2 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL2);
+            analyzeTable(conn2, TENANT_TABLE_NAME);
             rs = conn2.createStatement().executeQuery("select * from " + TENANT_TABLE_NAME + " where id = 2");
             assertTrue("Expected 1 row in result set", rs.next());
             assertEquals(2, rs.getInt(3));
@@ -98,7 +98,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
             conn2 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL2);
             conn2.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " select * from " + TENANT_TABLE_NAME );
             conn2.commit();
-            analyzeTable(conn2, TENANT_TABLE_NAME);
             conn2.close();
             
             conn2 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL2);
@@ -115,7 +114,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
             conn2 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL2);
             conn2.setAutoCommit(true);;
             conn2.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " select 'all', tenant_type_id, id, 'Big ' || tenant_col from " + TENANT_TABLE_NAME );
-            analyzeTable(conn2, TENANT_TABLE_NAME);
             conn2.close();
 
             conn2 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL2);
@@ -132,7 +130,14 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
             assertEquals("Big Black Hat", rs.getString(4));
             assertFalse("Expected 2 rows total", rs.next());
             conn2.close();
-            
+            conn1 = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
+            rs = conn1.createStatement().executeQuery("select * from " + TENANT_TABLE_NAME );
+            assertTrue("Expected row row in result set", rs.next());
+            assertEquals(1, rs.getInt(3));
+            assertEquals("Cheap Sunglasses", rs.getString(4));
+            assertTrue("Expected 1 row in result set", rs.next());
+            assertEquals(2, rs.getInt(3));
+            assertEquals("Viva Las Vegas", rs.getString(4));
         }
         finally {
             conn1.close();
@@ -163,10 +168,10 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
             conn.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " (id, tenant_col) values (1, 'Cheap Sunglasses')");
             conn.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + " (id, tenant_col) values (2, 'Viva Las Vegas')");
             conn.commit();
-            analyzeTable(conn, TENANT_TABLE_NAME);
             conn.close();
             
             conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
+            analyzeTable(conn, TENANT_TABLE_NAME);
             ResultSet rs = conn.createStatement().executeQuery("select tenant_col from " + TENANT_TABLE_NAME + " join foo on k=id");
             assertTrue("Expected 1 row in result set", rs.next());
             assertEquals("Cheap Sunglasses", rs.getString(1));
@@ -190,7 +195,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('AC/DC', 'abc', 1, 'Bon Scott')");
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', '" + TENANT_TYPE_ID + "', 1, 'Billy Gibbons')");
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', 'def', 1, 'Billy Gibbons')");
-            analyzeTable(conn, PARENT_TABLE_NAME);
             conn.close();
             
             conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
@@ -200,6 +204,7 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
             assertFalse("Expected 1 row in result set", rs.next());
             
             rs = conn.createStatement().executeQuery("select count(*) from " + TENANT_TABLE_NAME);
+            analyzeTable(conn, PARENT_TABLE_NAME);
             assertTrue("Expected 1 row in result set", rs.next());
             assertEquals(1, rs.getInt(1));
             assertFalse("Expected 1 row in result set", rs.next());
@@ -222,7 +227,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('AC/DC', 'abc', 1, 'Bon Scott')");
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', '" + TENANT_TYPE_ID + "', 1, 'Billy Gibbons')");
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', 'def', 1, 'Billy Gibbons')");
-            analyzeTable(conn, PARENT_TABLE_NAME);
             conn.close();
             
             conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
@@ -238,6 +242,7 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
             conn.close();
             
             conn = nextConnection(getUrl());
+            analyzeTable(conn, PARENT_TABLE_NAME);
             rs = conn.createStatement().executeQuery("select count(*) from " + PARENT_TABLE_NAME);
             rs.next();
             assertEquals(2, rs.getInt(1));
@@ -260,7 +265,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID + " (tenant_id, id, user) values ('AC/DC', 1, 'Bon Scott')");
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID + " (tenant_id, id, user) values ('" + TENANT_ID + "', 1, 'Billy Gibbons')");
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID + " (tenant_id, id, user) values ('" + TENANT_ID + "', 2, 'Billy Gibbons')");
-            analyzeTable(conn, PARENT_TABLE_NAME_NO_TENANT_TYPE_ID);
             conn.close();
             
             conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
@@ -268,7 +272,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
             int count = conn.createStatement().executeUpdate("delete from " + TENANT_TABLE_NAME_NO_TENANT_TYPE_ID);
             assertEquals("Expected 2 rows have been deleted", 2, count);
             conn.close();
-            
             conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
             ResultSet rs = conn.createStatement().executeQuery("select * from " + TENANT_TABLE_NAME_NO_TENANT_TYPE_ID);
             assertFalse("Expected no rows in result set", rs.next());
@@ -297,10 +300,10 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('AC/DC', 'abc', 1, 'Bon Scott')");
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', '" + TENANT_TYPE_ID + "', 1, 'Billy Gibbons')");
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', 'def', 1, 'Billy Gibbons')");
-            analyzeTable(conn, PARENT_TABLE_NAME);
             conn.close();
             
             conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
+            analyzeTable(conn, PARENT_TABLE_NAME);
             conn.createStatement().execute("delete from " + TENANT_TABLE_NAME);
             conn.commit();
             conn.close();
@@ -328,7 +331,6 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID + " (tenant_id, id, user) values ('AC/DC', 1, 'Bon Scott')");
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID + " (tenant_id, id, user) values ('" + TENANT_ID + "', 1, 'Billy Gibbons')");
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID + " (tenant_id, id, user) values ('" + TENANT_ID + "', 2, 'Billy Gibbons')");
-            analyzeTable(conn, PARENT_TABLE_NAME_NO_TENANT_TYPE_ID);
             conn.close();
             
             conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
@@ -336,6 +338,7 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
             conn.close();
             
             conn = nextConnection(getUrl());
+            analyzeTable(conn, PARENT_TABLE_NAME_NO_TENANT_TYPE_ID);
             ResultSet rs = conn.createStatement().executeQuery("select count(*) from " + PARENT_TABLE_NAME_NO_TENANT_TYPE_ID);
             rs.next();
             assertEquals(3, rs.getInt(1));
@@ -358,11 +361,11 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('AC/DC', 'aaa', 1, 'Bon Scott')");
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', '" + TENANT_TYPE_ID + "', 1, 'Billy Gibbons')");
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', 'def', 2, 'Billy Gibbons')");
-            analyzeTable(conn, PARENT_TABLE_NAME);
             conn.close();
             
             conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
             conn.setAutoCommit(true);
+            analyzeTable(conn, TENANT_TABLE_NAME);
             int count = conn.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + "(id, user) select id+100, user from " + TENANT_TABLE_NAME);
             assertEquals("Expected 1 row to have been inserted", 1, count);
             conn.close();
@@ -393,11 +396,11 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('AC/DC', 'aaa', 1, 'Bon Scott')");
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', '" + TENANT_TYPE_ID + "', 1, 'Billy Gibbons')");
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_id, tenant_type_id, id, user) values ('" + TENANT_ID + "', 'def', 2, 'Billy Gibbons')");
-            analyzeTable(conn, PARENT_TABLE_NAME);
             conn.close();
             
             conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
             conn.setAutoCommit(true);
+            analyzeTable(conn, TENANT_TABLE_NAME);
             int count = conn.createStatement().executeUpdate("upsert into " + TENANT_TABLE_NAME + "(id, user) select id+100, user from ANOTHER_TENANT_TABLE where id=2");
             assertEquals("Expected 1 row to have been inserted", 1, count);
             conn.close();
@@ -442,10 +445,9 @@ public class TenantSpecificTablesDMLIT extends BaseTenantSpecificTablesIT {
             conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
             conn.setAutoCommit(true);
             conn.createStatement().executeUpdate("upsert into " + PARENT_TABLE_NAME + " (tenant_type_id, id, user) values ('" + TENANT_TYPE_ID + "', 1, 'Billy Gibbons')");
-            analyzeTable(conn, PARENT_TABLE_NAME);
             conn.close();
-
             conn = nextConnection(PHOENIX_JDBC_TENANT_SPECIFIC_URL);
+            analyzeTable(conn, PARENT_TABLE_NAME);
             rs = conn.createStatement().executeQuery("select user from " + PARENT_TABLE_NAME);
             assertTrue(rs.next());
             assertEquals(rs.getString(1),"Billy Gibbons");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 2a23f0e..bb134f1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -48,6 +48,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String EMPTY_CF = "_EmptyCF";
     public static final String SPECIFIC_ARRAY_INDEX = "_SpecificArrayIndex";
     public static final String GROUP_BY_LIMIT = "_GroupByLimit";
+    public static final String ANALYZE_TABLE = "_AnalyzeTable";
 
     /**
      * Used by logger to identify coprocessor
@@ -73,6 +74,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     abstract protected boolean isRegionObserverFor(Scan scan);
     abstract protected RegionScanner doPostScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable;
     
+    @Override
+    public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+        final Scan scan, final RegionScanner s) throws IOException {
+        if (isRegionObserverFor(scan)) {
+            throwIfScanOutOfRegion(scan, c.getEnvironment().getRegion());
+        }
+        return s;
+    }
     /**
      * Wrapper for {@link #postScannerOpen(ObserverContext, Scan, RegionScanner)} that ensures no non IOException is thrown,
      * to prevent the coprocessor from becoming blacklisted.
@@ -84,8 +93,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
             if (!isRegionObserverFor(scan)) {
                 return s;
             }
-            HRegion region = c.getEnvironment().getRegion();
-            throwIfScanOutOfRegion(scan, region);
             return doPostScannerOpen(c, scan, s);
         } catch (Throwable t) {
             ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index c05ef30..a1d943c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -158,7 +158,9 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met
     private static final KeyValue VIEW_TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_TYPE_BYTES);
     private static final KeyValue VIEW_INDEX_ID_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_INDEX_ID_BYTES);
     private static final KeyValue INDEX_DISABLE_TIMESTAMP_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
+    private static final KeyValue EMPTY_KEYVALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES);
     private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList(
+            EMPTY_KEYVALUE_KV, 
             TABLE_TYPE_KV,
             TABLE_SEQ_NUM_KV,
             COLUMN_COUNT_KV,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 99af478..bb70932 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -34,7 +34,10 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
@@ -45,8 +48,12 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
@@ -59,6 +66,7 @@ import org.apache.phoenix.expression.aggregator.ServerAggregators;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.query.QueryConstants;
@@ -70,6 +78,8 @@ import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.stat.StatisticsCollector;
+import org.apache.phoenix.schema.stat.StatisticsTable;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.KeyValueUtil;
@@ -91,6 +101,8 @@ import com.google.common.collect.Sets;
 public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver {
     private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
     private KeyValueBuilder kvBuilder;
+    private static final Log LOG = LogFactory.getLog(UngroupedAggregateRegionObserver.class);
+    private StatisticsTable statsTable = null;
     
     @Override
     public void start(CoprocessorEnvironment e) throws IOException {
@@ -98,6 +110,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         // Can't use ClientKeyValueBuilder on server-side because the memstore expects to
         // be able to get a single backing buffer for a KeyValue.
         this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
+        String name = ((RegionCoprocessorEnvironment)e).getRegion().getTableDesc().getNameAsString();
+        String schemaName = SchemaUtil.getSchemaNameFromFullName(name);
+        if (!QueryConstants.SYSTEM_SCHEMA_NAME.equals(schemaName)) {
+            this.statsTable = StatisticsTable.getStatisticsTableForCoprocessor(e.getConfiguration(), name);
+        }
     }
 
     private static void commitBatch(HRegion region, List<Pair<Mutation,Integer>> mutations, byte[] indexUUID) throws IOException {
@@ -115,11 +132,31 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     public static void serializeIntoScan(Scan scan) {
         scan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, QueryConstants.TRUE);
     }
+    
+    @Override
+    public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s)
+            throws IOException {
+        s = super.preScannerOpen(e, scan, s);
+        if(ScanUtil.isAnalyzeTable(scan)) {
+            scan.setStartRow(HConstants.EMPTY_START_ROW);
+            scan.setStopRow(HConstants.EMPTY_END_ROW);
+        }
+        return s;
+    }
 
     @Override
     protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException {
         final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+        boolean isAnalyze = false;
+        HRegion region = c.getEnvironment().getRegion();
+        String table = c.getEnvironment().getRegion().getRegionInfo().getTableNameAsString();;
+        StatisticsCollector stats = null;
+        if (ScanUtil.isAnalyzeTable(scan) && statsTable != null) {
+            stats = new StatisticsCollector(statsTable, c.getEnvironment().getConfiguration());
+            isAnalyze = true;
+        }
+
         RegionScanner theScanner = s;
         if (p != null || j != null)  {
             theScanner = new HashJoinRegionScanner(s, p, j, ScanUtil.getTenantId(scan), c.getEnvironment());
@@ -155,7 +192,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         
         int batchSize = 0;
         long ts = scan.getTimeRange().getMax();
-        HRegion region = c.getEnvironment().getRegion();
         List<Pair<Mutation,Integer>> mutations = Collections.emptyList();
         if (isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null) {
             // TODO: size better
@@ -169,7 +205,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         boolean hasAny = false;
         MultiKeyValueTuple result = new MultiKeyValueTuple();
         if (logger.isInfoEnabled()) {
-        	logger.info("Starting ungrouped coprocessor scan " + scan);
+        	logger.info("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo());
         }
         long rowCount = 0;
         MultiVersionConsistencyControl.setThreadReadPoint(innerScanner.getMvccReadPoint());
@@ -181,6 +217,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                 // since this is an indication of whether or not there are more values after the
                 // ones returned
                 hasMore = innerScanner.nextRaw(results, null);
+                if (isAnalyze && stats != null) {
+                    stats.collectStatistics(results);
+                }
                 if (!results.isEmpty()) {
                 	rowCount++;
                     result.setKeyValues(results);
@@ -275,6 +314,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             } while (hasMore);
         } finally {
             try {
+                if (isAnalyze && stats != null) {
+                    stats.updateStatistic(region);
+                    stats.clear();
+                }
                 innerScanner.close();
             } finally {
                 region.closeRegionOperation();
@@ -384,6 +427,42 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             }
         }
     }
+    
+    @Override
+    public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+            Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
+            long earliestPutTs, InternalScanner s) throws IOException {
+        InternalScanner internalScan = s;
+        String table = c.getEnvironment().getRegion().getRegionInfo().getTableNameAsString();
+        if (!table.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)
+                && scanType.equals(ScanType.MAJOR_COMPACT)) {
+            StatisticsCollector stats = new StatisticsCollector(statsTable, c.getEnvironment().getConfiguration());
+            internalScan =
+                    stats.createCompactionScanner(c.getEnvironment().getRegion(), store, scanners, scanType, earliestPutTs, s);
+        }
+        return internalScan;
+    }
+    
+    
+    @Override
+    public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, HRegion l, HRegion r)
+            throws IOException {
+        HRegion region = e.getEnvironment().getRegion();
+        String table = region.getRegionInfo().getTableNameAsString();
+        if (!table.equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME) && statsTable != null) {
+            try {
+                StatisticsCollector stats = new StatisticsCollector(statsTable, e.getEnvironment()
+                        .getConfiguration());
+                stats.collectStatsDuringSplit(e.getEnvironment().getConfiguration(), l, r, region);
+                stats.clear();
+            } catch (IOException ioe) { 
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Error while collecting stats during split ",ioe);
+                }
+            }
+        }
+            
+    }
 
     public static byte[] serialize(List<Expression> selectExpressions) {
         ByteArrayOutputStream stream = new ByteArrayOutputStream();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
index a54b5b4..bff2a8b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
@@ -33,6 +33,7 @@ import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,9 +72,8 @@ public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRe
     // Get the mapping between key range and the regions that contains them.
     protected List<HRegionLocation> getAllRegions() throws SQLException {
         Scan scan = context.getScan();
-        PTable table = tableRef.getTable();
         List<HRegionLocation> allTableRegions = context.getConnection().getQueryServices()
-                .getAllTableRegions(table.getPhysicalName().getBytes());
+                .getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes());
         // If we're not salting, then we've already intersected the minMaxRange with the scan range
         // so there's nothing to do here.
         return filterRegions(allTableRegions, scan.getStartRow(), scan.getStopRow());
@@ -107,27 +107,28 @@ public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRe
 
     protected List<KeyRange> genKeyRanges(List<HRegionLocation> regions) {
         if (regions.isEmpty()) { return Collections.emptyList(); }
+        PTable table = tableRef.getTable();
         Scan scan = context.getScan();
-        PTable table = this.tableRef.getTable();
         byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table);
         List<byte[]> gps = Lists.newArrayList();
-
-        if (table.getColumnFamilies().isEmpty()) {
-            // For sure we can get the defaultCF from the table
-            gps = table.getGuidePosts();
-        } else {
-            try {
-                if (scan.getFamilyMap().size() > 0) {
-                    if (scan.getFamilyMap().containsKey(defaultCF)) { // Favor using default CF if it's used in scan
+        if (!ScanUtil.isAnalyzeTable(scan)) {
+            if (table.getColumnFamilies().isEmpty()) {
+                // For sure we can get the defaultCF from the table
+                gps = table.getGuidePosts();
+            } else {
+                try {
+                    if (scan.getFamilyMap().size() > 0) {
+                        if (scan.getFamilyMap().containsKey(defaultCF)) { // Favor using default CF if it's used in scan
+                            gps = table.getColumnFamily(defaultCF).getGuidePosts();
+                        } else { // Otherwise, just use first CF in use by scan
+                            gps = table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts();
+                        }
+                    } else {
                         gps = table.getColumnFamily(defaultCF).getGuidePosts();
-                    } else { // Otherwise, just use first CF in use by scan
-                        gps = table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts();
                     }
-                } else {
-                    gps = table.getColumnFamily(defaultCF).getGuidePosts();
+                } catch (ColumnFamilyNotFoundException cfne) {
+                    // Alter table does this
                 }
-            } catch (ColumnFamilyNotFoundException cfne) {
-                // Alter table does this
             }
 
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index a05acde..26e4809 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -94,9 +94,6 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
     void addConnection(PhoenixConnection connection) throws SQLException;
     void removeConnection(PhoenixConnection connection) throws SQLException;
 
-    long updateStatistics(KeyRange keyRange, byte[] tableName)
-            throws SQLException;
-
     /**
      * @return the {@link KeyValueBuilder} that is valid for the locally installed version of HBase.
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 926407a..3a59d6f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -102,9 +102,6 @@ import org.apache.phoenix.schema.Sequence;
 import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
-import org.apache.phoenix.schema.stat.StatisticsCollector;
-import org.apache.phoenix.schema.stat.StatisticsCollectorProtocol;
-import org.apache.phoenix.schema.stat.StatisticsCollectorResponse;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.MetaDataUtil;
@@ -121,7 +118,6 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.protobuf.ServiceException;
 
 public class ConnectionQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices {
     private static final Logger logger = LoggerFactory.getLogger(ConnectionQueryServicesImpl.class);
@@ -522,9 +518,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
                 descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, 1, null);
             }
-            if (!descriptor.hasCoprocessor(StatisticsCollector.class.getName())) {
-                descriptor.addCoprocessor(StatisticsCollector.class.getName(), null, 1, null);
-            }
 
             // TODO: better encapsulation for this
             // Since indexes can't have indexes, don't install our indexing coprocessor for indexes. Also,
@@ -1805,42 +1798,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     }
 
     @Override
-    public long updateStatistics(final KeyRange keyRange, final byte[] tableName) throws SQLException {
-        HTableInterface ht = null;
-        try {
-            ht = this.getTable(tableName);
-            long noOfRowsScanned = 0;
-            Batch.Call<StatisticsCollectorProtocol, StatisticsCollectorResponse> callable = 
-                    new Batch.Call<StatisticsCollectorProtocol, StatisticsCollectorResponse>() {
-                  @Override
-                public StatisticsCollectorResponse call(StatisticsCollectorProtocol instance) throws IOException {
-                    return instance.collectStat(keyRange);
-                  }
-                };
-                Map<byte[], StatisticsCollectorResponse> result = ht.coprocessorExec(StatisticsCollectorProtocol.class,
-                    keyRange.getLowerRange(), keyRange.getUpperRange(), callable);
-                for (StatisticsCollectorResponse response : result.values()) {
-                    noOfRowsScanned += response.getRowsScanned();
-                }
-                return noOfRowsScanned;
-        } catch (ServiceException e) {
-            throw new SQLException("Unable to update the statistics for the table " + tableName, e);
-        } catch (TableNotFoundException e) {
-            throw new SQLException("Unable to update the statistics for the table " + tableName, e);
-        } catch (Throwable e) {
-            throw new SQLException("Unable to update the statistics for the table " + tableName, e);
-        } finally {
-            if (ht != null) {
-                try {
-                    ht.close();
-                } catch (IOException e) {
-                    throw new SQLException("Unable to close the table " + tableName + " after collecting stats", e);
-                }
-            }
-        }
-    }
-
-    @Override
     public void clearCacheForTable(final byte[] tenantId, final byte[] schemaName, final byte[] tableName,
             final long clientTS) throws SQLException {
         // clear the meta data cache for the table here

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 4e155e3..70e656a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -181,12 +181,6 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
     public MetaDataMutationResult dropColumn(List<Mutation> tableMetadata, PTableType tableType) throws SQLException {
         return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, null);
     }
-
-    @Override
-    public long updateStatistics(KeyRange keyRange, byte[] tableName) throws SQLException {
-        // Noop
-        return 0;
-    }
     
     @Override
     public void clearCacheForTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index fa01f09..8bd2c61 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -226,11 +226,6 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
     public String getUserName() {
         return getDelegate().getUserName();
     }
-
-    @Override
-    public long updateStatistics(KeyRange keyRange, byte[] tableName) throws SQLException {
-        return getDelegate().updateStatistics(keyRange, tableName);
-    }
     
     @Override
     public void clearCacheForTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1d22fdc9/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 390e621..ccfbdd9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -85,9 +85,12 @@ import java.util.Set;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -96,6 +99,8 @@ import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.compile.PostDDLCompiler;
 import org.apache.phoenix.compile.PostIndexDDLCompiler;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
@@ -104,6 +109,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.AddColumnStatement;
 import org.apache.phoenix.parse.AlterIndexStatement;
 import org.apache.phoenix.parse.ColumnDef;
@@ -120,7 +126,6 @@ import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.PrimaryKeyConstraint;
 import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.parse.UpdateStatisticsStatement;
-import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -130,7 +135,6 @@ import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -281,7 +285,7 @@ public class MetaDataClient {
         return updateCache(schemaName, tableName, false);
     }
     
-    private MetaDataMutationResult updateCache(String schemaName, String tableName, boolean alwaysHitServer) throws SQLException { // TODO: pass byte[] here
+    private MetaDataMutationResult updateCache(String schemaName, String tableName, boolean alwaysHitServer) throws SQLException {
         Long scn = connection.getSCN();
         boolean systemTable = SYSTEM_CATALOG_SCHEMA.equals(schemaName);
         // System tables must always have a null tenantId
@@ -467,27 +471,11 @@ public class MetaDataClient {
         PTable table = resolver.getTables().get(0).getTable();
         PName physicalName = table.getPhysicalName();
         byte[] tenantIdBytes = ByteUtil.EMPTY_BYTE_ARRAY;
-        KeyRange analyzeRange = KeyRange.EVERYTHING_RANGE;
-        if (connection.getTenantId() != null && table.isMultiTenant()) {
-            tenantIdBytes = connection.getTenantId().getBytes();
-            //  TODO remove this inner if once PHOENIX-1259 is fixed.
-            if (table.getBucketNum() == null) {
-                List<List<KeyRange>> tenantIdKeyRanges = Collections.singletonList(Collections.singletonList(KeyRange
-                        .getKeyRange(tenantIdBytes)));
-                byte[] lowerRange = ScanUtil.getMinKey(table.getRowKeySchema(), tenantIdKeyRanges,
-                        ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
-                byte[] upperRange = ScanUtil.getMaxKey(table.getRowKeySchema(), tenantIdKeyRanges,
-                        ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
-                analyzeRange = KeyRange.getKeyRange(lowerRange, upperRange);
-            }
-        }
         Long scn = connection.getSCN();
         // Always invalidate the cache
         long clientTS = connection.getSCN() == null ? HConstants.LATEST_TIMESTAMP : scn;
         connection.getQueryServices().clearCacheForTable(tenantIdBytes, table.getSchemaName().getBytes(),
                 table.getTableName().getBytes(), clientTS);
-        // Clear the cache also. So that for cases like major compaction also we would be able to use the stats
-        updateCache(table.getSchemaName().getString(), table.getTableName().getString(), true);
         String query = "SELECT CURRENT_DATE(),"+ LAST_STATS_UPDATE_TIME + " FROM " + SYSTEM_CATALOG_SCHEMA
                 + "." + SYSTEM_STATS_TABLE + " WHERE " + PHYSICAL_NAME + "='" + physicalName.getString() + "' AND " + COLUMN_FAMILY
                 + " IS NULL AND " + REGION_NAME + " IS NULL";
@@ -497,12 +485,26 @@ public class MetaDataClient {
             lastUpdatedTime = rs.getDate(1).getTime() - rs.getDate(2).getTime();
         }
         if (minTimeForStatsUpdate  > lastUpdatedTime) {
+            // Here create the select query.
+            String countQuery = "SELECT /*+ NO_CACHE */ count(*) FROM " + table.getName().getString();
+            PhoenixStatement statement = (PhoenixStatement) connection.createStatement();
+            QueryPlan plan = statement.compileQuery(countQuery);
+            Scan scan = plan.getContext().getScan();
+            // Add all CF in the table
+            scan.getFamilyMap().clear();
+            for (PColumnFamily family : table.getColumnFamilies()) {
+                scan.addFamily(family.getName().getBytes());
+            }
+            scan.setAttribute(BaseScannerRegionObserver.ANALYZE_TABLE, PDataType.TRUE_BYTES);
+            KeyValue kv = plan.iterator().next().getValue(0);
+            ImmutableBytesWritable tempPtr = plan.getContext().getTempPtr();
+            tempPtr.set(kv.getValue());
+            // A single Cell will be returned with the count(*) - we decode that here
+            long rowCount = PDataType.LONG.getCodec().decodeLong(tempPtr, SortOrder.getDefault());
             // We need to update the stats table
-            connection.getQueryServices().updateStatistics(analyzeRange, physicalName.getBytes());
             connection.getQueryServices().clearCacheForTable(tenantIdBytes, table.getSchemaName().getBytes(),
                     table.getTableName().getBytes(), clientTS);
-            updateCache(table.getSchemaName().getString(), table.getTableName().getString(), true);
-            return new MutationState(1, connection);
+            return new  MutationState(0, connection, rowCount);
         } else {
             return new MutationState(0, connection);
         }