You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2014/09/26 07:52:33 UTC

git commit: Phoenix-1264 : Add StatisticsCollector to existing tables on first connection to cluster

Repository: phoenix
Updated Branches:
  refs/heads/master 10efdb1f2 -> 6b0461002


Phoenix-1264 : Add StatisticsCollector to existing tables on first
connection to cluster


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6b046100
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6b046100
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6b046100

Branch: refs/heads/master
Commit: 6b04610022415fcc27ea69fe001cbd464badf355
Parents: 10efdb1
Author: Ramkrishna <ra...@intel.com>
Authored: Fri Sep 26 11:21:40 2014 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Fri Sep 26 11:21:40 2014 +0530

----------------------------------------------------------------------
 ...efaultParallelIteratorsRegionSplitterIT.java |  15 ++
 .../phoenix/end2end/GuidePostsLifeCycleIT.java  |  22 +-
 .../org/apache/phoenix/end2end/KeyOnlyIT.java   |  15 ++
 .../phoenix/end2end/MultiCfQueryExecIT.java     |  14 ++
 .../phoenix/end2end/StatsCollectorIT.java       |  44 ++--
 .../coprocessor/BaseScannerRegionObserver.java  |   1 +
 .../UngroupedAggregateRegionObserver.java       |  72 +++++-
 .../DefaultParallelIteratorRegionSplitter.java  |  30 +--
 .../phoenix/query/ConnectionQueryServices.java  |   3 -
 .../query/ConnectionQueryServicesImpl.java      |  51 +---
 .../query/ConnectionlessQueryServicesImpl.java  |   6 -
 .../query/DelegateConnectionQueryServices.java  |   5 -
 .../apache/phoenix/schema/MetaDataClient.java   |  36 ++-
 .../schema/stat/StatisticsCollector.java        | 249 +++++--------------
 .../phoenix/schema/stat/StatisticsScanner.java  |   7 +-
 .../phoenix/schema/stat/StatisticsTable.java    |  49 ++--
 .../phoenix/schema/stat/StatisticsTracker.java  |  62 -----
 .../java/org/apache/phoenix/util/ScanUtil.java  |   4 +
 .../phoenix/query/QueryServicesTestImpl.java    |   2 +-
 .../src/main/StatisticsCollect.proto            |  20 --
 20 files changed, 272 insertions(+), 435 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java
index dd1dc8b..a6ec835 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultParallelIteratorsRegionSplitterIT.java
@@ -27,6 +27,7 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.HConstants;
@@ -40,13 +41,18 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.google.common.collect.Maps;
+
 
 /**
  * Tests for {@link DefaultParallelIteratorRegionSplitter}.
@@ -58,6 +64,14 @@ import org.junit.experimental.categories.Category;
 @Category(ClientManagedTimeTest.class)
 public class DefaultParallelIteratorsRegionSplitterIT extends BaseParallelIteratorsRegionSplitterIT {
     
+    @BeforeClass
+    @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Must update config before starting server
+        props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
     private static List<KeyRange> getSplits(Connection conn, long ts, final Scan scan)
             throws SQLException {
         TableRef tableRef = getTableRef(conn, ts);
@@ -93,6 +107,7 @@ public class DefaultParallelIteratorsRegionSplitterIT extends BaseParallelIterat
         Scan scan = new Scan();
         
         // number of regions > target query concurrency
+        conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery();
         scan.setStartRow(K1);
         scan.setStopRow(K12);
         List<KeyRange> keyRanges = getSplits(conn, ts, scan);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java
index 7645040..3cef492 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GuidePostsLifeCycleIT.java
@@ -28,6 +28,7 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -40,16 +41,32 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.google.common.collect.Maps;
+
 @Category(HBaseManagedTimeTest.class)
 public class GuidePostsLifeCycleIT extends BaseHBaseManagedTimeIT {
-
+    
+    @BeforeClass
+    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Must update config before starting server
+        props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
+        props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(20));
+        props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(20));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+    
     protected static final byte[] KMIN  = new byte[] {'!'};
     protected static final byte[] KMIN2  = new byte[] {'.'};
     protected static final byte[] K1  = new byte[] {'a'};
@@ -106,16 +123,19 @@ public class GuidePostsLifeCycleIT extends BaseHBaseManagedTimeIT {
         upsert(new byte[][] { KMIN, K4, K11 });
         stmt = conn.prepareStatement("ANALYZE STABLE");
         stmt.execute();
+        conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery(); 
         keyRanges = getSplits(conn, scan);
         assertEquals(7, keyRanges.size());
         upsert(new byte[][] { KMIN2, K5, K12 });
         stmt = conn.prepareStatement("ANALYZE STABLE");
         stmt.execute();
+        conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery();
         keyRanges = getSplits(conn, scan);
         assertEquals(10, keyRanges.size());
         upsert(new byte[][] { K1, K6, KP });
         stmt = conn.prepareStatement("ANALYZE STABLE");
         stmt.execute();
+        conn.prepareStatement("SELECT COUNT(*) FROM STABLE").executeQuery();
         keyRanges = getSplits(conn, scan);
         assertEquals(13, keyRanges.size());
         conn.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
index 4b0d07f..f713fff 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java
@@ -32,6 +32,7 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -44,15 +45,29 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.google.common.collect.Maps;
+
 @Category(ClientManagedTimeTest.class)
 public class KeyOnlyIT extends BaseClientManagedTimeIT {
+    
+    @BeforeClass
+    @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Must update config before starting server
+        props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
     @Test
     public void testKeyOnly() throws Exception {
         long ts = nextTimestamp();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java
index ebf03d0..f01d985 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java
@@ -32,6 +32,7 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -44,17 +45,30 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.google.common.collect.Maps;
+
 @Category(ClientManagedTimeTest.class)
 public class MultiCfQueryExecIT extends BaseClientManagedTimeIT {
     private static final String MULTI_CF = "MULTI_CF";
     
+    @BeforeClass
+    @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Must update config before starting server
+        props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
     protected static void initTableValues(long ts) throws Exception {
         ensureTableCreated(getUrl(),MULTI_CF,null, ts-2);
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
index a38abea..e20c11f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java
@@ -1,10 +1,5 @@
 package org.apache.phoenix.end2end;
 
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
-import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
-import static org.apache.phoenix.util.TestUtil.LOCALHOST;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertTrue;
 
@@ -18,9 +13,6 @@ import java.sql.SQLException;
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -32,27 +24,19 @@ import com.google.common.collect.Maps;
 
 @Category(HBaseManagedTimeTest.class)
 public class StatsCollectorIT extends BaseHBaseManagedTimeIT {
-    private static String url;
-    private static HBaseTestingUtility util;
-    private static int frequency = 4000;
-
+    //private static String url;
+    private static int frequency = 5000;
+    
     @BeforeClass
+    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
     public static void doSetup() throws Exception {
-        Configuration conf = HBaseConfiguration.create();
-        setUpConfigForMiniCluster(conf);
-        conf.setInt("hbase.client.retries.number", 2);
-        conf.setInt("hbase.client.pause", 5000);
-        conf.setLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, 0);
-        util = new HBaseTestingUtility(conf);
-        util.startMiniCluster();
-        String clientPort = util.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
-        url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort
-                + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
-        int histogramDepth = 60;
-        Map<String, String> props = Maps.newHashMapWithExpectedSize(3);
-        props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Integer.toString(histogramDepth));
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Must update config before starting server
+        props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l));
         props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Integer.toString(frequency));
-        driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
+        props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(20));
+        props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(20));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
 
     @Test
@@ -62,7 +46,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT {
         ResultSet rs;
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         // props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
-        conn = DriverManager.getConnection(url, props);
+        conn = DriverManager.getConnection(getUrl(), props);
         conn.createStatement().execute(
                 "CREATE TABLE t ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n"
                         + " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC)) \n");
@@ -99,7 +83,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT {
         long ts = nextTimestamp();
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         // props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
-        conn = DriverManager.getConnection(url, props);
+        conn = DriverManager.getConnection(getUrl(), props);
         conn.createStatement().execute(
                 "CREATE TABLE x ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n"
                         + " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC)) \n");
@@ -148,7 +132,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT {
         Connection conn;
         PreparedStatement stmt;
         // props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30));
-        conn = DriverManager.getConnection(url, props);
+        conn = DriverManager.getConnection(getUrl(), props);
         stmt = upsertStmt(conn, tableName);
         stmt.setString(1, "a");
         String[] s = new String[] { "abc", "def", "ghi", "jkll", null, null, "xxx" };
@@ -219,7 +203,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT {
     }
 
     private void flush(String tableName) throws IOException, InterruptedException {
-        util.getHBaseAdmin().flush(tableName.toUpperCase());
+        //utility.getHBaseAdmin().flush(tableName.toUpperCase());
     }
 
     private PreparedStatement upsertStmt(Connection conn, String tableName) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index b2e2806..1129eef 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -60,6 +60,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String VIEW_CONSTANTS = "_ViewConstants";
     public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey";
     public static final String REVERSE_SCAN = "_ReverseScan";
+    public static final String ANALYZE_TABLE = "_ANALYZETABLE";
 
     /** Exposed for testing */
     public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on server";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 95b095e..d39f868 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -34,6 +34,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -47,7 +49,11 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
@@ -63,6 +69,7 @@ import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.query.QueryConstants;
@@ -74,6 +81,8 @@ import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.stat.StatisticsCollector;
+import org.apache.phoenix.schema.stat.StatisticsTable;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
@@ -94,7 +103,7 @@ import com.google.common.collect.Sets;
  * 
  * @since 0.1
  */
-public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver {
+public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
     // TODO: move all constants into a single class
     public static final String UNGROUPED_AGG = "UngroupedAgg";
     public static final String DELETE_AGG = "DeleteAgg";
@@ -105,6 +114,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     public static final String EMPTY_CF = "EmptyCF";
     private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
     private KeyValueBuilder kvBuilder;
+    private static final Log LOG = LogFactory.getLog(UngroupedAggregateRegionObserver.class);
+    private StatisticsTable statsTable = null;
     
     @Override
     public void start(CoprocessorEnvironment e) throws IOException {
@@ -112,6 +123,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         // Can't use ClientKeyValueBuilder on server-side because the memstore expects to
         // be able to get a single backing buffer for a KeyValue.
         this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
+        String name = ((RegionCoprocessorEnvironment)e).getRegion().getTableDesc().getTableName().getNameAsString();
+        this.statsTable = StatisticsTable.getStatisticsTableForCoprocessor(e.getConfiguration(), name);
     }
 
     private static void commitBatch(HRegion region, List<Mutation> mutations, byte[] indexUUID) throws IOException {
@@ -128,16 +141,23 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     public static void serializeIntoScan(Scan scan) {
         scan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, QueryConstants.TRUE);
     }
-
+    
     @Override
     protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException {
         int offset = 0;
+        boolean isAnalyze = false;
+        HRegion region = c.getEnvironment().getRegion();
+        TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
+        StatisticsCollector stats = null;
+        if(scan.getAttribute(BaseScannerRegionObserver.ANALYZE_TABLE) != null && statsTable != null) {
+            stats = new StatisticsCollector(statsTable, c.getEnvironment().getConfiguration());
+            isAnalyze = true;
+        }
         if (ScanUtil.isLocalIndex(scan)) {
             /*
              * For local indexes, we need to set an offset on row key expressions to skip
              * the region start key.
              */
-            HRegion region = c.getEnvironment().getRegion();
             offset = region.getStartKey().length != 0 ? region.getStartKey().length:region.getEndKey().length;
             ScanUtil.setRowKeyOffset(scan, offset);
         }
@@ -199,7 +219,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         
         int batchSize = 0;
         long ts = scan.getTimeRange().getMax();
-        HRegion region = c.getEnvironment().getRegion();
         List<Mutation> mutations = Collections.emptyList();
         boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan;
         if (isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) {
@@ -214,7 +233,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         boolean hasAny = false;
         MultiKeyValueTuple result = new MultiKeyValueTuple();
         if (logger.isInfoEnabled()) {
-        	logger.info("Starting ungrouped coprocessor scan " + scan);
+        	logger.info("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo());
         }
         long rowCount = 0;
         region.startRegionOperation();
@@ -226,6 +245,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                 // since this is an indication of whether or not there are more values after the
                 // ones returned
                 hasMore = innerScanner.nextRaw(results);
+                if(isAnalyze && stats != null) {
+                    stats.collectStatistics(results);
+                }
                 
                 if (!results.isEmpty()) {
                     if (localIndexScan && !isDelete) {
@@ -346,6 +368,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             } while (hasMore);
         } finally {
             try {
+                if (isAnalyze && stats != null) {
+                    stats.updateStatistic(region);
+                    stats.clear();
+                }
                 innerScanner.close();
             } finally {
                 region.closeRegionOperation();
@@ -408,6 +434,42 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         };
         return scanner;
     }
+    
+    @Override
+    public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+            Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
+            long earliestPutTs, InternalScanner s) throws IOException {
+        InternalScanner internalScan = s;
+        TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
+        if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)
+                && scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
+            StatisticsCollector stats = new StatisticsCollector(statsTable, c.getEnvironment().getConfiguration());
+            internalScan =
+                    stats.createCompactionScanner(c.getEnvironment().getRegion(), store, scanners, scanType, earliestPutTs, s);
+        }
+        return internalScan;
+    }
+    
+    
+    @Override
+    public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, HRegion l, HRegion r)
+            throws IOException {
+        HRegion region = e.getEnvironment().getRegion();
+        TableName table = region.getRegionInfo().getTable();
+        if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
+            try {
+                StatisticsCollector stats = new StatisticsCollector(statsTable, e.getEnvironment()
+                        .getConfiguration());
+                stats.collectStatsDuringSplit(e.getEnvironment().getConfiguration(), l, r, region);
+                stats.clear();
+            } catch (IOException ioe) { 
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Error while collecting stats during split ",ioe);
+                }
+            }
+        }
+            
+    }
 
     private HRegion getIndexRegion(RegionCoprocessorEnvironment environment) throws IOException {
         HRegion userRegion = environment.getRegion();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
index 227163e..a0ac20c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
@@ -33,6 +33,7 @@ import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,23 +112,24 @@ public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRe
         PTable table = this.tableRef.getTable();
         byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table);
         List<byte[]> gps = Lists.newArrayList();
-
-        if (table.getColumnFamilies().isEmpty()) {
-            // For sure we can get the defaultCF from the table
-            gps = table.getGuidePosts();
-        } else {
-            try {
-                if (scan.getFamilyMap().size() > 0) {
-                    if (scan.getFamilyMap().containsKey(defaultCF)) { // Favor using default CF if it's used in scan
+        if (!ScanUtil.isAnalyzeTable(scan)) {
+            if (table.getColumnFamilies().isEmpty()) {
+                // For sure we can get the defaultCF from the table
+                gps = table.getGuidePosts();
+            } else {
+                try {
+                    if (scan.getFamilyMap().size() > 0) {
+                        if (scan.getFamilyMap().containsKey(defaultCF)) { // Favor using default CF if it's used in scan
+                            gps = table.getColumnFamily(defaultCF).getGuidePosts();
+                        } else { // Otherwise, just use first CF in use by scan
+                            gps = table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts();
+                        }
+                    } else {
                         gps = table.getColumnFamily(defaultCF).getGuidePosts();
-                    } else { // Otherwise, just use first CF in use by scan
-                        gps = table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts();
                     }
-                } else {
-                    gps = table.getColumnFamily(defaultCF).getGuidePosts();
+                } catch (ColumnFamilyNotFoundException cfne) {
+                    // Alter table does this
                 }
-            } catch (ColumnFamilyNotFoundException cfne) {
-                // Alter table does this
             }
         }
         List<KeyRange> guidePosts = Lists.newArrayListWithCapacity(regions.size());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 0c1f45d..15c8ebe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -94,9 +94,6 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
     void addConnection(PhoenixConnection connection) throws SQLException;
     void removeConnection(PhoenixConnection connection) throws SQLException;
 
-    long updateStatistics(KeyRange keyRange, byte[] tableName)
-            throws SQLException;
-
     /**
      * @return the {@link KeyValueBuilder} that is valid for the locally installed version of HBase.
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index dfd56bc..25117ad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -88,9 +88,6 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRespons
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
-import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectRequest;
-import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectResponse;
-import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectService;
 import org.apache.phoenix.exception.PhoenixIOException;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -124,7 +121,6 @@ import org.apache.phoenix.schema.Sequence;
 import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
-import org.apache.phoenix.schema.stat.StatisticsCollector;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.ConfigUtil;
@@ -143,7 +139,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.protobuf.HBaseZeroCopyByteString;
-import com.google.protobuf.ServiceException;
 
 
 public class ConnectionQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices {
@@ -595,10 +590,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
                 descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, 1, null);
             }
-
-            if (!descriptor.hasCoprocessor(StatisticsCollector.class.getName())) {
-                descriptor.addCoprocessor(StatisticsCollector.class.getName(), null, 1, null);
-            }
             // TODO: better encapsulation for this
             // Since indexes can't have indexes, don't install our indexing coprocessor for indexes. Also,
             // don't install on the metadata table until we fix the TODO there.
@@ -1864,47 +1855,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             }
         }
     }
-
-    @Override
-    public long updateStatistics(final KeyRange keyRange, final byte[] tableName) throws SQLException {
-        HTableInterface ht = null;
-        try {
-            ht = this.getTable(tableName);
-            Batch.Call<StatCollectService, StatCollectResponse> callable = new Batch.Call<StatCollectService, StatCollectResponse>() {
-                ServerRpcController controller = new ServerRpcController();
-                BlockingRpcCallback<StatCollectResponse> rpcCallback = new BlockingRpcCallback<StatCollectResponse>();
-
-                @Override
-                public StatCollectResponse call(StatCollectService service) throws IOException {
-                    StatCollectRequest.Builder builder = StatCollectRequest.newBuilder();
-                    builder.setStartRow(HBaseZeroCopyByteString.wrap(keyRange.getLowerRange()));
-                    builder.setStopRow(HBaseZeroCopyByteString.wrap(keyRange.getUpperRange()));
-                    service.collectStat(controller, builder.build(), rpcCallback);
-                    if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
-                    return rpcCallback.get();
-                }
-            };
-            Map<byte[], StatCollectResponse> result = ht.coprocessorService(StatCollectService.class,
-                    keyRange.getLowerRange(), keyRange.getUpperRange(), callable);
-            StatCollectResponse next = result.values().iterator().next();
-            return next.getRowsScanned();
-        } catch (ServiceException e) {
-            throw new SQLException("Unable to update the statistics for the table " + tableName, e);
-        } catch (TableNotFoundException e) {
-            throw new SQLException("Unable to update the statistics for the table " + tableName, e);
-        } catch (Throwable e) {
-            throw new SQLException("Unable to update the statistics for the table " + tableName, e);
-        } finally {
-            if (ht != null) {
-                try {
-                    ht.close();
-                } catch (IOException e) {
-                    throw new SQLException("Unable to close the table " + tableName + " after collecting stats", e);
-                }
-            }
-        }
-    }
-
+    
     @Override
     public void clearCacheForTable(final byte[] tenantId, final byte[] schemaName, final byte[] tableName,
             final long clientTS) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 9fa415c..055bc79 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -188,12 +188,6 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
     public MetaDataMutationResult dropColumn(List<Mutation> tableMetadata, PTableType tableType) throws SQLException {
         return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, null);
     }
-
-    @Override
-    public long updateStatistics(KeyRange keyRange, byte[] tableName) throws SQLException {
-        // Noop
-        return 0;
-    }
     
     @Override
     public void clearCacheForTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index fa01f09..8bd2c61 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -226,11 +226,6 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
     public String getUserName() {
         return getDelegate().getUserName();
     }
-
-    @Override
-    public long updateStatistics(KeyRange keyRange, byte[] tableName) throws SQLException {
-        return getDelegate().updateStatistics(keyRange, tableName);
-    }
     
     @Override
     public void clearCacheForTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 1f933d8..82eb836 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -130,7 +130,6 @@ import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.PrimaryKeyConstraint;
 import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.parse.UpdateStatisticsStatement;
-import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -141,7 +140,6 @@ import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -479,20 +477,6 @@ public class MetaDataClient {
         PTable table = resolver.getTables().get(0).getTable();
         PName physicalName = table.getPhysicalName();
         byte[] tenantIdBytes = ByteUtil.EMPTY_BYTE_ARRAY;
-        KeyRange analyzeRange = KeyRange.EVERYTHING_RANGE;
-        if (connection.getTenantId() != null && table.isMultiTenant()) {
-            tenantIdBytes = connection.getTenantId().getBytes();
-            //  TODO remove this inner if once PHOENIX-1259 is fixed.
-            if (table.getBucketNum() == null && table.getIndexType() != IndexType.LOCAL) {
-                List<List<KeyRange>> tenantIdKeyRanges = Collections.singletonList(Collections.singletonList(KeyRange
-                        .getKeyRange(tenantIdBytes)));
-                byte[] lowerRange = ScanUtil.getMinKey(table.getRowKeySchema(), tenantIdKeyRanges,
-                        ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
-                byte[] upperRange = ScanUtil.getMaxKey(table.getRowKeySchema(), tenantIdKeyRanges,
-                        ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
-                analyzeRange = KeyRange.getKeyRange(lowerRange, upperRange);
-            }
-        }
         Long scn = connection.getSCN();
         // Always invalidate the cache
         long clientTS = connection.getSCN() == null ? HConstants.LATEST_TIMESTAMP : scn;
@@ -509,12 +493,26 @@ public class MetaDataClient {
             lastUpdatedTime = rs.getDate(1).getTime() - rs.getDate(2).getTime();
         }
         if (minTimeForStatsUpdate  > lastUpdatedTime) {
+            // Here create the select query.
+            String countQuery = "SELECT /*+ NO_CACHE */ count(*) FROM " + table.getName().getString();
+            PhoenixStatement statement = (PhoenixStatement) connection.createStatement();
+            QueryPlan plan = statement.compileQuery(countQuery);
+            Scan scan = plan.getContext().getScan();
+            // Add all CF in the table
+            scan.getFamilyMap().clear();
+            for (PColumnFamily family : table.getColumnFamilies()) {
+                scan.addFamily(family.getName().getBytes());
+            }
+            scan.setAttribute(BaseScannerRegionObserver.ANALYZE_TABLE, PDataType.TRUE_BYTES);
+            Cell kv = plan.iterator().next().getValue(0);
+            ImmutableBytesWritable tempPtr = plan.getContext().getTempPtr();
+            tempPtr.set(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+            // A single Cell will be returned with the count(*) - we decode that here
+            long rowCount = PDataType.LONG.getCodec().decodeLong(tempPtr, SortOrder.getDefault());
             // We need to update the stats table
-            connection.getQueryServices().updateStatistics(analyzeRange, physicalName.getBytes());
             connection.getQueryServices().clearCacheForTable(tenantIdBytes, table.getSchemaName().getBytes(),
                     table.getTableName().getBytes(), clientTS);
-            updateCache(table.getSchemaName().getString(), table.getTableName().getString(), true);
-            return new MutationState(1, connection);
+            return new  MutationState(0, connection, rowCount);
         } else {
             return new MutationState(0, connection);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
index 7552698..6b45c5e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
@@ -21,27 +21,15 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.Coprocessor;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@@ -50,13 +38,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.coprocessor.generated.StatCollectorProtos;
-import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectRequest;
-import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectResponse;
-import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectResponse.Builder;
-import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectService;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PDataType;
@@ -64,91 +46,50 @@ import org.apache.phoenix.schema.PhoenixArray;
 import org.apache.phoenix.util.TimeKeeper;
 
 import com.google.common.collect.Lists;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
+import com.google.common.collect.Maps;
 
 /**
- * An endpoint implementation that allows to collect the stats for a given region and groups the stat per family. This is also an
- * RegionObserver that collects the information on compaction also. The user would be allowed to invoke this endpoint and thus populate the
- * Phoenix stats table with the max key, min key and guide posts for the given region. The stats can be consumed by the stats associated
- * with every PTable and the same can be used to parallelize the queries
+ * A default implementation of the Statistics tracker that helps to collect stats like min key, max key and
+ * guideposts
  */
-public class StatisticsCollector extends BaseRegionObserver implements CoprocessorService, Coprocessor,
-        StatisticsTracker, StatCollectService.Interface {
+public class StatisticsCollector {
 
-    public static void addToTable(HTableDescriptor desc) throws IOException {
-        desc.addCoprocessor(StatisticsCollector.class.getName());
-    }
-
-    private Map<String, byte[]> minMap = new ConcurrentHashMap<String, byte[]>();
-    private Map<String, byte[]> maxMap = new ConcurrentHashMap<String, byte[]>();
+    private Map<String, byte[]> minMap = Maps.newHashMap();
+    private Map<String, byte[]> maxMap = Maps.newHashMap();
     private long guidepostDepth;
     private long byteCount = 0;
-    private Map<String, List<byte[]>> guidePostsMap = new ConcurrentHashMap<String, List<byte[]>>();
-    private Map<ImmutableBytesPtr, Boolean> familyMap = new ConcurrentHashMap<ImmutableBytesPtr, Boolean>();
-    private RegionCoprocessorEnvironment env;
-    protected StatisticsTable stats;
+    private Map<String, List<byte[]>> guidePostsMap = Maps.newHashMap();
+    private Map<ImmutableBytesPtr, Boolean> familyMap = Maps.newHashMap();
+    protected StatisticsTable statsTable;
     // Ensures that either analyze or compaction happens at any point of time.
-    private ReentrantLock lock = new ReentrantLock();
     private static final Log LOG = LogFactory.getLog(StatisticsCollector.class);
 
-    @Override
-    public void collectStat(RpcController controller, StatCollectRequest request, RpcCallback<StatCollectResponse> done) {
-        HRegion region = env.getRegion();
-        boolean heldLock = false;
-        int count = 0;
-        Builder newBuilder = StatCollectResponse.newBuilder();
+    public StatisticsCollector(StatisticsTable statsTable, Configuration conf) throws IOException {
+        // Get the stats table associated with the current table on which the CP is
+        // triggered
+        this.statsTable = statsTable;
+        guidepostDepth =
+                conf.getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB,
+                    QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH);
+    }
+
+    public void updateStatistic(HRegion region) {
         try {
-            if (lock.tryLock()) {
-                heldLock = true;
-                // Clear all old stats
-                clear();
-                Scan scan = createScan(env.getConfiguration());
-                if (request.hasStartRow()) {
-                    scan.setStartRow(request.getStartRow().toByteArray());
-                }
-                if (request.hasStopRow()) {
-                    scan.setStopRow(request.getStopRow().toByteArray());
-                }
-                RegionScanner scanner = null;
-                try {
-                    scanner = region.getScanner(scan);
-                    count = scanRegion(scanner, count);
-                } catch (IOException e) {
-                    LOG.error(e);
-                    ResponseConverter.setControllerException(controller, e);
-                } finally {
-                    if (scanner != null) {
-                        try {
-                            ArrayList<Mutation> mutations = new ArrayList<Mutation>();
-                            writeStatsToStatsTable(region, scanner, true, mutations, TimeKeeper.SYSTEM.getCurrentTime());
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("Committing new stats for the region " + region.getRegionInfo());
-                            }
-                            commitStats(mutations);
-                        } catch (IOException e) {
-                            LOG.error(e);
-                            ResponseConverter.setControllerException(controller, e);
-                        } finally {
-                            clear();
-                        }
-                    }
-                }
+            ArrayList<Mutation> mutations = new ArrayList<Mutation>();
+            writeStatsToStatsTable(region, true, mutations, TimeKeeper.SYSTEM.getCurrentTime());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Committing new stats for the region " + region.getRegionInfo());
             }
+            commitStats(mutations);
+        } catch (IOException e) {
+            LOG.error(e);
         } finally {
-            if (heldLock) {
-                lock.unlock();
-            }
-            newBuilder.setRowsScanned(count);
-            StatCollectResponse result = newBuilder.build();
-            done.run(result);
+            clear();
         }
     }
-
+    
     private void writeStatsToStatsTable(final HRegion region,
-            final RegionScanner scanner, boolean delete, List<Mutation> mutations, long currentTime) throws IOException {
-        scanner.close();
+            boolean delete, List<Mutation> mutations, long currentTime) throws IOException {
         try {
             // update the statistics table
             for (ImmutableBytesPtr fam : familyMap.keySet()) {
@@ -157,13 +98,13 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
                     if(LOG.isDebugEnabled()) {
                         LOG.debug("Deleting the stats for the region "+region.getRegionInfo());
                     }
-                    stats.deleteStats(tableName, region.getRegionInfo().getRegionNameAsString(), this,
+                    statsTable.deleteStats(tableName, region.getRegionInfo().getRegionNameAsString(), this,
                             Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime);
                 }
                 if(LOG.isDebugEnabled()) {
                     LOG.debug("Adding new stats for the region "+region.getRegionInfo());
                 }
-                stats.addStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this,
+                statsTable.addStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this,
                         Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime);
             }
         } catch (IOException e) {
@@ -173,7 +114,7 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
     }
 
     private void commitStats(List<Mutation> mutations) throws IOException {
-        stats.commitStats(mutations);
+        statsTable.commitStats(mutations);
     }
 
     private void deleteStatsFromStatsTable(final HRegion region, List<Mutation> mutations, long currentTime) throws IOException {
@@ -181,7 +122,7 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
             // update the statistics table
             for (ImmutableBytesPtr fam : familyMap.keySet()) {
                 String tableName = region.getRegionInfo().getTable().getNameAsString();
-                stats.deleteStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this,
+                statsTable.deleteStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this,
                         Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime);
             }
         } catch (IOException e) {
@@ -196,7 +137,7 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
         while (hasMore) {
             // Am getting duplicates here. Need to avoid that
             hasMore = scanner.next(results);
-            updateStat(results);
+            collectStatistics(results);
             count += results.size();
             results.clear();
             while (!hasMore) {
@@ -212,93 +153,42 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
      * @param results
      *            next batch of {@link KeyValue}s
      */
-    protected void updateStat(final List<Cell> results) {
+    public void collectStatistics(final List<Cell> results) {
         for (Cell c : results) {
             KeyValue kv = KeyValueUtil.ensureKeyValue(c);
             updateStatistic(kv);
         }
     }
 
-    @Override
-    public Service getService() {
-        return StatCollectorProtos.StatCollectService.newReflectiveService(this);
-    }
-
-    @Override
-    public void start(CoprocessorEnvironment env) throws IOException {
-        if (env instanceof RegionCoprocessorEnvironment) {
-            this.env = (RegionCoprocessorEnvironment)env;
-        } else {
-            throw new CoprocessorException("Must be loaded on a table region!");
-        }
-        HTableDescriptor desc = ((RegionCoprocessorEnvironment)env).getRegion().getTableDesc();
-        // Get the stats table associated with the current table on which the CP is
-        // triggered
-        stats = StatisticsTable.getStatisticsTableForCoprocessor(env, desc.getName());
-        guidepostDepth = env.getConfiguration().getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB,
-                QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH);
-    }
-
-    @Override
-    public void stop(CoprocessorEnvironment env) throws IOException {
-        if (env instanceof RegionCoprocessorEnvironment) {
-            TableName table = ((RegionCoprocessorEnvironment)env).getRegion().getRegionInfo().getTable();
-            // Close only if the table is system table
-            if(table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
-                stats.close();
-            }
-        }
-    }
-
-    @Override
-    public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-            List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s)
-            throws IOException {
+    public InternalScanner createCompactionScanner(HRegion region, Store store,
+            List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s) throws IOException {
+        // See if this is for Major compaction
         InternalScanner internalScan = s;
-        TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
-        if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
-            boolean heldLock = false;
-            try {
-                if (lock.tryLock()) {
-                    heldLock = true;
-                    // See if this is for Major compaction
-                    if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
-                        // this is the first CP accessed, so we need to just create a major
-                        // compaction scanner, just
-                        // like in the compactor
-                        if (s == null) {
-                            Scan scan = new Scan();
-                            scan.setMaxVersions(store.getFamily().getMaxVersions());
-                            long smallestReadPoint = store.getSmallestReadPoint();
-                            internalScan = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType,
-                                    smallestReadPoint, earliestPutTs);
-                        }
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Compaction scanner created for stats");
-                        }
-                        InternalScanner scanner = getInternalScanner(c, store, internalScan,
-                                store.getColumnFamilyName());
-                        if (scanner != null) {
-                            internalScan = scanner;
-                        }
-                    }
-                }
-            } finally {
-                if (heldLock) {
-                    lock.unlock();
-                }
+        if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
+            // this is the first CP accessed, so we need to just create a major
+            // compaction scanner, just
+            // like in the compactor
+            if (s == null) {
+                Scan scan = new Scan();
+                scan.setMaxVersions(store.getFamily().getMaxVersions());
+                long smallestReadPoint = store.getSmallestReadPoint();
+                internalScan = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType,
+                        smallestReadPoint, earliestPutTs);
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Compaction scanner created for stats");
+            }
+            InternalScanner scanner = getInternalScanner(region, store, internalScan, store.getColumnFamilyName());
+            if (scanner != null) {
+                internalScan = scanner;
             }
         }
         return internalScan;
     }
-    
 
-    @Override
-    public void postSplit(ObserverContext<RegionCoprocessorEnvironment> ctx, HRegion l, HRegion r) throws IOException {
-        // Invoke collectStat here
-        HRegion region = ctx.getEnvironment().getRegion();
-        TableName table = region.getRegionInfo().getTable();
-        if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
+    public void collectStatsDuringSplit(Configuration conf, HRegion l, HRegion r,
+            HRegion region) {
+        try {
             if (familyMap != null) {
                 familyMap.clear();
             }
@@ -307,14 +197,13 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
             // TODO : Try making this atomic
             List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3);
             long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
-            Configuration conf = ctx.getEnvironment().getConfiguration();
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Collecting stats for the daughter region "+l.getRegionInfo());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Collecting stats for the daughter region " + l.getRegionInfo());
             }
             collectStatsForSplitRegions(conf, l, region, true, mutations, currentTime);
             clear();
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Collecting stats for the daughter region "+r.getRegionInfo());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Collecting stats for the daughter region " + r.getRegionInfo());
             }
             collectStatsForSplitRegions(conf, r, region, false, mutations, currentTime);
             clear();
@@ -322,6 +211,9 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
                 LOG.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo());
             }
             commitStats(mutations);
+        } catch (IOException e) {
+            LOG.error("Error while capturing stats after split of region "
+                    + region.getRegionInfo().getRegionNameAsString(), e);
         }
     }
 
@@ -345,7 +237,7 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
                         }
                         deleteStatsFromStatsTable(parent, mutations, currentTime);
                     }
-                    writeStatsToStatsTable(daughter, scanner, false, mutations, currentTime);
+                    writeStatsToStatsTable(daughter, false, mutations, currentTime);
                 } catch (IOException e) {
                     LOG.error(e);
                     throw e;
@@ -363,13 +255,12 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
         return scan;
     }
 
-    protected InternalScanner getInternalScanner(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+    protected InternalScanner getInternalScanner(HRegion region, Store store,
             InternalScanner internalScan, String family) {
-        return new StatisticsScanner(this, stats, c.getEnvironment().getRegion().getRegionInfo(), internalScan,
+        return new StatisticsScanner(this, statsTable, region.getRegionInfo(), internalScan,
                 Bytes.toBytes(family));
     }
 
-    @Override
     public void clear() {
         this.maxMap.clear();
         this.minMap.clear();
@@ -377,7 +268,6 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
         this.familyMap.clear();
     }
 
-    @Override
     public void updateStatistic(KeyValue kv) {
         byte[] cf = kv.getFamily();
         familyMap.put(new ImmutableBytesPtr(cf), true);
@@ -415,19 +305,16 @@ public class StatisticsCollector extends BaseRegionObserver implements Coprocess
         }
     }
 
-    @Override
     public byte[] getMaxKey(String fam) {
         if (maxMap.get(fam) != null) { return maxMap.get(fam); }
         return null;
     }
 
-    @Override
     public byte[] getMinKey(String fam) {
         if (minMap.get(fam) != null) { return minMap.get(fam); }
         return null;
     }
 
-    @Override
     public byte[] getGuidePosts(String fam) {
         if (!guidePostsMap.isEmpty()) {
             List<byte[]> guidePosts = guidePostsMap.get(fam);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
index 09174b2..86ffca7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
@@ -32,10 +32,10 @@ public class StatisticsScanner implements InternalScanner {
     private InternalScanner delegate;
     private StatisticsTable stats;
     private HRegionInfo region;
-    private StatisticsTracker tracker;
+    private StatisticsCollector tracker;
     private byte[] family;
 
-    public StatisticsScanner(StatisticsTracker tracker, StatisticsTable stats, HRegionInfo region,
+    public StatisticsScanner(StatisticsCollector tracker, StatisticsTable stats, HRegionInfo region,
             InternalScanner delegate, byte[] family) {
         // should there be only one tracker?
         this.tracker = tracker;
@@ -109,9 +109,6 @@ public class StatisticsScanner implements InternalScanner {
             delegate.close();
         } catch (IOException e) {
             LOG.error("Error while closing the scanner");
-            // TODO : We should throw the exception
-            /*if (toThrow == null) { throw e; }
-            throw MultipleIOException.createIOException(Lists.newArrayList(toThrow, e));*/
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
index fcbbee9..e92d61e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
@@ -25,15 +25,11 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PDataType;
@@ -46,38 +42,34 @@ public class StatisticsTable implements Closeable {
     /** Map of the currently open statistics tables */
     private static final Map<String, StatisticsTable> tableMap = new HashMap<String, StatisticsTable>();
     /**
-     * @param env
-     *            Environment wherein the coprocessor is attempting to update the stats table.
+     * @param Configuration
+     *            Configruation to update the stats table.
      * @param primaryTableName
      *            name of the primary table on which we should collect stats
      * @return the {@link StatisticsTable} for the given primary table.
      * @throws IOException
      *             if the table cannot be created due to an underlying HTable creation error
      */
-    public synchronized static StatisticsTable getStatisticsTableForCoprocessor(CoprocessorEnvironment env,
-            byte[] primaryTableName) throws IOException {
+    public synchronized static StatisticsTable getStatisticsTableForCoprocessor(Configuration conf,
+            String primaryTableName) throws IOException {
         StatisticsTable table = tableMap.get(primaryTableName);
         if (table == null) {
             // Map the statics table and the table with which the statistics is
             // associated. This is a workaround
-            HTablePool pool = new HTablePool(env.getConfiguration(), 1);
+            HTablePool pool = new HTablePool(conf,1);
+            //HTable hTable = new HTable(conf, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
             HTableInterface hTable = pool.getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
-            table = new StatisticsTable(hTable, primaryTableName);
-            tableMap.put(Bytes.toString(primaryTableName), table);
+            //h.setAutoFlushTo(true);
+            table = new StatisticsTable(hTable);
+            tableMap.put(primaryTableName, table);
         }
         return table;
     }
 
     private final HTableInterface statisticsTable;
-    private final byte[] sourceTableName;
 
-    private StatisticsTable(HTableInterface statsTable, byte[] sourceTableName) {
+    public StatisticsTable(HTableInterface statsTable) {
         this.statisticsTable = statsTable;
-        this.sourceTableName = sourceTableName;
-    }
-
-    public StatisticsTable(Configuration conf, HTableDescriptor source) throws IOException {
-        this(new HTable(conf, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME), source.getName());
     }
 
     /**
@@ -104,7 +96,7 @@ public class StatisticsTable implements Closeable {
      *             if we fail to do any of the puts. Any single failure will prevent any future attempts for the remaining list of stats to
      *             update
      */
-    public void addStats(String tableName, String regionName, StatisticsTracker tracker, String fam,
+    public void addStats(String tableName, String regionName, StatisticsCollector tracker, String fam,
             List<Mutation> mutations, long currentTime) throws IOException {
         if (tracker == null) { return; }
 
@@ -119,13 +111,15 @@ public class StatisticsTable implements Closeable {
     public void commitStats(List<Mutation> mutations) throws IOException {
         Object[] res = new Object[mutations.size()];
         try {
-            statisticsTable.batch(mutations, res);
+            if (mutations.size() > 0) {
+                statisticsTable.batch(mutations, res);
+            }
         } catch (InterruptedException e) {
             throw new IOException("Exception while adding deletes and puts");
         }
     }
 
-    private void formStatsUpdateMutation(StatisticsTracker tracker, String fam, List<Mutation> mutations,
+    private void formStatsUpdateMutation(StatisticsCollector tracker, String fam, List<Mutation> mutations,
             long currentTime, byte[] prefix) {
         Put put = new Put(prefix, currentTime);
         if (tracker.getGuidePosts(fam) != null) {
@@ -147,22 +141,11 @@ public class StatisticsTable implements Closeable {
         mutations.add(put);
     }
     
-    public void deleteStats(String tableName, String regionName, StatisticsTracker tracker, String fam,
+    public void deleteStats(String tableName, String regionName, StatisticsCollector tracker, String fam,
             List<Mutation> mutations, long currentTime)
             throws IOException {
         byte[] prefix = StatisticsUtils.getRowKey(PDataType.VARCHAR.toBytes(tableName), PDataType.VARCHAR.toBytes(fam),
                 PDataType.VARCHAR.toBytes(regionName));
         mutations.add(new Delete(prefix, currentTime - 1));
     }
-
-    /**
-     * @return the underlying {@link HTableInterface} to which this table is writing
-     */
-    HTableInterface getUnderlyingTable() {
-        return statisticsTable;
-    }
-
-    byte[] getSourceTableName() {
-        return this.sourceTableName;
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java
deleted file mode 100644
index e1754f3..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.schema.stat;
-
-import org.apache.hadoop.hbase.KeyValue;
-
-/**
- * Track a statistic for the column on a given region
- */
-public interface StatisticsTracker {
-
-    /**
-     * Reset the statistic after the completion of the compaction
-     */
-    public void clear();
-
-    /**
-     * Update the current statistics with the next {@link KeyValue} to be written
-     * 
-     * @param kv
-     *            next {@link KeyValue} to be written.
-     */
-    public void updateStatistic(KeyValue kv);
-
-    /**
-     * Return the max key of the family
-     * @param fam
-     * @return
-     */
-    public byte[] getMaxKey(String fam);
-
-    /**
-     * Return the min key of the family
-     * 
-     * @param fam
-     * @return
-     */
-    public byte[] getMinKey(String fam);
-
-    /**
-     * Return the guide posts of the family
-     * 
-     * @param fam
-     * @return
-     */
-    public byte[] getGuidePosts(String fam);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 42b20fe..daef1c3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -552,4 +552,8 @@ public class ScanUtil {
 
         return offset + slotPosition;
     }
+
+    public static boolean isAnalyzeTable(Scan scan) {
+        return scan.getAttribute((BaseScannerRegionObserver.ANALYZE_TABLE)) != null;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index a65ca77..6e29c69 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -51,7 +51,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
     private static final String DEFAULT_WAL_EDIT_CODEC = IndexedWALEditCodec.class.getName();
     public static final long DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE =  1024L*1024L*4L; // 4 Mb
     public static final long DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE =  1024L*1024L*2L; // 2 Mb
-    public static final long DEFAULT_HISTOGRAM_BYTE_DEPTH = 20;
+    public static final long DEFAULT_HISTOGRAM_BYTE_DEPTH = 2000;
 
     
     public QueryServicesTestImpl(ReadOnlyProps defaultProps) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b046100/phoenix-protocol/src/main/StatisticsCollect.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/StatisticsCollect.proto b/phoenix-protocol/src/main/StatisticsCollect.proto
deleted file mode 100644
index c80a756..0000000
--- a/phoenix-protocol/src/main/StatisticsCollect.proto
+++ /dev/null
@@ -1,20 +0,0 @@
-option java_package = "org.apache.phoenix.coprocessor.generated";
-option java_outer_classname = "StatCollectorProtos";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-option optimize_for = SPEED;
-
-
-message StatCollectRequest {
-  optional bytes startRow = 1;
-  optional bytes stopRow = 2;
-}
-
-message StatCollectResponse {
-  required uint64 rowsScanned = 1;
-}
-
-service StatCollectService {
-  rpc collectStat(StatCollectRequest)
-    returns (StatCollectResponse);
-}
\ No newline at end of file