You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/09/20 01:43:29 UTC

[2/5] PHOENIX-180 Use stats to guide query parallelization (Ramkrishna S Vasudevan)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 7e1e0a5..987d200 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -79,6 +79,8 @@ import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl;
 import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheForTableRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheForTableResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateTableRequest;
@@ -90,6 +92,9 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRespons
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
+import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectRequest;
+import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectResponse;
+import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectService;
 import org.apache.phoenix.exception.PhoenixIOException;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -123,6 +128,7 @@ import org.apache.phoenix.schema.Sequence;
 import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.stat.StatisticsCollector;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.ConfigUtil;
@@ -141,6 +147,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.protobuf.HBaseZeroCopyByteString;
+import com.google.protobuf.ServiceException;
 
 
 public class ConnectionQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices {
@@ -153,7 +160,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     private final ReadOnlyProps props;
     private final String userName;
     private final ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices> childServices;
-    private final StatsManager statsManager;
     
     // Cache the latest meta data here for future connections
     // writes guarded by "latestMetaDataLock"
@@ -211,10 +217,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         // TODO: should we track connection wide memory usage or just org-wide usage?
         // If connection-wide, create a MemoryManager here, otherwise just use the one from the delegate
         this.childServices = new ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices>(INITIAL_CHILD_SERVICES_CAPACITY);
-        int statsUpdateFrequencyMs = this.getProps().getInt(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, QueryServicesOptions.DEFAULT_STATS_UPDATE_FREQ_MS);
-        int maxStatsAgeMs = this.getProps().getInt(QueryServices.MAX_STATS_AGE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_STATS_AGE_MS);
-        this.statsManager = new StatsManagerImpl(this, statsUpdateFrequencyMs, maxStatsAgeMs);
-        
         // find the HBase version and use that to determine the KeyValueBuilder that should be used
         String hbaseVersion = VersionInfo.getVersion();
         this.kvBuilder = KeyValueBuilder.get(hbaseVersion);
@@ -242,11 +244,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     }
 
     @Override
-    public StatsManager getStatsManager() {
-        return this.statsManager;
-    }
-    
-    @Override
     public HTableInterface getTable(byte[] tableName) throws SQLException {
         try {
             return HBaseFactoryProvider.getHTableFactory().getTable(tableName, connection, getExecutor());
@@ -307,42 +304,29 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 sqlE = e;
             } finally {
                 try {
-                    // Clear any client-side caches.  
-                    statsManager.clearStats();
-                } catch (SQLException e) {
+                    childServices.clear();
+                    synchronized (latestMetaDataLock) {
+                        latestMetaData = null;
+                        latestMetaDataLock.notifyAll();
+                    }
+                    if (connection != null) connection.close();
+                } catch (IOException e) {
                     if (sqlE == null) {
-                        sqlE = e;
+                        sqlE = ServerUtil.parseServerException(e);
                     } else {
-                        sqlE.setNextException(e);
+                        sqlE.setNextException(ServerUtil.parseServerException(e));
                     }
                 } finally {
                     try {
-                        childServices.clear();
-                        synchronized (latestMetaDataLock) {
-                            latestMetaData = null;
-                            latestMetaDataLock.notifyAll();
-                        }
-                        if (connection != null) connection.close();
-                    } catch (IOException e) {
+                        super.close();
+                    } catch (SQLException e) {
                         if (sqlE == null) {
-                            sqlE = ServerUtil.parseServerException(e);
+                            sqlE = e;
                         } else {
-                            sqlE.setNextException(ServerUtil.parseServerException(e));
+                            sqlE.setNextException(e);
                         }
                     } finally {
-                        try {
-                            super.close();
-                        } catch (SQLException e) {
-                            if (sqlE == null) {
-                                sqlE = e;
-                            } else {
-                                sqlE.setNextException(e);
-                            }
-                        } finally {
-                            if (sqlE != null) {
-                                throw sqlE;
-                            }
-                        }
+                        if (sqlE != null) { throw sqlE; }
                     }
                 }
             }
@@ -615,7 +599,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
                 descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, 1, null);
             }
-            
+
+            if (!descriptor.hasCoprocessor(StatisticsCollector.class.getName())) {
+                descriptor.addCoprocessor(StatisticsCollector.class.getName(), null, 1, null);
+            }
             // TODO: better encapsulation for this
             // Since indexes can't have indexes, don't install our indexing coprocessor for indexes. Also,
             // don't install on the metadata table until we fix the TODO there.
@@ -1531,19 +1518,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                             }
                             try {
                                 metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_SEQUENCE_METADATA);
+                                
+                                // TODO : Get this from a configuration
+                                metaConnection.createStatement().executeUpdate(
+                                        QueryConstants.CREATE_STATS_TABLE_METADATA);
                             } catch (NewerTableAlreadyExistsException ignore) {
-                                // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed timestamp.
-                                // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
                             } catch (TableAlreadyExistsException ignore) {
-                                // This will occur if we have an older SYSTEM.SEQUENCE, so we need to update it to include
-                                // any new columns we've added.
-                                String newColumns =
-                                        MIN_VALUE + " " + PDataType.LONG.getSqlTypeName() + ", " 
-                                                + MAX_VALUE + " " + PDataType.LONG.getSqlTypeName() + ", " 
-                                                + CYCLE_FLAG + " " + PDataType.BOOLEAN.getSqlTypeName() + ", " 
-                                                + LIMIT_REACHED_FLAG + " " + PDataType.BOOLEAN.getSqlTypeName();
-                                metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME, 
-                                    MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, newColumns); 
                             }
                         } catch (Exception e) {
                             if (e instanceof SQLException) {
@@ -1873,6 +1853,92 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         }
     }
 
+    @Override
+    public long updateStatistics(final KeyRange keyRange, final byte[] tableName) throws SQLException {
+        HTableInterface ht = null;
+        try {
+            ht = this.getTable(tableName);
+            Batch.Call<StatCollectService, StatCollectResponse> callable = new Batch.Call<StatCollectService, StatCollectResponse>() {
+                ServerRpcController controller = new ServerRpcController();
+                BlockingRpcCallback<StatCollectResponse> rpcCallback = new BlockingRpcCallback<StatCollectResponse>();
+
+                @Override
+                public StatCollectResponse call(StatCollectService service) throws IOException {
+                    StatCollectRequest.Builder builder = StatCollectRequest.newBuilder();
+                    builder.setStartRow(HBaseZeroCopyByteString.wrap(keyRange.getLowerRange()));
+                    builder.setStopRow(HBaseZeroCopyByteString.wrap(keyRange.getUpperRange()));
+                    service.collectStat(controller, builder.build(), rpcCallback);
+                    if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
+                    return rpcCallback.get();
+                }
+            };
+            Map<byte[], StatCollectResponse> result = ht.coprocessorService(StatCollectService.class,
+                    keyRange.getLowerRange(), keyRange.getUpperRange(), callable);
+            StatCollectResponse next = result.values().iterator().next();
+            return next.getRowsScanned();
+        } catch (ServiceException e) {
+            throw new SQLException("Unable to update the statistics for the table " + tableName, e);
+        } catch (TableNotFoundException e) {
+            throw new SQLException("Unable to update the statistics for the table " + tableName, e);
+        } catch (Throwable e) {
+            throw new SQLException("Unable to update the statistics for the table " + tableName, e);
+        } finally {
+            if (ht != null) {
+                try {
+                    ht.close();
+                } catch (IOException e) {
+                    throw new SQLException("Unable to close the table " + tableName + " after collecting stats", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void clearCacheForTable(final byte[] tenantId, final byte[] schemaName, final byte[] tableName,
+            final long clientTS) throws SQLException {
+        // clear the meta data cache for the table here
+        try {
+            SQLException sqlE = null;
+            HTableInterface htable = this.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+            try {
+                htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
+                        new Batch.Call<MetaDataService, ClearCacheForTableResponse>() {
+                            @Override
+                            public ClearCacheForTableResponse call(MetaDataService instance) throws IOException {
+                                ServerRpcController controller = new ServerRpcController();
+                                BlockingRpcCallback<ClearCacheForTableResponse> rpcCallback = new BlockingRpcCallback<ClearCacheForTableResponse>();
+                                ClearCacheForTableRequest.Builder builder = ClearCacheForTableRequest.newBuilder();
+                                builder.setTenantId(HBaseZeroCopyByteString.wrap(tenantId));
+                                builder.setTableName(HBaseZeroCopyByteString.wrap(tableName));
+                                builder.setSchemaName(HBaseZeroCopyByteString.wrap(schemaName));
+                                builder.setClientTimestamp(clientTS);
+                                instance.clearCacheForTable(controller, builder.build(), rpcCallback);
+                                if (controller.getFailedOn() != null) { throw controller.getFailedOn(); }
+                                return rpcCallback.get();
+                            }
+                        });
+            } catch (IOException e) {
+                throw ServerUtil.parseServerException(e);
+            } catch (Throwable e) {
+                sqlE = new SQLException(e);
+            } finally {
+                try {
+                    htable.close();
+                } catch (IOException e) {
+                    if (sqlE == null) {
+                        sqlE = ServerUtil.parseServerException(e);
+                    } else {
+                        sqlE.setNextException(ServerUtil.parseServerException(e));
+                    }
+                } finally {
+                    if (sqlE != null) { throw sqlE; }
+                }
+            }
+        } catch (Exception e) {
+            throw new SQLException(ServerUtil.parseServerException(e));
+        }
+    }
+
     @SuppressWarnings("deprecation")
     @Override
     public void returnSequences(List<SequenceKey> keys, long timestamp, SQLException[] exceptions) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 61c2ef8..9fa415c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -68,7 +68,6 @@ import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.SequenceNotFoundException;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
-import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -121,30 +120,6 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
     }
 
     @Override
-    public StatsManager getStatsManager() {
-        return new StatsManager() {
-
-            @Override
-            public byte[] getMinKey(TableRef table) {
-                return HConstants.EMPTY_START_ROW;
-            }
-
-            @Override
-            public byte[] getMaxKey(TableRef table) {
-                return HConstants.EMPTY_END_ROW;
-            }
-
-            @Override
-            public void updateStats(TableRef table) throws SQLException {
-            }
-
-            @Override
-            public void clearStats() throws SQLException {
-            }
-        };
-    }
-
-    @Override
     public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException {
         return Collections.singletonList(new HRegionLocation(
             new HRegionInfo(TableName.valueOf(tableName), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW),
@@ -214,6 +189,15 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
         return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, null);
     }
 
+    @Override
+    public long updateStatistics(KeyRange keyRange, byte[] tableName) throws SQLException {
+        // Noop
+        return 0;
+    }
+    
+    @Override
+    public void clearCacheForTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS)
+            throws SQLException {}
     // TODO: share this with ConnectionQueryServicesImpl
     @Override
     public void init(String url, Properties props) throws SQLException {
@@ -249,6 +233,15 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
                     // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed timestamp.
                     // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp.
                 }
+                try {
+                    // TODO : Get this from a configuration
+                    metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_STATS_TABLE_METADATA);
+                } catch (NewerTableAlreadyExistsException ignore) {
+                    // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed
+                    // timestamp.
+                    // A TableAlreadyExistsException is not thrown, since the table only exists *after* this
+                    // fixed timestamp.
+                }
             } catch (SQLException e) {
                 sqlE = e;
             } finally {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 0b6a399..fa01f09 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -65,11 +65,6 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
     }
 
     @Override
-    public StatsManager getStatsManager() {
-        return getDelegate().getStatsManager();
-    }
-
-    @Override
     public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException {
         return getDelegate().getAllTableRegions(tableName);
     }
@@ -231,4 +226,15 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
     public String getUserName() {
         return getDelegate().getUserName();
     }
+
+    @Override
+    public long updateStatistics(KeyRange keyRange, byte[] tableName) throws SQLException {
+        return getDelegate().updateStatistics(keyRange, tableName);
+    }
+    
+    @Override
+    public void clearCacheForTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS)
+            throws SQLException {
+        getDelegate().clearCacheForTable(tenantId, schemaName, tableName, clientTS);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index da2d487..bbc653e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -34,24 +34,31 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_AUTOINCREMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NULLABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_KEY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_KEY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_PREC_RADIX;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REF_GENERATION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REGION_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REMARKS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCOPE_CATALOG;
@@ -67,6 +74,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATETIME_SUB;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
@@ -78,8 +86,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
+
 import java.math.BigDecimal;
 
 import org.apache.hadoop.hbase.HConstants;
@@ -111,6 +118,7 @@ public interface QueryConstants {
     
     public enum JoinType {INNER, LEFT_OUTER}
     public final static String SYSTEM_SCHEMA_NAME = "SYSTEM";
+    public final static byte[] SYSTEM_SCHEMA_NAME_BYTES = Bytes.toBytes(SYSTEM_SCHEMA_NAME);
     public final static String PHOENIX_METADATA = "table";
 
     public final static PName SINGLE_COLUMN_NAME = PNameFactory.newNormalizedName("s");
@@ -220,6 +228,22 @@ public interface QueryConstants {
             HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
             HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n";
     
+    public static final String CREATE_STATS_TABLE_METADATA = 
+            "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_STATS_TABLE + "\"(\n" +
+            // PK columns
+            PHYSICAL_NAME  + " VARCHAR NOT NULL," +
+            COLUMN_FAMILY + " VARCHAR," +
+            REGION_NAME + " VARCHAR," +
+            GUIDE_POSTS  + " VARBINARY[]," +
+            MIN_KEY + " VARBINARY," + 
+            MAX_KEY + " VARBINARY," +
+            LAST_STATS_UPDATE_TIME+ " DATE, "+
+            "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY ("
+            + PHYSICAL_NAME + ","
+            + COLUMN_FAMILY + ","+ REGION_NAME+"))\n" +
+            HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
+            HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n";
+            
     public static final String CREATE_SEQUENCE_METADATA =
             "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + TYPE_SEQUENCE + "\"(\n" +                                    
             TENANT_ID + " VARCHAR NULL," +

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 9594f33..fd4152b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -68,18 +68,14 @@ public interface QueryServices extends SQLCloseable {
     public static final String MAX_MEMORY_WAIT_MS_ATTRIB = "phoenix.query.maxGlobalMemoryWaitMs";
     public static final String MAX_TENANT_MEMORY_PERC_ATTRIB = "phoenix.query.maxTenantMemoryPercentage";
     public static final String MAX_SERVER_CACHE_SIZE_ATTRIB = "phoenix.query.maxServerCacheBytes";
-    public static final String TARGET_QUERY_CONCURRENCY_ATTRIB = "phoenix.query.targetConcurrency";
-    public static final String MAX_QUERY_CONCURRENCY_ATTRIB = "phoenix.query.maxConcurrency";
     public static final String DATE_FORMAT_ATTRIB = "phoenix.query.dateFormat";
     public static final String NUMBER_FORMAT_ATTRIB = "phoenix.query.numberFormat";
     public static final String STATS_UPDATE_FREQ_MS_ATTRIB = "phoenix.query.statsUpdateFrequency";
-    public static final String MAX_STATS_AGE_MS_ATTRIB = "phoenix.query.maxStatsAge";
     public static final String CALL_QUEUE_ROUND_ROBIN_ATTRIB = "ipc.server.callqueue.roundrobin";
     public static final String SCAN_CACHE_SIZE_ATTRIB = "hbase.client.scanner.caching";
     public static final String MAX_MUTATION_SIZE_ATTRIB = "phoenix.mutate.maxSize";
     public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize";
     public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCacheTimeToLiveMs";
-    public static final String MAX_INTRA_REGION_PARALLELIZATION_ATTRIB  = "phoenix.query.maxIntraRegionParallelization";
     public static final String ROW_KEY_ORDER_SALTED_TABLE_ATTRIB  = "phoenix.query.rowKeyOrderSaltedTable";
     public static final String USE_INDEXES_ATTRIB  = "phoenix.query.useIndexes";
     public static final String IMMUTABLE_ROWS_ATTRIB  = "phoenix.mutate.immutableRows";
@@ -134,7 +130,9 @@ public interface QueryServices extends SQLCloseable {
     public static final String TRACING_STATS_TABLE_NAME_ATTRIB = "phoenix.trace.statsTableName";
 
     public static final String USE_REVERSE_SCAN_ATTRIB = "phoenix.query.useReverseScan";
-    
+
+    public static final String HISTOGRAM_BYTE_DEPTH_CONF_KEY = "phoenix.guidepost.width";
+
     /**
      * Get executor service used for parallel scans
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 605d44c..a0bc4da 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -24,16 +24,15 @@ import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.HISTOGRAM_BYTE_DEPTH_CONF_KEY;
 import static org.apache.phoenix.query.QueryServices.IMMUTABLE_ROWS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.KEEP_ALIVE_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MASTER_INFO_PORT_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.MAX_INTRA_REGION_PARALLELIZATION_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_PERC_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_WAIT_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_MUTATION_SIZE_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.MAX_QUERY_CONCURRENCY_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_SERVER_METADATA_CACHE_SIZE_ATTRIB;
@@ -51,7 +50,6 @@ import static org.apache.phoenix.query.QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.SPOOL_DIRECTORY;
 import static org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.TARGET_QUERY_CONCURRENCY_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
@@ -75,7 +73,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
 public class QueryServicesOptions {
 	public static final int DEFAULT_KEEP_ALIVE_MS = 60000;
 	public static final int DEFAULT_THREAD_POOL_SIZE = 128;
-	public static final int DEFAULT_QUEUE_SIZE = 500;
+	public static final int DEFAULT_QUEUE_SIZE = 5000;
 	public static final int DEFAULT_THREAD_TIMEOUT_MS = 600000; // 10min
 	public static final int DEFAULT_SPOOL_THRESHOLD_BYTES = 1024 * 1024 * 20; // 20m
     public static final String DEFAULT_SPOOL_DIRECTORY = "/tmp";
@@ -146,6 +144,8 @@ public class QueryServicesOptions {
     public static final String DEFAULT_TRACING_STATS_TABLE_NAME = "SYSTEM.TRACING_STATS";
     public static final String DEFAULT_TRACING_FREQ = Tracing.Frequency.NEVER.getKey();
     public static final double DEFAULT_TRACING_PROBABILITY_THRESHOLD = 0.05;
+    public static final long DEFAULT_HISTOGRAM_BYTE_DEPTH = 1024 * 1024;
+    
     
     public static final boolean DEFAULT_USE_REVERSE_SCAN = true;
 
@@ -186,13 +186,10 @@ public class QueryServicesOptions {
             .setIfUnset(MAX_TENANT_MEMORY_PERC_ATTRIB, DEFAULT_MAX_TENANT_MEMORY_PERC)
             .setIfUnset(MAX_SERVER_CACHE_SIZE_ATTRIB, DEFAULT_MAX_SERVER_CACHE_SIZE)
             .setIfUnset(SCAN_CACHE_SIZE_ATTRIB, DEFAULT_SCAN_CACHE_SIZE)
-            .setIfUnset(TARGET_QUERY_CONCURRENCY_ATTRIB, DEFAULT_TARGET_QUERY_CONCURRENCY)
-            .setIfUnset(MAX_QUERY_CONCURRENCY_ATTRIB, DEFAULT_MAX_QUERY_CONCURRENCY)
             .setIfUnset(DATE_FORMAT_ATTRIB, DEFAULT_DATE_FORMAT)
             .setIfUnset(STATS_UPDATE_FREQ_MS_ATTRIB, DEFAULT_STATS_UPDATE_FREQ_MS)
             .setIfUnset(CALL_QUEUE_ROUND_ROBIN_ATTRIB, DEFAULT_CALL_QUEUE_ROUND_ROBIN)
             .setIfUnset(MAX_MUTATION_SIZE_ATTRIB, DEFAULT_MAX_MUTATION_SIZE)
-            .setIfUnset(MAX_INTRA_REGION_PARALLELIZATION_ATTRIB, DEFAULT_MAX_INTRA_REGION_PARALLELIZATION)
             .setIfUnset(ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, DEFAULT_ROW_KEY_ORDER_SALTED_TABLE)
             .setIfUnset(USE_INDEXES_ATTRIB, DEFAULT_USE_INDEXES)
             .setIfUnset(IMMUTABLE_ROWS_ATTRIB, DEFAULT_IMMUTABLE_ROWS)
@@ -204,6 +201,7 @@ public class QueryServicesOptions {
             .setIfUnset(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES)
             .setIfUnset(SEQUENCE_CACHE_SIZE_ATTRIB, DEFAULT_SEQUENCE_CACHE_SIZE)
             .setIfUnset(SCAN_RESULT_CHUNK_SIZE, DEFAULT_SCAN_RESULT_CHUNK_SIZE)
+            .setIfUnset(HISTOGRAM_BYTE_DEPTH_CONF_KEY, DEFAULT_HISTOGRAM_BYTE_DEPTH);
             ;
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user set
@@ -294,14 +292,6 @@ public class QueryServicesOptions {
         return set(SCAN_CACHE_SIZE_ATTRIB, scanFetchSize);
     }
     
-    public QueryServicesOptions setMaxQueryConcurrency(int maxQueryConcurrency) {
-        return set(MAX_QUERY_CONCURRENCY_ATTRIB, maxQueryConcurrency);
-    }
-
-    public QueryServicesOptions setTargetQueryConcurrency(int targetQueryConcurrency) {
-        return set(TARGET_QUERY_CONCURRENCY_ATTRIB, targetQueryConcurrency);
-    }
-    
     public QueryServicesOptions setDateFormat(String dateFormat) {
         return set(DATE_FORMAT_ATTRIB, dateFormat);
     }
@@ -310,6 +300,10 @@ public class QueryServicesOptions {
         return set(STATS_UPDATE_FREQ_MS_ATTRIB, frequencyMs);
     }
     
+    public QueryServicesOptions setHistogramDepthBytes(int depth) {
+        return set(HISTOGRAM_BYTE_DEPTH_CONF_KEY, depth);
+    }
+    
     public QueryServicesOptions setCallQueueRoundRobin(boolean isRoundRobin) {
         return set(CALL_QUEUE_PRODUCER_ATTRIB_NAME, isRoundRobin);
     }
@@ -322,10 +316,6 @@ public class QueryServicesOptions {
         return set(MUTATE_BATCH_SIZE_ATTRIB, mutateBatchSize);
     }
     
-    public QueryServicesOptions setMaxIntraRegionParallelization(int maxIntraRegionParallelization) {
-        return set(MAX_INTRA_REGION_PARALLELIZATION_ATTRIB, maxIntraRegionParallelization);
-    }
-    
     public QueryServicesOptions setRowKeyOrderSaltedTable(boolean rowKeyOrderSaltedTable) {
         return set(ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, rowKeyOrderSaltedTable);
     }
@@ -394,11 +384,7 @@ public class QueryServicesOptions {
     public int getMutateBatchSize() {
         return config.getInt(MUTATE_BATCH_SIZE_ATTRIB, DEFAULT_MUTATE_BATCH_SIZE);
     }
-    
-    public int getMaxIntraRegionParallelization() {
-        return config.getInt(MAX_INTRA_REGION_PARALLELIZATION_ATTRIB, DEFAULT_MAX_INTRA_REGION_PARALLELIZATION);
-    }
-    
+
     public boolean isUseIndexes() {
         return config.getBoolean(USE_INDEXES_ATTRIB, DEFAULT_USE_INDEXES);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 56b6604..eafac8b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -37,15 +37,19 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REGION_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
@@ -60,8 +64,10 @@ import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADAT
 import static org.apache.phoenix.schema.PDataType.VARCHAR;
 
 import java.io.IOException;
+import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
@@ -123,6 +129,8 @@ import org.apache.phoenix.parse.NamedTableNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.PrimaryKeyConstraint;
 import org.apache.phoenix.parse.TableName;
+import org.apache.phoenix.parse.UpdateStatisticsStatement;
+import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -133,6 +141,7 @@ import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -462,6 +471,55 @@ public class MetaDataClient {
         return connection.getQueryServices().updateData(plan);
     }
 
+    public MutationState updateStatistics(UpdateStatisticsStatement updateStatisticsStmt) throws SQLException {
+        // Check before updating the stats if we have reached the configured time to reupdate the stats once again
+        long minTimeForStatsUpdate = connection.getQueryServices().getProps()
+                .getLong(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, QueryServicesOptions.DEFAULT_STATS_UPDATE_FREQ_MS);
+        ColumnResolver resolver = FromCompiler.getResolver(updateStatisticsStmt, connection);
+        PTable table = resolver.getTables().get(0).getTable();
+        PName physicalName = table.getPhysicalName();
+        byte[] tenantIdBytes = ByteUtil.EMPTY_BYTE_ARRAY;
+        KeyRange analyzeRange = KeyRange.EVERYTHING_RANGE;
+        if (connection.getTenantId() != null && table.isMultiTenant()) {
+            tenantIdBytes = connection.getTenantId().getBytes();
+            //  TODO remove this inner if once PHOENIX-1259 is fixed.
+            if (table.getBucketNum() == null && table.getIndexType() != IndexType.LOCAL) {
+                List<List<KeyRange>> tenantIdKeyRanges = Collections.singletonList(Collections.singletonList(KeyRange
+                        .getKeyRange(tenantIdBytes)));
+                byte[] lowerRange = ScanUtil.getMinKey(table.getRowKeySchema(), tenantIdKeyRanges,
+                        ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
+                byte[] upperRange = ScanUtil.getMaxKey(table.getRowKeySchema(), tenantIdKeyRanges,
+                        ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
+                analyzeRange = KeyRange.getKeyRange(lowerRange, upperRange);
+            }
+        }
+        Long scn = connection.getSCN();
+        // Always invalidate the cache
+        long clientTS = connection.getSCN() == null ? HConstants.LATEST_TIMESTAMP : scn;
+        connection.getQueryServices().clearCacheForTable(tenantIdBytes, table.getSchemaName().getBytes(),
+                table.getTableName().getBytes(), clientTS);
+        // Clear the cache also. So that for cases like major compaction also we would be able to use the stats
+        updateCache(table.getSchemaName().getString(), table.getTableName().getString(), true);
+        String query = "SELECT CURRENT_DATE(),"+ LAST_STATS_UPDATE_TIME + " FROM " + SYSTEM_CATALOG_SCHEMA
+                + "." + SYSTEM_STATS_TABLE + " WHERE " + PHYSICAL_NAME + "='" + physicalName.getString() + "' AND " + COLUMN_FAMILY
+                + " IS NULL AND " + REGION_NAME + " IS NULL";
+        ResultSet rs = connection.createStatement().executeQuery(query);
+        long lastUpdatedTime = 0;
+        if (rs.next() && rs.getDate(2) != null) {
+            lastUpdatedTime = rs.getDate(1).getTime() - rs.getDate(2).getTime();
+        }
+        if (minTimeForStatsUpdate  > lastUpdatedTime) {
+            // We need to update the stats table
+            connection.getQueryServices().updateStatistics(analyzeRange, physicalName.getBytes());
+            connection.getQueryServices().clearCacheForTable(tenantIdBytes, table.getSchemaName().getBytes(),
+                    table.getTableName().getBytes(), clientTS);
+            updateCache(table.getSchemaName().getString(), table.getTableName().getString(), true);
+            return new MutationState(1, connection);
+        } else {
+            return new MutationState(0, connection);
+        }
+    }
+
     private MutationState buildIndexAtTimeStamp(PTable index, NamedTableNode dataTableNode) throws SQLException {
         // If our connection is at a fixed point-in-time, we need to open a new
         // connection so that our new index table is visible.
@@ -1440,7 +1498,8 @@ public class MetaDataClient {
         return dropTable(schemaName, tableName, parentTableName, PTableType.INDEX, statement.ifExists(), false);
     }
 
-    private MutationState dropTable(String schemaName, String tableName, String parentTableName, PTableType tableType, boolean ifExists, boolean cascade) throws SQLException {
+    private MutationState dropTable(String schemaName, String tableName, String parentTableName, PTableType tableType,
+            boolean ifExists, boolean cascade) throws SQLException {
         connection.rollback();
         boolean wasAutoCommit = connection.getAutoCommit();
         try {
@@ -1465,80 +1524,118 @@ public class MetaDataClient {
 
             MetaDataMutationResult result = connection.getQueryServices().dropTable(tableMetaData, tableType, cascade);
             MutationCode code = result.getMutationCode();
-            switch(code) {
-                case TABLE_NOT_FOUND:
-                    if (!ifExists) {
-                        throw new TableNotFoundException(schemaName, tableName);
-                    }
-                    break;
-                case NEWER_TABLE_FOUND:
-                    throw new NewerTableAlreadyExistsException(schemaName, tableName);
-                case UNALLOWED_TABLE_MUTATION:
-                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_MUTATE_TABLE)
-                        .setSchemaName(schemaName).setTableName(tableName).build().buildException();
-                default:
-                    connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName), parentTableName, result.getMutationTime());
-                                        
-                    if (result.getTable() != null && tableType != PTableType.VIEW) {
-                        connection.setAutoCommit(true);
-                        PTable table = result.getTable();
-                        boolean dropMetaData = result.getTable().getViewIndexId() == null && 
-                                connection.getQueryServices().getProps().getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
-                        long ts = (scn == null ? result.getMutationTime() : scn);
-                        // Create empty table and schema - they're only used to get the name from
-                        // PName name, PTableType type, long timeStamp, long sequenceNumber, List<PColumn> columns
-                        List<TableRef> tableRefs = Lists.newArrayListWithExpectedSize(2 + table.getIndexes().size());
-                        // All multi-tenant tables have a view index table, so no need to check in that case
-                        if (tableType == PTableType.TABLE && (table.isMultiTenant() || hasViewIndexTable || hasLocalIndexTable)) {
-                            MetaDataUtil.deleteViewIndexSequences(connection, table.getPhysicalName());
-                            // TODO: consider removing this, as the DROP INDEX done for each DROP VIEW command
-                            // would have deleted all the rows already
-                            if (!dropMetaData) {
-                                if (hasViewIndexTable) {
-                                    String viewIndexSchemaName = null;
-                                    String viewIndexTableName = null;
-                                    if(schemaName != null) {
-                                        viewIndexSchemaName = MetaDataUtil.getViewIndexTableName(schemaName);
-                                        viewIndexTableName = tableName;
-                                    } else {
-                                        viewIndexTableName = MetaDataUtil.getViewIndexTableName(tableName);
-                                    }
-                                    PTable viewIndexTable = new PTableImpl(null, viewIndexSchemaName, viewIndexTableName, ts, table.getColumnFamilies());
-                                    tableRefs.add(new TableRef(null, viewIndexTable, ts, false));
-                                } 
-                                if (hasLocalIndexTable) {
-                                    String localIndexSchemaName = null;
-                                    String localIndexTableName = null;
-                                    if(schemaName != null) {
-                                        localIndexSchemaName = MetaDataUtil.getLocalIndexTableName(schemaName);
-                                        localIndexTableName = tableName;
-                                    } else {
-                                        localIndexTableName = MetaDataUtil.getLocalIndexTableName(tableName);
-                                    }
-                                    PTable localIndexTable = new PTableImpl(null, localIndexSchemaName, localIndexTableName, ts, Collections.<PColumnFamily>emptyList());
-                                    tableRefs.add(new TableRef(null, localIndexTable, ts, false));
-                                }
+            switch (code) {
+            case TABLE_NOT_FOUND:
+                if (!ifExists) { throw new TableNotFoundException(schemaName, tableName); }
+                break;
+            case NEWER_TABLE_FOUND:
+                throw new NewerTableAlreadyExistsException(schemaName, tableName);
+            case UNALLOWED_TABLE_MUTATION:
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_MUTATE_TABLE)
+
+                .setSchemaName(schemaName).setTableName(tableName).build().buildException();
+            default:
+                connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName), parentTableName,
+                        result.getMutationTime());
+
+                if (result.getTable() != null && tableType != PTableType.VIEW) {
+                    connection.setAutoCommit(true);
+                    PTable table = result.getTable();
+                    boolean dropMetaData = result.getTable().getViewIndexId() == null &&
+
+                    connection.getQueryServices().getProps().getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA);
+                    long ts = (scn == null ? result.getMutationTime() : scn);
+                    // Create empty table and schema - they're only used to get the name from
+                    // PName name, PTableType type, long timeStamp, long sequenceNumber, List<PColumn> columns
+                    List<TableRef> tableRefs = Lists.newArrayListWithExpectedSize(2 + table.getIndexes().size());
+                    // All multi-tenant tables have a view index table, so no need to check in that case
+                    if (tableType == PTableType.TABLE
+                            && (table.isMultiTenant() || hasViewIndexTable || hasLocalIndexTable)) {
+
+                        MetaDataUtil.deleteViewIndexSequences(connection, table.getPhysicalName());
+                        if (hasViewIndexTable) {
+                            String viewIndexSchemaName = null;
+                            String viewIndexTableName = null;
+                            if (schemaName != null) {
+                                viewIndexSchemaName = MetaDataUtil.getViewIndexTableName(schemaName);
+                                viewIndexTableName = tableName;
+                            } else {
+                                viewIndexTableName = MetaDataUtil.getViewIndexTableName(tableName);
                             }
+                            PTable viewIndexTable = new PTableImpl(null, viewIndexSchemaName, viewIndexTableName, ts,
+                                    table.getColumnFamilies());
+                            tableRefs.add(new TableRef(null, viewIndexTable, ts, false));
                         }
-                        if (!dropMetaData) {
-                            // Delete everything in the column. You'll still be able to do queries at earlier timestamps
-                            tableRefs.add(new TableRef(null, table, ts, false));
-                            // TODO: Let the standard mutable secondary index maintenance handle this?
-                            for (PTable index: table.getIndexes()) {
-                                tableRefs.add(new TableRef(null, index, ts, false));
+                        if (hasLocalIndexTable) {
+                            String localIndexSchemaName = null;
+                            String localIndexTableName = null;
+                            if (schemaName != null) {
+                                localIndexSchemaName = MetaDataUtil.getLocalIndexTableName(schemaName);
+                                localIndexTableName = tableName;
+                            } else {
+                                localIndexTableName = MetaDataUtil.getLocalIndexTableName(tableName);
                             }
-                            MutationPlan plan = new PostDDLCompiler(connection).compile(tableRefs, null, null, Collections.<PColumn>emptyList(), ts);
-                            return connection.getQueryServices().updateData(plan);
+                            PTable localIndexTable = new PTableImpl(null, localIndexSchemaName, localIndexTableName,
+                                    ts, Collections.<PColumnFamily> emptyList());
+                            tableRefs.add(new TableRef(null, localIndexTable, ts, false));
                         }
                     }
-                    break;
+                    tableRefs.add(new TableRef(null, table, ts, false));
+                    // TODO: Let the standard mutable secondary index maintenance handle this?
+                    for (PTable index : table.getIndexes()) {
+                        tableRefs.add(new TableRef(null, index, ts, false));
+                    }
+                    deleteFromStatsTable(tableRefs, ts);
+                    if (!dropMetaData) {
+                        MutationPlan plan = new PostDDLCompiler(connection).compile(tableRefs, null, null,
+                                Collections.<PColumn> emptyList(), ts);
+                        // Delete everything in the column. You'll still be able to do queries at earlier timestamps
+                        return connection.getQueryServices().updateData(plan);
+                    }
                 }
-                 return new MutationState(0,connection);
+                break;
+            }
+            return new MutationState(0, connection);
         } finally {
             connection.setAutoCommit(wasAutoCommit);
         }
     }
 
+    private void deleteFromStatsTable(List<TableRef> tableRefs, long ts) throws SQLException {
+        Properties props = new Properties(connection.getClientInfo());
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+        Connection conn = DriverManager.getConnection(connection.getURL(), props);
+        conn.setAutoCommit(true);
+        boolean success = false;
+        SQLException sqlException = null;
+        try {
+            StringBuilder buf = new StringBuilder("DELETE FROM SYSTEM.STATS WHERE PHYSICAL_NAME IN (");
+            for (TableRef ref : tableRefs) {
+                buf.append("'" + ref.getTable().getName().getString() + "',");
+            }
+            buf.setCharAt(buf.length() - 1, ')');
+            conn.createStatement().execute(buf.toString());
+            success = true;
+        } catch (SQLException e) {
+            sqlException = e;
+        } finally {
+            try {
+                conn.close();
+            } catch (SQLException e) {
+                if (sqlException == null) {
+                    // If we're not in the middle of throwing another exception
+                    // then throw the exception we got on close.
+                    if (success) {
+                        sqlException = e;
+                    }
+                } else {
+                    sqlException.setNextException(e);
+                }
+            }
+            if (sqlException != null) { throw sqlException; }
+        }
+    }
+
     private MutationCode processMutationResult(String schemaName, String tableName, MetaDataMutationResult result) throws SQLException {
         final MutationCode mutationCode = result.getMutationCode();
         PName tenantId = connection.getTenantId();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java
index 24da14d..01c236f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.schema;
 
 import java.util.Collection;
+import java.util.List;
 
 /**
  * 
@@ -51,4 +52,6 @@ public interface PColumnFamily {
     PColumn getColumn(String name) throws ColumnNotFoundException;
     
     int getEstimatedSize();
+    
+    List<byte[]> getGuidePosts();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
index 5ccd50b..15ac8fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.schema;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -34,6 +35,7 @@ public class PColumnFamilyImpl implements PColumnFamily {
     private final Map<String, PColumn> columnByString;
     private final Map<byte[], PColumn> columnByBytes;
     private final int estimatedSize;
+    private List<byte[]> guidePosts = Collections.emptyList();
 
     @Override
     public int getEstimatedSize() {
@@ -41,9 +43,23 @@ public class PColumnFamilyImpl implements PColumnFamily {
     }
     
     public PColumnFamilyImpl(PName name, List<PColumn> columns) {
+       this(name, columns, null);
+    }
+    
+    public PColumnFamilyImpl(PName name, List<PColumn> columns, List<byte[]> guidePosts) {
         Preconditions.checkNotNull(name);
-        long estimatedSize = SizedUtil.OBJECT_SIZE + SizedUtil.POINTER_SIZE * 4 + SizedUtil.INT_SIZE + name.getEstimatedSize() +
-                SizedUtil.sizeOfMap(columns.size()) * 2 + SizedUtil.sizeOfArrayList(columns.size());
+        // Include guidePosts also in estimating the size
+        int guidePostsSize = 0;
+        if(guidePosts != null) {
+            guidePostsSize = guidePosts.size();
+            for(byte[] gps : guidePosts) {
+                guidePostsSize += gps.length;
+            }
+            Collections.sort(guidePosts, Bytes.BYTES_COMPARATOR);
+            this.guidePosts = guidePosts;
+        }
+        long estimatedSize = SizedUtil.OBJECT_SIZE + SizedUtil.POINTER_SIZE * 5 + SizedUtil.INT_SIZE + name.getEstimatedSize() +
+                SizedUtil.sizeOfMap(columns.size()) * 2 + SizedUtil.sizeOfArrayList(columns.size()) + SizedUtil.sizeOfArrayList(guidePostsSize);
         this.name = name;
         this.columns = ImmutableList.copyOf(columns);
         ImmutableMap.Builder<String, PColumn> columnByStringBuilder = ImmutableMap.builder();
@@ -85,4 +101,9 @@ public class PColumnFamilyImpl implements PColumnFamily {
         }
         return column;
     }
+
+    @Override
+    public List<byte[]> getGuidePosts() {
+        return guidePosts;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index cbf0dad..374b10c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.schema.stat.PTableStats;
 
 
 /**
@@ -253,10 +252,11 @@ public interface PTable {
     int newKey(ImmutableBytesWritable key, byte[][] values);
 
     /**
-     * Return the statistics table associated with this PTable.
+     * Return the statistics table associated with this PTable. A list of 
+     * guide posts are return 
      * @return the statistics table.
      */
-    PTableStats getTableStats();
+    List<byte[]> getGuidePosts();
 
     RowKeySchema getRowKeySchema();
 
@@ -316,4 +316,5 @@ public interface PTable {
     
     int getEstimatedSize();
     IndexType getIndexType();
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index abe637d..41faaf2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -26,11 +26,10 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
+import java.util.TreeMap;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
@@ -96,8 +95,6 @@ public class PTableImpl implements PTable {
     private ListMultimap<String,PColumn> columnsByName;
     private PName pkName;
     private Integer bucketNum;
-    // Statistics associated with this table.
-    private PTableStats stats;
     private RowKeySchema rowKeySchema;
     // Indexes associated with this table.
     private List<PTable> indexes;
@@ -116,6 +113,7 @@ public class PTableImpl implements PTable {
     private Short viewIndexId;
     private int estimatedSize;
     private IndexType indexType;
+    private List<byte[]> guidePosts = Collections.emptyList();
     
     public PTableImpl() {
         this.indexes = Collections.emptyList();
@@ -213,6 +211,17 @@ public class PTableImpl implements PTable {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataTableName,
                 indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, viewType, viewIndexId, indexType);
     }
+    
+    public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type,
+            PIndexState state, long timeStamp, long sequenceNumber, PName pkName, Integer bucketNum,
+            List<PColumn> columns, PName dataTableName, List<PTable> indexes, boolean isImmutableRows,
+            List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL,
+            boolean multiTenant, ViewType viewType, Short viewIndexId, IndexType indexType, PTableStats stats)
+            throws SQLException {
+        return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName,
+                bucketNum, columns, dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
+                viewExpression, disableWAL, multiTenant, viewType, viewIndexId, indexType, stats);
+    }
 
     private PTableImpl(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber,
             PName pkName, Integer bucketNum, List<PColumn> columns, PName dataTableName, List<PTable> indexes, boolean isImmutableRows,
@@ -220,6 +229,16 @@ public class PTableImpl implements PTable {
         init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
                 new PTableStatsImpl(), dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, viewType, viewIndexId, indexType);
     }
+    
+    private PTableImpl(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state,
+            long timeStamp, long sequenceNumber, PName pkName, Integer bucketNum, List<PColumn> columns,
+            PName dataTableName, List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames,
+            PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, ViewType viewType,
+            Short viewIndexId, IndexType indexType, PTableStats stats) throws SQLException {
+        init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
+                stats, dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression,
+                disableWAL, multiTenant, viewType, viewIndexId, indexType);
+    }
 
     @Override
     public boolean isMultiTenant() {
@@ -240,7 +259,7 @@ public class PTableImpl implements PTable {
     private void init(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber,
             PName pkName, Integer bucketNum, List<PColumn> columns, PTableStats stats, PName parentTableName, List<PTable> indexes,
             boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
-            ViewType viewType, Short viewIndexId, IndexType indexType) throws SQLException {
+            ViewType viewType, Short viewIndexId, IndexType indexType ) throws SQLException {
         Preconditions.checkNotNull(schemaName);
         Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty
         int estimatedSize = SizedUtil.OBJECT_SIZE * 2 + 23 * SizedUtil.POINTER_SIZE + 4 * SizedUtil.INT_SIZE + 2 * SizedUtil.LONG_SIZE + 2 * SizedUtil.INT_OBJECT_SIZE +
@@ -330,16 +349,40 @@ public class PTableImpl implements PTable {
                 columnsInFamily.add(column);
             }
         }
-        
         this.rowKeySchema = builder.build();
         estimatedSize += rowKeySchema.getEstimatedSize();
         Iterator<Map.Entry<PName,List<PColumn>>> iterator = familyMap.entrySet().iterator();
         PColumnFamily[] families = new PColumnFamily[familyMap.size()];
+        if (families.length == 0) {
+            if(stats != null) {
+                byte[] defaultFamilyNameBytes = null;
+                if(defaultFamilyName == null) {
+                    defaultFamilyNameBytes = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+                } else {
+                    defaultFamilyNameBytes = defaultFamilyName.getBytes();
+                }
+                if (stats.getGuidePosts().get(defaultFamilyNameBytes) != null) {
+                    guidePosts = stats.getGuidePosts().get(defaultFamilyNameBytes);
+                    if (guidePosts != null) {
+                        Collections.sort(guidePosts, Bytes.BYTES_COMPARATOR);
+                        estimatedSize += SizedUtil.sizeOfArrayList(guidePosts.size());
+                        for (byte[] gps : guidePosts) {
+                            estimatedSize += gps.length;
+                        }
+                    }
+                }
+            }
+        }
         ImmutableMap.Builder<String, PColumnFamily> familyByString = ImmutableMap.builder();
-        ImmutableSortedMap.Builder<byte[], PColumnFamily> familyByBytes = ImmutableSortedMap.orderedBy(Bytes.BYTES_COMPARATOR);
+        ImmutableSortedMap.Builder<byte[], PColumnFamily> familyByBytes = ImmutableSortedMap
+                .orderedBy(Bytes.BYTES_COMPARATOR);
+        List<byte[]> famGuidePosts = null;
         for (int i = 0; i < families.length; i++) {
             Map.Entry<PName,List<PColumn>> entry = iterator.next();
-            PColumnFamily family = new PColumnFamilyImpl(entry.getKey(), entry.getValue());
+            if (stats != null) {
+                famGuidePosts = stats.getGuidePosts().get(entry.getKey().getBytes());
+            }
+            PColumnFamily family = new PColumnFamilyImpl(entry.getKey(), entry.getValue(), famGuidePosts);
             families[i] = family;
             familyByString.put(family.getName().getString(), family);
             familyByBytes.put(family.getName().getBytes(), family);
@@ -350,8 +393,6 @@ public class PTableImpl implements PTable {
         this.familyByString = familyByString.build();
         estimatedSize += SizedUtil.sizeOfArrayList(families.length);
         estimatedSize += SizedUtil.sizeOfMap(families.length) * 2;
-        
-        this.stats = stats;
         this.indexes = indexes == null ? Collections.<PTable>emptyList() : indexes;
         for (PTable index : this.indexes) {
             estimatedSize += index.getEstimatedSize();
@@ -693,8 +734,8 @@ public class PTableImpl implements PTable {
     }
 
     @Override
-    public PTableStats getTableStats() {
-        return stats;
+    public List<byte[]> getGuidePosts() {
+        return guidePosts;
     }
 
     @Override
@@ -851,14 +892,15 @@ public class PTableImpl implements PTable {
       for (PTableProtos.PTable curPTableProto : table.getIndexesList()) {
         indexes.add(createFromProto(curPTableProto));
       }
+      
       boolean isImmutableRows = table.getIsImmutableRows();
-      Map<String, byte[][]> guidePosts = new HashMap<String, byte[][]>();
+      TreeMap<byte[], List<byte[]>> tableGuidePosts = new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR);
       for (PTableProtos.PTableStats pTableStatsProto : table.getGuidePostsList()) {
-        byte[][] value = new byte[pTableStatsProto.getValuesCount()][];
-        for (int j = 0; j < pTableStatsProto.getValuesCount(); j++) {
-          value[j] = pTableStatsProto.getValues(j).toByteArray();
-        }
-        guidePosts.put(pTableStatsProto.getKey(), value);
+          List<byte[]> value = Lists.newArrayListWithExpectedSize(pTableStatsProto.getValuesCount());
+            for (int j = 0; j < pTableStatsProto.getValuesCount(); j++) {
+                value.add(pTableStatsProto.getValues(j).toByteArray());
+            }
+            tableGuidePosts.put(pTableStatsProto.getKeyBytes().toByteArray(), value);
       }
       PName dataTableName = null;
       if (table.hasDataTableNameBytes()) {
@@ -886,7 +928,7 @@ public class PTableImpl implements PTable {
         }
       }
       
-      PTableStats stats = new PTableStatsImpl(guidePosts);
+      PTableStats stats = new PTableStatsImpl(tableGuidePosts);
       try {
         PTableImpl result = new PTableImpl();
         result.init(tenantId, schemaName, tableName, tableType, indexState, timeStamp, sequenceNumber, pkName,
@@ -937,24 +979,37 @@ public class PTableImpl implements PTable {
         PColumn column = columns.get(i);
         builder.addColumns(PColumnImpl.toProto(column));
       }
+
       List<PTable> indexes = table.getIndexes();
       for (PTable curIndex : indexes) {
         builder.addIndexes(toProto(curIndex));
       }
       builder.setIsImmutableRows(table.isImmutableRows());
 
-      // build stats
-      Map<String, byte[][]> statsMap = table.getTableStats().getGuidePosts();
-      if(statsMap != null) {
-        for (Entry<String, byte[][]> entry : statsMap.entrySet()) {
-          PTableProtos.PTableStats.Builder statsBuilder = PTableProtos.PTableStats.newBuilder();
-          statsBuilder.setKey(entry.getKey());
-          for (byte[] curVal : entry.getValue()) {
-            statsBuilder.addValues(HBaseZeroCopyByteString.wrap(curVal));
-          }
-          builder.addGuidePosts(statsBuilder.build());
-        }
-      }
+        // build stats for the table
+      if (table.getColumnFamilies().isEmpty() && !table.getGuidePosts().isEmpty()) {
+         List<byte[]> stats = table.getGuidePosts();
+          if (stats != null) {
+             PTableProtos.PTableStats.Builder statsBuilder = PTableProtos.PTableStats.newBuilder();
+             statsBuilder.setKey(Bytes.toString(SchemaUtil.getEmptyColumnFamily(table)));
+             for (byte[] stat : stats) {
+                 statsBuilder.addValues(HBaseZeroCopyByteString.wrap(stat));
+             }
+             builder.addGuidePosts(statsBuilder.build());
+         }
+      } else {
+            for (PColumnFamily fam : table.getColumnFamilies()) {
+                PTableProtos.PTableStats.Builder statsBuilder = PTableProtos.PTableStats.newBuilder();
+                if (fam.getGuidePosts() != null) {
+                    statsBuilder.setKey(fam.getName().getString());
+                    for (byte[] stat : fam.getGuidePosts()) {
+                        statsBuilder.addValues(HBaseZeroCopyByteString.wrap(stat));
+                    }
+                    builder.addGuidePosts(statsBuilder.build());
+                }
+            }
+       }
+
       if (table.getParentName() != null) {
         builder.setDataTableNameBytes(HBaseZeroCopyByteString.wrap(table.getParentTableName().getBytes()));
       }
@@ -980,4 +1035,5 @@ public class PTableImpl implements PTable {
     public PTableKey getKey() {
         return key;
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStats.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStats.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStats.java
index 5d867ef..dfda457 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStats.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStats.java
@@ -17,28 +17,19 @@
  */
 package org.apache.phoenix.schema.stat;
 
-import java.util.Map;
+import java.util.List;
+import java.util.TreeMap;
 
-import org.apache.hadoop.hbase.HRegionInfo;
 
-
-/**
- * Interface for Phoenix table statistics. Statistics is collected on the server
- * side and can be used for various purpose like splitting region for scanning, etc.
- * 
- * The table is defined on the client side, but it is populated on the server side. The client
- * should not populate any data to the statistics object.
+/*
+ * The table is defined on the client side, but it is populated on the server side. The client should not populate any data to the
+ * statistics object.
  */
 public interface PTableStats {
-
     /**
-     * Given the region info, returns an array of bytes that is the current estimate of key
-     * distribution inside that region. The keys should split that region into equal chunks.
-     * 
-     * @param region
-     * @return array of keys
+     * Returns a tree map of the guide posts collected against a column family
+     * @return
      */
-    byte[][] getRegionGuidePosts(HRegionInfo region);
+    TreeMap<byte[], List<byte[]>> getGuidePosts();
 
-    Map<String, byte[][]> getGuidePosts();
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java
index 8690a17..3e8f1e9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/PTableStatsImpl.java
@@ -16,40 +16,31 @@
  * limitations under the License.
  */
 package org.apache.phoenix.schema.stat;
+ import java.util.List;
+import java.util.TreeMap;
 
-import java.util.Map;
-
-import org.apache.hadoop.hbase.HRegionInfo;
-
-import com.google.common.collect.ImmutableMap;
-
-
-/**
+import org.apache.hadoop.hbase.util.Bytes;
+ 
+ /**
  * Implementation for PTableStats.
  */
 public class PTableStatsImpl implements PTableStats {
 
-    // The map for guide posts should be immutable. We only take the current snapshot from outside
-    // method call and store it.
-    private Map<String, byte[][]> regionGuidePosts;
+    public static final PTableStats NO_STATS = new PTableStatsImpl();
 
-    public PTableStatsImpl() { }
+    private TreeMap<byte[], List<byte[]>> guidePosts = new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR);
 
-    public PTableStatsImpl(Map<String, byte[][]> stats) {
-        regionGuidePosts = ImmutableMap.copyOf(stats);
+    public PTableStatsImpl() {
+        this(new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR));
     }
 
-    @Override
-    public byte[][] getRegionGuidePosts(HRegionInfo region) {
-        return regionGuidePosts.get(region.getRegionNameAsString());
+    public PTableStatsImpl(TreeMap<byte[], List<byte[]>> guidePosts) {
+        this.guidePosts = guidePosts;
     }
 
     @Override
-    public Map<String, byte[][]> getGuidePosts(){
-      if(regionGuidePosts != null) {
-        return ImmutableMap.copyOf(regionGuidePosts);
-      }
-      
-      return null;
+    public TreeMap<byte[], List<byte[]>> getGuidePosts() {
+        return guidePosts;
     }
+
 }